Whamcloud - gitweb
14b49edb521963b16783a280adfb55c0290ed246
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
47
48 inline int
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
50 {
51         /* if memcmp() == 0, it is NULL */
52         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
53 }
54
55 inline void mxlnd_noop(char *s, ...)
56 {
57         return;
58 }
59
60 char *
61 mxlnd_ctxstate_to_str(int mxc_state)
62 {
63         switch (mxc_state) {
64         case MXLND_CTX_INIT:
65                 return "MXLND_CTX_INIT";
66         case MXLND_CTX_IDLE:
67                 return "MXLND_CTX_IDLE";
68         case MXLND_CTX_PREP:
69                 return "MXLND_CTX_PREP";
70         case MXLND_CTX_PENDING:
71                 return "MXLND_CTX_PENDING";
72         case MXLND_CTX_COMPLETED:
73                 return "MXLND_CTX_COMPLETED";
74         case MXLND_CTX_CANCELED:
75                 return "MXLND_CTX_CANCELED";
76         default:
77                 return "*unknown*";
78         }
79 }
80
81 char *
82 mxlnd_connstatus_to_str(int mxk_status)
83 {
84         switch (mxk_status) {
85         case MXLND_CONN_READY:
86                 return "MXLND_CONN_READY";
87         case MXLND_CONN_INIT:
88                 return "MXLND_CONN_INIT";
89         case MXLND_CONN_REQ:
90                 return "MXLND_CONN_REQ";
91         case MXLND_CONN_ACK:
92                 return "MXLND_CONN_ACK";
93         case MXLND_CONN_WAIT:
94                 return "MXLND_CONN_WAIT";
95         case MXLND_CONN_DISCONNECT:
96                 return "MXLND_CONN_DISCONNECT";
97         case MXLND_CONN_FAIL:
98                 return "MXLND_CONN_FAIL";
99         default:
100                 return "unknown";
101         }
102 }
103
104 char *
105 mxlnd_msgtype_to_str(int type) {
106         switch (type) {
107         case MXLND_MSG_EAGER:
108                 return "MXLND_MSG_EAGER";
109         case MXLND_MSG_CONN_REQ:
110                 return "MXLND_MSG_CONN_REQ";
111         case MXLND_MSG_CONN_ACK:
112                 return "MXLND_MSG_CONN_ACK";
113         case MXLND_MSG_BYE:
114                 return "MXLND_MSG_BYE";
115         case MXLND_MSG_NOOP:
116                 return "MXLND_MSG_NOOP";
117         case MXLND_MSG_PUT_REQ:
118                 return "MXLND_MSG_PUT_REQ";
119         case MXLND_MSG_PUT_ACK:
120                 return "MXLND_MSG_PUT_ACK";
121         case MXLND_MSG_PUT_DATA:
122                 return "MXLND_MSG_PUT_DATA";
123         case MXLND_MSG_GET_REQ:
124                 return "MXLND_MSG_GET_REQ";
125         case MXLND_MSG_GET_DATA:
126                 return "MXLND_MSG_GET_DATA";
127         default:
128                 return "unknown";
129         }
130 }
131
132 char *
133 mxlnd_lnetmsg_to_str(int type)
134 {
135         switch (type) {
136         case LNET_MSG_ACK:
137                 return "LNET_MSG_ACK";
138         case LNET_MSG_PUT:
139                 return "LNET_MSG_PUT";
140         case LNET_MSG_GET:
141                 return "LNET_MSG_GET";
142         case LNET_MSG_REPLY:
143                 return "LNET_MSG_REPLY";
144         case LNET_MSG_HELLO:
145                 return "LNET_MSG_HELLO";
146         default:
147                 LBUG();
148                 return "*unknown*";
149         }
150 }
151
152 static inline u64
153 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
154 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
155 {
156         u64 type        = (u64) ctx->mxc_msg_type;
157         u64 err         = (u64) error;
158         u64 match       = 0ULL;
159
160         LASSERT(ctx->mxc_msg_type != 0);
161         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
162         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
163         return match;
164 }
165
166 static inline void
167 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
168 {
169         *msg_type = (u8) MXLND_MSG_TYPE(match);
170         *error    = (u8) MXLND_ERROR_VAL(match);
171         *cookie   = match & MXLND_MAX_COOKIE;
172         LASSERT(*msg_type == MXLND_MSG_EAGER    ||
173                 *msg_type == MXLND_MSG_ICON_REQ ||
174                 *msg_type == MXLND_MSG_CONN_REQ ||
175                 *msg_type == MXLND_MSG_ICON_ACK ||
176                 *msg_type == MXLND_MSG_CONN_ACK ||
177                 *msg_type == MXLND_MSG_BYE      ||
178                 *msg_type == MXLND_MSG_NOOP     ||
179                 *msg_type == MXLND_MSG_PUT_REQ  ||
180                 *msg_type == MXLND_MSG_PUT_ACK  ||
181                 *msg_type == MXLND_MSG_PUT_DATA ||
182                 *msg_type == MXLND_MSG_GET_REQ  ||
183                 *msg_type == MXLND_MSG_GET_DATA);
184         return;
185 }
186
187 struct kmx_ctx *
188 mxlnd_get_idle_rx(void)
189 {
190         struct list_head        *tmp    = NULL;
191         struct kmx_ctx          *rx     = NULL;
192
193         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
194
195         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
196                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
197                 return NULL;
198         }
199
200         tmp = &kmxlnd_data.kmx_rx_idle;
201         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
202         list_del_init(&rx->mxc_list);
203         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
204
205 #if MXLND_DEBUG
206         if (rx->mxc_get != rx->mxc_put) {
207                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
208                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
209                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
210                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
211                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
212                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
213                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
214                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
215                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
216                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
217         }
218 #endif
219         LASSERT (rx->mxc_get == rx->mxc_put);
220
221         rx->mxc_get++;
222
223         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
224         rx->mxc_state = MXLND_CTX_PREP;
225
226         return rx;
227 }
228
229 int
230 mxlnd_put_idle_rx(struct kmx_ctx *rx)
231 {
232         if (rx == NULL) {
233                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
234                 return -EINVAL;
235         } else if (rx->mxc_type != MXLND_REQ_RX) {
236                 CDEBUG(D_NETERROR, "called with tx\n");
237                 return -EINVAL;
238         }
239         LASSERT(rx->mxc_get == rx->mxc_put + 1);
240         mxlnd_ctx_init(rx);
241         rx->mxc_put++;
242         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
243         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
244         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
245         return 0;
246 }
247
248 int
249 mxlnd_reduce_idle_rxs(__u32 count)
250 {
251         __u32                   i       = 0;
252         struct kmx_ctx          *rx     = NULL;
253
254         spin_lock(&kmxlnd_data.kmx_rxs_lock);
255         for (i = 0; i < count; i++) {
256                 rx = mxlnd_get_idle_rx();
257                 if (rx != NULL) {
258                         struct list_head *tmp = &rx->mxc_global_list;
259                         list_del_init(tmp);
260                         mxlnd_ctx_free(rx);
261                 } else {
262                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
263                         break;
264                 }
265         }
266         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
267         return 0;
268 }
269
270 struct kmx_ctx *
271 mxlnd_get_idle_tx(void)
272 {
273         struct list_head        *tmp    = NULL;
274         struct kmx_ctx          *tx     = NULL;
275
276         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
277
278         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
279                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
280                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
281                 return NULL;
282         }
283
284         tmp = &kmxlnd_data.kmx_tx_idle;
285         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
286         list_del_init(&tx->mxc_list);
287
288         /* Allocate a new completion cookie.  It might not be needed,
289          * but we've got a lock right now and we're unlikely to
290          * wrap... */
291         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
292         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
293                 kmxlnd_data.kmx_tx_next_cookie = 1;
294         }
295         kmxlnd_data.kmx_tx_used++;
296         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
297
298         LASSERT (tx->mxc_get == tx->mxc_put);
299
300         tx->mxc_get++;
301
302         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
303         LASSERT (tx->mxc_lntmsg[0] == NULL);
304         LASSERT (tx->mxc_lntmsg[1] == NULL);
305
306         tx->mxc_state = MXLND_CTX_PREP;
307
308         return tx;
309 }
310
311 void
312 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye);
313
314 int
315 mxlnd_put_idle_tx(struct kmx_ctx *tx)
316 {
317         //int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
318         int             result  = 0;
319         lnet_msg_t      *lntmsg[2];
320
321         if (tx == NULL) {
322                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
323                 return -EINVAL;
324         } else if (tx->mxc_type != MXLND_REQ_TX) {
325                 CDEBUG(D_NETERROR, "called with rx\n");
326                 return -EINVAL;
327         }
328         if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
329               tx->mxc_status.code == MX_STATUS_TRUNCATED)) {
330                 struct kmx_conn *conn   = tx->mxc_conn;
331
332                 result = -EIO;
333                 mxlnd_conn_disconnect(conn, 0, 1);
334         }
335
336         lntmsg[0] = tx->mxc_lntmsg[0];
337         lntmsg[1] = tx->mxc_lntmsg[1];
338
339         LASSERT(tx->mxc_get == tx->mxc_put + 1);
340         mxlnd_ctx_init(tx);
341         tx->mxc_put++;
342         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
343         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
344         kmxlnd_data.kmx_tx_used--;
345         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
346         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
347         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
348         return 0;
349 }
350
351 /**
352  * Free the conn
353  * \param conn a kmx_conn pointer
354  *
355  * The calling function should remove the conn from the conns list first
356  * then destroy it.
357  */
358 void
359 mxlnd_conn_free(struct kmx_conn *conn)
360 {
361         struct kmx_peer *peer   = conn->mxk_peer;
362
363         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365                  list_empty (&conn->mxk_tx_free_queue) &&
366                  list_empty (&conn->mxk_pending));
367         if (!list_empty(&conn->mxk_list)) {
368                 list_del_init(&conn->mxk_list);
369                 if (peer->mxp_conn == conn) {
370                         peer->mxp_conn = NULL;
371                         if (!mxlnd_endpoint_addr_null(conn->mxk_epa))
372                                 mx_set_endpoint_addr_context(conn->mxk_epa,
373                                                              (void *) NULL);
374                 }
375         }
376         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
377         MXLND_FREE (conn, sizeof (*conn));
378         return;
379 }
380
381
382 void
383 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
384 {
385         int                     found   = 0;
386         struct kmx_ctx          *ctx    = NULL;
387         struct kmx_ctx          *next   = NULL;
388         mx_return_t             mxret   = MX_SUCCESS;
389         u32                     result  = 0;
390
391         do {
392                 found = 0;
393                 spin_lock(&conn->mxk_lock);
394                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
395                         /* we will delete all including txs */
396                         list_del_init(&ctx->mxc_list);
397                         if (ctx->mxc_type == MXLND_REQ_RX) {
398                                 found = 1;
399                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
400                                                   &ctx->mxc_mxreq,
401                                                   &result);
402                                 if (mxret != MX_SUCCESS) {
403                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
404                                 }
405                                 if (result == 1) {
406                                         ctx->mxc_status.code = -ECONNABORTED;
407                                         ctx->mxc_state = MXLND_CTX_CANCELED;
408                                         /* NOTE this calls lnet_finalize() and
409                                          * we cannot hold any locks when calling it.
410                                          * It also calls mxlnd_conn_decref(conn) */
411                                         spin_unlock(&conn->mxk_lock);
412                                         mxlnd_handle_rx_completion(ctx);
413                                         spin_lock(&conn->mxk_lock);
414                                 }
415                                 break;
416                         }
417                 }
418                 spin_unlock(&conn->mxk_lock);
419         }
420         while (found);
421
422         return;
423 }
424
425 /**
426  * Shutdown a connection
427  * \param conn a kmx_conn pointer
428  * \param mx_dis call mx_disconnect()
429  * \param send_bye send peer a BYE msg
430  *
431  * This function sets the status to DISCONNECT, completes queued
432  * txs with failure, calls mx_disconnect, which will complete
433  * pending txs and matched rxs with failure.
434  */
435 void
436 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye)
437 {
438         mx_endpoint_addr_t      epa     = conn->mxk_epa;
439         struct list_head        *tmp    = NULL;
440         int                     valid   = !mxlnd_endpoint_addr_null(epa);
441
442         spin_lock(&conn->mxk_lock);
443         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
444                 spin_unlock(&conn->mxk_lock);
445                 return;
446         }
447         conn->mxk_status = MXLND_CONN_DISCONNECT;
448         conn->mxk_timeout = 0;
449
450         while (!list_empty(&conn->mxk_tx_free_queue) ||
451                !list_empty(&conn->mxk_tx_credit_queue)) {
452
453                 struct kmx_ctx          *tx     = NULL;
454
455                 if (!list_empty(&conn->mxk_tx_free_queue)) {
456                         tmp = &conn->mxk_tx_free_queue;
457                 } else {
458                         tmp = &conn->mxk_tx_credit_queue;
459                 }
460
461                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
462                 list_del_init(&tx->mxc_list);
463                 tx->mxc_status.code = -ECONNABORTED;
464                 spin_unlock(&conn->mxk_lock);
465                 mxlnd_put_idle_tx(tx);
466                 mxlnd_conn_decref(conn); /* for this tx */
467                 spin_lock(&conn->mxk_lock);
468         }
469
470         spin_unlock(&conn->mxk_lock);
471
472         /* cancel pending rxs */
473         mxlnd_conn_cancel_pending_rxs(conn);
474
475         if (send_bye && valid) {
476                 u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET;
477                 /* send a BYE to the peer */
478                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
479                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
480                 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
481                                 epa, match, NULL, NULL);
482                 /* wait to allow the peer to ack our message */
483                 mxlnd_sleep(msecs_to_jiffies(20));
484         }
485
486         if (kmxlnd_data.kmx_shutdown != 1) {
487                 time_t          last_alive      = 0;
488                 unsigned long   last_msg        = 0;
489
490                 /* notify LNET that we are giving up on this peer */
491                 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
492                         last_msg = conn->mxk_last_rx;
493                 else
494                         last_msg = conn->mxk_last_tx;
495
496                 last_alive = cfs_time_current_sec() -
497                              cfs_duration_sec(cfs_time_current() - last_msg);
498                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
499
500                 if (mx_dis && valid)
501                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
502         }
503         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
504
505         return;
506 }
507
508 /**
509  * Allocate and initialize a new conn struct
510  * \param connp address of a kmx_conn pointer
511  * \param peer owning kmx_peer
512  *
513  * Returns 0 on success and -ENOMEM on failure
514  */
515 int
516 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
517 {
518         struct kmx_conn *conn    = NULL;
519
520         LASSERT(peer != NULL);
521
522         MXLND_ALLOC(conn, sizeof (*conn));
523         if (conn == NULL) {
524                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
525                 return -ENOMEM;
526         }
527         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
528
529         memset(conn, 0, sizeof(*conn));
530
531         /* conn->mxk_incarnation = 0 - will be set by peer */
532         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer 
533                                                    and one for the caller */
534         conn->mxk_peer = peer;
535         /* mxk_epa - to be set after mx_iconnect() */
536         INIT_LIST_HEAD(&conn->mxk_list);
537         spin_lock_init(&conn->mxk_lock);
538         /* conn->mxk_timeout = 0 */
539         conn->mxk_last_tx = jiffies;
540         conn->mxk_last_rx = conn->mxk_last_tx;
541         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
542         /* mxk_outstanding = 0 */
543         conn->mxk_status = MXLND_CONN_INIT;
544         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
545         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
546         /* conn->mxk_ntx_msgs = 0 */
547         /* conn->mxk_ntx_data = 0 */
548         /* conn->mxk_ntx_posted = 0 */
549         /* conn->mxk_data_posted = 0 */
550         INIT_LIST_HEAD(&conn->mxk_pending);
551
552         *connp = conn;
553
554         mxlnd_peer_addref(peer);        /* add a ref for this conn */
555
556         /* add to front of peer's conns list */
557         list_add(&conn->mxk_list, &peer->mxp_conns);
558         peer->mxp_conn = conn;
559         return 0;
560 }
561
562 int
563 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
564 {
565         int ret = 0;
566         write_lock(&kmxlnd_data.kmx_global_lock);
567         ret = mxlnd_conn_alloc_locked(connp, peer);
568         write_unlock(&kmxlnd_data.kmx_global_lock);
569         return ret;
570 }
571
572 int
573 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
574 {
575         int             ret     = 0;
576         struct kmx_conn *conn   = ctx->mxc_conn;
577
578         ctx->mxc_state = MXLND_CTX_PENDING;
579         if (conn != NULL) {
580                 spin_lock(&conn->mxk_lock);
581                 if (conn->mxk_status >= MXLND_CONN_INIT) {
582                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
583                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
584                                 conn->mxk_timeout = ctx->mxc_deadline;
585                         }
586                 } else {
587                         ctx->mxc_state = MXLND_CTX_COMPLETED;
588                         ret = -1;
589                 }
590                 spin_unlock(&conn->mxk_lock);
591         }
592         return ret;
593 }
594
595 int
596 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
597 {
598         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
599                 ctx->mxc_state == MXLND_CTX_COMPLETED);
600         if (ctx->mxc_state != MXLND_CTX_PENDING &&
601             ctx->mxc_state != MXLND_CTX_COMPLETED) {
602                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
603                        mxlnd_ctxstate_to_str(ctx->mxc_state));
604         }
605         ctx->mxc_state = MXLND_CTX_COMPLETED;
606         if (!list_empty(&ctx->mxc_list)) {
607                 struct kmx_conn *conn = ctx->mxc_conn;
608                 struct kmx_ctx *next = NULL;
609                 LASSERT(conn != NULL);
610                 spin_lock(&conn->mxk_lock);
611                 list_del_init(&ctx->mxc_list);
612                 conn->mxk_timeout = 0;
613                 if (!list_empty(&conn->mxk_pending)) {
614                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
615                         conn->mxk_timeout = next->mxc_deadline;
616                 }
617                 spin_unlock(&conn->mxk_lock);
618         }
619         return 0;
620 }
621
622 /**
623  * Free the peer
624  * \param peer a kmx_peer pointer
625  *
626  * The calling function should decrement the rxs, drain the tx queues and
627  * remove the peer from the peers list first then destroy it.
628  */
629 void
630 mxlnd_peer_free(struct kmx_peer *peer)
631 {
632         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer,
633                         peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : "");
634
635         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
636
637         if (peer == kmxlnd_data.kmx_localhost)
638                 LASSERT(kmxlnd_data.kmx_shutdown);
639
640         if (!list_empty(&peer->mxp_peers)) {
641                 /* assume we are locked */
642                 list_del_init(&peer->mxp_peers);
643         }
644
645         MXLND_FREE (peer, sizeof (*peer));
646         atomic_dec(&kmxlnd_data.kmx_npeers);
647         return;
648 }
649
650 #define MXLND_LOOKUP_COUNT 10
651
652 /* We only want the MAC address of the peer's Myricom NIC. We
653  * require that each node has the IPoMX interface (myriN) up.
654  * We will not pass any traffic over IPoMX, but it allows us
655  * to get the MAC address. */
656 static int
657 mxlnd_ip2nic_id(u32 ip, u64 *nic_id)
658 {
659         int                     ret     = 0;
660         int                     try     = 1;
661         int                     fatal   = 0;
662         u64                     tmp_id  = 0ULL;
663         unsigned char           *haddr  = NULL;
664         struct net_device       *dev    = NULL;
665         struct neighbour        *n      = NULL;
666         cfs_socket_t            *sock   = NULL;
667         __be32                  dst_ip  = htonl(ip);
668
669         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
670         if (dev == NULL) {
671                 return -ENODEV;
672         }
673
674         haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */
675
676         do {
677                 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
678                 if (n) {
679                         n->used = jiffies;
680                         if (n->nud_state & NUD_VALID) {
681                                 memcpy(haddr, n->ha, dev->addr_len);
682                                 neigh_release(n);
683                                 ret = 0;
684                                 break;
685                         }
686                 }
687                 /* not found, try to connect (force an arp) */
688                 libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
689                 if (!fatal)
690                         libcfs_sock_release(sock);
691                 schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */
692         } while (try++ < MXLND_LOOKUP_COUNT);
693
694         dev_put(dev);
695
696         if (tmp_id == 0ULL)
697                 ret = -EHOSTUNREACH;
698 #ifdef __LITTLE_ENDIAN
699         *nic_id = ___arch__swab64(tmp_id);
700 #else
701         *nic_id = tmp_id;
702 #endif
703         return ret;
704 }
705
706 /**
707  * Allocate and initialize a new peer struct
708  * \param peerp  address of a kmx_peer pointer
709  * \param nid  LNET node id
710  * Returns 0 on success and -ENOMEM on failure
711  */
712 int
713 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
714 {
715         int                     i       = 0;
716         int                     ret     = 0;
717         u32                     ip      = LNET_NIDADDR(nid);
718         struct kmx_peer        *peer    = NULL;
719
720         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
721
722         MXLND_ALLOC(peer, sizeof (*peer));
723         if (peer == NULL) {
724                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
725                 return -ENOMEM;
726         }
727         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
728
729         memset(peer, 0, sizeof(*peer));
730
731         peer->mxp_nid = nid;
732         /* peer->mxp_incarnation */
733         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
734
735         peer->mxp_ip = ip;
736         peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id;
737         peer->mxp_board = board;
738         peer->mxp_nic_id = nic_id;
739
740         if (nic_id == 0ULL) {
741                 ret = mxlnd_ip2nic_id(ip, &nic_id);
742                 if (ret != 0)
743                         CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret);
744                 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
745         }
746
747         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
748
749         INIT_LIST_HEAD(&peer->mxp_peers);
750         INIT_LIST_HEAD(&peer->mxp_conns);
751         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
752         if (ret != 0) {
753                 mxlnd_peer_decref(peer);
754                 return ret;
755         }
756
757         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
758                 struct kmx_ctx   *rx     = NULL;
759                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
760                 if (ret != 0) {
761                         mxlnd_reduce_idle_rxs(i);
762                         mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
763                         mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
764                         mxlnd_peer_decref(peer);
765                         return ret;
766                 }
767                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
768                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
769                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
770                 rx->mxc_put = -1;
771                 mxlnd_put_idle_rx(rx);
772         }
773         /* peer->mxp_reconnect_time = 0 */
774         /* peer->mxp_incompatible = 0 */
775
776         *peerp = peer;
777         return 0;
778 }
779
780 static inline struct kmx_peer *
781 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
782 {
783         int                     found   = 0;
784         int                     hash    = 0;
785         struct kmx_peer         *peer   = NULL;
786
787         hash = mxlnd_nid_to_hash(nid);
788
789         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
790                 if (peer->mxp_nid == nid) {
791                         found = 1;
792                         mxlnd_peer_addref(peer);
793                         break;
794                 }
795         }
796         return (found ? peer : NULL);
797 }
798
799 static inline struct kmx_peer *
800 mxlnd_find_peer_by_nid(lnet_nid_t nid)
801 {
802         struct kmx_peer *peer   = NULL;
803
804         read_lock(&kmxlnd_data.kmx_global_lock);
805         peer = mxlnd_find_peer_by_nid_locked(nid);
806         read_unlock(&kmxlnd_data.kmx_global_lock);
807         return peer;
808 }
809
810 static inline int
811 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
812 {
813         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
814                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
815                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
816                 tx->mxc_msg_type == MXLND_MSG_NOOP);
817 }
818
819 /**
820  * Set type and number of bytes
821  * \param msg  msg pointer
822  * \param type  of message
823  * \param body_nob  bytes in msg body
824  */
825 static inline void
826 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
827 {
828         msg->mxm_type = type;
829         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
830 }
831
832 static inline void
833 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
834 {
835         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
836         struct kmx_msg  *msg    = NULL;
837
838         LASSERT (tx != NULL);
839         LASSERT (nob <= MXLND_EAGER_SIZE);
840
841         tx->mxc_nid = nid;
842         /* tx->mxc_peer should have already been set if we know it */
843         tx->mxc_msg_type = type;
844         tx->mxc_nseg = 1;
845         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
846         tx->mxc_seg.segment_length = nob;
847         tx->mxc_pin_type = MX_PIN_PHYSICAL;
848         //tx->mxc_state = MXLND_CTX_PENDING;
849
850         msg = tx->mxc_msg;
851         msg->mxm_type = type;
852         msg->mxm_nob  = nob;
853
854         return;
855 }
856
857 static inline __u32
858 mxlnd_cksum (void *ptr, int nob)
859 {
860         char  *c  = ptr;
861         __u32  sum = 0;
862
863         while (nob-- > 0)
864                 sum = ((sum << 1) | (sum >> 31)) + *c++;
865
866         /* ensure I don't return 0 (== no checksum) */
867         return (sum == 0) ? 1 : sum;
868 }
869
870 /**
871  * Complete msg info
872  * \param tx  msg to send
873  */
874 static inline void
875 mxlnd_pack_msg(struct kmx_ctx *tx)
876 {
877         struct kmx_msg  *msg    = tx->mxc_msg;
878
879         /* type and nob should already be set in init_msg() */
880         msg->mxm_magic    = MXLND_MSG_MAGIC;
881         msg->mxm_version  = MXLND_MSG_VERSION;
882         /*   mxm_type */
883         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
884          * return credits as well */
885         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
886             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
887                 spin_lock(&tx->mxc_conn->mxk_lock);
888                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
889                 tx->mxc_conn->mxk_outstanding = 0;
890                 spin_unlock(&tx->mxc_conn->mxk_lock);
891         } else {
892                 msg->mxm_credits  = 0;
893         }
894         /*   mxm_nob */
895         msg->mxm_cksum    = 0;
896         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
897         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
898         msg->mxm_dstnid   = tx->mxc_nid;
899         /* if it is a new peer, the dststamp will be 0 */
900         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
901         msg->mxm_seq      = tx->mxc_cookie;
902
903         if (*kmxlnd_tunables.kmx_cksum) {
904                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
905         }
906 }
907
908 int
909 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
910 {
911         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
912         __u32     msg_cksum     = 0;
913         int       flip          = 0;
914         int       msg_nob       = 0;
915
916         /* 6 bytes are enough to have received magic + version */
917         if (nob < 6) {
918                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
919                 return -EPROTO;
920         }
921
922         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
923                 flip = 0;
924         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
925                 flip = 1;
926         } else {
927                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
928                 return -EPROTO;
929         }
930
931         if (msg->mxm_version !=
932             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
933                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
934                 return -EPROTO;
935         }
936
937         if (nob < hdr_size) {
938                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
939                 return -EPROTO;
940         }
941
942         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
943         if (msg_nob > nob) {
944                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
945                 return -EPROTO;
946         }
947
948         /* checksum must be computed with mxm_cksum zero and BEFORE anything
949          * gets flipped */
950         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
951         msg->mxm_cksum = 0;
952         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
953                 CDEBUG(D_NETERROR, "Bad checksum\n");
954                 return -EPROTO;
955         }
956         msg->mxm_cksum = msg_cksum;
957
958         if (flip) {
959                 /* leave magic unflipped as a clue to peer endianness */
960                 __swab16s(&msg->mxm_version);
961                 CLASSERT (sizeof(msg->mxm_type) == 1);
962                 CLASSERT (sizeof(msg->mxm_credits) == 1);
963                 msg->mxm_nob = msg_nob;
964                 __swab64s(&msg->mxm_srcnid);
965                 __swab64s(&msg->mxm_srcstamp);
966                 __swab64s(&msg->mxm_dstnid);
967                 __swab64s(&msg->mxm_dststamp);
968                 __swab64s(&msg->mxm_seq);
969         }
970
971         if (msg->mxm_srcnid == LNET_NID_ANY) {
972                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
973                 return -EPROTO;
974         }
975
976         switch (msg->mxm_type) {
977         default:
978                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
979                 return -EPROTO;
980
981         case MXLND_MSG_NOOP:
982                 break;
983
984         case MXLND_MSG_EAGER:
985                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
986                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
987                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
988                         return -EPROTO;
989                 }
990                 break;
991
992         case MXLND_MSG_PUT_REQ:
993                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
994                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
995                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
996                         return -EPROTO;
997                 }
998                 if (flip)
999                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1000                 break;
1001
1002         case MXLND_MSG_PUT_ACK:
1003                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1004                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1005                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1006                         return -EPROTO;
1007                 }
1008                 if (flip) {
1009                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1010                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1011                 }
1012                 break;
1013
1014         case MXLND_MSG_GET_REQ:
1015                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1016                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1017                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1018                         return -EPROTO;
1019                 }
1020                 if (flip) {
1021                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1022                 }
1023                 break;
1024
1025         case MXLND_MSG_CONN_REQ:
1026         case MXLND_MSG_CONN_ACK:
1027                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1028                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1029                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1030                         return -EPROTO;
1031                 }
1032                 if (flip) {
1033                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1034                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1035                 }
1036                 break;
1037         }
1038         return 0;
1039 }
1040
1041 /**
1042  * Receive a message
1043  * \param lntmsg  the LNET msg that this is continuing. If EAGER, then NULL.
1044  * \param length  length of incoming message
1045  *
1046  * The caller gets the rx and sets nid, peer and conn if known.
1047  *
1048  * Returns 0 on success and -1 on failure
1049  */
1050 int
1051 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1052 {
1053         int             ret     = 0;
1054         mx_return_t     mxret   = MX_SUCCESS;
1055         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1056
1057         rx->mxc_msg_type = msg_type;
1058         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1059         rx->mxc_cookie = cookie;
1060         /* rx->mxc_match may already be set */
1061         /* rx->mxc_seg.segment_ptr is already set */
1062         rx->mxc_seg.segment_length = length;
1063         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1064         ret = mxlnd_q_pending_ctx(rx);
1065         if (ret == -1) {
1066                 /* the caller is responsible for calling conn_decref() if needed */
1067                 return -1;
1068         }
1069         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1070                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1071         if (mxret != MX_SUCCESS) {
1072                 mxlnd_deq_pending_ctx(rx);
1073                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1074                                    mx_strerror(mxret), (int) mxret);
1075                 return -1;
1076         }
1077         return 0;
1078 }
1079
1080
1081 /**
1082  * This is the callback function that will handle unexpected receives
1083  * \param context  NULL, ignore
1084  * \param source  the peer's mx_endpoint_addr_t
1085  * \param match_value  the msg's bit, should be MXLND_MSG_EAGER
1086  * \param length  length of incoming message
1087  * \param data_if_available  ignore
1088  *
1089  * If it is an eager-sized msg, we will call recv_msg() with the actual
1090  * length. If it is a large message, we will call recv_msg() with a
1091  * length of 0 bytes to drop it because we should never have a large,
1092  * unexpected message.
1093  *
1094  * NOTE - The MX library blocks until this function completes. Make it as fast as
1095  * possible. DO NOT allocate memory which can block!
1096  *
1097  * If we cannot get a rx or the conn is closed, drop the message on the floor
1098  * (i.e. recv 0 bytes and ignore).
1099  */
1100 mx_unexp_handler_action_t
1101 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1102                  uint64_t match_value, uint32_t length, void *data_if_available)
1103 {
1104         int             ret             = 0;
1105         struct kmx_ctx  *rx             = NULL;
1106         mx_ksegment_t   seg;
1107         u8              msg_type        = 0;
1108         u8              error           = 0;
1109         u64             cookie          = 0ULL;
1110
1111         if (context != NULL) {
1112                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1113         }
1114
1115 #if MXLND_DEBUG
1116         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1117 #endif
1118
1119         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1120         if (msg_type == MXLND_MSG_BYE) {
1121                 struct kmx_peer *peer   = NULL;
1122
1123                 mx_get_endpoint_addr_context(source, (void **) &peer);
1124                 if (peer && peer->mxp_conn) {
1125                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1126                                         libcfs_nid2str(peer->mxp_nid));
1127                         mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1128                 }
1129
1130                 return MX_RECV_FINISHED;
1131         }
1132
1133         rx = mxlnd_get_idle_rx();
1134         if (rx != NULL) {
1135                 if (length <= MXLND_EAGER_SIZE) {
1136                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1137                 } else {
1138                         CDEBUG(D_NETERROR, "unexpected large receive with "
1139                                            "match_value=0x%llx length=%d\n",
1140                                            match_value, length);
1141                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1142                 }
1143
1144                 if (ret == 0) {
1145                         struct kmx_peer *peer   = NULL;
1146                         struct kmx_conn *conn   = NULL;
1147
1148                         /* NOTE to avoid a peer disappearing out from under us,
1149                          *      read lock the peers lock first */
1150                         read_lock(&kmxlnd_data.kmx_global_lock);
1151                         mx_get_endpoint_addr_context(source, (void **) &peer);
1152                         if (peer != NULL) {
1153                                 mxlnd_peer_addref(peer); /* add a ref... */
1154                                 conn = peer->mxp_conn;
1155                                 if (conn) {
1156                                         mxlnd_conn_addref(conn); /* add ref until rx completed */
1157                                         mxlnd_peer_decref(peer); /* and drop peer ref */
1158                                         rx->mxc_conn = conn;
1159                                 }
1160                                 rx->mxc_peer = peer;
1161                                 rx->mxc_nid = peer->mxp_nid;
1162                         }
1163                         read_unlock(&kmxlnd_data.kmx_global_lock);
1164                 } else {
1165                         CDEBUG(D_NETERROR, "could not post receive\n");
1166                         mxlnd_put_idle_rx(rx);
1167                 }
1168         }
1169
1170         if (rx == NULL || ret != 0) {
1171                 if (rx == NULL) {
1172                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1173                 } else {
1174                         /* ret != 0 */
1175                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1176                 }
1177                 seg.segment_ptr = 0ULL;
1178                 seg.segment_length = 0;
1179                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1180                           match_value, ~0ULL, NULL, NULL);
1181         }
1182
1183         return MX_RECV_CONTINUE;
1184 }
1185
1186
1187 int
1188 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1189 {
1190         int                      i      = 0;
1191         int                      ret    = -ENOENT;
1192         struct kmx_peer         *peer   = NULL;
1193
1194         read_lock(&kmxlnd_data.kmx_global_lock);
1195         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1196                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1197                         if (index-- == 0) {
1198                                 *nidp = peer->mxp_nid;
1199                                 *count = atomic_read(&peer->mxp_refcount);
1200                                 ret = 0;
1201                                 break;
1202                         }
1203                 }
1204         }
1205         read_unlock(&kmxlnd_data.kmx_global_lock);
1206
1207         return ret;
1208 }
1209
1210 void
1211 mxlnd_del_peer_locked(struct kmx_peer *peer)
1212 {
1213         list_del_init(&peer->mxp_peers); /* remove from the global list */
1214         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1215         mxlnd_peer_decref(peer); /* drop global list ref */
1216         return;
1217 }
1218
1219 int
1220 mxlnd_del_peer(lnet_nid_t nid)
1221 {
1222         int             i       = 0;
1223         int             ret     = 0;
1224         struct kmx_peer *peer   = NULL;
1225         struct kmx_peer *next   = NULL;
1226
1227         if (nid != LNET_NID_ANY) {
1228                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1229         }
1230         write_lock(&kmxlnd_data.kmx_global_lock);
1231         if (nid != LNET_NID_ANY) {
1232                 if (peer == NULL) {
1233                         ret = -ENOENT;
1234                 } if (peer == kmxlnd_data.kmx_localhost) {
1235                         mxlnd_peer_decref(peer); /* and drops it */
1236                         CERROR("cannot free this host's NID 0x%llx\n", nid);
1237                 } else {
1238                         mxlnd_peer_decref(peer); /* and drops it */
1239                         mxlnd_del_peer_locked(peer);
1240                 }
1241         } else { /* LNET_NID_ANY */
1242                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1243                         list_for_each_entry_safe(peer, next,
1244                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1245                                 if (peer != kmxlnd_data.kmx_localhost)
1246                                         mxlnd_del_peer_locked(peer);
1247                         }
1248                 }
1249         }
1250         write_unlock(&kmxlnd_data.kmx_global_lock);
1251
1252         return ret;
1253 }
1254
1255 struct kmx_conn *
1256 mxlnd_get_conn_by_idx(int index)
1257 {
1258         int                      i      = 0;
1259         struct kmx_peer         *peer   = NULL;
1260         struct kmx_conn         *conn   = NULL;
1261
1262         read_lock(&kmxlnd_data.kmx_global_lock);
1263         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1264                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1265                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1266                                 if (index-- > 0) {
1267                                         continue;
1268                                 }
1269
1270                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1271                                 read_unlock(&kmxlnd_data.kmx_global_lock);
1272                                 return conn;
1273                         }
1274                 }
1275         }
1276         read_unlock(&kmxlnd_data.kmx_global_lock);
1277
1278         return NULL;
1279 }
1280
1281 void
1282 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1283 {
1284         struct kmx_conn *conn   = NULL;
1285         struct kmx_conn *next   = NULL;
1286
1287         if (peer == kmxlnd_data.kmx_localhost) return;
1288
1289         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1290                 mxlnd_conn_disconnect(conn, 0, 1);
1291
1292         return;
1293 }
1294
1295 int
1296 mxlnd_close_matching_conns(lnet_nid_t nid)
1297 {
1298         int             i       = 0;
1299         int             ret     = 0;
1300         struct kmx_peer *peer   = NULL;
1301
1302         read_lock(&kmxlnd_data.kmx_global_lock);
1303         if (nid != LNET_NID_ANY) {
1304                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1305                 if (peer == NULL) {
1306                         ret = -ENOENT;
1307                 } else {
1308                         mxlnd_close_matching_conns_locked(peer);
1309                         mxlnd_peer_decref(peer); /* and drops it here */
1310                 }
1311         } else { /* LNET_NID_ANY */
1312                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1313                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1314                                 mxlnd_close_matching_conns_locked(peer);
1315                 }
1316         }
1317         read_unlock(&kmxlnd_data.kmx_global_lock);
1318
1319         return ret;
1320 }
1321
1322 /**
1323  * Modify MXLND parameters
1324  * \param ni  LNET interface handle
1325  * \param cmd command to change
1326  * \param arg the ioctl data
1327  *
1328  * Not implemented yet.
1329  */
1330 int
1331 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1332 {
1333         struct libcfs_ioctl_data *data  = arg;
1334         int                       ret   = -EINVAL;
1335
1336         LASSERT (ni == kmxlnd_data.kmx_ni);
1337
1338         switch (cmd) {
1339         case IOC_LIBCFS_GET_PEER: {
1340                 lnet_nid_t      nid     = 0;
1341                 int             count   = 0;
1342
1343                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1344                 data->ioc_nid    = nid;
1345                 data->ioc_count  = count;
1346                 break;
1347         }
1348         case IOC_LIBCFS_DEL_PEER: {
1349                 ret = mxlnd_del_peer(data->ioc_nid);
1350                 break;
1351         }
1352         case IOC_LIBCFS_GET_CONN: {
1353                 struct kmx_conn *conn = NULL;
1354
1355                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1356                 if (conn == NULL) {
1357                         ret = -ENOENT;
1358                 } else {
1359                         ret = 0;
1360                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1361                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1362                 }
1363                 break;
1364         }
1365         case IOC_LIBCFS_CLOSE_CONNECTION: {
1366                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1367                 break;
1368         }
1369         default:
1370                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1371                 break;
1372         }
1373
1374         return ret;
1375 }
1376
1377 /**
1378  * Add the tx to the global tx queue
1379  *
1380  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1381  */
1382 void
1383 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1384 {
1385         u8                      msg_type        = tx->mxc_msg_type;
1386         //struct kmx_peer         *peer           = tx->mxc_peer;
1387         struct kmx_conn         *conn           = tx->mxc_conn;
1388
1389         LASSERT (msg_type != 0);
1390         LASSERT (tx->mxc_nid != 0);
1391         LASSERT (tx->mxc_peer != NULL);
1392         LASSERT (tx->mxc_conn != NULL);
1393
1394         tx->mxc_incarnation = conn->mxk_incarnation;
1395
1396         if (msg_type != MXLND_MSG_PUT_DATA &&
1397             msg_type != MXLND_MSG_GET_DATA) {
1398                 /* msg style tx */
1399                 if (mxlnd_tx_requires_credit(tx)) {
1400                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1401                         conn->mxk_ntx_msgs++;
1402                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1403                            msg_type == MXLND_MSG_CONN_ACK) {
1404                         /* put conn msgs at the front of the queue */
1405                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1406                 } else {
1407                         /* PUT_ACK, PUT_NAK */
1408                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1409                         conn->mxk_ntx_msgs++;
1410                 }
1411         } else {
1412                 /* data style tx */
1413                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1414                 conn->mxk_ntx_data++;
1415         }
1416
1417         return;
1418 }
1419
1420 /**
1421  * Add the tx to the global tx queue
1422  *
1423  * Add the tx to the peer's msg or data queue
1424  */
1425 static inline void
1426 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1427 {
1428         LASSERT(tx->mxc_peer != NULL);
1429         LASSERT(tx->mxc_conn != NULL);
1430         spin_lock(&tx->mxc_conn->mxk_lock);
1431         mxlnd_peer_queue_tx_locked(tx);
1432         spin_unlock(&tx->mxc_conn->mxk_lock);
1433
1434         return;
1435 }
1436
1437 /**
1438  * Add the tx to the global tx queue
1439  *
1440  * Add the tx to the global queue and up the tx_queue_sem
1441  */
1442 void
1443 mxlnd_queue_tx(struct kmx_ctx *tx)
1444 {
1445         struct kmx_peer *peer   = tx->mxc_peer;
1446         LASSERT (tx->mxc_nid != 0);
1447
1448         if (peer != NULL) {
1449                 if (peer->mxp_incompatible &&
1450                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1451                         /* let this fail now */
1452                         tx->mxc_status.code = -ECONNABORTED;
1453                         mxlnd_conn_decref(peer->mxp_conn);
1454                         mxlnd_put_idle_tx(tx);
1455                         return;
1456                 }
1457                 if (tx->mxc_conn == NULL) {
1458                         int             ret     = 0;
1459                         struct kmx_conn *conn   = NULL;
1460
1461                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1462                         if (ret != 0) {
1463                                 tx->mxc_status.code = ret;
1464                                 mxlnd_put_idle_tx(tx);
1465                                 goto done;
1466                         }
1467                         tx->mxc_conn = conn;
1468                         mxlnd_peer_decref(peer); /* and takes it from peer */
1469                 }
1470                 LASSERT(tx->mxc_conn != NULL);
1471                 mxlnd_peer_queue_tx(tx);
1472                 mxlnd_check_sends(peer);
1473         } else {
1474                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1475                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1476                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1477                 up(&kmxlnd_data.kmx_tx_queue_sem);
1478         }
1479 done:
1480         return;
1481 }
1482
1483 int
1484 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1485 {
1486         int             i                       = 0;
1487         int             sum                     = 0;
1488         int             old_sum                 = 0;
1489         int             nseg                    = 0;
1490         int             first_iov               = -1;
1491         int             first_iov_offset        = 0;
1492         int             first_found             = 0;
1493         int             last_iov                = -1;
1494         int             last_iov_length         = 0;
1495         mx_ksegment_t  *seg                     = NULL;
1496
1497         if (niov == 0) return 0;
1498         LASSERT(iov != NULL);
1499
1500         for (i = 0; i < niov; i++) {
1501                 sum = old_sum + (u32) iov[i].iov_len;
1502                 if (!first_found && (sum > offset)) {
1503                         first_iov = i;
1504                         first_iov_offset = offset - old_sum;
1505                         first_found = 1;
1506                         sum = (u32) iov[i].iov_len - first_iov_offset;
1507                         old_sum = 0;
1508                 }
1509                 if (sum >= nob) {
1510                         last_iov = i;
1511                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1512                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1513                         break;
1514                 }
1515                 old_sum = sum;
1516         }
1517         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1518         nseg = last_iov - first_iov + 1;
1519         LASSERT(nseg > 0);
1520
1521         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1522         if (seg == NULL) {
1523                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1524                 return -1;
1525         }
1526         memset(seg, 0, nseg * sizeof(*seg));
1527         ctx->mxc_nseg = nseg;
1528         sum = 0;
1529         for (i = 0; i < nseg; i++) {
1530                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1531                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1532                 if (i == 0) {
1533                         seg[i].segment_ptr += (u64) first_iov_offset;
1534                         seg[i].segment_length -= (u32) first_iov_offset;
1535                 }
1536                 if (i == (nseg - 1)) {
1537                         seg[i].segment_length = (u32) last_iov_length;
1538                 }
1539                 sum += seg[i].segment_length;
1540         }
1541         ctx->mxc_seg_list = seg;
1542         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1543 #ifdef MX_PIN_FULLPAGES
1544         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1545 #endif
1546         LASSERT(nob == sum);
1547         return 0;
1548 }
1549
1550 int
1551 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1552 {
1553         int             i                       = 0;
1554         int             sum                     = 0;
1555         int             old_sum                 = 0;
1556         int             nseg                    = 0;
1557         int             first_kiov              = -1;
1558         int             first_kiov_offset       = 0;
1559         int             first_found             = 0;
1560         int             last_kiov               = -1;
1561         int             last_kiov_length        = 0;
1562         mx_ksegment_t  *seg                     = NULL;
1563
1564         if (niov == 0) return 0;
1565         LASSERT(kiov != NULL);
1566
1567         for (i = 0; i < niov; i++) {
1568                 sum = old_sum + kiov[i].kiov_len;
1569                 if (i == 0) sum -= kiov[i].kiov_offset;
1570                 if (!first_found && (sum > offset)) {
1571                         first_kiov = i;
1572                         first_kiov_offset = offset - old_sum;
1573                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1574                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1575                         first_found = 1;
1576                         sum = kiov[i].kiov_len - first_kiov_offset;
1577                         old_sum = 0;
1578                 }
1579                 if (sum >= nob) {
1580                         last_kiov = i;
1581                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1582                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1583                         break;
1584                 }
1585                 old_sum = sum;
1586         }
1587         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1588         nseg = last_kiov - first_kiov + 1;
1589         LASSERT(nseg > 0);
1590
1591         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1592         if (seg == NULL) {
1593                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1594                 return -1;
1595         }
1596         memset(seg, 0, niov * sizeof(*seg));
1597         ctx->mxc_nseg = niov;
1598         sum = 0;
1599         for (i = 0; i < niov; i++) {
1600                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1601                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1602                 if (i == 0) {
1603                         seg[i].segment_ptr += (u64) first_kiov_offset;
1604                         /* we have to add back the original kiov_offset */
1605                         seg[i].segment_length -= first_kiov_offset +
1606                                                  kiov[first_kiov].kiov_offset;
1607                 }
1608                 if (i == (nseg - 1)) {
1609                         seg[i].segment_length = last_kiov_length;
1610                 }
1611                 sum += seg[i].segment_length;
1612         }
1613         ctx->mxc_seg_list = seg;
1614         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1615 #ifdef MX_PIN_FULLPAGES
1616         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1617 #endif
1618         LASSERT(nob == sum);
1619         return 0;
1620 }
1621
1622 void
1623 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1624 {
1625         LASSERT(type == MXLND_MSG_PUT_ACK);
1626         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1627         tx->mxc_cookie = cookie;
1628         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1629         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1630         tx->mxc_match = mxlnd_create_match(tx, status);
1631
1632         mxlnd_queue_tx(tx);
1633 }
1634
1635
1636 /**
1637  * Get tx, map [k]iov, queue tx
1638  *
1639  * This setups the DATA send for PUT or GET.
1640  *
1641  * On success, it queues the tx, on failure it calls lnet_finalize()
1642  */
1643 void
1644 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1645 {
1646         int                     ret             = 0;
1647         lnet_process_id_t       target          = lntmsg->msg_target;
1648         unsigned int            niov            = lntmsg->msg_niov;
1649         struct iovec           *iov             = lntmsg->msg_iov;
1650         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1651         unsigned int            offset          = lntmsg->msg_offset;
1652         unsigned int            nob             = lntmsg->msg_len;
1653         struct kmx_ctx         *tx              = NULL;
1654
1655         LASSERT(lntmsg != NULL);
1656         LASSERT(peer != NULL);
1657         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1658         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1659
1660         tx = mxlnd_get_idle_tx();
1661         if (tx == NULL) {
1662                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1663                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1664                         libcfs_nid2str(target.nid));
1665                 goto failed_0;
1666         }
1667         tx->mxc_nid = target.nid;
1668         /* NOTE called when we have a ref on the conn, get one for this tx */
1669         mxlnd_conn_addref(peer->mxp_conn);
1670         tx->mxc_peer = peer;
1671         tx->mxc_conn = peer->mxp_conn;
1672         tx->mxc_msg_type = msg_type;
1673         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1674         tx->mxc_state = MXLND_CTX_PENDING;
1675         tx->mxc_lntmsg[0] = lntmsg;
1676         tx->mxc_cookie = cookie;
1677         tx->mxc_match = mxlnd_create_match(tx, 0);
1678
1679         /* This setups up the mx_ksegment_t to send the DATA payload  */
1680         if (nob == 0) {
1681                 /* do not setup the segments */
1682                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1683                                    "to %s?\n", libcfs_nid2str(target.nid));
1684                 ret = 0;
1685         } else if (kiov == NULL) {
1686                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1687         } else {
1688                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1689         }
1690         if (ret != 0) {
1691                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1692                                    libcfs_nid2str(target.nid));
1693                 tx->mxc_status.code = -EIO;
1694                 goto failed_1;
1695         }
1696         mxlnd_queue_tx(tx);
1697         return;
1698
1699 failed_1:
1700         mxlnd_conn_decref(peer->mxp_conn);
1701         mxlnd_put_idle_tx(tx);
1702         return;
1703
1704 failed_0:
1705         CDEBUG(D_NETERROR, "no tx avail\n");
1706         lnet_finalize(ni, lntmsg, -EIO);
1707         return;
1708 }
1709
1710 /**
1711  * Map [k]iov, post rx
1712  *
1713  * This setups the DATA receive for PUT or GET.
1714  *
1715  * On success, it returns 0, on failure it returns -1
1716  */
1717 int
1718 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1719 {
1720         int                     ret     = 0;
1721         lnet_process_id_t       target  = lntmsg->msg_target;
1722         unsigned int            niov    = lntmsg->msg_niov;
1723         struct iovec           *iov     = lntmsg->msg_iov;
1724         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1725         unsigned int            offset  = lntmsg->msg_offset;
1726         unsigned int            nob     = lntmsg->msg_len;
1727         mx_return_t             mxret   = MX_SUCCESS;
1728         u64                     mask    = ~(MXLND_ERROR_MASK);
1729
1730         /* above assumes MXLND_MSG_PUT_DATA */
1731         if (msg_type == MXLND_MSG_GET_DATA) {
1732                 niov = lntmsg->msg_md->md_niov;
1733                 iov = lntmsg->msg_md->md_iov.iov;
1734                 kiov = lntmsg->msg_md->md_iov.kiov;
1735                 offset = 0;
1736                 nob = lntmsg->msg_md->md_length;
1737         }
1738
1739         LASSERT(lntmsg != NULL);
1740         LASSERT(rx != NULL);
1741         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1742         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
1743
1744         rx->mxc_msg_type = msg_type;
1745         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1746         rx->mxc_state = MXLND_CTX_PENDING;
1747         rx->mxc_nid = target.nid;
1748         /* if posting a GET_DATA, we may not yet know the peer */
1749         if (rx->mxc_peer != NULL) {
1750                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1751         }
1752         rx->mxc_lntmsg[0] = lntmsg;
1753         rx->mxc_cookie = cookie;
1754         rx->mxc_match = mxlnd_create_match(rx, 0);
1755         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1756         if (kiov == NULL) {
1757                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1758         } else {
1759                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1760         }
1761         if (msg_type == MXLND_MSG_GET_DATA) {
1762                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1763                 if (rx->mxc_lntmsg[1] == NULL) {
1764                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1765                                            libcfs_nid2str(target.nid));
1766                         ret = -1;
1767                 }
1768         }
1769         if (ret != 0) {
1770                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1771                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1772                        libcfs_nid2str(target.nid));
1773                 return -1;
1774         }
1775         ret = mxlnd_q_pending_ctx(rx);
1776         if (ret == -1) {
1777                 return -1;
1778         }
1779         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1780         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1781                           rx->mxc_seg_list, rx->mxc_nseg,
1782                           rx->mxc_pin_type, rx->mxc_match,
1783                           mask, (void *) rx,
1784                           &rx->mxc_mxreq);
1785         if (mxret != MX_SUCCESS) {
1786                 if (rx->mxc_conn != NULL) {
1787                         mxlnd_deq_pending_ctx(rx);
1788                 }
1789                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1790                                    (int) mxret, libcfs_nid2str(target.nid));
1791                 return -1;
1792         }
1793
1794         return 0;
1795 }
1796
1797 /**
1798  * The LND required send function
1799  *
1800  * This must not block. Since we may not have a peer struct for the receiver,
1801  * it will append send messages on a global tx list. We will then up the
1802  * tx_queued's semaphore to notify it of the new send. 
1803  */
1804 int
1805 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1806 {
1807         int                     ret             = 0;
1808         int                     type            = lntmsg->msg_type;
1809         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1810         lnet_process_id_t       target          = lntmsg->msg_target;
1811         lnet_nid_t              nid             = target.nid;
1812         int                     target_is_router = lntmsg->msg_target_is_router;
1813         int                     routing         = lntmsg->msg_routing;
1814         unsigned int            payload_niov    = lntmsg->msg_niov;
1815         struct iovec           *payload_iov     = lntmsg->msg_iov;
1816         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1817         unsigned int            payload_offset  = lntmsg->msg_offset;
1818         unsigned int            payload_nob     = lntmsg->msg_len;
1819         struct kmx_ctx         *tx              = NULL;
1820         struct kmx_msg         *txmsg           = NULL;
1821         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1822         struct kmx_ctx         *rx_data         = NULL;
1823         struct kmx_conn        *conn            = NULL;
1824         int                     nob             = 0;
1825         uint32_t                length          = 0;
1826         struct kmx_peer         *peer           = NULL;
1827
1828         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1829                        payload_nob, payload_niov, libcfs_id2str(target));
1830
1831         LASSERT (payload_nob == 0 || payload_niov > 0);
1832         LASSERT (payload_niov <= LNET_MAX_IOV);
1833         /* payload is either all vaddrs or all pages */
1834         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1835
1836         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1837
1838         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1839          * to a new peer, use the nid */
1840         peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1841         if (peer != NULL) {
1842                 if (unlikely(peer->mxp_incompatible)) {
1843                         mxlnd_peer_decref(peer); /* drop ref taken above */
1844                 } else {
1845                         read_lock(&kmxlnd_data.kmx_global_lock);
1846                         conn = peer->mxp_conn;
1847                         if (conn) {
1848                                 mxlnd_conn_addref(conn);
1849                                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1850                         }
1851                         read_unlock(&kmxlnd_data.kmx_global_lock);
1852                 }
1853         }
1854         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
1855         if (conn == NULL && peer != NULL) {
1856                 CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1857                        peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1858         }
1859
1860         switch (type) {
1861         case LNET_MSG_ACK:
1862                 LASSERT (payload_nob == 0);
1863                 break;
1864
1865         case LNET_MSG_REPLY:
1866         case LNET_MSG_PUT:
1867                 /* Is the payload small enough not to need DATA? */
1868                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1869                 if (nob <= MXLND_EAGER_SIZE)
1870                         break;                  /* send EAGER */
1871
1872                 tx = mxlnd_get_idle_tx();
1873                 if (unlikely(tx == NULL)) {
1874                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1875                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1876                                libcfs_nid2str(nid));
1877                         if (conn) mxlnd_conn_decref(conn);
1878                         return -ENOMEM;
1879                 }
1880
1881                 /* the peer may be NULL */
1882                 tx->mxc_peer = peer;
1883                 tx->mxc_conn = conn; /* may be NULL */
1884                 /* we added a conn ref above */
1885                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1886                 txmsg = tx->mxc_msg;
1887                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1888                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1889                 tx->mxc_match = mxlnd_create_match(tx, 0);
1890
1891                 /* we must post a receive _before_ sending the request.
1892                  * we need to determine how much to receive, it will be either
1893                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1894
1895                 rx = mxlnd_get_idle_rx();
1896                 if (unlikely(rx == NULL)) {
1897                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1898                                            libcfs_nid2str(nid));
1899                         mxlnd_put_idle_tx(tx);
1900                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1901                         return -ENOMEM;
1902                 }
1903                 rx->mxc_nid = nid;
1904                 rx->mxc_peer = peer;
1905                 /* conn may be NULL but unlikely since the first msg is always small */
1906                 /* NOTE no need to lock peer before adding conn ref since we took
1907                  * a conn ref for the tx (it cannot be freed between there and here ) */
1908                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1909                 rx->mxc_conn = conn;
1910                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1911                 rx->mxc_cookie = tx->mxc_cookie;
1912                 rx->mxc_match = mxlnd_create_match(rx, 0);
1913
1914                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1915                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1916                 if (unlikely(ret != 0)) {
1917                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1918                                            libcfs_nid2str(nid));
1919                         rx->mxc_lntmsg[0] = NULL;
1920                         mxlnd_put_idle_rx(rx);
1921                         mxlnd_put_idle_tx(tx);
1922                         if (conn) {
1923                                 mxlnd_conn_decref(conn); /* for the rx... */
1924                                 mxlnd_conn_decref(conn); /* and for the tx */
1925                         }
1926                         return -EHOSTUNREACH;
1927                 }
1928
1929                 mxlnd_queue_tx(tx);
1930                 return 0;
1931
1932         case LNET_MSG_GET:
1933                 if (routing || target_is_router)
1934                         break;                  /* send EAGER */
1935
1936                 /* is the REPLY message too small for DATA? */
1937                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1938                 if (nob <= MXLND_EAGER_SIZE)
1939                         break;                  /* send EAGER */
1940
1941                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1942                  * then post GET_REQ tx */
1943                 tx = mxlnd_get_idle_tx();
1944                 if (unlikely(tx == NULL)) {
1945                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1946                                            libcfs_nid2str(nid));
1947                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1948                         return -ENOMEM;
1949                 }
1950                 rx_data = mxlnd_get_idle_rx();
1951                 if (unlikely(rx_data == NULL)) {
1952                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1953                                            libcfs_nid2str(nid));
1954                         mxlnd_put_idle_tx(tx);
1955                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1956                         return -ENOMEM;
1957                 }
1958                 rx_data->mxc_peer = peer;
1959                 /* NOTE no need to lock peer before adding conn ref since we took
1960                  * a conn ref for the tx (it cannot be freed between there and here ) */
1961                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1962                 rx_data->mxc_conn = conn; /* may be NULL */
1963
1964                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1965                 if (unlikely(ret != 0)) {
1966                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1967                                            libcfs_nid2str(nid));
1968                         mxlnd_put_idle_rx(rx_data);
1969                         mxlnd_put_idle_tx(tx);
1970                         if (conn) {
1971                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1972                                 mxlnd_conn_decref(conn); /* and for the tx */
1973                         }
1974                         return -EIO;
1975                 }
1976
1977                 tx->mxc_peer = peer;
1978                 tx->mxc_conn = conn; /* may be NULL */
1979                 /* conn ref taken above */
1980                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1981                 txmsg = tx->mxc_msg;
1982                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1983                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1984                 tx->mxc_match = mxlnd_create_match(tx, 0);
1985
1986                 mxlnd_queue_tx(tx);
1987                 return 0;
1988
1989         default:
1990                 LBUG();
1991                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1992                 return -EIO;
1993         }
1994
1995         /* send EAGER */
1996
1997         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1998                 <= MXLND_EAGER_SIZE);
1999
2000         tx = mxlnd_get_idle_tx();
2001         if (unlikely(tx == NULL)) {
2002                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2003                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2004                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2005                 return -ENOMEM;
2006         }
2007
2008         tx->mxc_peer = peer;
2009         tx->mxc_conn = conn; /* may be NULL */
2010         /* conn ref taken above */
2011         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2012         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2013         tx->mxc_match = mxlnd_create_match(tx, 0);
2014
2015         txmsg = tx->mxc_msg;
2016         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2017
2018         if (payload_kiov != NULL)
2019                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
2020                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2021                             payload_niov, payload_kiov, payload_offset, payload_nob);
2022         else
2023                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
2024                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2025                             payload_niov, payload_iov, payload_offset, payload_nob);
2026
2027         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2028         mxlnd_queue_tx(tx);
2029         return 0;
2030 }
2031
2032 /**
2033  * The LND required recv function
2034  *
2035  * This must not block.
2036  */
2037 int
2038 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2039              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2040              unsigned int offset, unsigned int mlen, unsigned int rlen)
2041 {
2042         int                     ret             = 0;
2043         int                     nob             = 0;
2044         int                     len             = 0;
2045         struct kmx_ctx          *rx             = private;
2046         struct kmx_msg          *rxmsg          = rx->mxc_msg;
2047         lnet_nid_t               nid            = rx->mxc_nid;
2048         struct kmx_ctx          *tx             = NULL;
2049         struct kmx_msg          *txmsg          = NULL;
2050         struct kmx_peer         *peer           = rx->mxc_peer;
2051         struct kmx_conn         *conn           = peer->mxp_conn;
2052         u64                      cookie         = 0ULL;
2053         int                      msg_type       = rxmsg->mxm_type;
2054         int                      repost         = 1;
2055         int                      credit         = 0;
2056         int                      finalize       = 0;
2057
2058         LASSERT (mlen <= rlen);
2059         /* Either all pages or all vaddrs */
2060         LASSERT (!(kiov != NULL && iov != NULL));
2061         LASSERT (peer != NULL);
2062
2063         /* conn_addref(conn) already taken for the primary rx */
2064
2065         switch (msg_type) {
2066         case MXLND_MSG_EAGER:
2067                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2068                 len = rx->mxc_status.xfer_length;
2069                 if (unlikely(nob > len)) {
2070                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2071                                            libcfs_nid2str(nid), nob, len);
2072                         ret = -EPROTO;
2073                         break;
2074                 }
2075
2076                 if (kiov != NULL)
2077                         lnet_copy_flat2kiov(niov, kiov, offset,
2078                                 MXLND_EAGER_SIZE, rxmsg,
2079                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2080                                 mlen);
2081                 else
2082                         lnet_copy_flat2iov(niov, iov, offset,
2083                                 MXLND_EAGER_SIZE, rxmsg,
2084                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2085                                 mlen);
2086                 finalize = 1;
2087                 credit = 1;
2088                 break;
2089
2090         case MXLND_MSG_PUT_REQ:
2091                 /* we are going to reuse the rx, store the needed info */
2092                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2093
2094                 /* get tx, post rx, send PUT_ACK */
2095
2096                 tx = mxlnd_get_idle_tx();
2097                 if (unlikely(tx == NULL)) {
2098                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2099                         /* Not replying will break the connection */
2100                         ret = -ENOMEM;
2101                         break;
2102                 }
2103                 if (unlikely(mlen == 0)) {
2104                         finalize = 1;
2105                         tx->mxc_peer = peer;
2106                         tx->mxc_conn = conn;
2107                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2108                         /* repost = 1 */
2109                         break;
2110                 }
2111
2112                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2113                 tx->mxc_peer = peer;
2114                 tx->mxc_conn = conn;
2115                 /* no need to lock peer first since we already have a ref */
2116                 mxlnd_conn_addref(conn); /* for the tx */
2117                 txmsg = tx->mxc_msg;
2118                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2119                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2120                 tx->mxc_cookie = cookie;
2121                 tx->mxc_match = mxlnd_create_match(tx, 0);
2122
2123                 /* we must post a receive _before_ sending the PUT_ACK */
2124                 mxlnd_ctx_init(rx);
2125                 rx->mxc_state = MXLND_CTX_PREP;
2126                 rx->mxc_peer = peer;
2127                 rx->mxc_conn = conn;
2128                 /* do not take another ref for this rx, it is already taken */
2129                 rx->mxc_nid = peer->mxp_nid;
2130                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2131                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2132
2133                 if (unlikely(ret != 0)) {
2134                         /* Notify peer that it's over */
2135                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2136                                            libcfs_nid2str(nid), ret);
2137                         mxlnd_ctx_init(tx);
2138                         tx->mxc_state = MXLND_CTX_PREP;
2139                         tx->mxc_peer = peer;
2140                         tx->mxc_conn = conn;
2141                         /* finalize = 0, let the PUT_ACK tx finalize this */
2142                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2143                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2144                         /* conn ref already taken above */
2145                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2146                         /* repost = 1 */
2147                         break;
2148                 }
2149
2150                 mxlnd_queue_tx(tx);
2151                 /* do not return a credit until after PUT_DATA returns */
2152                 repost = 0;
2153                 break;
2154
2155         case MXLND_MSG_GET_REQ:
2156                 if (likely(lntmsg != NULL)) {
2157                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2158                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2159                 } else {
2160                         /* GET didn't match anything */
2161                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2162                          * We have to embed the error code in the match bits.
2163                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2164                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2165
2166                         tx = mxlnd_get_idle_tx();
2167                         if (unlikely(tx == NULL)) {
2168                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2169                                                    libcfs_nid2str(nid));
2170                                 ret = -ENOMEM;
2171                                 break;
2172                         }
2173                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2174                         tx->mxc_state = MXLND_CTX_PENDING;
2175                         tx->mxc_nid = nid;
2176                         tx->mxc_peer = peer;
2177                         tx->mxc_conn = conn;
2178                         /* no need to lock peer first since we already have a ref */
2179                         mxlnd_conn_addref(conn); /* for this tx */
2180                         tx->mxc_cookie = cookie;
2181                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2182                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2183                         mxlnd_queue_tx(tx);
2184                 }
2185                 /* finalize lntmsg after tx completes */
2186                 break;
2187
2188         default:
2189                 LBUG();
2190         }
2191
2192         if (repost) {
2193                 /* we received a message, increment peer's outstanding credits */
2194                 if (credit == 1) {
2195                         spin_lock(&conn->mxk_lock);
2196                         conn->mxk_outstanding++;
2197                         spin_unlock(&conn->mxk_lock);
2198                 }
2199                 /* we are done with the rx */
2200                 mxlnd_put_idle_rx(rx);
2201                 mxlnd_conn_decref(conn);
2202         }
2203
2204         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2205
2206         /* we received a credit, see if we can use it to send a msg */
2207         if (credit) mxlnd_check_sends(peer);
2208
2209         return ret;
2210 }
2211
2212 void
2213 mxlnd_sleep(unsigned long timeout)
2214 {
2215         set_current_state(TASK_INTERRUPTIBLE);
2216         schedule_timeout(timeout);
2217         return;
2218 }
2219
2220 /**
2221  * The generic send queue thread
2222  * \param arg  thread id (as a void *)
2223  *
2224  * This thread moves send messages from the global tx_queue to the owning
2225  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2226  * it to the global peer list.
2227  */
2228 int
2229 mxlnd_tx_queued(void *arg)
2230 {
2231         long                    id      = (long) arg;
2232         int                     ret     = 0;
2233         int                     found   = 0;
2234         struct kmx_ctx         *tx      = NULL;
2235         struct kmx_peer        *peer    = NULL;
2236         struct list_head       *tmp_tx  = NULL;
2237
2238         cfs_daemonize("mxlnd_tx_queued");
2239         //cfs_block_allsigs();
2240
2241         while (!kmxlnd_data.kmx_shutdown) {
2242                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2243                 if (kmxlnd_data.kmx_shutdown)
2244                         break;
2245                 if (ret != 0) // Should we check for -EINTR?
2246                         continue;
2247                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2248                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2249                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2250                         continue;
2251                 }
2252                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2253                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2254                 list_del_init(&tx->mxc_list);
2255                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2256
2257                 found = 0;
2258                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2259                 if (peer != NULL) {
2260                         tx->mxc_peer = peer;
2261                         write_lock(&kmxlnd_data.kmx_global_lock);
2262                         if (peer->mxp_conn == NULL) {
2263                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2264                                 if (ret != 0) {
2265                                         /* out of memory, give up and fail tx */
2266                                         tx->mxc_status.code = -ENOMEM;
2267                                         write_unlock(&kmxlnd_data.kmx_global_lock);
2268                                         mxlnd_peer_decref(peer);
2269                                         mxlnd_put_idle_tx(tx);
2270                                         continue;
2271                                 }
2272                         }
2273                         tx->mxc_conn = peer->mxp_conn;
2274                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2275                         write_unlock(&kmxlnd_data.kmx_global_lock);
2276                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2277                         mxlnd_queue_tx(tx);
2278                         found = 1;
2279                 }
2280                 if (found == 0) {
2281                         int              hash   = 0;
2282                         struct kmx_peer *peer = NULL;
2283                         struct kmx_peer *old = NULL;
2284
2285                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2286
2287                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2288                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2289                         /* create peer */
2290                         /* adds conn ref for this function */
2291                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2292                                         *kmxlnd_tunables.kmx_board,
2293                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2294                         if (ret != 0) {
2295                                 /* finalize message */
2296                                 tx->mxc_status.code = ret;
2297                                 mxlnd_put_idle_tx(tx);
2298                                 continue;
2299                         }
2300                         tx->mxc_peer = peer;
2301                         tx->mxc_conn = peer->mxp_conn;
2302                         /* this tx will keep the conn ref taken in peer_alloc() */
2303
2304                         /* add peer to global peer list, but look to see
2305                          * if someone already created it after we released
2306                          * the read lock */
2307                         write_lock(&kmxlnd_data.kmx_global_lock);
2308                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2309                                 if (old->mxp_nid == peer->mxp_nid) {
2310                                         /* somebody beat us here, we created a duplicate */
2311                                         found = 1;
2312                                         break;
2313                                 }
2314                         }
2315
2316                         if (found == 0) {
2317                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2318                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2319                         } else {
2320                                 tx->mxc_peer = old;
2321                                 tx->mxc_conn = old->mxp_conn;
2322                                 /* FIXME can conn be NULL? */
2323                                 LASSERT(old->mxp_conn != NULL);
2324                                 mxlnd_conn_addref(old->mxp_conn);
2325                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2326                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2327                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2328                                 mxlnd_peer_decref(peer);
2329                         }
2330                         write_unlock(&kmxlnd_data.kmx_global_lock);
2331
2332                         mxlnd_queue_tx(tx);
2333                 }
2334         }
2335         mxlnd_thread_stop(id);
2336         return 0;
2337 }
2338
2339 /* When calling this, we must not have the peer lock. */
2340 void
2341 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2342 {
2343         mx_return_t     mxret           = MX_SUCCESS;
2344         mx_request_t    request;
2345         struct kmx_conn *conn           = peer->mxp_conn;
2346         u8              msg_type        = (u8) MXLND_MSG_TYPE(mask);
2347
2348         /* NOTE we are holding a conn ref every time we call this function,
2349          * we do not need to lock the peer before taking another ref */
2350         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2351
2352         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2353
2354         if (peer->mxp_reconnect_time == 0) {
2355                 peer->mxp_reconnect_time = jiffies;
2356         }
2357
2358         if (peer->mxp_nic_id == 0ULL) {
2359                 int             ret     = 0;
2360
2361                 ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id);
2362                 if (ret == 0) {
2363                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2364                 }
2365                 if (peer->mxp_nic_id == 0ULL) {
2366                         /* not mapped yet, return */
2367                         spin_lock(&conn->mxk_lock);
2368                         conn->mxk_status = MXLND_CONN_INIT;
2369                         spin_unlock(&conn->mxk_lock);
2370                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2371                                 /* give up and notify LNET */
2372                                 mxlnd_conn_disconnect(conn, 0, 0);
2373                                 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2374                                                                             function... */
2375                                 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2376                                                                       longer need */
2377                         }
2378                         mxlnd_conn_decref(conn);
2379                         return;
2380                 }
2381         }
2382
2383         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2384                             peer->mxp_ep_id, MXLND_MSG_MAGIC, mask,
2385                             (void *) peer, &request);
2386         if (unlikely(mxret != MX_SUCCESS)) {
2387                 spin_lock(&conn->mxk_lock);
2388                 conn->mxk_status = MXLND_CONN_FAIL;
2389                 spin_unlock(&conn->mxk_lock);
2390                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2391                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2392                 mxlnd_conn_decref(conn);
2393         }
2394         return;
2395 }
2396
2397 #define MXLND_STATS 0
2398
2399 int
2400 mxlnd_check_sends(struct kmx_peer *peer)
2401 {
2402         int                     ret             = 0;
2403         int                     found           = 0;
2404         mx_return_t             mxret           = MX_SUCCESS;
2405         struct kmx_ctx          *tx             = NULL;
2406         struct kmx_conn         *conn           = NULL;
2407         u8                      msg_type        = 0;
2408         int                     credit          = 0;
2409         int                     status          = 0;
2410         int                     ntx_posted      = 0;
2411         int                     credits         = 0;
2412 #if MXLND_STATS
2413         static unsigned long    last            = 0;
2414 #endif
2415
2416         if (unlikely(peer == NULL)) {
2417                 LASSERT(peer != NULL);
2418                 return -1;
2419         }
2420         write_lock(&kmxlnd_data.kmx_global_lock);
2421         conn = peer->mxp_conn;
2422         /* NOTE take a ref for the duration of this function since it is called
2423          * when there might not be any queued txs for this peer */
2424         if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2425         write_unlock(&kmxlnd_data.kmx_global_lock);
2426
2427         /* do not add another ref for this tx */
2428
2429         if (conn == NULL) {
2430                 /* we do not have any conns */
2431                 return -1;
2432         }
2433
2434 #if MXLND_STATS
2435         if (time_after(jiffies, last)) {
2436                 last = jiffies + HZ;
2437                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2438                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2439                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2440                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2441                               conn->mxk_ntx_data, conn->mxk_data_posted);
2442         }
2443 #endif
2444
2445         /* cache peer state for asserts */
2446         spin_lock(&conn->mxk_lock);
2447         ntx_posted = conn->mxk_ntx_posted;
2448         credits = conn->mxk_credits;
2449         spin_unlock(&conn->mxk_lock);
2450
2451         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2452         LASSERT(ntx_posted >= 0);
2453
2454         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2455         LASSERT(credits >= 0);
2456
2457         /* check number of queued msgs, ignore data */
2458         spin_lock(&conn->mxk_lock);
2459         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2460                 /* check if any txs queued that could return credits... */
2461                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2462                         /* if not, send a NOOP */
2463                         tx = mxlnd_get_idle_tx();
2464                         if (likely(tx != NULL)) {
2465                                 tx->mxc_peer = peer;
2466                                 tx->mxc_conn = peer->mxp_conn;
2467                                 mxlnd_conn_addref(conn); /* for this tx */
2468                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2469                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2470                                 mxlnd_peer_queue_tx_locked(tx);
2471                                 found = 1;
2472                                 goto done_locked;
2473                         }
2474                 }
2475         }
2476         spin_unlock(&conn->mxk_lock);
2477
2478         /* if the peer is not ready, try to connect */
2479         spin_lock(&conn->mxk_lock);
2480         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2481             conn->mxk_status == MXLND_CONN_FAIL ||
2482             conn->mxk_status == MXLND_CONN_REQ)) {
2483                 u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET;
2484                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2485                 conn->mxk_status = MXLND_CONN_WAIT;
2486                 spin_unlock(&conn->mxk_lock);
2487                 mxlnd_iconnect(peer, match);
2488                 goto done;
2489         }
2490         spin_unlock(&conn->mxk_lock);
2491
2492         spin_lock(&conn->mxk_lock);
2493         while (!list_empty(&conn->mxk_tx_free_queue) ||
2494                !list_empty(&conn->mxk_tx_credit_queue)) {
2495                 /* We have something to send. If we have a queued tx that does not
2496                  * require a credit (free), choose it since its completion will 
2497                  * return a credit (here or at the peer), complete a DATA or 
2498                  * CONN_REQ or CONN_ACK. */
2499                 struct list_head *tmp_tx = NULL;
2500                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2501                         tmp_tx = &conn->mxk_tx_free_queue;
2502                 } else {
2503                         tmp_tx = &conn->mxk_tx_credit_queue;
2504                 }
2505                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2506
2507                 msg_type = tx->mxc_msg_type;
2508
2509                 /* don't try to send a rx */
2510                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2511
2512                 /* ensure that it is a valid msg type */
2513                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2514                         msg_type == MXLND_MSG_CONN_ACK ||
2515                         msg_type == MXLND_MSG_NOOP     ||
2516                         msg_type == MXLND_MSG_EAGER    ||
2517                         msg_type == MXLND_MSG_PUT_REQ  ||
2518                         msg_type == MXLND_MSG_PUT_ACK  ||
2519                         msg_type == MXLND_MSG_PUT_DATA ||
2520                         msg_type == MXLND_MSG_GET_REQ  ||
2521                         msg_type == MXLND_MSG_GET_DATA);
2522                 LASSERT(tx->mxc_peer == peer);
2523                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2524
2525                 credit = mxlnd_tx_requires_credit(tx);
2526                 if (credit) {
2527
2528                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2529                                 CDEBUG(D_NET, "%s: posted enough\n",
2530                                               libcfs_nid2str(peer->mxp_nid));
2531                                 goto done_locked;
2532                         }
2533
2534                         if (conn->mxk_credits == 0) {
2535                                 CDEBUG(D_NET, "%s: no credits\n",
2536                                               libcfs_nid2str(peer->mxp_nid));
2537                                 goto done_locked;
2538                         }
2539
2540                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2541                             conn->mxk_outstanding == 0) {  /* giving back credits */
2542                                 CDEBUG(D_NET, "%s: not using last credit\n",
2543                                               libcfs_nid2str(peer->mxp_nid));
2544                                 goto done_locked;
2545                         }
2546                 }
2547
2548                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2549                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2550                                 msg_type == MXLND_MSG_CONN_ACK)) {
2551                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2552                                              mxlnd_connstatus_to_str(conn->mxk_status),
2553                                              tx->mxc_cookie,
2554                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2555                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2556                                         list_del_init(&tx->mxc_list);
2557                                         tx->mxc_status.code = -ECONNABORTED;
2558                                         mxlnd_put_idle_tx(tx);
2559                                         mxlnd_conn_decref(conn);
2560                                 }
2561                                 goto done_locked;
2562                         }
2563                 }
2564
2565                 list_del_init(&tx->mxc_list);
2566
2567                 /* handle credits, etc now while we have the lock to avoid races */
2568                 if (credit) {
2569                         conn->mxk_credits--;
2570                         conn->mxk_ntx_posted++;
2571                 }
2572                 if (msg_type != MXLND_MSG_PUT_DATA &&
2573                     msg_type != MXLND_MSG_GET_DATA) {
2574                         if (msg_type != MXLND_MSG_CONN_REQ &&
2575                             msg_type != MXLND_MSG_CONN_ACK) {
2576                                 conn->mxk_ntx_msgs--;
2577                         }
2578                 }
2579                 if (tx->mxc_incarnation == 0 &&
2580                     conn->mxk_incarnation != 0) {
2581                         tx->mxc_incarnation = conn->mxk_incarnation;
2582                 }
2583                 spin_unlock(&conn->mxk_lock);
2584
2585                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2586                  * or (2) there is a non-DATA msg that can return credits in the 
2587                  * queue, then drop this duplicate NOOP */
2588                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2589                         spin_lock(&conn->mxk_lock);
2590                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2591                             (conn->mxk_ntx_msgs >= 1)) {
2592                                 conn->mxk_credits++;
2593                                 conn->mxk_ntx_posted--;
2594                                 spin_unlock(&conn->mxk_lock);
2595                                 /* redundant NOOP */
2596                                 mxlnd_put_idle_tx(tx);
2597                                 mxlnd_conn_decref(conn);
2598                                 CDEBUG(D_NET, "%s: redundant noop\n",
2599                                               libcfs_nid2str(peer->mxp_nid));
2600                                 found = 1;
2601                                 goto done;
2602                         }
2603                         spin_unlock(&conn->mxk_lock);
2604                 }
2605
2606                 found = 1;
2607                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2608                     (msg_type != MXLND_MSG_GET_DATA))) {
2609                         mxlnd_pack_msg(tx);
2610                 }
2611
2612                 //ret = -ECONNABORTED;
2613                 mxret = MX_SUCCESS;
2614
2615                 spin_lock(&conn->mxk_lock);
2616                 status = conn->mxk_status;
2617                 spin_unlock(&conn->mxk_lock);
2618
2619                 if (likely((status == MXLND_CONN_READY) ||
2620                     (msg_type == MXLND_MSG_CONN_REQ) ||
2621                     (msg_type == MXLND_MSG_CONN_ACK))) {
2622                         ret = 0;
2623                         if (msg_type != MXLND_MSG_CONN_REQ &&
2624                             msg_type != MXLND_MSG_CONN_ACK) {
2625                                 /* add to the pending list */
2626                                 ret = mxlnd_q_pending_ctx(tx);
2627                                 if (ret == -1) {
2628                                         /* FIXME the conn is disconnected, now what? */
2629                                 }
2630                         } else {
2631                                 /* CONN_REQ/ACK */
2632                                 tx->mxc_state = MXLND_CTX_PENDING;
2633                         }
2634
2635                         if (ret == 0) {
2636                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2637                                     msg_type != MXLND_MSG_GET_DATA)) {
2638                                         /* send a msg style tx */
2639                                         LASSERT(tx->mxc_nseg == 1);
2640                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2641                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2642                                                mxlnd_msgtype_to_str(msg_type),
2643                                                tx->mxc_cookie);
2644                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2645                                                           &tx->mxc_seg,
2646                                                           tx->mxc_nseg,
2647                                                           tx->mxc_pin_type,
2648                                                           conn->mxk_epa,
2649                                                           tx->mxc_match,
2650                                                           (void *) tx,
2651                                                           &tx->mxc_mxreq);
2652                                 } else {
2653                                         /* send a DATA tx */
2654                                         spin_lock(&conn->mxk_lock);
2655                                         conn->mxk_ntx_data--;
2656                                         conn->mxk_data_posted++;
2657                                         spin_unlock(&conn->mxk_lock);
2658                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2659                                                mxlnd_msgtype_to_str(msg_type),
2660                                                tx->mxc_cookie);
2661                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2662                                                           tx->mxc_seg_list,
2663                                                           tx->mxc_nseg,
2664                                                           tx->mxc_pin_type,
2665                                                           conn->mxk_epa,
2666                                                           tx->mxc_match,
2667                                                           (void *) tx,
2668                                                           &tx->mxc_mxreq);
2669                                 }
2670                         } else {
2671                                 mxret = MX_CONNECTION_FAILED;
2672                         }
2673                         if (likely(mxret == MX_SUCCESS)) {
2674                                 ret = 0;
2675                         } else {
2676                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2677                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2678                                        libcfs_nid2str(peer->mxp_nid));
2679                                 /* NOTE mx_kisend() only fails if there are not enough 
2680                                 * resources. Do not change the connection status. */
2681                                 if (mxret == MX_NO_RESOURCES) {
2682                                         tx->mxc_status.code = -ENOMEM;
2683                                 } else {
2684                                         tx->mxc_status.code = -ECONNABORTED;
2685                                 }
2686                                 if (credit) {
2687                                         spin_lock(&conn->mxk_lock);
2688                                         conn->mxk_ntx_posted--;
2689                                         conn->mxk_credits++;
2690                                         spin_unlock(&conn->mxk_lock);
2691                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2692                                         msg_type == MXLND_MSG_GET_DATA) {
2693                                         spin_lock(&conn->mxk_lock);
2694                                         conn->mxk_data_posted--;
2695                                         spin_unlock(&conn->mxk_lock);
2696                                 }
2697                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2698                                     msg_type != MXLND_MSG_GET_DATA &&
2699                                     msg_type != MXLND_MSG_CONN_REQ &&
2700                                     msg_type != MXLND_MSG_CONN_ACK) {
2701                                         spin_lock(&conn->mxk_lock);
2702                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2703                                         spin_unlock(&conn->mxk_lock);
2704                                 }
2705                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2706                                     msg_type != MXLND_MSG_CONN_ACK) {
2707                                         /* remove from the pending list */
2708                                         mxlnd_deq_pending_ctx(tx);
2709                                 }
2710                                 mxlnd_put_idle_tx(tx);
2711                                 mxlnd_conn_decref(conn);
2712                         }
2713                 }
2714                 spin_lock(&conn->mxk_lock);
2715         }
2716 done_locked:
2717         spin_unlock(&conn->mxk_lock);
2718 done:
2719         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2720         return found;
2721 }
2722
2723
2724 /**
2725  * A tx completed, progress or complete the msg
2726  * \param tx  the tx descriptor
2727  * 
2728  * Determine which type of send request it was and start the next step, if needed,
2729  * or, if done, signal completion to LNET. After we are done, put back on the
2730  * idle tx list.
2731  */
2732 void
2733 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2734 {
2735         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2736         struct kmx_msg  *msg    = tx->mxc_msg;
2737         struct kmx_peer *peer   = tx->mxc_peer;
2738         struct kmx_conn *conn   = tx->mxc_conn;
2739         u8              type    = tx->mxc_msg_type;
2740         int             credit  = mxlnd_tx_requires_credit(tx);
2741         u64             cookie  = tx->mxc_cookie;
2742
2743         CDEBUG(D_NET, "entering %s (0x%llx):\n",
2744                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2745
2746         if (unlikely(conn == NULL)) {
2747                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2748                 conn = peer->mxp_conn;
2749                 if (conn != NULL) {
2750                         /* do not add a ref for the tx, it was set before sending */
2751                         tx->mxc_conn = conn;
2752                         tx->mxc_peer = conn->mxk_peer;
2753                 }
2754         }
2755         LASSERT (peer != NULL);
2756         LASSERT (conn != NULL);
2757
2758         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2759                 LASSERT (type == msg->mxm_type);
2760         }
2761
2762         if (failed) {
2763                 tx->mxc_status.code = -EIO;
2764         } else {
2765                 spin_lock(&conn->mxk_lock);
2766                 conn->mxk_last_tx = jiffies;
2767                 spin_unlock(&conn->mxk_lock);
2768         }
2769
2770         switch (type) {
2771
2772         case MXLND_MSG_GET_DATA:
2773                 spin_lock(&conn->mxk_lock);
2774                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2775                         conn->mxk_outstanding++;
2776                         conn->mxk_data_posted--;
2777                 }
2778                 spin_unlock(&conn->mxk_lock);
2779                 break;
2780
2781         case MXLND_MSG_PUT_DATA:
2782                 spin_lock(&conn->mxk_lock);
2783                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2784                         conn->mxk_data_posted--;
2785                 }
2786                 spin_unlock(&conn->mxk_lock);
2787                 break;
2788
2789         case MXLND_MSG_NOOP:
2790         case MXLND_MSG_PUT_REQ:
2791         case MXLND_MSG_PUT_ACK:
2792         case MXLND_MSG_GET_REQ:
2793         case MXLND_MSG_EAGER:
2794         //case MXLND_MSG_NAK:
2795                 break;
2796
2797         case MXLND_MSG_CONN_ACK:
2798                 if (peer->mxp_incompatible) {
2799                         /* we sent our params, now close this conn */
2800                         mxlnd_conn_disconnect(conn, 0, 1);
2801                 }
2802         case MXLND_MSG_CONN_REQ:
2803                 if (failed) {
2804                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2805                                "failed with %s (%d) to %s\n",
2806                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2807                                mx_strstatus(tx->mxc_status.code),
2808                                tx->mxc_status.code,
2809                                libcfs_nid2str(tx->mxc_nid));
2810                         if (!peer->mxp_incompatible) {
2811                                 spin_lock(&conn->mxk_lock);
2812                                 conn->mxk_status = MXLND_CONN_FAIL;
2813                                 spin_unlock(&conn->mxk_lock);
2814                         }
2815                 }
2816                 break;
2817
2818         default:
2819                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2820                 LBUG();
2821         }
2822
2823         if (credit) {
2824                 spin_lock(&conn->mxk_lock);
2825                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2826                         conn->mxk_ntx_posted--;
2827                 }
2828                 spin_unlock(&conn->mxk_lock);
2829         }
2830
2831         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2832         mxlnd_put_idle_tx(tx);
2833         mxlnd_conn_decref(conn);
2834
2835         mxlnd_check_sends(peer);
2836
2837         return;
2838 }
2839
2840 void
2841 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2842 {
2843         int                     ret             = 0;
2844         int                     repost          = 1;
2845         int                     credit          = 1;
2846         u32                     nob             = rx->mxc_status.xfer_length;
2847         u64                     bits            = rx->mxc_status.match_info;
2848         struct kmx_msg         *msg             = rx->mxc_msg;
2849         struct kmx_peer        *peer            = rx->mxc_peer;
2850         struct kmx_conn        *conn            = rx->mxc_conn;
2851         u8                      type            = rx->mxc_msg_type;
2852         u64                     seq             = 0ULL;
2853         lnet_msg_t             *lntmsg[2];
2854         int                     result          = 0;
2855         u64                     nic_id          = 0ULL;
2856         u32                     ep_id           = 0;
2857         u32                     sid             = 0;
2858         int                     peer_ref        = 0;
2859         int                     conn_ref        = 0;
2860         int                     incompatible    = 0;
2861         u64                     match           = 0ULL;
2862
2863         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2864          * failed GET reply, CONN_REQ, or a CONN_ACK */
2865
2866         /* NOTE peer may still be NULL if it is a new peer and
2867          *      conn may be NULL if this is a re-connect */
2868         if (likely(peer != NULL && conn != NULL)) {
2869                 /* we have a reference on the conn */
2870                 conn_ref = 1;
2871         } else if (peer != NULL && conn == NULL) {
2872                 /* we have a reference on the peer */
2873                 peer_ref = 1;
2874         } else if (peer == NULL && conn != NULL) {
2875                 /* fatal error */
2876                 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2877                 LBUG();
2878         } /* else peer and conn == NULL */
2879
2880 #if 0
2881         if (peer == NULL || conn == NULL) {
2882                 /* if the peer was disconnected, the peer may exist but
2883                  * not have any valid conns */
2884                 decref = 0; /* no peer means no ref was taken for this rx */
2885         }
2886 #endif
2887
2888         if (conn == NULL && peer != NULL) {
2889                 write_lock(&kmxlnd_data.kmx_global_lock);
2890                 conn = peer->mxp_conn;
2891                 if (conn) {
2892                         mxlnd_conn_addref(conn); /* conn takes ref... */
2893                         mxlnd_peer_decref(peer); /* from peer */
2894                         conn_ref = 1;
2895                         peer_ref = 0;
2896                 }
2897                 write_unlock(&kmxlnd_data.kmx_global_lock);
2898                 rx->mxc_conn = conn;
2899         }
2900
2901 #if MXLND_DEBUG
2902         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2903 #endif
2904
2905         lntmsg[0] = NULL;
2906         lntmsg[1] = NULL;
2907
2908         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2909                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2910                                    libcfs_nid2str(rx->mxc_nid),
2911                                    mx_strstatus(rx->mxc_status.code),
2912                                    (int) rx->mxc_status.code);
2913                 credit = 0;
2914                 goto cleanup;
2915         }
2916
2917         if (nob == 0) {
2918                 /* this may be a failed GET reply */
2919                 if (type == MXLND_MSG_GET_DATA) {
2920                         /* get the error (52-59) bits from the match bits */
2921                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
2922                         lntmsg[0] = rx->mxc_lntmsg[0];
2923                         result = -ret;
2924                         goto cleanup;
2925                 } else {
2926                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2927                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2928                                            libcfs_nid2str(rx->mxc_nid));
2929                         goto cleanup;
2930                 }
2931         }
2932
2933         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2934         if (type == MXLND_MSG_PUT_DATA) {
2935                 result = rx->mxc_status.code;
2936                 lntmsg[0] = rx->mxc_lntmsg[0];
2937                 goto cleanup;
2938         } else if (type == MXLND_MSG_GET_DATA) {
2939                 result = rx->mxc_status.code;
2940                 lntmsg[0] = rx->mxc_lntmsg[0];
2941                 lntmsg[1] = rx->mxc_lntmsg[1];
2942                 goto cleanup;
2943         }
2944
2945         ret = mxlnd_unpack_msg(msg, nob);
2946         if (ret != 0) {
2947                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2948                                    ret, libcfs_nid2str(rx->mxc_nid));
2949                 goto cleanup;
2950         }
2951         rx->mxc_nob = nob;
2952         type = msg->mxm_type;
2953         seq = msg->mxm_seq;
2954
2955         if (type != MXLND_MSG_CONN_REQ &&
2956             (rx->mxc_nid != msg->mxm_srcnid ||
2957              kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2958                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2959                        "0x%llx and rx msg dst is 0x%llx)\n",
2960                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2961                        msg->mxm_dstnid);
2962                 goto cleanup;
2963         }
2964
2965         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2966                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2967                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2968                         if (conn != NULL) {
2969                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2970                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2971                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2972                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2973                                        msg->mxm_srcstamp, conn->mxk_incarnation,
2974                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2975                         } else {
2976                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2977                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2978                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2979                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2980                         }
2981                         credit = 0;
2982                         goto cleanup;
2983                 }
2984         }
2985
2986         CDEBUG(D_NET, "Received %s with %d credits\n",
2987                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
2988
2989         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2990             msg->mxm_type != MXLND_MSG_CONN_ACK) {
2991                 LASSERT(peer != NULL);
2992                 LASSERT(conn != NULL);
2993                 if (msg->mxm_credits != 0) {
2994                         spin_lock(&conn->mxk_lock);
2995                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2996                                 if ((conn->mxk_credits + msg->mxm_credits) > 
2997                                      *kmxlnd_tunables.kmx_credits) {
2998                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
2999                                                conn->mxk_credits, msg->mxm_credits);
3000                                 }
3001                                 conn->mxk_credits += msg->mxm_credits;
3002                                 LASSERT(conn->mxk_credits >= 0);
3003                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
3004                         }
3005                         spin_unlock(&conn->mxk_lock);
3006                 }
3007         }
3008
3009         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3010         switch (type) {
3011         case MXLND_MSG_NOOP:
3012                 break;
3013
3014         case MXLND_MSG_EAGER:
3015                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3016                                         msg->mxm_srcnid, rx, 0);
3017                 repost = ret < 0;
3018                 break;
3019
3020         case MXLND_MSG_PUT_REQ:
3021                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3022                                         msg->mxm_srcnid, rx, 1);
3023                 repost = ret < 0;
3024                 break;
3025
3026         case MXLND_MSG_PUT_ACK: {
3027                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3028                 if (cookie > MXLND_MAX_COOKIE) {
3029                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3030                                            libcfs_nid2str(rx->mxc_nid));
3031                         result = -((u32) MXLND_ERROR_VAL(cookie));
3032                         lntmsg[0] = rx->mxc_lntmsg[0];
3033                 } else {
3034                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3035                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3036                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3037                 }
3038                 /* repost == 1 */
3039                 break;
3040         }
3041         case MXLND_MSG_GET_REQ:
3042                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3043                                         msg->mxm_srcnid, rx, 1);
3044                 repost = ret < 0;
3045                 break;
3046
3047         case MXLND_MSG_CONN_REQ:
3048                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3049                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3050                                         libcfs_nid2str(msg->mxm_srcnid),
3051                                         libcfs_nid2str(msg->mxm_dstnid));
3052                         goto cleanup;
3053                 }
3054                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3055                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3056                                     "%d (%d wanted)\n",
3057                                         libcfs_nid2str(msg->mxm_srcnid),
3058                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3059                                         *kmxlnd_tunables.kmx_credits);
3060                         incompatible = 1;
3061                 }
3062                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3063                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3064                                     "%d (%d wanted)\n",
3065                                         libcfs_nid2str(msg->mxm_srcnid),
3066                                         msg->mxm_u.conn_req.mxcrm_eager_size,
3067                                         (int) MXLND_EAGER_SIZE);
3068                         incompatible = 1;
3069                 }
3070                 mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid);
3071                 if (peer == NULL) {
3072                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3073                         if (peer == NULL) {
3074                                 int             hash    = 0;
3075                                 struct kmx_peer *existing_peer    = NULL;
3076                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3077
3078                                 rx->mxc_nid = msg->mxm_srcnid;
3079
3080                                 /* adds conn ref for peer and one for this function */
3081                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3082                                                 *kmxlnd_tunables.kmx_board,
3083                                                 *kmxlnd_tunables.kmx_ep_id, 0ULL);
3084                                 if (ret != 0) {
3085                                         goto cleanup;
3086                                 }
3087                                 peer->mxp_sid = sid;
3088                                 LASSERT(peer->mxp_ep_id == ep_id);
3089                                 write_lock(&kmxlnd_data.kmx_global_lock);
3090                                 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3091                                 if (existing_peer) {
3092                                         mxlnd_conn_decref(peer->mxp_conn);
3093                                         mxlnd_peer_decref(peer);
3094                                         peer = existing_peer;
3095                                         mxlnd_conn_addref(peer->mxp_conn);
3096                                 } else {
3097                                         list_add_tail(&peer->mxp_peers,
3098                                                       &kmxlnd_data.kmx_peers[hash]);
3099                                         atomic_inc(&kmxlnd_data.kmx_npeers);
3100                                 }
3101                                 write_unlock(&kmxlnd_data.kmx_global_lock);
3102                         } else {
3103                                 /* FIXME should write lock here */
3104                                 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3105                                 mxlnd_peer_decref(peer); /* drop ref taken above */
3106                                 if (ret != 0) {
3107                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3108                                         goto cleanup;
3109                                 }
3110                         }
3111                         conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3112                         conn = peer->mxp_conn;
3113                 } else { /* found peer */
3114                         struct kmx_conn *old_conn       = conn;
3115
3116                         if (sid != peer->mxp_sid) {
3117                                 /* do not call mx_disconnect() or send a BYE */
3118                                 mxlnd_conn_disconnect(old_conn, 0, 0);
3119
3120                                 /* the ref for this rx was taken on the old_conn */
3121                                 mxlnd_conn_decref(old_conn);
3122
3123                                 /* This allocs a conn, points peer->mxp_conn to this one.
3124                                 * The old conn is still on the peer->mxp_conns list.
3125                                 * As the pending requests complete, they will call
3126                                 * conn_decref() which will eventually free it. */
3127                                 ret = mxlnd_conn_alloc(&conn, peer);
3128                                 if (ret != 0) {
3129                                         CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3130                                         goto cleanup;
3131                                 }
3132                                 /* conn_alloc() adds one ref for the peer and one
3133                                  * for this function */
3134                                 conn_ref = 1;
3135
3136                                 peer->mxp_sid = sid;
3137                         }
3138                 }
3139                 write_lock(&kmxlnd_data.kmx_global_lock);
3140                 peer->mxp_incarnation = msg->mxm_srcstamp;
3141                 peer->mxp_incompatible = incompatible;
3142                 write_unlock(&kmxlnd_data.kmx_global_lock);
3143                 spin_lock(&conn->mxk_lock);
3144                 conn->mxk_incarnation = msg->mxm_srcstamp;
3145                 conn->mxk_status = MXLND_CONN_WAIT;
3146                 spin_unlock(&conn->mxk_lock);
3147
3148                 /* handle_conn_ack() will create the CONN_ACK msg */
3149                 match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET;
3150                 mxlnd_iconnect(peer, match);
3151
3152                 break;
3153
3154         case MXLND_MSG_CONN_ACK:
3155                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3156                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3157                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3158                                 libcfs_nid2str(msg->mxm_dstnid));
3159                         ret = -1;
3160                         goto failed;
3161                 }
3162                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3163                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3164                                "incompatible queue depth %d (%d wanted)\n",
3165                                 libcfs_nid2str(msg->mxm_srcnid),
3166                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3167                                 *kmxlnd_tunables.kmx_credits);
3168                         spin_lock(&conn->mxk_lock);
3169                         conn->mxk_status = MXLND_CONN_FAIL;
3170                         spin_unlock(&conn->mxk_lock);
3171                         incompatible = 1;
3172                         ret = -1;
3173                 }
3174                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3175                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3176                                "incompatible EAGER size %d (%d wanted)\n",
3177                                 libcfs_nid2str(msg->mxm_srcnid),
3178                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3179                                 (int) MXLND_EAGER_SIZE);
3180                         spin_lock(&conn->mxk_lock);
3181                         conn->mxk_status = MXLND_CONN_FAIL;
3182                         spin_unlock(&conn->mxk_lock);
3183                         incompatible = 1;
3184                         ret = -1;
3185                 }
3186                 write_lock(&kmxlnd_data.kmx_global_lock);
3187                 peer->mxp_incarnation = msg->mxm_srcstamp;
3188                 peer->mxp_incompatible = incompatible;
3189                 write_unlock(&kmxlnd_data.kmx_global_lock);
3190                 spin_lock(&conn->mxk_lock);
3191                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3192                 conn->mxk_outstanding = 0;
3193                 conn->mxk_incarnation = msg->mxm_srcstamp;
3194                 conn->mxk_timeout = 0;
3195                 if (!incompatible) {
3196                         conn->mxk_status = MXLND_CONN_READY;
3197                 }
3198                 spin_unlock(&conn->mxk_lock);
3199                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3200                 break;
3201
3202         default:
3203                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3204                                 libcfs_nid2str(rx->mxc_nid));
3205                 ret = -EPROTO;
3206                 break;
3207         }
3208
3209 failed:
3210         if (ret < 0) {
3211                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3212                 spin_lock(&conn->mxk_lock);
3213                 conn->mxk_status = MXLND_CONN_FAIL;
3214                 spin_unlock(&conn->mxk_lock);
3215         }
3216
3217 cleanup:
3218         if (conn != NULL) {
3219                 spin_lock(&conn->mxk_lock);
3220                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3221                 spin_unlock(&conn->mxk_lock);
3222         }
3223
3224         if (repost) {
3225                 /* lnet_parse() failed, etc., repost now */
3226                 mxlnd_put_idle_rx(rx);
3227                 if (conn != NULL && credit == 1) {
3228                         if (type == MXLND_MSG_PUT_DATA) {
3229                                 spin_lock(&conn->mxk_lock);
3230                                 conn->mxk_outstanding++;
3231                                 spin_unlock(&conn->mxk_lock);
3232                         } else if (type != MXLND_MSG_GET_DATA &&
3233                                   (type == MXLND_MSG_EAGER ||
3234                                    type == MXLND_MSG_PUT_REQ ||
3235                                    type == MXLND_MSG_NOOP)) {
3236                                 spin_lock(&conn->mxk_lock);
3237                                 conn->mxk_outstanding++;
3238                                 spin_unlock(&conn->mxk_lock);
3239                         }
3240                 }
3241                 if (conn_ref) mxlnd_conn_decref(conn);
3242                 LASSERT(peer_ref == 0);
3243         }
3244
3245         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3246                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3247         } else {
3248                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3249         }
3250
3251         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3252         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3253
3254         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3255
3256         return;
3257 }
3258
3259
3260
3261 void
3262 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3263 {
3264         struct kmx_ctx  *tx     = NULL;
3265         struct kmx_msg  *txmsg   = NULL;
3266         struct kmx_conn *conn   = peer->mxp_conn;
3267
3268         /* a conn ref was taken when calling mx_iconnect(), 
3269          * hold it until CONN_REQ or CONN_ACK completes */
3270
3271         CDEBUG(D_NET, "entering\n");
3272         if (status.code != MX_STATUS_SUCCESS) {
3273                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3274                         mx_strstatus(status.code), status.code,
3275                         libcfs_nid2str(peer->mxp_nid));
3276                 spin_lock(&conn->mxk_lock);
3277                 conn->mxk_status = MXLND_CONN_FAIL;
3278                 spin_unlock(&conn->mxk_lock);
3279
3280                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3281                         struct kmx_conn *new_conn       = NULL;
3282                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3283                         /* FIXME write lock here ? */
3284                         mxlnd_conn_disconnect(conn, 0, 0);
3285                         mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3286                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3287                         peer->mxp_reconnect_time = 0;
3288                 }
3289
3290                 mxlnd_conn_decref(conn);
3291                 return;
3292         }
3293
3294         spin_lock(&conn->mxk_lock);
3295         conn->mxk_epa = status.source;
3296         spin_unlock(&conn->mxk_lock);
3297         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3298          *      we should not need to lock the peer */
3299         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3300
3301         /* mx_iconnect() succeeded, reset delay to 0 */
3302         write_lock(&kmxlnd_data.kmx_global_lock);
3303         peer->mxp_reconnect_time = 0;
3304         write_unlock(&kmxlnd_data.kmx_global_lock);
3305
3306         /* marshal CONN_REQ msg */
3307         /* we are still using the conn ref from iconnect() - do not take another */
3308         tx = mxlnd_get_idle_tx();
3309         if (tx == NULL) {
3310                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3311                                    libcfs_nid2str(peer->mxp_nid));
3312                 spin_lock(&conn->mxk_lock);
3313                 conn->mxk_status = MXLND_CONN_FAIL;
3314                 spin_unlock(&conn->mxk_lock);
3315                 mxlnd_conn_decref(conn);
3316                 return;
3317         }
3318
3319         tx->mxc_peer = peer;
3320         tx->mxc_conn = conn;
3321         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3322         txmsg = tx->mxc_msg;
3323         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3324         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3325         tx->mxc_match = mxlnd_create_match(tx, 0);
3326
3327         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3328         mxlnd_queue_tx(tx);
3329         return;
3330 }
3331
3332 void
3333 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3334 {
3335         struct kmx_ctx  *tx     = NULL;
3336         struct kmx_msg  *txmsg   = NULL;
3337         struct kmx_conn *conn   = peer->mxp_conn;
3338         u64             nic_id  = 0ULL;
3339         u32             ep_id   = 0;
3340         u32             sid     = 0;
3341
3342         /* a conn ref was taken when calling mx_iconnect(), 
3343          * hold it until CONN_REQ or CONN_ACK completes */
3344
3345         CDEBUG(D_NET, "entering\n");
3346         if (status.code != MX_STATUS_SUCCESS) {
3347                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3348                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3349                         mx_strstatus(status.code), status.code,
3350                         libcfs_nid2str(peer->mxp_nid),
3351                         peer->mxp_nid,
3352                         peer->mxp_nic_id,
3353                         peer->mxp_ep_id);
3354                 spin_lock(&conn->mxk_lock);
3355                 conn->mxk_status = MXLND_CONN_FAIL;
3356                 spin_unlock(&conn->mxk_lock);
3357
3358                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3359                         struct kmx_conn *new_conn       = NULL;
3360                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3361                         /* FIXME write lock here? */
3362                         mxlnd_conn_disconnect(conn, 0, 1);
3363                         mxlnd_conn_alloc(&new_conn, peer); /* adds ref for 
3364                                                               this function... */
3365                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3366                         peer->mxp_reconnect_time = 0;
3367                 }
3368
3369                 mxlnd_conn_decref(conn);
3370                 return;
3371         }
3372         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3373         spin_lock(&conn->mxk_lock);
3374         conn->mxk_epa = status.source;
3375         if (likely(!peer->mxp_incompatible)) {
3376                 conn->mxk_status = MXLND_CONN_READY;
3377         }
3378         spin_unlock(&conn->mxk_lock);
3379         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3380          *      we should not have to lock the peer */
3381         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3382
3383         /* mx_iconnect() succeeded, reset delay to 0 */
3384         write_lock(&kmxlnd_data.kmx_global_lock);
3385         peer->mxp_reconnect_time = 0;
3386         peer->mxp_sid = sid;
3387         write_unlock(&kmxlnd_data.kmx_global_lock);
3388
3389         /* marshal CONN_ACK msg */
3390         tx = mxlnd_get_idle_tx();
3391         if (tx == NULL) {
3392                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3393                                    libcfs_nid2str(peer->mxp_nid));
3394                 spin_lock(&conn->mxk_lock);
3395                 conn->mxk_status = MXLND_CONN_FAIL;
3396                 spin_unlock(&conn->mxk_lock);
3397                 mxlnd_conn_decref(conn);
3398                 return;
3399         }
3400
3401         tx->mxc_peer = peer;
3402         tx->mxc_conn = conn;
3403         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3404         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3405         txmsg = tx->mxc_msg;
3406         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3407         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3408         tx->mxc_match = mxlnd_create_match(tx, 0);
3409
3410         mxlnd_queue_tx(tx);
3411         return;
3412 }
3413
3414 /**
3415  * The MX request completion thread(s)
3416  * \param arg  thread id (as a void *)
3417  *
3418  * This thread waits for a MX completion and then completes the request.
3419  * We will create one thread per CPU.
3420  */
3421 int
3422 mxlnd_request_waitd(void *arg)
3423 {
3424         long                    id              = (long) arg;
3425         char                    name[24];
3426         __u32                   result          = 0;
3427         mx_return_t             mxret           = MX_SUCCESS;
3428         mx_status_t             status;
3429         struct kmx_ctx         *ctx             = NULL;
3430         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3431         struct kmx_peer        *peer            = NULL;
3432         struct kmx_conn        *conn            = NULL;
3433 #if MXLND_POLLING
3434         int                     count           = 0;
3435 #endif
3436
3437         memset(name, 0, sizeof(name));
3438         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3439         cfs_daemonize(name);
3440         //cfs_block_allsigs();
3441
3442         memset(&status, 0, sizeof(status));
3443
3444         CDEBUG(D_NET, "%s starting\n", name);
3445
3446         while (!kmxlnd_data.kmx_shutdown) {
3447                 u8      msg_type        = 0;
3448
3449                 mxret = MX_SUCCESS;
3450                 result = 0;
3451 #if MXLND_POLLING
3452                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3453                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3454                                             &status, &result);
3455                 } else {
3456                         count = 0;
3457                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3458                                             0ULL, 0ULL, &status, &result);
3459                 }
3460 #else
3461                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3462                                     0ULL, 0ULL, &status, &result);
3463 #endif
3464                 if (unlikely(kmxlnd_data.kmx_shutdown))
3465                         break;
3466
3467                 if (result != 1) {
3468                         /* nothing completed... */
3469                         continue;
3470                 }
3471
3472                 if (status.code != MX_STATUS_SUCCESS) {
3473                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3474                                "match_info 0x%llx and length %d\n",
3475                                mx_strstatus(status.code), status.code,
3476                                (u64) status.match_info, status.msg_length);
3477                 }
3478
3479                 msg_type = MXLND_MSG_TYPE(status.match_info);
3480
3481                 /* This may be a mx_iconnect() request completing,
3482                  * check the bit mask for CONN_REQ and CONN_ACK */
3483                 if (msg_type == MXLND_MSG_ICON_REQ ||
3484                     msg_type == MXLND_MSG_ICON_ACK) {
3485                         peer = (struct kmx_peer*) status.context;
3486                         if (msg_type == MXLND_MSG_ICON_REQ) {
3487                                 mxlnd_handle_conn_req(peer, status);
3488                         } else {
3489                                 mxlnd_handle_conn_ack(peer, status);
3490                         }
3491                         continue;
3492                 }
3493
3494                 /* This must be a tx or rx */
3495
3496                 /* NOTE: if this is a RX from the unexpected callback, it may
3497                  * have very little info. If we dropped it in unexpected_recv(),
3498                  * it will not have a context. If so, ignore it. */
3499                 ctx = (struct kmx_ctx *) status.context;
3500                 if (ctx != NULL) {
3501
3502                         req_type = ctx->mxc_type;
3503                         conn = ctx->mxc_conn; /* this may be NULL */
3504                         mxlnd_deq_pending_ctx(ctx);
3505
3506                         /* copy status to ctx->mxc_status */
3507                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3508
3509                         switch (req_type) {
3510                         case MXLND_REQ_TX:
3511                                 mxlnd_handle_tx_completion(ctx);
3512                                 break;
3513                         case MXLND_REQ_RX:
3514                                 mxlnd_handle_rx_completion(ctx);
3515                                 break;
3516                         default:
3517                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3518                                 LBUG();
3519                                 break;
3520                         }
3521
3522                         /* FIXME may need to reconsider this */
3523                         /* conn is always set except for the first CONN_REQ rx
3524                          * from a new peer */
3525                         if (!(status.code == MX_STATUS_SUCCESS ||
3526                               status.code == MX_STATUS_TRUNCATED) &&
3527                               conn != NULL) {
3528                                 mxlnd_conn_disconnect(conn, 1, 1);
3529                         }
3530                 }
3531                 CDEBUG(D_NET, "waitd() completed task\n");
3532         }
3533         CDEBUG(D_NET, "%s stopping\n", name);
3534         mxlnd_thread_stop(id);
3535         return 0;
3536 }
3537
3538
3539 unsigned long
3540 mxlnd_check_timeouts(unsigned long now)
3541 {
3542         int                     i               = 0;
3543         int                     disconnect      = 0;
3544         unsigned long           next            = 0;
3545         struct  kmx_peer        *peer           = NULL;
3546         struct  kmx_conn        *conn           = NULL;
3547
3548         read_lock(&kmxlnd_data.kmx_global_lock);
3549         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3550                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3551
3552                         if (unlikely(kmxlnd_data.kmx_shutdown)) {
3553                                 read_unlock(&kmxlnd_data.kmx_global_lock);
3554                                 return next;
3555                         }
3556
3557                         conn = peer->mxp_conn;
3558                         if (conn) {
3559                                 mxlnd_conn_addref(conn);
3560                         } else {
3561                                 continue;
3562                         }
3563
3564                         /* FIXMEis this needed? */
3565                         spin_lock(&conn->mxk_lock);
3566
3567                         /* if nothing pending (timeout == 0) or
3568                          * if conn is already disconnected,
3569                          * skip this conn */
3570                         if (conn->mxk_timeout == 0 ||
3571                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3572                                 /* FIXME is this needed? */
3573                                 spin_unlock(&conn->mxk_lock);
3574                                 mxlnd_conn_decref(conn);
3575                                 continue;
3576                         }
3577
3578                         /* we want to find the timeout that will occur first.
3579                          * if it is in the future, we will sleep until then.
3580                          * if it is in the past, then we will sleep one
3581                          * second and repeat the process. */
3582                         if ((next == 0) || (conn->mxk_timeout < next)) {
3583                                 next = conn->mxk_timeout;
3584                         }
3585
3586                         disconnect = 0;
3587
3588                         if (time_after_eq(now, conn->mxk_timeout))  {
3589                                 disconnect = 1;
3590                         }
3591                         spin_unlock(&conn->mxk_lock);
3592
3593                         if (disconnect) {
3594                                 mxlnd_conn_disconnect(conn, 1, 1);
3595                         }
3596                         mxlnd_conn_decref(conn);
3597                 }
3598         }
3599         read_unlock(&kmxlnd_data.kmx_global_lock);
3600         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3601
3602         return next;
3603 }
3604
3605 /**
3606  * Enforces timeouts on messages
3607  * \param arg  thread id (as a void *)
3608  *
3609  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3610  * it calls mxlnd_conn_disconnect().
3611  *
3612  * After checking for timeouts, try progressing sends (call check_sends()).
3613  */
3614 int
3615 mxlnd_timeoutd(void *arg)
3616 {
3617         int                     i       = 0;
3618         long                    id      = (long) arg;
3619         unsigned long           now     = 0;
3620         unsigned long           next    = 0;
3621         unsigned long           delay   = HZ;
3622         struct kmx_peer        *peer    = NULL;
3623         struct kmx_conn        *conn    = NULL;
3624
3625         cfs_daemonize("mxlnd_timeoutd");
3626         //cfs_block_allsigs();
3627
3628         CDEBUG(D_NET, "timeoutd starting\n");
3629
3630         while (!kmxlnd_data.kmx_shutdown) {
3631
3632                 now = jiffies;
3633                 /* if the next timeout has not arrived, go back to sleep */
3634                 if (time_after(now, next)) {
3635                         next = mxlnd_check_timeouts(now);
3636                 }
3637
3638                read_lock(&kmxlnd_data.kmx_global_lock);
3639                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3640                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3641                                 /* FIXME upgrade to write lock?
3642                                  * is any lock needed? */
3643                                 conn = peer->mxp_conn;
3644                                 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3645
3646                                 if (conn == NULL)
3647                                         continue;
3648
3649                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3650                                     time_after(now, conn->mxk_last_tx + HZ)) {
3651                                         /* FIXME drop lock or call check_sends_locked */
3652                                         read_unlock(&kmxlnd_data.kmx_global_lock);
3653                                         mxlnd_check_sends(peer);
3654                                         read_lock(&kmxlnd_data.kmx_global_lock);
3655                                 }
3656                                 mxlnd_conn_decref(conn); /* until here */
3657                         }
3658                 }
3659                 read_unlock(&kmxlnd_data.kmx_global_lock);
3660
3661                 mxlnd_sleep(delay);
3662         }
3663         CDEBUG(D_NET, "timeoutd stopping\n");
3664         mxlnd_thread_stop(id);
3665         return 0;
3666 }