Whamcloud - gitweb
i=maxim,b=18460,b=20171:
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
47
48 inline int
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
50 {
51         /* if memcmp() == 0, it is NULL */
52         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
53 }
54
55 inline void mxlnd_noop(char *s, ...)
56 {
57         return;
58 }
59
60 char *
61 mxlnd_ctxstate_to_str(int mxc_state)
62 {
63         switch (mxc_state) {
64         case MXLND_CTX_INIT:
65                 return "MXLND_CTX_INIT";
66         case MXLND_CTX_IDLE:
67                 return "MXLND_CTX_IDLE";
68         case MXLND_CTX_PREP:
69                 return "MXLND_CTX_PREP";
70         case MXLND_CTX_PENDING:
71                 return "MXLND_CTX_PENDING";
72         case MXLND_CTX_COMPLETED:
73                 return "MXLND_CTX_COMPLETED";
74         case MXLND_CTX_CANCELED:
75                 return "MXLND_CTX_CANCELED";
76         default:
77                 return "*unknown*";
78         }
79 }
80
81 char *
82 mxlnd_connstatus_to_str(int mxk_status)
83 {
84         switch (mxk_status) {
85         case MXLND_CONN_READY:
86                 return "MXLND_CONN_READY";
87         case MXLND_CONN_INIT:
88                 return "MXLND_CONN_INIT";
89         case MXLND_CONN_REQ:
90                 return "MXLND_CONN_REQ";
91         case MXLND_CONN_ACK:
92                 return "MXLND_CONN_ACK";
93         case MXLND_CONN_WAIT:
94                 return "MXLND_CONN_WAIT";
95         case MXLND_CONN_DISCONNECT:
96                 return "MXLND_CONN_DISCONNECT";
97         case MXLND_CONN_FAIL:
98                 return "MXLND_CONN_FAIL";
99         default:
100                 return "unknown";
101         }
102 }
103
104 char *
105 mxlnd_msgtype_to_str(int type) {
106         switch (type) {
107         case MXLND_MSG_EAGER:
108                 return "MXLND_MSG_EAGER";
109         case MXLND_MSG_CONN_REQ:
110                 return "MXLND_MSG_CONN_REQ";
111         case MXLND_MSG_CONN_ACK:
112                 return "MXLND_MSG_CONN_ACK";
113         case MXLND_MSG_BYE:
114                 return "MXLND_MSG_BYE";
115         case MXLND_MSG_NOOP:
116                 return "MXLND_MSG_NOOP";
117         case MXLND_MSG_PUT_REQ:
118                 return "MXLND_MSG_PUT_REQ";
119         case MXLND_MSG_PUT_ACK:
120                 return "MXLND_MSG_PUT_ACK";
121         case MXLND_MSG_PUT_DATA:
122                 return "MXLND_MSG_PUT_DATA";
123         case MXLND_MSG_GET_REQ:
124                 return "MXLND_MSG_GET_REQ";
125         case MXLND_MSG_GET_DATA:
126                 return "MXLND_MSG_GET_DATA";
127         default:
128                 return "unknown";
129         }
130 }
131
132 char *
133 mxlnd_lnetmsg_to_str(int type)
134 {
135         switch (type) {
136         case LNET_MSG_ACK:
137                 return "LNET_MSG_ACK";
138         case LNET_MSG_PUT:
139                 return "LNET_MSG_PUT";
140         case LNET_MSG_GET:
141                 return "LNET_MSG_GET";
142         case LNET_MSG_REPLY:
143                 return "LNET_MSG_REPLY";
144         case LNET_MSG_HELLO:
145                 return "LNET_MSG_HELLO";
146         default:
147                 LBUG();
148                 return "*unknown*";
149         }
150 }
151
152 static inline u64
153 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
154 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
155 {
156         u64 type        = (u64) ctx->mxc_msg_type;
157         u64 err         = (u64) error;
158         u64 match       = 0ULL;
159
160         LASSERT(ctx->mxc_msg_type != 0);
161         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
162         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
163         return match;
164 }
165
166 static inline void
167 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
168 {
169         *msg_type = (u8) MXLND_MSG_TYPE(match);
170         *error    = (u8) MXLND_ERROR_VAL(match);
171         *cookie   = match & MXLND_MAX_COOKIE;
172         LASSERT(*msg_type == MXLND_MSG_EAGER    ||
173                 *msg_type == MXLND_MSG_ICON_REQ ||
174                 *msg_type == MXLND_MSG_CONN_REQ ||
175                 *msg_type == MXLND_MSG_ICON_ACK ||
176                 *msg_type == MXLND_MSG_CONN_ACK ||
177                 *msg_type == MXLND_MSG_BYE      ||
178                 *msg_type == MXLND_MSG_NOOP     ||
179                 *msg_type == MXLND_MSG_PUT_REQ  ||
180                 *msg_type == MXLND_MSG_PUT_ACK  ||
181                 *msg_type == MXLND_MSG_PUT_DATA ||
182                 *msg_type == MXLND_MSG_GET_REQ  ||
183                 *msg_type == MXLND_MSG_GET_DATA);
184         return;
185 }
186
187 struct kmx_ctx *
188 mxlnd_get_idle_rx(void)
189 {
190         struct list_head        *tmp    = NULL;
191         struct kmx_ctx          *rx     = NULL;
192
193         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
194
195         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
196                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
197                 return NULL;
198         }
199
200         tmp = &kmxlnd_data.kmx_rx_idle;
201         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
202         list_del_init(&rx->mxc_list);
203         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
204
205 #if MXLND_DEBUG
206         if (rx->mxc_get != rx->mxc_put) {
207                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
208                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
209                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
210                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
211                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
212                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
213                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
214                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
215                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
216                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
217         }
218 #endif
219         LASSERT (rx->mxc_get == rx->mxc_put);
220
221         rx->mxc_get++;
222
223         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
224         rx->mxc_state = MXLND_CTX_PREP;
225
226         return rx;
227 }
228
229 int
230 mxlnd_put_idle_rx(struct kmx_ctx *rx)
231 {
232         if (rx == NULL) {
233                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
234                 return -EINVAL;
235         } else if (rx->mxc_type != MXLND_REQ_RX) {
236                 CDEBUG(D_NETERROR, "called with tx\n");
237                 return -EINVAL;
238         }
239         LASSERT(rx->mxc_get == rx->mxc_put + 1);
240         mxlnd_ctx_init(rx);
241         rx->mxc_put++;
242         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
243         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
244         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
245         return 0;
246 }
247
248 int
249 mxlnd_reduce_idle_rxs(__u32 count)
250 {
251         __u32                   i       = 0;
252         struct kmx_ctx          *rx     = NULL;
253
254         spin_lock(&kmxlnd_data.kmx_rxs_lock);
255         for (i = 0; i < count; i++) {
256                 rx = mxlnd_get_idle_rx();
257                 if (rx != NULL) {
258                         struct list_head *tmp = &rx->mxc_global_list;
259                         list_del_init(tmp);
260                         mxlnd_ctx_free(rx);
261                 } else {
262                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
263                         break;
264                 }
265         }
266         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
267         return 0;
268 }
269
270 struct kmx_ctx *
271 mxlnd_get_idle_tx(void)
272 {
273         struct list_head        *tmp    = NULL;
274         struct kmx_ctx          *tx     = NULL;
275
276         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
277
278         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
279                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
280                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
281                 return NULL;
282         }
283
284         tmp = &kmxlnd_data.kmx_tx_idle;
285         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
286         list_del_init(&tx->mxc_list);
287
288         /* Allocate a new completion cookie.  It might not be needed,
289          * but we've got a lock right now and we're unlikely to
290          * wrap... */
291         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
292         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
293                 kmxlnd_data.kmx_tx_next_cookie = 1;
294         }
295         kmxlnd_data.kmx_tx_used++;
296         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
297
298         LASSERT (tx->mxc_get == tx->mxc_put);
299
300         tx->mxc_get++;
301
302         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
303         LASSERT (tx->mxc_lntmsg[0] == NULL);
304         LASSERT (tx->mxc_lntmsg[1] == NULL);
305
306         tx->mxc_state = MXLND_CTX_PREP;
307
308         return tx;
309 }
310
311 void
312 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye);
313
314 int
315 mxlnd_put_idle_tx(struct kmx_ctx *tx)
316 {
317         //int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
318         int             result  = 0;
319         lnet_msg_t      *lntmsg[2];
320
321         if (tx == NULL) {
322                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
323                 return -EINVAL;
324         } else if (tx->mxc_type != MXLND_REQ_TX) {
325                 CDEBUG(D_NETERROR, "called with rx\n");
326                 return -EINVAL;
327         }
328         if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
329               tx->mxc_status.code == MX_STATUS_TRUNCATED)) {
330                 struct kmx_conn *conn   = tx->mxc_conn;
331
332                 result = -EIO;
333                 mxlnd_conn_disconnect(conn, 0, 1);
334         }
335
336         lntmsg[0] = tx->mxc_lntmsg[0];
337         lntmsg[1] = tx->mxc_lntmsg[1];
338
339         LASSERT(tx->mxc_get == tx->mxc_put + 1);
340         mxlnd_ctx_init(tx);
341         tx->mxc_put++;
342         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
343         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
344         kmxlnd_data.kmx_tx_used--;
345         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
346         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
347         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
348         return 0;
349 }
350
351 /**
352  * Free the conn
353  * \param conn a kmx_conn pointer
354  *
355  * The calling function should remove the conn from the conns list first
356  * then destroy it.
357  */
358 void
359 mxlnd_conn_free(struct kmx_conn *conn)
360 {
361         struct kmx_peer *peer   = conn->mxk_peer;
362
363         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365                  list_empty (&conn->mxk_tx_free_queue) &&
366                  list_empty (&conn->mxk_pending));
367         if (!list_empty(&conn->mxk_list)) {
368                 list_del_init(&conn->mxk_list);
369                 if (peer->mxp_conn == conn) {
370                         peer->mxp_conn = NULL;
371                         if (!mxlnd_endpoint_addr_null(conn->mxk_epa))
372                                 mx_set_endpoint_addr_context(conn->mxk_epa,
373                                                              (void *) NULL);
374                 }
375         }
376         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
377         MXLND_FREE (conn, sizeof (*conn));
378         return;
379 }
380
381
382 void
383 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
384 {
385         int                     found   = 0;
386         struct kmx_ctx          *ctx    = NULL;
387         struct kmx_ctx          *next   = NULL;
388         mx_return_t             mxret   = MX_SUCCESS;
389         u32                     result  = 0;
390
391         do {
392                 found = 0;
393                 spin_lock(&conn->mxk_lock);
394                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
395                         /* we will delete all including txs */
396                         list_del_init(&ctx->mxc_list);
397                         if (ctx->mxc_type == MXLND_REQ_RX) {
398                                 found = 1;
399                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
400                                                   &ctx->mxc_mxreq,
401                                                   &result);
402                                 if (mxret != MX_SUCCESS) {
403                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
404                                 }
405                                 if (result == 1) {
406                                         ctx->mxc_status.code = -ECONNABORTED;
407                                         ctx->mxc_state = MXLND_CTX_CANCELED;
408                                         /* NOTE this calls lnet_finalize() and
409                                          * we cannot hold any locks when calling it.
410                                          * It also calls mxlnd_conn_decref(conn) */
411                                         spin_unlock(&conn->mxk_lock);
412                                         mxlnd_handle_rx_completion(ctx);
413                                         spin_lock(&conn->mxk_lock);
414                                 }
415                                 break;
416                         }
417                 }
418                 spin_unlock(&conn->mxk_lock);
419         }
420         while (found);
421
422         return;
423 }
424
425 /**
426  * Shutdown a connection
427  * \param conn a kmx_conn pointer
428  * \param mx_dis call mx_disconnect()
429  * \param send_bye send peer a BYE msg
430  *
431  * This function sets the status to DISCONNECT, completes queued
432  * txs with failure, calls mx_disconnect, which will complete
433  * pending txs and matched rxs with failure.
434  */
435 void
436 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye)
437 {
438         mx_endpoint_addr_t      epa     = conn->mxk_epa;
439         struct list_head        *tmp    = NULL;
440         int                     valid   = !mxlnd_endpoint_addr_null(epa);
441
442         spin_lock(&conn->mxk_lock);
443         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
444                 spin_unlock(&conn->mxk_lock);
445                 return;
446         }
447         conn->mxk_status = MXLND_CONN_DISCONNECT;
448         conn->mxk_timeout = 0;
449
450         while (!list_empty(&conn->mxk_tx_free_queue) ||
451                !list_empty(&conn->mxk_tx_credit_queue)) {
452
453                 struct kmx_ctx          *tx     = NULL;
454
455                 if (!list_empty(&conn->mxk_tx_free_queue)) {
456                         tmp = &conn->mxk_tx_free_queue;
457                 } else {
458                         tmp = &conn->mxk_tx_credit_queue;
459                 }
460
461                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
462                 list_del_init(&tx->mxc_list);
463                 tx->mxc_status.code = -ECONNABORTED;
464                 spin_unlock(&conn->mxk_lock);
465                 mxlnd_put_idle_tx(tx);
466                 mxlnd_conn_decref(conn); /* for this tx */
467                 spin_lock(&conn->mxk_lock);
468         }
469
470         spin_unlock(&conn->mxk_lock);
471
472         /* cancel pending rxs */
473         mxlnd_conn_cancel_pending_rxs(conn);
474
475         if (send_bye && valid) {
476                 u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET;
477                 /* send a BYE to the peer */
478                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
479                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
480                 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
481                                 epa, match, NULL, NULL);
482                 /* wait to allow the peer to ack our message */
483                 mxlnd_sleep(msecs_to_jiffies(20));
484         }
485
486         if (kmxlnd_data.kmx_shutdown != 1) {
487                 unsigned long last_msg = 0;
488
489                 /* notify LNET that we are giving up on this peer */
490                 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
491                         last_msg = conn->mxk_last_rx;
492                 else
493                         last_msg = conn->mxk_last_tx;
494
495                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
496
497                 if (mx_dis && valid)
498                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
499         }
500         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
501
502         return;
503 }
504
505 /**
506  * Allocate and initialize a new conn struct
507  * \param connp address of a kmx_conn pointer
508  * \param peer owning kmx_peer
509  *
510  * Returns 0 on success and -ENOMEM on failure
511  */
512 int
513 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
514 {
515         struct kmx_conn *conn    = NULL;
516
517         LASSERT(peer != NULL);
518
519         MXLND_ALLOC(conn, sizeof (*conn));
520         if (conn == NULL) {
521                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
522                 return -ENOMEM;
523         }
524         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
525
526         memset(conn, 0, sizeof(*conn));
527
528         /* conn->mxk_incarnation = 0 - will be set by peer */
529         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer 
530                                                    and one for the caller */
531         conn->mxk_peer = peer;
532         /* mxk_epa - to be set after mx_iconnect() */
533         INIT_LIST_HEAD(&conn->mxk_list);
534         spin_lock_init(&conn->mxk_lock);
535         /* conn->mxk_timeout = 0 */
536         conn->mxk_last_tx = jiffies;
537         conn->mxk_last_rx = conn->mxk_last_tx;
538         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
539         /* mxk_outstanding = 0 */
540         conn->mxk_status = MXLND_CONN_INIT;
541         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
542         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
543         /* conn->mxk_ntx_msgs = 0 */
544         /* conn->mxk_ntx_data = 0 */
545         /* conn->mxk_ntx_posted = 0 */
546         /* conn->mxk_data_posted = 0 */
547         INIT_LIST_HEAD(&conn->mxk_pending);
548
549         *connp = conn;
550
551         mxlnd_peer_addref(peer);        /* add a ref for this conn */
552
553         /* add to front of peer's conns list */
554         list_add(&conn->mxk_list, &peer->mxp_conns);
555         peer->mxp_conn = conn;
556         return 0;
557 }
558
559 int
560 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
561 {
562         int ret = 0;
563         write_lock(&kmxlnd_data.kmx_global_lock);
564         ret = mxlnd_conn_alloc_locked(connp, peer);
565         write_unlock(&kmxlnd_data.kmx_global_lock);
566         return ret;
567 }
568
569 int
570 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
571 {
572         int             ret     = 0;
573         struct kmx_conn *conn   = ctx->mxc_conn;
574
575         ctx->mxc_state = MXLND_CTX_PENDING;
576         if (conn != NULL) {
577                 spin_lock(&conn->mxk_lock);
578                 if (conn->mxk_status >= MXLND_CONN_INIT) {
579                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
580                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
581                                 conn->mxk_timeout = ctx->mxc_deadline;
582                         }
583                 } else {
584                         ctx->mxc_state = MXLND_CTX_COMPLETED;
585                         ret = -1;
586                 }
587                 spin_unlock(&conn->mxk_lock);
588         }
589         return ret;
590 }
591
592 int
593 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
594 {
595         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
596                 ctx->mxc_state == MXLND_CTX_COMPLETED);
597         if (ctx->mxc_state != MXLND_CTX_PENDING &&
598             ctx->mxc_state != MXLND_CTX_COMPLETED) {
599                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
600                        mxlnd_ctxstate_to_str(ctx->mxc_state));
601         }
602         ctx->mxc_state = MXLND_CTX_COMPLETED;
603         if (!list_empty(&ctx->mxc_list)) {
604                 struct kmx_conn *conn = ctx->mxc_conn;
605                 struct kmx_ctx *next = NULL;
606                 LASSERT(conn != NULL);
607                 spin_lock(&conn->mxk_lock);
608                 list_del_init(&ctx->mxc_list);
609                 conn->mxk_timeout = 0;
610                 if (!list_empty(&conn->mxk_pending)) {
611                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
612                         conn->mxk_timeout = next->mxc_deadline;
613                 }
614                 spin_unlock(&conn->mxk_lock);
615         }
616         return 0;
617 }
618
619 /**
620  * Free the peer
621  * \param peer a kmx_peer pointer
622  *
623  * The calling function should decrement the rxs, drain the tx queues and
624  * remove the peer from the peers list first then destroy it.
625  */
626 void
627 mxlnd_peer_free(struct kmx_peer *peer)
628 {
629         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer,
630                         peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : "");
631
632         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
633
634         if (peer == kmxlnd_data.kmx_localhost)
635                 LASSERT(kmxlnd_data.kmx_shutdown);
636
637         if (!list_empty(&peer->mxp_peers)) {
638                 /* assume we are locked */
639                 list_del_init(&peer->mxp_peers);
640         }
641
642         MXLND_FREE (peer, sizeof (*peer));
643         atomic_dec(&kmxlnd_data.kmx_npeers);
644         return;
645 }
646
647 #define MXLND_LOOKUP_COUNT 10
648
649 /* We only want the MAC address of the peer's Myricom NIC. We
650  * require that each node has the IPoMX interface (myriN) up.
651  * We will not pass any traffic over IPoMX, but it allows us
652  * to get the MAC address. */
653 static int
654 mxlnd_ip2nic_id(u32 ip, u64 *nic_id)
655 {
656         int                     ret     = 0;
657         int                     try     = 1;
658         int                     fatal   = 0;
659         u64                     tmp_id  = 0ULL;
660         unsigned char           *haddr  = NULL;
661         struct net_device       *dev    = NULL;
662         struct neighbour        *n      = NULL;
663         cfs_socket_t            *sock   = NULL;
664         __be32                  dst_ip  = htonl(ip);
665
666         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
667         if (dev == NULL) {
668                 return -ENODEV;
669         }
670
671         haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */
672
673         do {
674                 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
675                 if (n) {
676                         n->used = jiffies;
677                         if (n->nud_state & NUD_VALID) {
678                                 memcpy(haddr, n->ha, dev->addr_len);
679                                 neigh_release(n);
680                                 ret = 0;
681                                 break;
682                         }
683                 }
684                 /* not found, try to connect (force an arp) */
685                 libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
686                 if (!fatal)
687                         libcfs_sock_release(sock);
688                 schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */
689         } while (try++ < MXLND_LOOKUP_COUNT);
690
691         dev_put(dev);
692
693         if (tmp_id == 0ULL)
694                 ret = -EHOSTUNREACH;
695 #ifdef __LITTLE_ENDIAN
696         *nic_id = ___arch__swab64(tmp_id);
697 #else
698         *nic_id = tmp_id;
699 #endif
700         return ret;
701 }
702
703 /**
704  * Allocate and initialize a new peer struct
705  * \param peerp  address of a kmx_peer pointer
706  * \param nid  LNET node id
707  * Returns 0 on success and -ENOMEM on failure
708  */
709 int
710 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
711 {
712         int                     i       = 0;
713         int                     ret     = 0;
714         u32                     ip      = LNET_NIDADDR(nid);
715         struct kmx_peer        *peer    = NULL;
716
717         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
718
719         MXLND_ALLOC(peer, sizeof (*peer));
720         if (peer == NULL) {
721                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
722                 return -ENOMEM;
723         }
724         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
725
726         memset(peer, 0, sizeof(*peer));
727
728         peer->mxp_nid = nid;
729         /* peer->mxp_incarnation */
730         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
731
732         peer->mxp_ip = ip;
733         peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id;
734         peer->mxp_board = board;
735         peer->mxp_nic_id = nic_id;
736
737         if (nic_id == 0ULL) {
738                 ret = mxlnd_ip2nic_id(ip, &nic_id);
739                 if (ret != 0)
740                         CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret);
741                 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
742         }
743
744         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
745
746         INIT_LIST_HEAD(&peer->mxp_peers);
747         INIT_LIST_HEAD(&peer->mxp_conns);
748         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
749         if (ret != 0) {
750                 mxlnd_peer_decref(peer);
751                 return ret;
752         }
753
754         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
755                 struct kmx_ctx   *rx     = NULL;
756                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
757                 if (ret != 0) {
758                         mxlnd_reduce_idle_rxs(i);
759                         mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
760                         mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
761                         mxlnd_peer_decref(peer);
762                         return ret;
763                 }
764                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
765                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
766                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
767                 rx->mxc_put = -1;
768                 mxlnd_put_idle_rx(rx);
769         }
770         /* peer->mxp_reconnect_time = 0 */
771         /* peer->mxp_incompatible = 0 */
772
773         *peerp = peer;
774         return 0;
775 }
776
777 static inline struct kmx_peer *
778 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
779 {
780         int                     found   = 0;
781         int                     hash    = 0;
782         struct kmx_peer         *peer   = NULL;
783
784         hash = mxlnd_nid_to_hash(nid);
785
786         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
787                 if (peer->mxp_nid == nid) {
788                         found = 1;
789                         mxlnd_peer_addref(peer);
790                         break;
791                 }
792         }
793         return (found ? peer : NULL);
794 }
795
796 static inline struct kmx_peer *
797 mxlnd_find_peer_by_nid(lnet_nid_t nid)
798 {
799         struct kmx_peer *peer   = NULL;
800
801         read_lock(&kmxlnd_data.kmx_global_lock);
802         peer = mxlnd_find_peer_by_nid_locked(nid);
803         read_unlock(&kmxlnd_data.kmx_global_lock);
804         return peer;
805 }
806
807 static inline int
808 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
809 {
810         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
811                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
812                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
813                 tx->mxc_msg_type == MXLND_MSG_NOOP);
814 }
815
816 /**
817  * Set type and number of bytes
818  * \param msg  msg pointer
819  * \param type  of message
820  * \param body_nob  bytes in msg body
821  */
822 static inline void
823 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
824 {
825         msg->mxm_type = type;
826         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
827 }
828
829 static inline void
830 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
831 {
832         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
833         struct kmx_msg  *msg    = NULL;
834
835         LASSERT (tx != NULL);
836         LASSERT (nob <= MXLND_EAGER_SIZE);
837
838         tx->mxc_nid = nid;
839         /* tx->mxc_peer should have already been set if we know it */
840         tx->mxc_msg_type = type;
841         tx->mxc_nseg = 1;
842         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
843         tx->mxc_seg.segment_length = nob;
844         tx->mxc_pin_type = MX_PIN_PHYSICAL;
845         //tx->mxc_state = MXLND_CTX_PENDING;
846
847         msg = tx->mxc_msg;
848         msg->mxm_type = type;
849         msg->mxm_nob  = nob;
850
851         return;
852 }
853
854 static inline __u32
855 mxlnd_cksum (void *ptr, int nob)
856 {
857         char  *c  = ptr;
858         __u32  sum = 0;
859
860         while (nob-- > 0)
861                 sum = ((sum << 1) | (sum >> 31)) + *c++;
862
863         /* ensure I don't return 0 (== no checksum) */
864         return (sum == 0) ? 1 : sum;
865 }
866
867 /**
868  * Complete msg info
869  * \param tx  msg to send
870  */
871 static inline void
872 mxlnd_pack_msg(struct kmx_ctx *tx)
873 {
874         struct kmx_msg  *msg    = tx->mxc_msg;
875
876         /* type and nob should already be set in init_msg() */
877         msg->mxm_magic    = MXLND_MSG_MAGIC;
878         msg->mxm_version  = MXLND_MSG_VERSION;
879         /*   mxm_type */
880         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
881          * return credits as well */
882         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
883             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
884                 spin_lock(&tx->mxc_conn->mxk_lock);
885                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
886                 tx->mxc_conn->mxk_outstanding = 0;
887                 spin_unlock(&tx->mxc_conn->mxk_lock);
888         } else {
889                 msg->mxm_credits  = 0;
890         }
891         /*   mxm_nob */
892         msg->mxm_cksum    = 0;
893         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
894         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
895         msg->mxm_dstnid   = tx->mxc_nid;
896         /* if it is a new peer, the dststamp will be 0 */
897         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
898         msg->mxm_seq      = tx->mxc_cookie;
899
900         if (*kmxlnd_tunables.kmx_cksum) {
901                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
902         }
903 }
904
905 int
906 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
907 {
908         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
909         __u32     msg_cksum     = 0;
910         int       flip          = 0;
911         int       msg_nob       = 0;
912
913         /* 6 bytes are enough to have received magic + version */
914         if (nob < 6) {
915                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
916                 return -EPROTO;
917         }
918
919         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
920                 flip = 0;
921         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
922                 flip = 1;
923         } else {
924                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
925                 return -EPROTO;
926         }
927
928         if (msg->mxm_version !=
929             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
930                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
931                 return -EPROTO;
932         }
933
934         if (nob < hdr_size) {
935                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
936                 return -EPROTO;
937         }
938
939         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
940         if (msg_nob > nob) {
941                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
942                 return -EPROTO;
943         }
944
945         /* checksum must be computed with mxm_cksum zero and BEFORE anything
946          * gets flipped */
947         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
948         msg->mxm_cksum = 0;
949         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
950                 CDEBUG(D_NETERROR, "Bad checksum\n");
951                 return -EPROTO;
952         }
953         msg->mxm_cksum = msg_cksum;
954
955         if (flip) {
956                 /* leave magic unflipped as a clue to peer endianness */
957                 __swab16s(&msg->mxm_version);
958                 CLASSERT (sizeof(msg->mxm_type) == 1);
959                 CLASSERT (sizeof(msg->mxm_credits) == 1);
960                 msg->mxm_nob = msg_nob;
961                 __swab64s(&msg->mxm_srcnid);
962                 __swab64s(&msg->mxm_srcstamp);
963                 __swab64s(&msg->mxm_dstnid);
964                 __swab64s(&msg->mxm_dststamp);
965                 __swab64s(&msg->mxm_seq);
966         }
967
968         if (msg->mxm_srcnid == LNET_NID_ANY) {
969                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
970                 return -EPROTO;
971         }
972
973         switch (msg->mxm_type) {
974         default:
975                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
976                 return -EPROTO;
977
978         case MXLND_MSG_NOOP:
979                 break;
980
981         case MXLND_MSG_EAGER:
982                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
983                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
984                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
985                         return -EPROTO;
986                 }
987                 break;
988
989         case MXLND_MSG_PUT_REQ:
990                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
991                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
992                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
993                         return -EPROTO;
994                 }
995                 if (flip)
996                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
997                 break;
998
999         case MXLND_MSG_PUT_ACK:
1000                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1001                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1002                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1003                         return -EPROTO;
1004                 }
1005                 if (flip) {
1006                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1007                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1008                 }
1009                 break;
1010
1011         case MXLND_MSG_GET_REQ:
1012                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1013                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1014                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1015                         return -EPROTO;
1016                 }
1017                 if (flip) {
1018                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1019                 }
1020                 break;
1021
1022         case MXLND_MSG_CONN_REQ:
1023         case MXLND_MSG_CONN_ACK:
1024                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1025                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1026                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1027                         return -EPROTO;
1028                 }
1029                 if (flip) {
1030                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1031                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1032                 }
1033                 break;
1034         }
1035         return 0;
1036 }
1037
1038 /**
1039  * Receive a message
1040  * \param lntmsg  the LNET msg that this is continuing. If EAGER, then NULL.
1041  * \param length  length of incoming message
1042  *
1043  * The caller gets the rx and sets nid, peer and conn if known.
1044  *
1045  * Returns 0 on success and -1 on failure
1046  */
1047 int
1048 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1049 {
1050         int             ret     = 0;
1051         mx_return_t     mxret   = MX_SUCCESS;
1052         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1053
1054         rx->mxc_msg_type = msg_type;
1055         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1056         rx->mxc_cookie = cookie;
1057         /* rx->mxc_match may already be set */
1058         /* rx->mxc_seg.segment_ptr is already set */
1059         rx->mxc_seg.segment_length = length;
1060         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1061         ret = mxlnd_q_pending_ctx(rx);
1062         if (ret == -1) {
1063                 /* the caller is responsible for calling conn_decref() if needed */
1064                 return -1;
1065         }
1066         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1067                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1068         if (mxret != MX_SUCCESS) {
1069                 mxlnd_deq_pending_ctx(rx);
1070                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1071                                    mx_strerror(mxret), (int) mxret);
1072                 return -1;
1073         }
1074         return 0;
1075 }
1076
1077
1078 /**
1079  * This is the callback function that will handle unexpected receives
1080  * \param context  NULL, ignore
1081  * \param source  the peer's mx_endpoint_addr_t
1082  * \param match_value  the msg's bit, should be MXLND_MSG_EAGER
1083  * \param length  length of incoming message
1084  * \param data_if_available  ignore
1085  *
1086  * If it is an eager-sized msg, we will call recv_msg() with the actual
1087  * length. If it is a large message, we will call recv_msg() with a
1088  * length of 0 bytes to drop it because we should never have a large,
1089  * unexpected message.
1090  *
1091  * NOTE - The MX library blocks until this function completes. Make it as fast as
1092  * possible. DO NOT allocate memory which can block!
1093  *
1094  * If we cannot get a rx or the conn is closed, drop the message on the floor
1095  * (i.e. recv 0 bytes and ignore).
1096  */
1097 mx_unexp_handler_action_t
1098 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1099                  uint64_t match_value, uint32_t length, void *data_if_available)
1100 {
1101         int             ret             = 0;
1102         struct kmx_ctx  *rx             = NULL;
1103         mx_ksegment_t   seg;
1104         u8              msg_type        = 0;
1105         u8              error           = 0;
1106         u64             cookie          = 0ULL;
1107
1108         if (context != NULL) {
1109                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1110         }
1111
1112 #if MXLND_DEBUG
1113         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1114 #endif
1115
1116         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1117         if (msg_type == MXLND_MSG_BYE) {
1118                 struct kmx_peer *peer   = NULL;
1119
1120                 mx_get_endpoint_addr_context(source, (void **) &peer);
1121                 if (peer && peer->mxp_conn) {
1122                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1123                                         libcfs_nid2str(peer->mxp_nid));
1124                         mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1125                 }
1126
1127                 return MX_RECV_FINISHED;
1128         }
1129
1130         rx = mxlnd_get_idle_rx();
1131         if (rx != NULL) {
1132                 if (length <= MXLND_EAGER_SIZE) {
1133                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1134                 } else {
1135                         CDEBUG(D_NETERROR, "unexpected large receive with "
1136                                            "match_value=0x%llx length=%d\n",
1137                                            match_value, length);
1138                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1139                 }
1140
1141                 if (ret == 0) {
1142                         struct kmx_peer *peer   = NULL;
1143                         struct kmx_conn *conn   = NULL;
1144
1145                         /* NOTE to avoid a peer disappearing out from under us,
1146                          *      read lock the peers lock first */
1147                         read_lock(&kmxlnd_data.kmx_global_lock);
1148                         mx_get_endpoint_addr_context(source, (void **) &peer);
1149                         if (peer != NULL) {
1150                                 mxlnd_peer_addref(peer); /* add a ref... */
1151                                 conn = peer->mxp_conn;
1152                                 if (conn) {
1153                                         mxlnd_conn_addref(conn); /* add ref until rx completed */
1154                                         mxlnd_peer_decref(peer); /* and drop peer ref */
1155                                         rx->mxc_conn = conn;
1156                                 }
1157                                 rx->mxc_peer = peer;
1158                                 rx->mxc_nid = peer->mxp_nid;
1159                         }
1160                         read_unlock(&kmxlnd_data.kmx_global_lock);
1161                 } else {
1162                         CDEBUG(D_NETERROR, "could not post receive\n");
1163                         mxlnd_put_idle_rx(rx);
1164                 }
1165         }
1166
1167         if (rx == NULL || ret != 0) {
1168                 if (rx == NULL) {
1169                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1170                 } else {
1171                         /* ret != 0 */
1172                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1173                 }
1174                 seg.segment_ptr = 0ULL;
1175                 seg.segment_length = 0;
1176                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1177                           match_value, ~0ULL, NULL, NULL);
1178         }
1179
1180         return MX_RECV_CONTINUE;
1181 }
1182
1183
1184 int
1185 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1186 {
1187         int                      i      = 0;
1188         int                      ret    = -ENOENT;
1189         struct kmx_peer         *peer   = NULL;
1190
1191         read_lock(&kmxlnd_data.kmx_global_lock);
1192         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1193                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1194                         if (index-- == 0) {
1195                                 *nidp = peer->mxp_nid;
1196                                 *count = atomic_read(&peer->mxp_refcount);
1197                                 ret = 0;
1198                                 break;
1199                         }
1200                 }
1201         }
1202         read_unlock(&kmxlnd_data.kmx_global_lock);
1203
1204         return ret;
1205 }
1206
1207 void
1208 mxlnd_del_peer_locked(struct kmx_peer *peer)
1209 {
1210         list_del_init(&peer->mxp_peers); /* remove from the global list */
1211         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1212         mxlnd_peer_decref(peer); /* drop global list ref */
1213         return;
1214 }
1215
1216 int
1217 mxlnd_del_peer(lnet_nid_t nid)
1218 {
1219         int             i       = 0;
1220         int             ret     = 0;
1221         struct kmx_peer *peer   = NULL;
1222         struct kmx_peer *next   = NULL;
1223
1224         if (nid != LNET_NID_ANY) {
1225                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1226         }
1227         write_lock(&kmxlnd_data.kmx_global_lock);
1228         if (nid != LNET_NID_ANY) {
1229                 if (peer == NULL) {
1230                         ret = -ENOENT;
1231                 } if (peer == kmxlnd_data.kmx_localhost) {
1232                         mxlnd_peer_decref(peer); /* and drops it */
1233                         CERROR("cannot free this host's NID 0x%llx\n", nid);
1234                 } else {
1235                         mxlnd_peer_decref(peer); /* and drops it */
1236                         mxlnd_del_peer_locked(peer);
1237                 }
1238         } else { /* LNET_NID_ANY */
1239                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1240                         list_for_each_entry_safe(peer, next,
1241                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1242                                 if (peer != kmxlnd_data.kmx_localhost)
1243                                         mxlnd_del_peer_locked(peer);
1244                         }
1245                 }
1246         }
1247         write_unlock(&kmxlnd_data.kmx_global_lock);
1248
1249         return ret;
1250 }
1251
1252 struct kmx_conn *
1253 mxlnd_get_conn_by_idx(int index)
1254 {
1255         int                      i      = 0;
1256         struct kmx_peer         *peer   = NULL;
1257         struct kmx_conn         *conn   = NULL;
1258
1259         read_lock(&kmxlnd_data.kmx_global_lock);
1260         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1261                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1262                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1263                                 if (index-- > 0) {
1264                                         continue;
1265                                 }
1266
1267                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1268                                 read_unlock(&kmxlnd_data.kmx_global_lock);
1269                                 return conn;
1270                         }
1271                 }
1272         }
1273         read_unlock(&kmxlnd_data.kmx_global_lock);
1274
1275         return NULL;
1276 }
1277
1278 void
1279 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1280 {
1281         struct kmx_conn *conn   = NULL;
1282         struct kmx_conn *next   = NULL;
1283
1284         if (peer == kmxlnd_data.kmx_localhost) return;
1285
1286         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1287                 mxlnd_conn_disconnect(conn, 0, 1);
1288
1289         return;
1290 }
1291
1292 int
1293 mxlnd_close_matching_conns(lnet_nid_t nid)
1294 {
1295         int             i       = 0;
1296         int             ret     = 0;
1297         struct kmx_peer *peer   = NULL;
1298
1299         read_lock(&kmxlnd_data.kmx_global_lock);
1300         if (nid != LNET_NID_ANY) {
1301                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1302                 if (peer == NULL) {
1303                         ret = -ENOENT;
1304                 } else {
1305                         mxlnd_close_matching_conns_locked(peer);
1306                         mxlnd_peer_decref(peer); /* and drops it here */
1307                 }
1308         } else { /* LNET_NID_ANY */
1309                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1310                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1311                                 mxlnd_close_matching_conns_locked(peer);
1312                 }
1313         }
1314         read_unlock(&kmxlnd_data.kmx_global_lock);
1315
1316         return ret;
1317 }
1318
1319 /**
1320  * Modify MXLND parameters
1321  * \param ni  LNET interface handle
1322  * \param cmd command to change
1323  * \param arg the ioctl data
1324  *
1325  * Not implemented yet.
1326  */
1327 int
1328 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1329 {
1330         struct libcfs_ioctl_data *data  = arg;
1331         int                       ret   = -EINVAL;
1332
1333         LASSERT (ni == kmxlnd_data.kmx_ni);
1334
1335         switch (cmd) {
1336         case IOC_LIBCFS_GET_PEER: {
1337                 lnet_nid_t      nid     = 0;
1338                 int             count   = 0;
1339
1340                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1341                 data->ioc_nid    = nid;
1342                 data->ioc_count  = count;
1343                 break;
1344         }
1345         case IOC_LIBCFS_DEL_PEER: {
1346                 ret = mxlnd_del_peer(data->ioc_nid);
1347                 break;
1348         }
1349         case IOC_LIBCFS_GET_CONN: {
1350                 struct kmx_conn *conn = NULL;
1351
1352                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1353                 if (conn == NULL) {
1354                         ret = -ENOENT;
1355                 } else {
1356                         ret = 0;
1357                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1358                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1359                 }
1360                 break;
1361         }
1362         case IOC_LIBCFS_CLOSE_CONNECTION: {
1363                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1364                 break;
1365         }
1366         default:
1367                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1368                 break;
1369         }
1370
1371         return ret;
1372 }
1373
1374 /**
1375  * Add the tx to the global tx queue
1376  *
1377  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1378  */
1379 void
1380 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1381 {
1382         u8                      msg_type        = tx->mxc_msg_type;
1383         //struct kmx_peer         *peer           = tx->mxc_peer;
1384         struct kmx_conn         *conn           = tx->mxc_conn;
1385
1386         LASSERT (msg_type != 0);
1387         LASSERT (tx->mxc_nid != 0);
1388         LASSERT (tx->mxc_peer != NULL);
1389         LASSERT (tx->mxc_conn != NULL);
1390
1391         tx->mxc_incarnation = conn->mxk_incarnation;
1392
1393         if (msg_type != MXLND_MSG_PUT_DATA &&
1394             msg_type != MXLND_MSG_GET_DATA) {
1395                 /* msg style tx */
1396                 if (mxlnd_tx_requires_credit(tx)) {
1397                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1398                         conn->mxk_ntx_msgs++;
1399                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1400                            msg_type == MXLND_MSG_CONN_ACK) {
1401                         /* put conn msgs at the front of the queue */
1402                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1403                 } else {
1404                         /* PUT_ACK, PUT_NAK */
1405                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1406                         conn->mxk_ntx_msgs++;
1407                 }
1408         } else {
1409                 /* data style tx */
1410                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1411                 conn->mxk_ntx_data++;
1412         }
1413
1414         return;
1415 }
1416
1417 /**
1418  * Add the tx to the global tx queue
1419  *
1420  * Add the tx to the peer's msg or data queue
1421  */
1422 static inline void
1423 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1424 {
1425         LASSERT(tx->mxc_peer != NULL);
1426         LASSERT(tx->mxc_conn != NULL);
1427         spin_lock(&tx->mxc_conn->mxk_lock);
1428         mxlnd_peer_queue_tx_locked(tx);
1429         spin_unlock(&tx->mxc_conn->mxk_lock);
1430
1431         return;
1432 }
1433
1434 /**
1435  * Add the tx to the global tx queue
1436  *
1437  * Add the tx to the global queue and up the tx_queue_sem
1438  */
1439 void
1440 mxlnd_queue_tx(struct kmx_ctx *tx)
1441 {
1442         struct kmx_peer *peer   = tx->mxc_peer;
1443         LASSERT (tx->mxc_nid != 0);
1444
1445         if (peer != NULL) {
1446                 if (peer->mxp_incompatible &&
1447                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1448                         /* let this fail now */
1449                         tx->mxc_status.code = -ECONNABORTED;
1450                         mxlnd_conn_decref(peer->mxp_conn);
1451                         mxlnd_put_idle_tx(tx);
1452                         return;
1453                 }
1454                 if (tx->mxc_conn == NULL) {
1455                         int             ret     = 0;
1456                         struct kmx_conn *conn   = NULL;
1457
1458                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1459                         if (ret != 0) {
1460                                 tx->mxc_status.code = ret;
1461                                 mxlnd_put_idle_tx(tx);
1462                                 goto done;
1463                         }
1464                         tx->mxc_conn = conn;
1465                         mxlnd_peer_decref(peer); /* and takes it from peer */
1466                 }
1467                 LASSERT(tx->mxc_conn != NULL);
1468                 mxlnd_peer_queue_tx(tx);
1469                 mxlnd_check_sends(peer);
1470         } else {
1471                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1472                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1473                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1474                 up(&kmxlnd_data.kmx_tx_queue_sem);
1475         }
1476 done:
1477         return;
1478 }
1479
1480 int
1481 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1482 {
1483         int             i                       = 0;
1484         int             sum                     = 0;
1485         int             old_sum                 = 0;
1486         int             nseg                    = 0;
1487         int             first_iov               = -1;
1488         int             first_iov_offset        = 0;
1489         int             first_found             = 0;
1490         int             last_iov                = -1;
1491         int             last_iov_length         = 0;
1492         mx_ksegment_t  *seg                     = NULL;
1493
1494         if (niov == 0) return 0;
1495         LASSERT(iov != NULL);
1496
1497         for (i = 0; i < niov; i++) {
1498                 sum = old_sum + (u32) iov[i].iov_len;
1499                 if (!first_found && (sum > offset)) {
1500                         first_iov = i;
1501                         first_iov_offset = offset - old_sum;
1502                         first_found = 1;
1503                         sum = (u32) iov[i].iov_len - first_iov_offset;
1504                         old_sum = 0;
1505                 }
1506                 if (sum >= nob) {
1507                         last_iov = i;
1508                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1509                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1510                         break;
1511                 }
1512                 old_sum = sum;
1513         }
1514         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1515         nseg = last_iov - first_iov + 1;
1516         LASSERT(nseg > 0);
1517
1518         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1519         if (seg == NULL) {
1520                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1521                 return -1;
1522         }
1523         memset(seg, 0, nseg * sizeof(*seg));
1524         ctx->mxc_nseg = nseg;
1525         sum = 0;
1526         for (i = 0; i < nseg; i++) {
1527                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1528                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1529                 if (i == 0) {
1530                         seg[i].segment_ptr += (u64) first_iov_offset;
1531                         seg[i].segment_length -= (u32) first_iov_offset;
1532                 }
1533                 if (i == (nseg - 1)) {
1534                         seg[i].segment_length = (u32) last_iov_length;
1535                 }
1536                 sum += seg[i].segment_length;
1537         }
1538         ctx->mxc_seg_list = seg;
1539         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1540 #ifdef MX_PIN_FULLPAGES
1541         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1542 #endif
1543         LASSERT(nob == sum);
1544         return 0;
1545 }
1546
1547 int
1548 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1549 {
1550         int             i                       = 0;
1551         int             sum                     = 0;
1552         int             old_sum                 = 0;
1553         int             nseg                    = 0;
1554         int             first_kiov              = -1;
1555         int             first_kiov_offset       = 0;
1556         int             first_found             = 0;
1557         int             last_kiov               = -1;
1558         int             last_kiov_length        = 0;
1559         mx_ksegment_t  *seg                     = NULL;
1560
1561         if (niov == 0) return 0;
1562         LASSERT(kiov != NULL);
1563
1564         for (i = 0; i < niov; i++) {
1565                 sum = old_sum + kiov[i].kiov_len;
1566                 if (i == 0) sum -= kiov[i].kiov_offset;
1567                 if (!first_found && (sum > offset)) {
1568                         first_kiov = i;
1569                         first_kiov_offset = offset - old_sum;
1570                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1571                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1572                         first_found = 1;
1573                         sum = kiov[i].kiov_len - first_kiov_offset;
1574                         old_sum = 0;
1575                 }
1576                 if (sum >= nob) {
1577                         last_kiov = i;
1578                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1579                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1580                         break;
1581                 }
1582                 old_sum = sum;
1583         }
1584         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1585         nseg = last_kiov - first_kiov + 1;
1586         LASSERT(nseg > 0);
1587
1588         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1589         if (seg == NULL) {
1590                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1591                 return -1;
1592         }
1593         memset(seg, 0, niov * sizeof(*seg));
1594         ctx->mxc_nseg = niov;
1595         sum = 0;
1596         for (i = 0; i < niov; i++) {
1597                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1598                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1599                 if (i == 0) {
1600                         seg[i].segment_ptr += (u64) first_kiov_offset;
1601                         /* we have to add back the original kiov_offset */
1602                         seg[i].segment_length -= first_kiov_offset +
1603                                                  kiov[first_kiov].kiov_offset;
1604                 }
1605                 if (i == (nseg - 1)) {
1606                         seg[i].segment_length = last_kiov_length;
1607                 }
1608                 sum += seg[i].segment_length;
1609         }
1610         ctx->mxc_seg_list = seg;
1611         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1612 #ifdef MX_PIN_FULLPAGES
1613         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1614 #endif
1615         LASSERT(nob == sum);
1616         return 0;
1617 }
1618
1619 void
1620 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1621 {
1622         LASSERT(type == MXLND_MSG_PUT_ACK);
1623         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1624         tx->mxc_cookie = cookie;
1625         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1626         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1627         tx->mxc_match = mxlnd_create_match(tx, status);
1628
1629         mxlnd_queue_tx(tx);
1630 }
1631
1632
1633 /**
1634  * Get tx, map [k]iov, queue tx
1635  *
1636  * This setups the DATA send for PUT or GET.
1637  *
1638  * On success, it queues the tx, on failure it calls lnet_finalize()
1639  */
1640 void
1641 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1642 {
1643         int                     ret             = 0;
1644         lnet_process_id_t       target          = lntmsg->msg_target;
1645         unsigned int            niov            = lntmsg->msg_niov;
1646         struct iovec           *iov             = lntmsg->msg_iov;
1647         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1648         unsigned int            offset          = lntmsg->msg_offset;
1649         unsigned int            nob             = lntmsg->msg_len;
1650         struct kmx_ctx         *tx              = NULL;
1651
1652         LASSERT(lntmsg != NULL);
1653         LASSERT(peer != NULL);
1654         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1655         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1656
1657         tx = mxlnd_get_idle_tx();
1658         if (tx == NULL) {
1659                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1660                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1661                         libcfs_nid2str(target.nid));
1662                 goto failed_0;
1663         }
1664         tx->mxc_nid = target.nid;
1665         /* NOTE called when we have a ref on the conn, get one for this tx */
1666         mxlnd_conn_addref(peer->mxp_conn);
1667         tx->mxc_peer = peer;
1668         tx->mxc_conn = peer->mxp_conn;
1669         tx->mxc_msg_type = msg_type;
1670         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1671         tx->mxc_state = MXLND_CTX_PENDING;
1672         tx->mxc_lntmsg[0] = lntmsg;
1673         tx->mxc_cookie = cookie;
1674         tx->mxc_match = mxlnd_create_match(tx, 0);
1675
1676         /* This setups up the mx_ksegment_t to send the DATA payload  */
1677         if (nob == 0) {
1678                 /* do not setup the segments */
1679                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1680                                    "to %s?\n", libcfs_nid2str(target.nid));
1681                 ret = 0;
1682         } else if (kiov == NULL) {
1683                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1684         } else {
1685                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1686         }
1687         if (ret != 0) {
1688                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1689                                    libcfs_nid2str(target.nid));
1690                 tx->mxc_status.code = -EIO;
1691                 goto failed_1;
1692         }
1693         mxlnd_queue_tx(tx);
1694         return;
1695
1696 failed_1:
1697         mxlnd_conn_decref(peer->mxp_conn);
1698         mxlnd_put_idle_tx(tx);
1699         return;
1700
1701 failed_0:
1702         CDEBUG(D_NETERROR, "no tx avail\n");
1703         lnet_finalize(ni, lntmsg, -EIO);
1704         return;
1705 }
1706
1707 /**
1708  * Map [k]iov, post rx
1709  *
1710  * This setups the DATA receive for PUT or GET.
1711  *
1712  * On success, it returns 0, on failure it returns -1
1713  */
1714 int
1715 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1716 {
1717         int                     ret     = 0;
1718         lnet_process_id_t       target  = lntmsg->msg_target;
1719         unsigned int            niov    = lntmsg->msg_niov;
1720         struct iovec           *iov     = lntmsg->msg_iov;
1721         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1722         unsigned int            offset  = lntmsg->msg_offset;
1723         unsigned int            nob     = lntmsg->msg_len;
1724         mx_return_t             mxret   = MX_SUCCESS;
1725         u64                     mask    = ~(MXLND_ERROR_MASK);
1726
1727         /* above assumes MXLND_MSG_PUT_DATA */
1728         if (msg_type == MXLND_MSG_GET_DATA) {
1729                 niov = lntmsg->msg_md->md_niov;
1730                 iov = lntmsg->msg_md->md_iov.iov;
1731                 kiov = lntmsg->msg_md->md_iov.kiov;
1732                 offset = 0;
1733                 nob = lntmsg->msg_md->md_length;
1734         }
1735
1736         LASSERT(lntmsg != NULL);
1737         LASSERT(rx != NULL);
1738         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1739         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
1740
1741         rx->mxc_msg_type = msg_type;
1742         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1743         rx->mxc_state = MXLND_CTX_PENDING;
1744         rx->mxc_nid = target.nid;
1745         /* if posting a GET_DATA, we may not yet know the peer */
1746         if (rx->mxc_peer != NULL) {
1747                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1748         }
1749         rx->mxc_lntmsg[0] = lntmsg;
1750         rx->mxc_cookie = cookie;
1751         rx->mxc_match = mxlnd_create_match(rx, 0);
1752         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1753         if (kiov == NULL) {
1754                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1755         } else {
1756                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1757         }
1758         if (msg_type == MXLND_MSG_GET_DATA) {
1759                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1760                 if (rx->mxc_lntmsg[1] == NULL) {
1761                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1762                                            libcfs_nid2str(target.nid));
1763                         ret = -1;
1764                 }
1765         }
1766         if (ret != 0) {
1767                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1768                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1769                        libcfs_nid2str(target.nid));
1770                 return -1;
1771         }
1772         ret = mxlnd_q_pending_ctx(rx);
1773         if (ret == -1) {
1774                 return -1;
1775         }
1776         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1777         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1778                           rx->mxc_seg_list, rx->mxc_nseg,
1779                           rx->mxc_pin_type, rx->mxc_match,
1780                           mask, (void *) rx,
1781                           &rx->mxc_mxreq);
1782         if (mxret != MX_SUCCESS) {
1783                 if (rx->mxc_conn != NULL) {
1784                         mxlnd_deq_pending_ctx(rx);
1785                 }
1786                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1787                                    (int) mxret, libcfs_nid2str(target.nid));
1788                 return -1;
1789         }
1790
1791         return 0;
1792 }
1793
1794 /**
1795  * The LND required send function
1796  *
1797  * This must not block. Since we may not have a peer struct for the receiver,
1798  * it will append send messages on a global tx list. We will then up the
1799  * tx_queued's semaphore to notify it of the new send. 
1800  */
1801 int
1802 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1803 {
1804         int                     ret             = 0;
1805         int                     type            = lntmsg->msg_type;
1806         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1807         lnet_process_id_t       target          = lntmsg->msg_target;
1808         lnet_nid_t              nid             = target.nid;
1809         int                     target_is_router = lntmsg->msg_target_is_router;
1810         int                     routing         = lntmsg->msg_routing;
1811         unsigned int            payload_niov    = lntmsg->msg_niov;
1812         struct iovec           *payload_iov     = lntmsg->msg_iov;
1813         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1814         unsigned int            payload_offset  = lntmsg->msg_offset;
1815         unsigned int            payload_nob     = lntmsg->msg_len;
1816         struct kmx_ctx         *tx              = NULL;
1817         struct kmx_msg         *txmsg           = NULL;
1818         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1819         struct kmx_ctx         *rx_data         = NULL;
1820         struct kmx_conn        *conn            = NULL;
1821         int                     nob             = 0;
1822         uint32_t                length          = 0;
1823         struct kmx_peer         *peer           = NULL;
1824
1825         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1826                        payload_nob, payload_niov, libcfs_id2str(target));
1827
1828         LASSERT (payload_nob == 0 || payload_niov > 0);
1829         LASSERT (payload_niov <= LNET_MAX_IOV);
1830         /* payload is either all vaddrs or all pages */
1831         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1832
1833         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1834
1835         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1836          * to a new peer, use the nid */
1837         peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1838         if (peer != NULL) {
1839                 if (unlikely(peer->mxp_incompatible)) {
1840                         mxlnd_peer_decref(peer); /* drop ref taken above */
1841                 } else {
1842                         read_lock(&kmxlnd_data.kmx_global_lock);
1843                         conn = peer->mxp_conn;
1844                         if (conn) {
1845                                 mxlnd_conn_addref(conn);
1846                                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1847                         }
1848                         read_unlock(&kmxlnd_data.kmx_global_lock);
1849                 }
1850         }
1851         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
1852         if (conn == NULL && peer != NULL) {
1853                 CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1854                        peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1855         }
1856
1857         switch (type) {
1858         case LNET_MSG_ACK:
1859                 LASSERT (payload_nob == 0);
1860                 break;
1861
1862         case LNET_MSG_REPLY:
1863         case LNET_MSG_PUT:
1864                 /* Is the payload small enough not to need DATA? */
1865                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1866                 if (nob <= MXLND_EAGER_SIZE)
1867                         break;                  /* send EAGER */
1868
1869                 tx = mxlnd_get_idle_tx();
1870                 if (unlikely(tx == NULL)) {
1871                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1872                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1873                                libcfs_nid2str(nid));
1874                         if (conn) mxlnd_conn_decref(conn);
1875                         return -ENOMEM;
1876                 }
1877
1878                 /* the peer may be NULL */
1879                 tx->mxc_peer = peer;
1880                 tx->mxc_conn = conn; /* may be NULL */
1881                 /* we added a conn ref above */
1882                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1883                 txmsg = tx->mxc_msg;
1884                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1885                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1886                 tx->mxc_match = mxlnd_create_match(tx, 0);
1887
1888                 /* we must post a receive _before_ sending the request.
1889                  * we need to determine how much to receive, it will be either
1890                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1891
1892                 rx = mxlnd_get_idle_rx();
1893                 if (unlikely(rx == NULL)) {
1894                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1895                                            libcfs_nid2str(nid));
1896                         mxlnd_put_idle_tx(tx);
1897                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1898                         return -ENOMEM;
1899                 }
1900                 rx->mxc_nid = nid;
1901                 rx->mxc_peer = peer;
1902                 /* conn may be NULL but unlikely since the first msg is always small */
1903                 /* NOTE no need to lock peer before adding conn ref since we took
1904                  * a conn ref for the tx (it cannot be freed between there and here ) */
1905                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1906                 rx->mxc_conn = conn;
1907                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1908                 rx->mxc_cookie = tx->mxc_cookie;
1909                 rx->mxc_match = mxlnd_create_match(rx, 0);
1910
1911                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1912                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1913                 if (unlikely(ret != 0)) {
1914                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1915                                            libcfs_nid2str(nid));
1916                         rx->mxc_lntmsg[0] = NULL;
1917                         mxlnd_put_idle_rx(rx);
1918                         mxlnd_put_idle_tx(tx);
1919                         if (conn) {
1920                                 mxlnd_conn_decref(conn); /* for the rx... */
1921                                 mxlnd_conn_decref(conn); /* and for the tx */
1922                         }
1923                         return -EHOSTUNREACH;
1924                 }
1925
1926                 mxlnd_queue_tx(tx);
1927                 return 0;
1928
1929         case LNET_MSG_GET:
1930                 if (routing || target_is_router)
1931                         break;                  /* send EAGER */
1932
1933                 /* is the REPLY message too small for DATA? */
1934                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1935                 if (nob <= MXLND_EAGER_SIZE)
1936                         break;                  /* send EAGER */
1937
1938                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1939                  * then post GET_REQ tx */
1940                 tx = mxlnd_get_idle_tx();
1941                 if (unlikely(tx == NULL)) {
1942                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1943                                            libcfs_nid2str(nid));
1944                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1945                         return -ENOMEM;
1946                 }
1947                 rx_data = mxlnd_get_idle_rx();
1948                 if (unlikely(rx_data == NULL)) {
1949                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1950                                            libcfs_nid2str(nid));
1951                         mxlnd_put_idle_tx(tx);
1952                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1953                         return -ENOMEM;
1954                 }
1955                 rx_data->mxc_peer = peer;
1956                 /* NOTE no need to lock peer before adding conn ref since we took
1957                  * a conn ref for the tx (it cannot be freed between there and here ) */
1958                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1959                 rx_data->mxc_conn = conn; /* may be NULL */
1960
1961                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1962                 if (unlikely(ret != 0)) {
1963                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1964                                            libcfs_nid2str(nid));
1965                         mxlnd_put_idle_rx(rx_data);
1966                         mxlnd_put_idle_tx(tx);
1967                         if (conn) {
1968                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1969                                 mxlnd_conn_decref(conn); /* and for the tx */
1970                         }
1971                         return -EIO;
1972                 }
1973
1974                 tx->mxc_peer = peer;
1975                 tx->mxc_conn = conn; /* may be NULL */
1976                 /* conn ref taken above */
1977                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1978                 txmsg = tx->mxc_msg;
1979                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1980                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1981                 tx->mxc_match = mxlnd_create_match(tx, 0);
1982
1983                 mxlnd_queue_tx(tx);
1984                 return 0;
1985
1986         default:
1987                 LBUG();
1988                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1989                 return -EIO;
1990         }
1991
1992         /* send EAGER */
1993
1994         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1995                 <= MXLND_EAGER_SIZE);
1996
1997         tx = mxlnd_get_idle_tx();
1998         if (unlikely(tx == NULL)) {
1999                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2000                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2001                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2002                 return -ENOMEM;
2003         }
2004
2005         tx->mxc_peer = peer;
2006         tx->mxc_conn = conn; /* may be NULL */
2007         /* conn ref taken above */
2008         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2009         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2010         tx->mxc_match = mxlnd_create_match(tx, 0);
2011
2012         txmsg = tx->mxc_msg;
2013         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2014
2015         if (payload_kiov != NULL)
2016                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
2017                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2018                             payload_niov, payload_kiov, payload_offset, payload_nob);
2019         else
2020                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
2021                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2022                             payload_niov, payload_iov, payload_offset, payload_nob);
2023
2024         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2025         mxlnd_queue_tx(tx);
2026         return 0;
2027 }
2028
2029 /**
2030  * The LND required recv function
2031  *
2032  * This must not block.
2033  */
2034 int
2035 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2036              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2037              unsigned int offset, unsigned int mlen, unsigned int rlen)
2038 {
2039         int                     ret             = 0;
2040         int                     nob             = 0;
2041         int                     len             = 0;
2042         struct kmx_ctx          *rx             = private;
2043         struct kmx_msg          *rxmsg          = rx->mxc_msg;
2044         lnet_nid_t               nid            = rx->mxc_nid;
2045         struct kmx_ctx          *tx             = NULL;
2046         struct kmx_msg          *txmsg          = NULL;
2047         struct kmx_peer         *peer           = rx->mxc_peer;
2048         struct kmx_conn         *conn           = peer->mxp_conn;
2049         u64                      cookie         = 0ULL;
2050         int                      msg_type       = rxmsg->mxm_type;
2051         int                      repost         = 1;
2052         int                      credit         = 0;
2053         int                      finalize       = 0;
2054
2055         LASSERT (mlen <= rlen);
2056         /* Either all pages or all vaddrs */
2057         LASSERT (!(kiov != NULL && iov != NULL));
2058         LASSERT (peer != NULL);
2059
2060         /* conn_addref(conn) already taken for the primary rx */
2061
2062         switch (msg_type) {
2063         case MXLND_MSG_EAGER:
2064                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2065                 len = rx->mxc_status.xfer_length;
2066                 if (unlikely(nob > len)) {
2067                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2068                                            libcfs_nid2str(nid), nob, len);
2069                         ret = -EPROTO;
2070                         break;
2071                 }
2072
2073                 if (kiov != NULL)
2074                         lnet_copy_flat2kiov(niov, kiov, offset,
2075                                 MXLND_EAGER_SIZE, rxmsg,
2076                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2077                                 mlen);
2078                 else
2079                         lnet_copy_flat2iov(niov, iov, offset,
2080                                 MXLND_EAGER_SIZE, rxmsg,
2081                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2082                                 mlen);
2083                 finalize = 1;
2084                 credit = 1;
2085                 break;
2086
2087         case MXLND_MSG_PUT_REQ:
2088                 /* we are going to reuse the rx, store the needed info */
2089                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2090
2091                 /* get tx, post rx, send PUT_ACK */
2092
2093                 tx = mxlnd_get_idle_tx();
2094                 if (unlikely(tx == NULL)) {
2095                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2096                         /* Not replying will break the connection */
2097                         ret = -ENOMEM;
2098                         break;
2099                 }
2100                 if (unlikely(mlen == 0)) {
2101                         finalize = 1;
2102                         tx->mxc_peer = peer;
2103                         tx->mxc_conn = conn;
2104                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2105                         /* repost = 1 */
2106                         break;
2107                 }
2108
2109                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2110                 tx->mxc_peer = peer;
2111                 tx->mxc_conn = conn;
2112                 /* no need to lock peer first since we already have a ref */
2113                 mxlnd_conn_addref(conn); /* for the tx */
2114                 txmsg = tx->mxc_msg;
2115                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2116                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2117                 tx->mxc_cookie = cookie;
2118                 tx->mxc_match = mxlnd_create_match(tx, 0);
2119
2120                 /* we must post a receive _before_ sending the PUT_ACK */
2121                 mxlnd_ctx_init(rx);
2122                 rx->mxc_state = MXLND_CTX_PREP;
2123                 rx->mxc_peer = peer;
2124                 rx->mxc_conn = conn;
2125                 /* do not take another ref for this rx, it is already taken */
2126                 rx->mxc_nid = peer->mxp_nid;
2127                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2128                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2129
2130                 if (unlikely(ret != 0)) {
2131                         /* Notify peer that it's over */
2132                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2133                                            libcfs_nid2str(nid), ret);
2134                         mxlnd_ctx_init(tx);
2135                         tx->mxc_state = MXLND_CTX_PREP;
2136                         tx->mxc_peer = peer;
2137                         tx->mxc_conn = conn;
2138                         /* finalize = 0, let the PUT_ACK tx finalize this */
2139                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2140                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2141                         /* conn ref already taken above */
2142                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2143                         /* repost = 1 */
2144                         break;
2145                 }
2146
2147                 mxlnd_queue_tx(tx);
2148                 /* do not return a credit until after PUT_DATA returns */
2149                 repost = 0;
2150                 break;
2151
2152         case MXLND_MSG_GET_REQ:
2153                 if (likely(lntmsg != NULL)) {
2154                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2155                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2156                 } else {
2157                         /* GET didn't match anything */
2158                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2159                          * We have to embed the error code in the match bits.
2160                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2161                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2162
2163                         tx = mxlnd_get_idle_tx();
2164                         if (unlikely(tx == NULL)) {
2165                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2166                                                    libcfs_nid2str(nid));
2167                                 ret = -ENOMEM;
2168                                 break;
2169                         }
2170                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2171                         tx->mxc_state = MXLND_CTX_PENDING;
2172                         tx->mxc_nid = nid;
2173                         tx->mxc_peer = peer;
2174                         tx->mxc_conn = conn;
2175                         /* no need to lock peer first since we already have a ref */
2176                         mxlnd_conn_addref(conn); /* for this tx */
2177                         tx->mxc_cookie = cookie;
2178                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2179                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2180                         mxlnd_queue_tx(tx);
2181                 }
2182                 /* finalize lntmsg after tx completes */
2183                 break;
2184
2185         default:
2186                 LBUG();
2187         }
2188
2189         if (repost) {
2190                 /* we received a message, increment peer's outstanding credits */
2191                 if (credit == 1) {
2192                         spin_lock(&conn->mxk_lock);
2193                         conn->mxk_outstanding++;
2194                         spin_unlock(&conn->mxk_lock);
2195                 }
2196                 /* we are done with the rx */
2197                 mxlnd_put_idle_rx(rx);
2198                 mxlnd_conn_decref(conn);
2199         }
2200
2201         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2202
2203         /* we received a credit, see if we can use it to send a msg */
2204         if (credit) mxlnd_check_sends(peer);
2205
2206         return ret;
2207 }
2208
2209 void
2210 mxlnd_sleep(unsigned long timeout)
2211 {
2212         set_current_state(TASK_INTERRUPTIBLE);
2213         schedule_timeout(timeout);
2214         return;
2215 }
2216
2217 /**
2218  * The generic send queue thread
2219  * \param arg  thread id (as a void *)
2220  *
2221  * This thread moves send messages from the global tx_queue to the owning
2222  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2223  * it to the global peer list.
2224  */
2225 int
2226 mxlnd_tx_queued(void *arg)
2227 {
2228         long                    id      = (long) arg;
2229         int                     ret     = 0;
2230         int                     found   = 0;
2231         struct kmx_ctx         *tx      = NULL;
2232         struct kmx_peer        *peer    = NULL;
2233         struct list_head       *tmp_tx  = NULL;
2234
2235         cfs_daemonize("mxlnd_tx_queued");
2236         //cfs_block_allsigs();
2237
2238         while (!kmxlnd_data.kmx_shutdown) {
2239                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2240                 if (kmxlnd_data.kmx_shutdown)
2241                         break;
2242                 if (ret != 0) // Should we check for -EINTR?
2243                         continue;
2244                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2245                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2246                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2247                         continue;
2248                 }
2249                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2250                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2251                 list_del_init(&tx->mxc_list);
2252                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2253
2254                 found = 0;
2255                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2256                 if (peer != NULL) {
2257                         tx->mxc_peer = peer;
2258                         write_lock(&kmxlnd_data.kmx_global_lock);
2259                         if (peer->mxp_conn == NULL) {
2260                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2261                                 if (ret != 0) {
2262                                         /* out of memory, give up and fail tx */
2263                                         tx->mxc_status.code = -ENOMEM;
2264                                         write_unlock(&kmxlnd_data.kmx_global_lock);
2265                                         mxlnd_peer_decref(peer);
2266                                         mxlnd_put_idle_tx(tx);
2267                                         continue;
2268                                 }
2269                         }
2270                         tx->mxc_conn = peer->mxp_conn;
2271                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2272                         write_unlock(&kmxlnd_data.kmx_global_lock);
2273                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2274                         mxlnd_queue_tx(tx);
2275                         found = 1;
2276                 }
2277                 if (found == 0) {
2278                         int              hash   = 0;
2279                         struct kmx_peer *peer = NULL;
2280                         struct kmx_peer *old = NULL;
2281
2282                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2283
2284                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2285                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2286                         /* create peer */
2287                         /* adds conn ref for this function */
2288                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2289                                         *kmxlnd_tunables.kmx_board,
2290                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2291                         if (ret != 0) {
2292                                 /* finalize message */
2293                                 tx->mxc_status.code = ret;
2294                                 mxlnd_put_idle_tx(tx);
2295                                 continue;
2296                         }
2297                         tx->mxc_peer = peer;
2298                         tx->mxc_conn = peer->mxp_conn;
2299                         /* this tx will keep the conn ref taken in peer_alloc() */
2300
2301                         /* add peer to global peer list, but look to see
2302                          * if someone already created it after we released
2303                          * the read lock */
2304                         write_lock(&kmxlnd_data.kmx_global_lock);
2305                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2306                                 if (old->mxp_nid == peer->mxp_nid) {
2307                                         /* somebody beat us here, we created a duplicate */
2308                                         found = 1;
2309                                         break;
2310                                 }
2311                         }
2312
2313                         if (found == 0) {
2314                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2315                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2316                         } else {
2317                                 tx->mxc_peer = old;
2318                                 tx->mxc_conn = old->mxp_conn;
2319                                 /* FIXME can conn be NULL? */
2320                                 LASSERT(old->mxp_conn != NULL);
2321                                 mxlnd_conn_addref(old->mxp_conn);
2322                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2323                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2324                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2325                                 mxlnd_peer_decref(peer);
2326                         }
2327                         write_unlock(&kmxlnd_data.kmx_global_lock);
2328
2329                         mxlnd_queue_tx(tx);
2330                 }
2331         }
2332         mxlnd_thread_stop(id);
2333         return 0;
2334 }
2335
2336 /* When calling this, we must not have the peer lock. */
2337 void
2338 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2339 {
2340         mx_return_t     mxret           = MX_SUCCESS;
2341         mx_request_t    request;
2342         struct kmx_conn *conn           = peer->mxp_conn;
2343         u8              msg_type        = (u8) MXLND_MSG_TYPE(mask);
2344
2345         /* NOTE we are holding a conn ref every time we call this function,
2346          * we do not need to lock the peer before taking another ref */
2347         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2348
2349         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2350
2351         if (peer->mxp_reconnect_time == 0) {
2352                 peer->mxp_reconnect_time = jiffies;
2353         }
2354
2355         if (peer->mxp_nic_id == 0ULL) {
2356                 int             ret     = 0;
2357
2358                 ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id);
2359                 if (ret == 0) {
2360                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2361                 }
2362                 if (peer->mxp_nic_id == 0ULL) {
2363                         /* not mapped yet, return */
2364                         spin_lock(&conn->mxk_lock);
2365                         conn->mxk_status = MXLND_CONN_INIT;
2366                         spin_unlock(&conn->mxk_lock);
2367                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2368                                 /* give up and notify LNET */
2369                                 mxlnd_conn_disconnect(conn, 0, 0);
2370                                 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2371                                                                             function... */
2372                                 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2373                                                                       longer need */
2374                         }
2375                         mxlnd_conn_decref(conn);
2376                         return;
2377                 }
2378         }
2379
2380         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2381                             peer->mxp_ep_id, MXLND_MSG_MAGIC, mask,
2382                             (void *) peer, &request);
2383         if (unlikely(mxret != MX_SUCCESS)) {
2384                 spin_lock(&conn->mxk_lock);
2385                 conn->mxk_status = MXLND_CONN_FAIL;
2386                 spin_unlock(&conn->mxk_lock);
2387                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2388                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2389                 mxlnd_conn_decref(conn);
2390         }
2391         return;
2392 }
2393
2394 #define MXLND_STATS 0
2395
2396 int
2397 mxlnd_check_sends(struct kmx_peer *peer)
2398 {
2399         int                     ret             = 0;
2400         int                     found           = 0;
2401         mx_return_t             mxret           = MX_SUCCESS;
2402         struct kmx_ctx          *tx             = NULL;
2403         struct kmx_conn         *conn           = NULL;
2404         u8                      msg_type        = 0;
2405         int                     credit          = 0;
2406         int                     status          = 0;
2407         int                     ntx_posted      = 0;
2408         int                     credits         = 0;
2409 #if MXLND_STATS
2410         static unsigned long    last            = 0;
2411 #endif
2412
2413         if (unlikely(peer == NULL)) {
2414                 LASSERT(peer != NULL);
2415                 return -1;
2416         }
2417         write_lock(&kmxlnd_data.kmx_global_lock);
2418         conn = peer->mxp_conn;
2419         /* NOTE take a ref for the duration of this function since it is called
2420          * when there might not be any queued txs for this peer */
2421         if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2422         write_unlock(&kmxlnd_data.kmx_global_lock);
2423
2424         /* do not add another ref for this tx */
2425
2426         if (conn == NULL) {
2427                 /* we do not have any conns */
2428                 return -1;
2429         }
2430
2431 #if MXLND_STATS
2432         if (time_after(jiffies, last)) {
2433                 last = jiffies + HZ;
2434                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2435                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2436                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2437                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2438                               conn->mxk_ntx_data, conn->mxk_data_posted);
2439         }
2440 #endif
2441
2442         /* cache peer state for asserts */
2443         spin_lock(&conn->mxk_lock);
2444         ntx_posted = conn->mxk_ntx_posted;
2445         credits = conn->mxk_credits;
2446         spin_unlock(&conn->mxk_lock);
2447
2448         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2449         LASSERT(ntx_posted >= 0);
2450
2451         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2452         LASSERT(credits >= 0);
2453
2454         /* check number of queued msgs, ignore data */
2455         spin_lock(&conn->mxk_lock);
2456         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2457                 /* check if any txs queued that could return credits... */
2458                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2459                         /* if not, send a NOOP */
2460                         tx = mxlnd_get_idle_tx();
2461                         if (likely(tx != NULL)) {
2462                                 tx->mxc_peer = peer;
2463                                 tx->mxc_conn = peer->mxp_conn;
2464                                 mxlnd_conn_addref(conn); /* for this tx */
2465                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2466                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2467                                 mxlnd_peer_queue_tx_locked(tx);
2468                                 found = 1;
2469                                 goto done_locked;
2470                         }
2471                 }
2472         }
2473         spin_unlock(&conn->mxk_lock);
2474
2475         /* if the peer is not ready, try to connect */
2476         spin_lock(&conn->mxk_lock);
2477         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2478             conn->mxk_status == MXLND_CONN_FAIL ||
2479             conn->mxk_status == MXLND_CONN_REQ)) {
2480                 u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET;
2481                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2482                 conn->mxk_status = MXLND_CONN_WAIT;
2483                 spin_unlock(&conn->mxk_lock);
2484                 mxlnd_iconnect(peer, match);
2485                 goto done;
2486         }
2487         spin_unlock(&conn->mxk_lock);
2488
2489         spin_lock(&conn->mxk_lock);
2490         while (!list_empty(&conn->mxk_tx_free_queue) ||
2491                !list_empty(&conn->mxk_tx_credit_queue)) {
2492                 /* We have something to send. If we have a queued tx that does not
2493                  * require a credit (free), choose it since its completion will 
2494                  * return a credit (here or at the peer), complete a DATA or 
2495                  * CONN_REQ or CONN_ACK. */
2496                 struct list_head *tmp_tx = NULL;
2497                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2498                         tmp_tx = &conn->mxk_tx_free_queue;
2499                 } else {
2500                         tmp_tx = &conn->mxk_tx_credit_queue;
2501                 }
2502                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2503
2504                 msg_type = tx->mxc_msg_type;
2505
2506                 /* don't try to send a rx */
2507                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2508
2509                 /* ensure that it is a valid msg type */
2510                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2511                         msg_type == MXLND_MSG_CONN_ACK ||
2512                         msg_type == MXLND_MSG_NOOP     ||
2513                         msg_type == MXLND_MSG_EAGER    ||
2514                         msg_type == MXLND_MSG_PUT_REQ  ||
2515                         msg_type == MXLND_MSG_PUT_ACK  ||
2516                         msg_type == MXLND_MSG_PUT_DATA ||
2517                         msg_type == MXLND_MSG_GET_REQ  ||
2518                         msg_type == MXLND_MSG_GET_DATA);
2519                 LASSERT(tx->mxc_peer == peer);
2520                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2521
2522                 credit = mxlnd_tx_requires_credit(tx);
2523                 if (credit) {
2524
2525                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2526                                 CDEBUG(D_NET, "%s: posted enough\n",
2527                                               libcfs_nid2str(peer->mxp_nid));
2528                                 goto done_locked;
2529                         }
2530
2531                         if (conn->mxk_credits == 0) {
2532                                 CDEBUG(D_NET, "%s: no credits\n",
2533                                               libcfs_nid2str(peer->mxp_nid));
2534                                 goto done_locked;
2535                         }
2536
2537                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2538                             conn->mxk_outstanding == 0) {  /* giving back credits */
2539                                 CDEBUG(D_NET, "%s: not using last credit\n",
2540                                               libcfs_nid2str(peer->mxp_nid));
2541                                 goto done_locked;
2542                         }
2543                 }
2544
2545                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2546                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2547                                 msg_type == MXLND_MSG_CONN_ACK)) {
2548                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2549                                              mxlnd_connstatus_to_str(conn->mxk_status),
2550                                              tx->mxc_cookie,
2551                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2552                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2553                                         list_del_init(&tx->mxc_list);
2554                                         tx->mxc_status.code = -ECONNABORTED;
2555                                         mxlnd_put_idle_tx(tx);
2556                                         mxlnd_conn_decref(conn);
2557                                 }
2558                                 goto done_locked;
2559                         }
2560                 }
2561
2562                 list_del_init(&tx->mxc_list);
2563
2564                 /* handle credits, etc now while we have the lock to avoid races */
2565                 if (credit) {
2566                         conn->mxk_credits--;
2567                         conn->mxk_ntx_posted++;
2568                 }
2569                 if (msg_type != MXLND_MSG_PUT_DATA &&
2570                     msg_type != MXLND_MSG_GET_DATA) {
2571                         if (msg_type != MXLND_MSG_CONN_REQ &&
2572                             msg_type != MXLND_MSG_CONN_ACK) {
2573                                 conn->mxk_ntx_msgs--;
2574                         }
2575                 }
2576                 if (tx->mxc_incarnation == 0 &&
2577                     conn->mxk_incarnation != 0) {
2578                         tx->mxc_incarnation = conn->mxk_incarnation;
2579                 }
2580                 spin_unlock(&conn->mxk_lock);
2581
2582                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2583                  * or (2) there is a non-DATA msg that can return credits in the 
2584                  * queue, then drop this duplicate NOOP */
2585                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2586                         spin_lock(&conn->mxk_lock);
2587                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2588                             (conn->mxk_ntx_msgs >= 1)) {
2589                                 conn->mxk_credits++;
2590                                 conn->mxk_ntx_posted--;
2591                                 spin_unlock(&conn->mxk_lock);
2592                                 /* redundant NOOP */
2593                                 mxlnd_put_idle_tx(tx);
2594                                 mxlnd_conn_decref(conn);
2595                                 CDEBUG(D_NET, "%s: redundant noop\n",
2596                                               libcfs_nid2str(peer->mxp_nid));
2597                                 found = 1;
2598                                 goto done;
2599                         }
2600                         spin_unlock(&conn->mxk_lock);
2601                 }
2602
2603                 found = 1;
2604                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2605                     (msg_type != MXLND_MSG_GET_DATA))) {
2606                         mxlnd_pack_msg(tx);
2607                 }
2608
2609                 //ret = -ECONNABORTED;
2610                 mxret = MX_SUCCESS;
2611
2612                 spin_lock(&conn->mxk_lock);
2613                 status = conn->mxk_status;
2614                 spin_unlock(&conn->mxk_lock);
2615
2616                 if (likely((status == MXLND_CONN_READY) ||
2617                     (msg_type == MXLND_MSG_CONN_REQ) ||
2618                     (msg_type == MXLND_MSG_CONN_ACK))) {
2619                         ret = 0;
2620                         if (msg_type != MXLND_MSG_CONN_REQ &&
2621                             msg_type != MXLND_MSG_CONN_ACK) {
2622                                 /* add to the pending list */
2623                                 ret = mxlnd_q_pending_ctx(tx);
2624                                 if (ret == -1) {
2625                                         /* FIXME the conn is disconnected, now what? */
2626                                 }
2627                         } else {
2628                                 /* CONN_REQ/ACK */
2629                                 tx->mxc_state = MXLND_CTX_PENDING;
2630                         }
2631
2632                         if (ret == 0) {
2633                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2634                                     msg_type != MXLND_MSG_GET_DATA)) {
2635                                         /* send a msg style tx */
2636                                         LASSERT(tx->mxc_nseg == 1);
2637                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2638                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2639                                                mxlnd_msgtype_to_str(msg_type),
2640                                                tx->mxc_cookie);
2641                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2642                                                           &tx->mxc_seg,
2643                                                           tx->mxc_nseg,
2644                                                           tx->mxc_pin_type,
2645                                                           conn->mxk_epa,
2646                                                           tx->mxc_match,
2647                                                           (void *) tx,
2648                                                           &tx->mxc_mxreq);
2649                                 } else {
2650                                         /* send a DATA tx */
2651                                         spin_lock(&conn->mxk_lock);
2652                                         conn->mxk_ntx_data--;
2653                                         conn->mxk_data_posted++;
2654                                         spin_unlock(&conn->mxk_lock);
2655                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2656                                                mxlnd_msgtype_to_str(msg_type),
2657                                                tx->mxc_cookie);
2658                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2659                                                           tx->mxc_seg_list,
2660                                                           tx->mxc_nseg,
2661                                                           tx->mxc_pin_type,
2662                                                           conn->mxk_epa,
2663                                                           tx->mxc_match,
2664                                                           (void *) tx,
2665                                                           &tx->mxc_mxreq);
2666                                 }
2667                         } else {
2668                                 mxret = MX_CONNECTION_FAILED;
2669                         }
2670                         if (likely(mxret == MX_SUCCESS)) {
2671                                 ret = 0;
2672                         } else {
2673                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2674                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2675                                        libcfs_nid2str(peer->mxp_nid));
2676                                 /* NOTE mx_kisend() only fails if there are not enough 
2677                                 * resources. Do not change the connection status. */
2678                                 if (mxret == MX_NO_RESOURCES) {
2679                                         tx->mxc_status.code = -ENOMEM;
2680                                 } else {
2681                                         tx->mxc_status.code = -ECONNABORTED;
2682                                 }
2683                                 if (credit) {
2684                                         spin_lock(&conn->mxk_lock);
2685                                         conn->mxk_ntx_posted--;
2686                                         conn->mxk_credits++;
2687                                         spin_unlock(&conn->mxk_lock);
2688                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2689                                         msg_type == MXLND_MSG_GET_DATA) {
2690                                         spin_lock(&conn->mxk_lock);
2691                                         conn->mxk_data_posted--;
2692                                         spin_unlock(&conn->mxk_lock);
2693                                 }
2694                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2695                                     msg_type != MXLND_MSG_GET_DATA &&
2696                                     msg_type != MXLND_MSG_CONN_REQ &&
2697                                     msg_type != MXLND_MSG_CONN_ACK) {
2698                                         spin_lock(&conn->mxk_lock);
2699                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2700                                         spin_unlock(&conn->mxk_lock);
2701                                 }
2702                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2703                                     msg_type != MXLND_MSG_CONN_ACK) {
2704                                         /* remove from the pending list */
2705                                         mxlnd_deq_pending_ctx(tx);
2706                                 }
2707                                 mxlnd_put_idle_tx(tx);
2708                                 mxlnd_conn_decref(conn);
2709                         }
2710                 }
2711                 spin_lock(&conn->mxk_lock);
2712         }
2713 done_locked:
2714         spin_unlock(&conn->mxk_lock);
2715 done:
2716         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2717         return found;
2718 }
2719
2720
2721 /**
2722  * A tx completed, progress or complete the msg
2723  * \param tx  the tx descriptor
2724  * 
2725  * Determine which type of send request it was and start the next step, if needed,
2726  * or, if done, signal completion to LNET. After we are done, put back on the
2727  * idle tx list.
2728  */
2729 void
2730 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2731 {
2732         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2733         struct kmx_msg  *msg    = tx->mxc_msg;
2734         struct kmx_peer *peer   = tx->mxc_peer;
2735         struct kmx_conn *conn   = tx->mxc_conn;
2736         u8              type    = tx->mxc_msg_type;
2737         int             credit  = mxlnd_tx_requires_credit(tx);
2738         u64             cookie  = tx->mxc_cookie;
2739
2740         CDEBUG(D_NET, "entering %s (0x%llx):\n",
2741                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2742
2743         if (unlikely(conn == NULL)) {
2744                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2745                 conn = peer->mxp_conn;
2746                 if (conn != NULL) {
2747                         /* do not add a ref for the tx, it was set before sending */
2748                         tx->mxc_conn = conn;
2749                         tx->mxc_peer = conn->mxk_peer;
2750                 }
2751         }
2752         LASSERT (peer != NULL);
2753         LASSERT (conn != NULL);
2754
2755         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2756                 LASSERT (type == msg->mxm_type);
2757         }
2758
2759         if (failed) {
2760                 tx->mxc_status.code = -EIO;
2761         } else {
2762                 spin_lock(&conn->mxk_lock);
2763                 conn->mxk_last_tx = jiffies;
2764                 spin_unlock(&conn->mxk_lock);
2765         }
2766
2767         switch (type) {
2768
2769         case MXLND_MSG_GET_DATA:
2770                 spin_lock(&conn->mxk_lock);
2771                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2772                         conn->mxk_outstanding++;
2773                         conn->mxk_data_posted--;
2774                 }
2775                 spin_unlock(&conn->mxk_lock);
2776                 break;
2777
2778         case MXLND_MSG_PUT_DATA:
2779                 spin_lock(&conn->mxk_lock);
2780                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2781                         conn->mxk_data_posted--;
2782                 }
2783                 spin_unlock(&conn->mxk_lock);
2784                 break;
2785
2786         case MXLND_MSG_NOOP:
2787         case MXLND_MSG_PUT_REQ:
2788         case MXLND_MSG_PUT_ACK:
2789         case MXLND_MSG_GET_REQ:
2790         case MXLND_MSG_EAGER:
2791         //case MXLND_MSG_NAK:
2792                 break;
2793
2794         case MXLND_MSG_CONN_ACK:
2795                 if (peer->mxp_incompatible) {
2796                         /* we sent our params, now close this conn */
2797                         mxlnd_conn_disconnect(conn, 0, 1);
2798                 }
2799         case MXLND_MSG_CONN_REQ:
2800                 if (failed) {
2801                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2802                                "failed with %s (%d) to %s\n",
2803                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2804                                mx_strstatus(tx->mxc_status.code),
2805                                tx->mxc_status.code,
2806                                libcfs_nid2str(tx->mxc_nid));
2807                         if (!peer->mxp_incompatible) {
2808                                 spin_lock(&conn->mxk_lock);
2809                                 conn->mxk_status = MXLND_CONN_FAIL;
2810                                 spin_unlock(&conn->mxk_lock);
2811                         }
2812                 }
2813                 break;
2814
2815         default:
2816                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2817                 LBUG();
2818         }
2819
2820         if (credit) {
2821                 spin_lock(&conn->mxk_lock);
2822                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2823                         conn->mxk_ntx_posted--;
2824                 }
2825                 spin_unlock(&conn->mxk_lock);
2826         }
2827
2828         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2829         mxlnd_put_idle_tx(tx);
2830         mxlnd_conn_decref(conn);
2831
2832         mxlnd_check_sends(peer);
2833
2834         return;
2835 }
2836
2837 void
2838 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2839 {
2840         int                     ret             = 0;
2841         int                     repost          = 1;
2842         int                     credit          = 1;
2843         u32                     nob             = rx->mxc_status.xfer_length;
2844         u64                     bits            = rx->mxc_status.match_info;
2845         struct kmx_msg         *msg             = rx->mxc_msg;
2846         struct kmx_peer        *peer            = rx->mxc_peer;
2847         struct kmx_conn        *conn            = rx->mxc_conn;
2848         u8                      type            = rx->mxc_msg_type;
2849         u64                     seq             = 0ULL;
2850         lnet_msg_t             *lntmsg[2];
2851         int                     result          = 0;
2852         u64                     nic_id          = 0ULL;
2853         u32                     ep_id           = 0;
2854         u32                     sid             = 0;
2855         int                     peer_ref        = 0;
2856         int                     conn_ref        = 0;
2857         int                     incompatible    = 0;
2858         u64                     match           = 0ULL;
2859
2860         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2861          * failed GET reply, CONN_REQ, or a CONN_ACK */
2862
2863         /* NOTE peer may still be NULL if it is a new peer and
2864          *      conn may be NULL if this is a re-connect */
2865         if (likely(peer != NULL && conn != NULL)) {
2866                 /* we have a reference on the conn */
2867                 conn_ref = 1;
2868         } else if (peer != NULL && conn == NULL) {
2869                 /* we have a reference on the peer */
2870                 peer_ref = 1;
2871         } else if (peer == NULL && conn != NULL) {
2872                 /* fatal error */
2873                 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2874                 LBUG();
2875         } /* else peer and conn == NULL */
2876
2877 #if 0
2878         if (peer == NULL || conn == NULL) {
2879                 /* if the peer was disconnected, the peer may exist but
2880                  * not have any valid conns */
2881                 decref = 0; /* no peer means no ref was taken for this rx */
2882         }
2883 #endif
2884
2885         if (conn == NULL && peer != NULL) {
2886                 write_lock(&kmxlnd_data.kmx_global_lock);
2887                 conn = peer->mxp_conn;
2888                 if (conn) {
2889                         mxlnd_conn_addref(conn); /* conn takes ref... */
2890                         mxlnd_peer_decref(peer); /* from peer */
2891                         conn_ref = 1;
2892                         peer_ref = 0;
2893                 }
2894                 write_unlock(&kmxlnd_data.kmx_global_lock);
2895                 rx->mxc_conn = conn;
2896         }
2897
2898 #if MXLND_DEBUG
2899         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2900 #endif
2901
2902         lntmsg[0] = NULL;
2903         lntmsg[1] = NULL;
2904
2905         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2906                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2907                                    libcfs_nid2str(rx->mxc_nid),
2908                                    mx_strstatus(rx->mxc_status.code),
2909                                    (int) rx->mxc_status.code);
2910                 credit = 0;
2911                 goto cleanup;
2912         }
2913
2914         if (nob == 0) {
2915                 /* this may be a failed GET reply */
2916                 if (type == MXLND_MSG_GET_DATA) {
2917                         /* get the error (52-59) bits from the match bits */
2918                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
2919                         lntmsg[0] = rx->mxc_lntmsg[0];
2920                         result = -ret;
2921                         goto cleanup;
2922                 } else {
2923                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2924                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2925                                            libcfs_nid2str(rx->mxc_nid));
2926                         goto cleanup;
2927                 }
2928         }
2929
2930         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2931         if (type == MXLND_MSG_PUT_DATA) {
2932                 result = rx->mxc_status.code;
2933                 lntmsg[0] = rx->mxc_lntmsg[0];
2934                 goto cleanup;
2935         } else if (type == MXLND_MSG_GET_DATA) {
2936                 result = rx->mxc_status.code;
2937                 lntmsg[0] = rx->mxc_lntmsg[0];
2938                 lntmsg[1] = rx->mxc_lntmsg[1];
2939                 goto cleanup;
2940         }
2941
2942         ret = mxlnd_unpack_msg(msg, nob);
2943         if (ret != 0) {
2944                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2945                                    ret, libcfs_nid2str(rx->mxc_nid));
2946                 goto cleanup;
2947         }
2948         rx->mxc_nob = nob;
2949         type = msg->mxm_type;
2950         seq = msg->mxm_seq;
2951
2952         if (type != MXLND_MSG_CONN_REQ &&
2953             (rx->mxc_nid != msg->mxm_srcnid ||
2954              kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2955                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2956                        "0x%llx and rx msg dst is 0x%llx)\n",
2957                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2958                        msg->mxm_dstnid);
2959                 goto cleanup;
2960         }
2961
2962         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2963                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2964                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2965                         if (conn != NULL) {
2966                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2967                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2968                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2969                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2970                                        msg->mxm_srcstamp, conn->mxk_incarnation,
2971                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2972                         } else {
2973                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2974                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2975                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2976                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2977                         }
2978                         credit = 0;
2979                         goto cleanup;
2980                 }
2981         }
2982
2983         CDEBUG(D_NET, "Received %s with %d credits\n",
2984                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
2985
2986         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2987             msg->mxm_type != MXLND_MSG_CONN_ACK) {
2988                 LASSERT(peer != NULL);
2989                 LASSERT(conn != NULL);
2990                 if (msg->mxm_credits != 0) {
2991                         spin_lock(&conn->mxk_lock);
2992                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2993                                 if ((conn->mxk_credits + msg->mxm_credits) > 
2994                                      *kmxlnd_tunables.kmx_credits) {
2995                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
2996                                                conn->mxk_credits, msg->mxm_credits);
2997                                 }
2998                                 conn->mxk_credits += msg->mxm_credits;
2999                                 LASSERT(conn->mxk_credits >= 0);
3000                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
3001                         }
3002                         spin_unlock(&conn->mxk_lock);
3003                 }
3004         }
3005
3006         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3007         switch (type) {
3008         case MXLND_MSG_NOOP:
3009                 break;
3010
3011         case MXLND_MSG_EAGER:
3012                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3013                                         msg->mxm_srcnid, rx, 0);
3014                 repost = ret < 0;
3015                 break;
3016
3017         case MXLND_MSG_PUT_REQ:
3018                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3019                                         msg->mxm_srcnid, rx, 1);
3020                 repost = ret < 0;
3021                 break;
3022
3023         case MXLND_MSG_PUT_ACK: {
3024                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3025                 if (cookie > MXLND_MAX_COOKIE) {
3026                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3027                                            libcfs_nid2str(rx->mxc_nid));
3028                         result = -((u32) MXLND_ERROR_VAL(cookie));
3029                         lntmsg[0] = rx->mxc_lntmsg[0];
3030                 } else {
3031                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3032                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3033                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3034                 }
3035                 /* repost == 1 */
3036                 break;
3037         }
3038         case MXLND_MSG_GET_REQ:
3039                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3040                                         msg->mxm_srcnid, rx, 1);
3041                 repost = ret < 0;
3042                 break;
3043
3044         case MXLND_MSG_CONN_REQ:
3045                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3046                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3047                                         libcfs_nid2str(msg->mxm_srcnid),
3048                                         libcfs_nid2str(msg->mxm_dstnid));
3049                         goto cleanup;
3050                 }
3051                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3052                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3053                                     "%d (%d wanted)\n",
3054                                         libcfs_nid2str(msg->mxm_srcnid),
3055                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3056                                         *kmxlnd_tunables.kmx_credits);
3057                         incompatible = 1;
3058                 }
3059                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3060                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3061                                     "%d (%d wanted)\n",
3062                                         libcfs_nid2str(msg->mxm_srcnid),
3063                                         msg->mxm_u.conn_req.mxcrm_eager_size,
3064                                         (int) MXLND_EAGER_SIZE);
3065                         incompatible = 1;
3066                 }
3067                 mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid);
3068                 if (peer == NULL) {
3069                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3070                         if (peer == NULL) {
3071                                 int             hash    = 0;
3072                                 struct kmx_peer *existing_peer    = NULL;
3073                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3074
3075                                 rx->mxc_nid = msg->mxm_srcnid;
3076
3077                                 /* adds conn ref for peer and one for this function */
3078                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3079                                                 *kmxlnd_tunables.kmx_board,
3080                                                 *kmxlnd_tunables.kmx_ep_id, 0ULL);
3081                                 if (ret != 0) {
3082                                         goto cleanup;
3083                                 }
3084                                 peer->mxp_sid = sid;
3085                                 LASSERT(peer->mxp_ep_id == ep_id);
3086                                 write_lock(&kmxlnd_data.kmx_global_lock);
3087                                 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3088                                 if (existing_peer) {
3089                                         mxlnd_conn_decref(peer->mxp_conn);
3090                                         mxlnd_peer_decref(peer);
3091                                         peer = existing_peer;
3092                                         mxlnd_conn_addref(peer->mxp_conn);
3093                                 } else {
3094                                         list_add_tail(&peer->mxp_peers,
3095                                                       &kmxlnd_data.kmx_peers[hash]);
3096                                         atomic_inc(&kmxlnd_data.kmx_npeers);
3097                                 }
3098                                 write_unlock(&kmxlnd_data.kmx_global_lock);
3099                         } else {
3100                                 /* FIXME should write lock here */
3101                                 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3102                                 mxlnd_peer_decref(peer); /* drop ref taken above */
3103                                 if (ret != 0) {
3104                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3105                                         goto cleanup;
3106                                 }
3107                         }
3108                         conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3109                         conn = peer->mxp_conn;
3110                 } else { /* found peer */
3111                         struct kmx_conn *old_conn       = conn;
3112
3113                         if (sid != peer->mxp_sid) {
3114                                 /* do not call mx_disconnect() or send a BYE */
3115                                 mxlnd_conn_disconnect(old_conn, 0, 0);
3116
3117                                 /* the ref for this rx was taken on the old_conn */
3118                                 mxlnd_conn_decref(old_conn);
3119
3120                                 /* This allocs a conn, points peer->mxp_conn to this one.
3121                                 * The old conn is still on the peer->mxp_conns list.
3122                                 * As the pending requests complete, they will call
3123                                 * conn_decref() which will eventually free it. */
3124                                 ret = mxlnd_conn_alloc(&conn, peer);
3125                                 if (ret != 0) {
3126                                         CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3127                                         goto cleanup;
3128                                 }
3129                                 /* conn_alloc() adds one ref for the peer and one
3130                                  * for this function */
3131                                 conn_ref = 1;
3132
3133                                 peer->mxp_sid = sid;
3134                         }
3135                 }
3136                 write_lock(&kmxlnd_data.kmx_global_lock);
3137                 peer->mxp_incarnation = msg->mxm_srcstamp;
3138                 peer->mxp_incompatible = incompatible;
3139                 write_unlock(&kmxlnd_data.kmx_global_lock);
3140                 spin_lock(&conn->mxk_lock);
3141                 conn->mxk_incarnation = msg->mxm_srcstamp;
3142                 conn->mxk_status = MXLND_CONN_WAIT;
3143                 spin_unlock(&conn->mxk_lock);
3144
3145                 /* handle_conn_ack() will create the CONN_ACK msg */
3146                 match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET;
3147                 mxlnd_iconnect(peer, match);
3148
3149                 break;
3150
3151         case MXLND_MSG_CONN_ACK:
3152                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3153                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3154                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3155                                 libcfs_nid2str(msg->mxm_dstnid));
3156                         ret = -1;
3157                         goto failed;
3158                 }
3159                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3160                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3161                                "incompatible queue depth %d (%d wanted)\n",
3162                                 libcfs_nid2str(msg->mxm_srcnid),
3163                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3164                                 *kmxlnd_tunables.kmx_credits);
3165                         spin_lock(&conn->mxk_lock);
3166                         conn->mxk_status = MXLND_CONN_FAIL;
3167                         spin_unlock(&conn->mxk_lock);
3168                         incompatible = 1;
3169                         ret = -1;
3170                 }
3171                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3172                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3173                                "incompatible EAGER size %d (%d wanted)\n",
3174                                 libcfs_nid2str(msg->mxm_srcnid),
3175                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3176                                 (int) MXLND_EAGER_SIZE);
3177                         spin_lock(&conn->mxk_lock);
3178                         conn->mxk_status = MXLND_CONN_FAIL;
3179                         spin_unlock(&conn->mxk_lock);
3180                         incompatible = 1;
3181                         ret = -1;
3182                 }
3183                 write_lock(&kmxlnd_data.kmx_global_lock);
3184                 peer->mxp_incarnation = msg->mxm_srcstamp;
3185                 peer->mxp_incompatible = incompatible;
3186                 write_unlock(&kmxlnd_data.kmx_global_lock);
3187                 spin_lock(&conn->mxk_lock);
3188                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3189                 conn->mxk_outstanding = 0;
3190                 conn->mxk_incarnation = msg->mxm_srcstamp;
3191                 conn->mxk_timeout = 0;
3192                 if (!incompatible) {
3193                         conn->mxk_status = MXLND_CONN_READY;
3194                 }
3195                 spin_unlock(&conn->mxk_lock);
3196                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3197                 break;
3198
3199         default:
3200                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3201                                 libcfs_nid2str(rx->mxc_nid));
3202                 ret = -EPROTO;
3203                 break;
3204         }
3205
3206 failed:
3207         if (ret < 0) {
3208                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3209                 spin_lock(&conn->mxk_lock);
3210                 conn->mxk_status = MXLND_CONN_FAIL;
3211                 spin_unlock(&conn->mxk_lock);
3212         }
3213
3214 cleanup:
3215         if (conn != NULL) {
3216                 spin_lock(&conn->mxk_lock);
3217                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3218                 spin_unlock(&conn->mxk_lock);
3219         }
3220
3221         if (repost) {
3222                 /* lnet_parse() failed, etc., repost now */
3223                 mxlnd_put_idle_rx(rx);
3224                 if (conn != NULL && credit == 1) {
3225                         if (type == MXLND_MSG_PUT_DATA) {
3226                                 spin_lock(&conn->mxk_lock);
3227                                 conn->mxk_outstanding++;
3228                                 spin_unlock(&conn->mxk_lock);
3229                         } else if (type != MXLND_MSG_GET_DATA &&
3230                                   (type == MXLND_MSG_EAGER ||
3231                                    type == MXLND_MSG_PUT_REQ ||
3232                                    type == MXLND_MSG_NOOP)) {
3233                                 spin_lock(&conn->mxk_lock);
3234                                 conn->mxk_outstanding++;
3235                                 spin_unlock(&conn->mxk_lock);
3236                         }
3237                 }
3238                 if (conn_ref) mxlnd_conn_decref(conn);
3239                 LASSERT(peer_ref == 0);
3240         }
3241
3242         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3243                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3244         } else {
3245                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3246         }
3247
3248         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3249         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3250
3251         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3252
3253         return;
3254 }
3255
3256
3257
3258 void
3259 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3260 {
3261         struct kmx_ctx  *tx     = NULL;
3262         struct kmx_msg  *txmsg   = NULL;
3263         struct kmx_conn *conn   = peer->mxp_conn;
3264
3265         /* a conn ref was taken when calling mx_iconnect(), 
3266          * hold it until CONN_REQ or CONN_ACK completes */
3267
3268         CDEBUG(D_NET, "entering\n");
3269         if (status.code != MX_STATUS_SUCCESS) {
3270                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3271                         mx_strstatus(status.code), status.code,
3272                         libcfs_nid2str(peer->mxp_nid));
3273                 spin_lock(&conn->mxk_lock);
3274                 conn->mxk_status = MXLND_CONN_FAIL;
3275                 spin_unlock(&conn->mxk_lock);
3276
3277                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3278                         struct kmx_conn *new_conn       = NULL;
3279                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3280                         /* FIXME write lock here ? */
3281                         mxlnd_conn_disconnect(conn, 0, 0);
3282                         mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3283                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3284                         peer->mxp_reconnect_time = 0;
3285                 }
3286
3287                 mxlnd_conn_decref(conn);
3288                 return;
3289         }
3290
3291         spin_lock(&conn->mxk_lock);
3292         conn->mxk_epa = status.source;
3293         spin_unlock(&conn->mxk_lock);
3294         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3295          *      we should not need to lock the peer */
3296         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3297
3298         /* mx_iconnect() succeeded, reset delay to 0 */
3299         write_lock(&kmxlnd_data.kmx_global_lock);
3300         peer->mxp_reconnect_time = 0;
3301         write_unlock(&kmxlnd_data.kmx_global_lock);
3302
3303         /* marshal CONN_REQ msg */
3304         /* we are still using the conn ref from iconnect() - do not take another */
3305         tx = mxlnd_get_idle_tx();
3306         if (tx == NULL) {
3307                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3308                                    libcfs_nid2str(peer->mxp_nid));
3309                 spin_lock(&conn->mxk_lock);
3310                 conn->mxk_status = MXLND_CONN_FAIL;
3311                 spin_unlock(&conn->mxk_lock);
3312                 mxlnd_conn_decref(conn);
3313                 return;
3314         }
3315
3316         tx->mxc_peer = peer;
3317         tx->mxc_conn = conn;
3318         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3319         txmsg = tx->mxc_msg;
3320         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3321         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3322         tx->mxc_match = mxlnd_create_match(tx, 0);
3323
3324         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3325         mxlnd_queue_tx(tx);
3326         return;
3327 }
3328
3329 void
3330 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3331 {
3332         struct kmx_ctx  *tx     = NULL;
3333         struct kmx_msg  *txmsg   = NULL;
3334         struct kmx_conn *conn   = peer->mxp_conn;
3335         u64             nic_id  = 0ULL;
3336         u32             ep_id   = 0;
3337         u32             sid     = 0;
3338
3339         /* a conn ref was taken when calling mx_iconnect(), 
3340          * hold it until CONN_REQ or CONN_ACK completes */
3341
3342         CDEBUG(D_NET, "entering\n");
3343         if (status.code != MX_STATUS_SUCCESS) {
3344                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3345                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3346                         mx_strstatus(status.code), status.code,
3347                         libcfs_nid2str(peer->mxp_nid),
3348                         peer->mxp_nid,
3349                         peer->mxp_nic_id,
3350                         peer->mxp_ep_id);
3351                 spin_lock(&conn->mxk_lock);
3352                 conn->mxk_status = MXLND_CONN_FAIL;
3353                 spin_unlock(&conn->mxk_lock);
3354
3355                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3356                         struct kmx_conn *new_conn       = NULL;
3357                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3358                         /* FIXME write lock here? */
3359                         mxlnd_conn_disconnect(conn, 0, 1);
3360                         mxlnd_conn_alloc(&new_conn, peer); /* adds ref for 
3361                                                               this function... */
3362                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3363                         peer->mxp_reconnect_time = 0;
3364                 }
3365
3366                 mxlnd_conn_decref(conn);
3367                 return;
3368         }
3369         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3370         spin_lock(&conn->mxk_lock);
3371         conn->mxk_epa = status.source;
3372         if (likely(!peer->mxp_incompatible)) {
3373                 conn->mxk_status = MXLND_CONN_READY;
3374         }
3375         spin_unlock(&conn->mxk_lock);
3376         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3377          *      we should not have to lock the peer */
3378         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3379
3380         /* mx_iconnect() succeeded, reset delay to 0 */
3381         write_lock(&kmxlnd_data.kmx_global_lock);
3382         peer->mxp_reconnect_time = 0;
3383         peer->mxp_sid = sid;
3384         write_unlock(&kmxlnd_data.kmx_global_lock);
3385
3386         /* marshal CONN_ACK msg */
3387         tx = mxlnd_get_idle_tx();
3388         if (tx == NULL) {
3389                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3390                                    libcfs_nid2str(peer->mxp_nid));
3391                 spin_lock(&conn->mxk_lock);
3392                 conn->mxk_status = MXLND_CONN_FAIL;
3393                 spin_unlock(&conn->mxk_lock);
3394                 mxlnd_conn_decref(conn);
3395                 return;
3396         }
3397
3398         tx->mxc_peer = peer;
3399         tx->mxc_conn = conn;
3400         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3401         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3402         txmsg = tx->mxc_msg;
3403         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3404         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3405         tx->mxc_match = mxlnd_create_match(tx, 0);
3406
3407         mxlnd_queue_tx(tx);
3408         return;
3409 }
3410
3411 /**
3412  * The MX request completion thread(s)
3413  * \param arg  thread id (as a void *)
3414  *
3415  * This thread waits for a MX completion and then completes the request.
3416  * We will create one thread per CPU.
3417  */
3418 int
3419 mxlnd_request_waitd(void *arg)
3420 {
3421         long                    id              = (long) arg;
3422         char                    name[24];
3423         __u32                   result          = 0;
3424         mx_return_t             mxret           = MX_SUCCESS;
3425         mx_status_t             status;
3426         struct kmx_ctx         *ctx             = NULL;
3427         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3428         struct kmx_peer        *peer            = NULL;
3429         struct kmx_conn        *conn            = NULL;
3430 #if MXLND_POLLING
3431         int                     count           = 0;
3432 #endif
3433
3434         memset(name, 0, sizeof(name));
3435         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3436         cfs_daemonize(name);
3437         //cfs_block_allsigs();
3438
3439         memset(&status, 0, sizeof(status));
3440
3441         CDEBUG(D_NET, "%s starting\n", name);
3442
3443         while (!kmxlnd_data.kmx_shutdown) {
3444                 u8      msg_type        = 0;
3445
3446                 mxret = MX_SUCCESS;
3447                 result = 0;
3448 #if MXLND_POLLING
3449                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3450                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3451                                             &status, &result);
3452                 } else {
3453                         count = 0;
3454                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3455                                             0ULL, 0ULL, &status, &result);
3456                 }
3457 #else
3458                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3459                                     0ULL, 0ULL, &status, &result);
3460 #endif
3461                 if (unlikely(kmxlnd_data.kmx_shutdown))
3462                         break;
3463
3464                 if (result != 1) {
3465                         /* nothing completed... */
3466                         continue;
3467                 }
3468
3469                 if (status.code != MX_STATUS_SUCCESS) {
3470                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3471                                "match_info 0x%llx and length %d\n",
3472                                mx_strstatus(status.code), status.code,
3473                                (u64) status.match_info, status.msg_length);
3474                 }
3475
3476                 msg_type = MXLND_MSG_TYPE(status.match_info);
3477
3478                 /* This may be a mx_iconnect() request completing,
3479                  * check the bit mask for CONN_REQ and CONN_ACK */
3480                 if (msg_type == MXLND_MSG_ICON_REQ ||
3481                     msg_type == MXLND_MSG_ICON_ACK) {
3482                         peer = (struct kmx_peer*) status.context;
3483                         if (msg_type == MXLND_MSG_ICON_REQ) {
3484                                 mxlnd_handle_conn_req(peer, status);
3485                         } else {
3486                                 mxlnd_handle_conn_ack(peer, status);
3487                         }
3488                         continue;
3489                 }
3490
3491                 /* This must be a tx or rx */
3492
3493                 /* NOTE: if this is a RX from the unexpected callback, it may
3494                  * have very little info. If we dropped it in unexpected_recv(),
3495                  * it will not have a context. If so, ignore it. */
3496                 ctx = (struct kmx_ctx *) status.context;
3497                 if (ctx != NULL) {
3498
3499                         req_type = ctx->mxc_type;
3500                         conn = ctx->mxc_conn; /* this may be NULL */
3501                         mxlnd_deq_pending_ctx(ctx);
3502
3503                         /* copy status to ctx->mxc_status */
3504                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3505
3506                         switch (req_type) {
3507                         case MXLND_REQ_TX:
3508                                 mxlnd_handle_tx_completion(ctx);
3509                                 break;
3510                         case MXLND_REQ_RX:
3511                                 mxlnd_handle_rx_completion(ctx);
3512                                 break;
3513                         default:
3514                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3515                                 LBUG();
3516                                 break;
3517                         }
3518
3519                         /* FIXME may need to reconsider this */
3520                         /* conn is always set except for the first CONN_REQ rx
3521                          * from a new peer */
3522                         if (!(status.code == MX_STATUS_SUCCESS ||
3523                               status.code == MX_STATUS_TRUNCATED) &&
3524                               conn != NULL) {
3525                                 mxlnd_conn_disconnect(conn, 1, 1);
3526                         }
3527                 }
3528                 CDEBUG(D_NET, "waitd() completed task\n");
3529         }
3530         CDEBUG(D_NET, "%s stopping\n", name);
3531         mxlnd_thread_stop(id);
3532         return 0;
3533 }
3534
3535
3536 unsigned long
3537 mxlnd_check_timeouts(unsigned long now)
3538 {
3539         int                     i               = 0;
3540         int                     disconnect      = 0;
3541         unsigned long           next            = 0;
3542         struct  kmx_peer        *peer           = NULL;
3543         struct  kmx_conn        *conn           = NULL;
3544
3545         read_lock(&kmxlnd_data.kmx_global_lock);
3546         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3547                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3548
3549                         if (unlikely(kmxlnd_data.kmx_shutdown)) {
3550                                 read_unlock(&kmxlnd_data.kmx_global_lock);
3551                                 return next;
3552                         }
3553
3554                         conn = peer->mxp_conn;
3555                         if (conn) {
3556                                 mxlnd_conn_addref(conn);
3557                         } else {
3558                                 continue;
3559                         }
3560
3561                         /* FIXMEis this needed? */
3562                         spin_lock(&conn->mxk_lock);
3563
3564                         /* if nothing pending (timeout == 0) or
3565                          * if conn is already disconnected,
3566                          * skip this conn */
3567                         if (conn->mxk_timeout == 0 ||
3568                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3569                                 /* FIXME is this needed? */
3570                                 spin_unlock(&conn->mxk_lock);
3571                                 mxlnd_conn_decref(conn);
3572                                 continue;
3573                         }
3574
3575                         /* we want to find the timeout that will occur first.
3576                          * if it is in the future, we will sleep until then.
3577                          * if it is in the past, then we will sleep one
3578                          * second and repeat the process. */
3579                         if ((next == 0) || (conn->mxk_timeout < next)) {
3580                                 next = conn->mxk_timeout;
3581                         }
3582
3583                         disconnect = 0;
3584
3585                         if (time_after_eq(now, conn->mxk_timeout))  {
3586                                 disconnect = 1;
3587                         }
3588                         spin_unlock(&conn->mxk_lock);
3589
3590                         if (disconnect) {
3591                                 mxlnd_conn_disconnect(conn, 1, 1);
3592                         }
3593                         mxlnd_conn_decref(conn);
3594                 }
3595         }
3596         read_unlock(&kmxlnd_data.kmx_global_lock);
3597         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3598
3599         return next;
3600 }
3601
3602 /**
3603  * Enforces timeouts on messages
3604  * \param arg  thread id (as a void *)
3605  *
3606  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3607  * it calls mxlnd_conn_disconnect().
3608  *
3609  * After checking for timeouts, try progressing sends (call check_sends()).
3610  */
3611 int
3612 mxlnd_timeoutd(void *arg)
3613 {
3614         int                     i       = 0;
3615         long                    id      = (long) arg;
3616         unsigned long           now     = 0;
3617         unsigned long           next    = 0;
3618         unsigned long           delay   = HZ;
3619         struct kmx_peer        *peer    = NULL;
3620         struct kmx_conn        *conn    = NULL;
3621
3622         cfs_daemonize("mxlnd_timeoutd");
3623         //cfs_block_allsigs();
3624
3625         CDEBUG(D_NET, "timeoutd starting\n");
3626
3627         while (!kmxlnd_data.kmx_shutdown) {
3628
3629                 now = jiffies;
3630                 /* if the next timeout has not arrived, go back to sleep */
3631                 if (time_after(now, next)) {
3632                         next = mxlnd_check_timeouts(now);
3633                 }
3634
3635                read_lock(&kmxlnd_data.kmx_global_lock);
3636                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3637                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3638                                 /* FIXME upgrade to write lock?
3639                                  * is any lock needed? */
3640                                 conn = peer->mxp_conn;
3641                                 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3642
3643                                 if (conn == NULL)
3644                                         continue;
3645
3646                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3647                                     time_after(now, conn->mxk_last_tx + HZ)) {
3648                                         /* FIXME drop lock or call check_sends_locked */
3649                                         read_unlock(&kmxlnd_data.kmx_global_lock);
3650                                         mxlnd_check_sends(peer);
3651                                         read_lock(&kmxlnd_data.kmx_global_lock);
3652                                 }
3653                                 mxlnd_conn_decref(conn); /* until here */
3654                         }
3655                 }
3656                 read_unlock(&kmxlnd_data.kmx_global_lock);
3657
3658                 mxlnd_sleep(delay);
3659         }
3660         CDEBUG(D_NET, "timeoutd stopping\n");
3661         mxlnd_thread_stop(id);
3662         return 0;
3663 }