Whamcloud - gitweb
i=isaac,b=18654:
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
47
48 inline int
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
50 {
51         /* if memcmp() == 0, it is NULL */
52         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
53 }
54
55 inline void mxlnd_noop(char *s, ...)
56 {
57         return;
58 }
59
60 char *
61 mxlnd_ctxstate_to_str(int mxc_state)
62 {
63         switch (mxc_state) {
64         case MXLND_CTX_INIT:
65                 return "MXLND_CTX_INIT";
66         case MXLND_CTX_IDLE:
67                 return "MXLND_CTX_IDLE";
68         case MXLND_CTX_PREP:
69                 return "MXLND_CTX_PREP";
70         case MXLND_CTX_PENDING:
71                 return "MXLND_CTX_PENDING";
72         case MXLND_CTX_COMPLETED:
73                 return "MXLND_CTX_COMPLETED";
74         case MXLND_CTX_CANCELED:
75                 return "MXLND_CTX_CANCELED";
76         default:
77                 return "*unknown*";
78         }
79 }
80
81 char *
82 mxlnd_connstatus_to_str(int mxk_status)
83 {
84         switch (mxk_status) {
85         case MXLND_CONN_READY:
86                 return "MXLND_CONN_READY";
87         case MXLND_CONN_INIT:
88                 return "MXLND_CONN_INIT";
89         case MXLND_CONN_REQ:
90                 return "MXLND_CONN_REQ";
91         case MXLND_CONN_ACK:
92                 return "MXLND_CONN_ACK";
93         case MXLND_CONN_WAIT:
94                 return "MXLND_CONN_WAIT";
95         case MXLND_CONN_DISCONNECT:
96                 return "MXLND_CONN_DISCONNECT";
97         case MXLND_CONN_FAIL:
98                 return "MXLND_CONN_FAIL";
99         default:
100                 return "unknown";
101         }
102 }
103
104 char *
105 mxlnd_msgtype_to_str(int type) {
106         switch (type) {
107         case MXLND_MSG_EAGER:
108                 return "MXLND_MSG_EAGER";
109         case MXLND_MSG_CONN_REQ:
110                 return "MXLND_MSG_CONN_REQ";
111         case MXLND_MSG_CONN_ACK:
112                 return "MXLND_MSG_CONN_ACK";
113         case MXLND_MSG_BYE:
114                 return "MXLND_MSG_BYE";
115         case MXLND_MSG_NOOP:
116                 return "MXLND_MSG_NOOP";
117         case MXLND_MSG_PUT_REQ:
118                 return "MXLND_MSG_PUT_REQ";
119         case MXLND_MSG_PUT_ACK:
120                 return "MXLND_MSG_PUT_ACK";
121         case MXLND_MSG_PUT_DATA:
122                 return "MXLND_MSG_PUT_DATA";
123         case MXLND_MSG_GET_REQ:
124                 return "MXLND_MSG_GET_REQ";
125         case MXLND_MSG_GET_DATA:
126                 return "MXLND_MSG_GET_DATA";
127         default:
128                 return "unknown";
129         }
130 }
131
132 char *
133 mxlnd_lnetmsg_to_str(int type)
134 {
135         switch (type) {
136         case LNET_MSG_ACK:
137                 return "LNET_MSG_ACK";
138         case LNET_MSG_PUT:
139                 return "LNET_MSG_PUT";
140         case LNET_MSG_GET:
141                 return "LNET_MSG_GET";
142         case LNET_MSG_REPLY:
143                 return "LNET_MSG_REPLY";
144         case LNET_MSG_HELLO:
145                 return "LNET_MSG_HELLO";
146         default:
147                 LBUG();
148                 return "*unknown*";
149         }
150 }
151
152 static inline u64
153 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
154 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
155 {
156         u64 type        = (u64) ctx->mxc_msg_type;
157         u64 err         = (u64) error;
158         u64 match       = 0ULL;
159
160         LASSERT(ctx->mxc_msg_type != 0);
161         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
162         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
163         return match;
164 }
165
166 static inline void
167 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
168 {
169         *msg_type = (u8) MXLND_MSG_TYPE(match);
170         *error    = (u8) MXLND_ERROR_VAL(match);
171         *cookie   = match & MXLND_MAX_COOKIE;
172         LASSERT(*msg_type == MXLND_MSG_EAGER    ||
173                 *msg_type == MXLND_MSG_ICON_REQ ||
174                 *msg_type == MXLND_MSG_CONN_REQ ||
175                 *msg_type == MXLND_MSG_ICON_ACK ||
176                 *msg_type == MXLND_MSG_CONN_ACK ||
177                 *msg_type == MXLND_MSG_BYE      ||
178                 *msg_type == MXLND_MSG_NOOP     ||
179                 *msg_type == MXLND_MSG_PUT_REQ  ||
180                 *msg_type == MXLND_MSG_PUT_ACK  ||
181                 *msg_type == MXLND_MSG_PUT_DATA ||
182                 *msg_type == MXLND_MSG_GET_REQ  ||
183                 *msg_type == MXLND_MSG_GET_DATA);
184         return;
185 }
186
187 struct kmx_ctx *
188 mxlnd_get_idle_rx(void)
189 {
190         struct list_head        *tmp    = NULL;
191         struct kmx_ctx          *rx     = NULL;
192
193         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
194
195         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
196                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
197                 return NULL;
198         }
199
200         tmp = &kmxlnd_data.kmx_rx_idle;
201         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
202         list_del_init(&rx->mxc_list);
203         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
204
205 #if MXLND_DEBUG
206         if (rx->mxc_get != rx->mxc_put) {
207                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
208                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
209                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
210                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
211                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
212                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
213                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
214                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
215                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
216                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
217         }
218 #endif
219         LASSERT (rx->mxc_get == rx->mxc_put);
220
221         rx->mxc_get++;
222
223         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
224         rx->mxc_state = MXLND_CTX_PREP;
225
226         return rx;
227 }
228
229 int
230 mxlnd_put_idle_rx(struct kmx_ctx *rx)
231 {
232         if (rx == NULL) {
233                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
234                 return -EINVAL;
235         } else if (rx->mxc_type != MXLND_REQ_RX) {
236                 CDEBUG(D_NETERROR, "called with tx\n");
237                 return -EINVAL;
238         }
239         LASSERT(rx->mxc_get == rx->mxc_put + 1);
240         mxlnd_ctx_init(rx);
241         rx->mxc_put++;
242         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
243         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
244         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
245         return 0;
246 }
247
248 int
249 mxlnd_reduce_idle_rxs(__u32 count)
250 {
251         __u32                   i       = 0;
252         struct kmx_ctx          *rx     = NULL;
253
254         spin_lock(&kmxlnd_data.kmx_rxs_lock);
255         for (i = 0; i < count; i++) {
256                 rx = mxlnd_get_idle_rx();
257                 if (rx != NULL) {
258                         struct list_head *tmp = &rx->mxc_global_list;
259                         list_del_init(tmp);
260                         mxlnd_ctx_free(rx);
261                 } else {
262                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
263                         break;
264                 }
265         }
266         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
267         return 0;
268 }
269
270 struct kmx_ctx *
271 mxlnd_get_idle_tx(void)
272 {
273         struct list_head        *tmp    = NULL;
274         struct kmx_ctx          *tx     = NULL;
275
276         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
277
278         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
279                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
280                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
281                 return NULL;
282         }
283
284         tmp = &kmxlnd_data.kmx_tx_idle;
285         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
286         list_del_init(&tx->mxc_list);
287
288         /* Allocate a new completion cookie.  It might not be needed,
289          * but we've got a lock right now and we're unlikely to
290          * wrap... */
291         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
292         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
293                 kmxlnd_data.kmx_tx_next_cookie = 1;
294         }
295         kmxlnd_data.kmx_tx_used++;
296         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
297
298         LASSERT (tx->mxc_get == tx->mxc_put);
299
300         tx->mxc_get++;
301
302         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
303         LASSERT (tx->mxc_lntmsg[0] == NULL);
304         LASSERT (tx->mxc_lntmsg[1] == NULL);
305
306         tx->mxc_state = MXLND_CTX_PREP;
307
308         return tx;
309 }
310
311 void
312 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye);
313
314 int
315 mxlnd_put_idle_tx(struct kmx_ctx *tx)
316 {
317         //int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
318         int             result  = 0;
319         lnet_msg_t      *lntmsg[2];
320
321         if (tx == NULL) {
322                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
323                 return -EINVAL;
324         } else if (tx->mxc_type != MXLND_REQ_TX) {
325                 CDEBUG(D_NETERROR, "called with rx\n");
326                 return -EINVAL;
327         }
328         if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
329               tx->mxc_status.code == MX_STATUS_TRUNCATED)) {
330                 struct kmx_conn *conn   = tx->mxc_conn;
331
332                 result = -EIO;
333                 mxlnd_conn_disconnect(conn, 0, 1);
334         }
335
336         lntmsg[0] = tx->mxc_lntmsg[0];
337         lntmsg[1] = tx->mxc_lntmsg[1];
338
339         LASSERT(tx->mxc_get == tx->mxc_put + 1);
340         mxlnd_ctx_init(tx);
341         tx->mxc_put++;
342         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
343         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
344         kmxlnd_data.kmx_tx_used--;
345         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
346         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
347         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
348         return 0;
349 }
350
351 /**
352  * mxlnd_conn_free - free the conn
353  * @conn - a kmx_conn pointer
354  *
355  * The calling function should remove the conn from the conns list first
356  * then destroy it.
357  */
358 void
359 mxlnd_conn_free(struct kmx_conn *conn)
360 {
361         struct kmx_peer *peer   = conn->mxk_peer;
362
363         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365                  list_empty (&conn->mxk_tx_free_queue) &&
366                  list_empty (&conn->mxk_pending));
367         if (!list_empty(&conn->mxk_list)) {
368                 list_del_init(&conn->mxk_list);
369                 if (peer->mxp_conn == conn) {
370                         peer->mxp_conn = NULL;
371                         if (!mxlnd_endpoint_addr_null(conn->mxk_epa))
372                                 mx_set_endpoint_addr_context(conn->mxk_epa,
373                                                              (void *) NULL);
374                 }
375         }
376         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
377         MXLND_FREE (conn, sizeof (*conn));
378         return;
379 }
380
381
382 void
383 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
384 {
385         int                     found   = 0;
386         struct kmx_ctx          *ctx    = NULL;
387         struct kmx_ctx          *next   = NULL;
388         mx_return_t             mxret   = MX_SUCCESS;
389         u32                     result  = 0;
390
391         do {
392                 found = 0;
393                 spin_lock(&conn->mxk_lock);
394                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
395                         /* we will delete all including txs */
396                         list_del_init(&ctx->mxc_list);
397                         if (ctx->mxc_type == MXLND_REQ_RX) {
398                                 found = 1;
399                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
400                                                   &ctx->mxc_mxreq,
401                                                   &result);
402                                 if (mxret != MX_SUCCESS) {
403                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
404                                 }
405                                 if (result == 1) {
406                                         ctx->mxc_status.code = -ECONNABORTED;
407                                         ctx->mxc_state = MXLND_CTX_CANCELED;
408                                         /* NOTE this calls lnet_finalize() and
409                                          * we cannot hold any locks when calling it.
410                                          * It also calls mxlnd_conn_decref(conn) */
411                                         spin_unlock(&conn->mxk_lock);
412                                         mxlnd_handle_rx_completion(ctx);
413                                         spin_lock(&conn->mxk_lock);
414                                 }
415                                 break;
416                         }
417                 }
418                 spin_unlock(&conn->mxk_lock);
419         }
420         while (found);
421
422         return;
423 }
424
425 /**
426  * mxlnd_conn_disconnect - shutdown a connection
427  * @conn - a kmx_conn pointer
428  * @mx_dis - call mx_disconnect()
429  * @send_bye - send peer a BYE msg
430  *
431  * This function sets the status to DISCONNECT, completes queued
432  * txs with failure, calls mx_disconnect, which will complete
433  * pending txs and matched rxs with failure.
434  */
435 void
436 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int send_bye)
437 {
438         mx_endpoint_addr_t      epa     = conn->mxk_epa;
439         struct list_head        *tmp    = NULL;
440         int                     valid   = !mxlnd_endpoint_addr_null(epa);
441
442         spin_lock(&conn->mxk_lock);
443         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
444                 spin_unlock(&conn->mxk_lock);
445                 return;
446         }
447         conn->mxk_status = MXLND_CONN_DISCONNECT;
448         conn->mxk_timeout = 0;
449
450         while (!list_empty(&conn->mxk_tx_free_queue) ||
451                !list_empty(&conn->mxk_tx_credit_queue)) {
452
453                 struct kmx_ctx          *tx     = NULL;
454
455                 if (!list_empty(&conn->mxk_tx_free_queue)) {
456                         tmp = &conn->mxk_tx_free_queue;
457                 } else {
458                         tmp = &conn->mxk_tx_credit_queue;
459                 }
460
461                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
462                 list_del_init(&tx->mxc_list);
463                 tx->mxc_status.code = -ECONNABORTED;
464                 spin_unlock(&conn->mxk_lock);
465                 mxlnd_put_idle_tx(tx);
466                 mxlnd_conn_decref(conn); /* for this tx */
467                 spin_lock(&conn->mxk_lock);
468         }
469
470         spin_unlock(&conn->mxk_lock);
471
472         /* cancel pending rxs */
473         mxlnd_conn_cancel_pending_rxs(conn);
474
475         if (send_bye && valid) {
476                 u64 match = ((u64) MXLND_MSG_BYE) << MXLND_MSG_OFFSET;
477                 /* send a BYE to the peer */
478                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
479                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
480                 mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
481                                 epa, match, NULL, NULL);
482                 /* wait to allow the peer to ack our message */
483                 mxlnd_sleep(msecs_to_jiffies(20));
484         }
485
486         if (kmxlnd_data.kmx_shutdown != 1) {
487                 time_t          last_alive      = 0;
488                 unsigned long   last_msg        = 0;
489
490                 /* notify LNET that we are giving up on this peer */
491                 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
492                         last_msg = conn->mxk_last_rx;
493                 else
494                         last_msg = conn->mxk_last_tx;
495
496                 last_alive = cfs_time_current_sec() -
497                              cfs_duration_sec(cfs_time_current() - last_msg);
498                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
499
500                 if (mx_dis && valid)
501                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
502         }
503         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
504
505         return;
506 }
507
508 /**
509  * mxlnd_conn_alloc - allocate and initialize a new conn struct
510  * @connp - address of a kmx_conn pointer
511  * @peer - owning kmx_peer
512  *
513  * Returns 0 on success and -ENOMEM on failure
514  */
515 int
516 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
517 {
518         struct kmx_conn *conn    = NULL;
519
520         LASSERT(peer != NULL);
521
522         MXLND_ALLOC(conn, sizeof (*conn));
523         if (conn == NULL) {
524                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
525                 return -ENOMEM;
526         }
527         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
528
529         memset(conn, 0, sizeof(*conn));
530
531         /* conn->mxk_incarnation = 0 - will be set by peer */
532         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer 
533                                                    and one for the caller */
534         conn->mxk_peer = peer;
535         /* mxk_epa - to be set after mx_iconnect() */
536         INIT_LIST_HEAD(&conn->mxk_list);
537         spin_lock_init(&conn->mxk_lock);
538         /* conn->mxk_timeout = 0 */
539         conn->mxk_last_tx = jiffies;
540         conn->mxk_last_rx = conn->mxk_last_tx;
541         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
542         /* mxk_outstanding = 0 */
543         conn->mxk_status = MXLND_CONN_INIT;
544         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
545         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
546         /* conn->mxk_ntx_msgs = 0 */
547         /* conn->mxk_ntx_data = 0 */
548         /* conn->mxk_ntx_posted = 0 */
549         /* conn->mxk_data_posted = 0 */
550         INIT_LIST_HEAD(&conn->mxk_pending);
551
552         *connp = conn;
553
554         mxlnd_peer_addref(peer);        /* add a ref for this conn */
555
556         /* add to front of peer's conns list */
557         list_add(&conn->mxk_list, &peer->mxp_conns);
558         peer->mxp_conn = conn;
559         return 0;
560 }
561
562 int
563 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
564 {
565         int ret = 0;
566         write_lock(&kmxlnd_data.kmx_global_lock);
567         ret = mxlnd_conn_alloc_locked(connp, peer);
568         write_unlock(&kmxlnd_data.kmx_global_lock);
569         return ret;
570 }
571
572 int
573 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
574 {
575         int             ret     = 0;
576         struct kmx_conn *conn   = ctx->mxc_conn;
577
578         ctx->mxc_state = MXLND_CTX_PENDING;
579         if (conn != NULL) {
580                 spin_lock(&conn->mxk_lock);
581                 if (conn->mxk_status >= MXLND_CONN_INIT) {
582                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
583                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
584                                 conn->mxk_timeout = ctx->mxc_deadline;
585                         }
586                 } else {
587                         ctx->mxc_state = MXLND_CTX_COMPLETED;
588                         ret = -1;
589                 }
590                 spin_unlock(&conn->mxk_lock);
591         }
592         return ret;
593 }
594
595 int
596 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
597 {
598         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
599                 ctx->mxc_state == MXLND_CTX_COMPLETED);
600         if (ctx->mxc_state != MXLND_CTX_PENDING &&
601             ctx->mxc_state != MXLND_CTX_COMPLETED) {
602                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
603                        mxlnd_ctxstate_to_str(ctx->mxc_state));
604         }
605         ctx->mxc_state = MXLND_CTX_COMPLETED;
606         if (!list_empty(&ctx->mxc_list)) {
607                 struct kmx_conn *conn = ctx->mxc_conn;
608                 struct kmx_ctx *next = NULL;
609                 LASSERT(conn != NULL);
610                 spin_lock(&conn->mxk_lock);
611                 list_del_init(&ctx->mxc_list);
612                 conn->mxk_timeout = 0;
613                 if (!list_empty(&conn->mxk_pending)) {
614                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
615                         conn->mxk_timeout = next->mxc_deadline;
616                 }
617                 spin_unlock(&conn->mxk_lock);
618         }
619         return 0;
620 }
621
622 /**
623  * mxlnd_peer_free - free the peer
624  * @peer - a kmx_peer pointer
625  *
626  * The calling function should decrement the rxs, drain the tx queues and
627  * remove the peer from the peers list first then destroy it.
628  */
629 void
630 mxlnd_peer_free(struct kmx_peer *peer)
631 {
632         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer,
633                         peer == kmxlnd_data.kmx_localhost ? "(*** localhost ***)" : "");
634
635         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
636
637         if (peer == kmxlnd_data.kmx_localhost)
638                 LASSERT(kmxlnd_data.kmx_shutdown);
639
640         if (!list_empty(&peer->mxp_peers)) {
641                 /* assume we are locked */
642                 list_del_init(&peer->mxp_peers);
643         }
644
645         MXLND_FREE (peer, sizeof (*peer));
646         atomic_dec(&kmxlnd_data.kmx_npeers);
647         return;
648 }
649
650 #define MXLND_LOOKUP_COUNT 10
651
652 /* We only want the MAC address of the peer's Myricom NIC. We
653  * require that each node has the IPoMX interface (myriN) up.
654  * We will not pass any traffic over IPoMX, but it allows us
655  * to get the MAC address. */
656 static int
657 mxlnd_ip2nic_id(u32 ip, u64 *nic_id)
658 {
659         int                     ret     = 0;
660         int                     try     = 1;
661         int                     fatal   = 0;
662         u64                     tmp_id  = 0ULL;
663         unsigned char           *haddr  = NULL;
664         struct net_device       *dev    = NULL;
665         struct neighbour        *n      = NULL;
666         cfs_socket_t            *sock   = NULL;
667         __be32                  dst_ip  = htonl(ip);
668
669         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
670         if (dev == NULL) {
671                 return -ENODEV;
672         }
673
674         haddr = (unsigned char *) &tmp_id + 2; /* MAC is only 6 bytes */
675
676         do {
677                 n = neigh_lookup(&arp_tbl, &dst_ip, dev);
678                 if (n) {
679                         n->used = jiffies;
680                         if (n->nud_state & NUD_VALID) {
681                                 memcpy(haddr, n->ha, dev->addr_len);
682                                 neigh_release(n);
683                                 ret = 0;
684                                 break;
685                         }
686                 }
687                 /* not found, try to connect (force an arp) */
688                 libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
689                 if (!fatal)
690                         libcfs_sock_release(sock);
691                 schedule_timeout_interruptible(HZ/10 * try); /* add a little backoff */
692         } while (try++ < MXLND_LOOKUP_COUNT);
693
694         dev_put(dev);
695
696         if (tmp_id == 0ULL)
697                 ret = -EHOSTUNREACH;
698 #ifdef __LITTLE_ENDIAN
699         *nic_id = ___arch__swab64(tmp_id);
700 #else
701         *nic_id = tmp_id;
702 #endif
703         return ret;
704 }
705
706 /**
707  * mxlnd_peer_alloc - allocate and initialize a new peer struct
708  * @peerp - address of a kmx_peer pointer
709  * @nid - LNET node id
710  *
711  * Returns 0 on success and -ENOMEM on failure
712  */
713 int
714 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
715 {
716         int                     i       = 0;
717         int                     ret     = 0;
718         u32                     ip      = LNET_NIDADDR(nid);
719         struct kmx_peer        *peer    = NULL;
720
721         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
722
723         MXLND_ALLOC(peer, sizeof (*peer));
724         if (peer == NULL) {
725                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
726                 return -ENOMEM;
727         }
728         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
729
730         memset(peer, 0, sizeof(*peer));
731
732         peer->mxp_nid = nid;
733         /* peer->mxp_incarnation */
734         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
735
736         peer->mxp_ip = ip;
737         peer->mxp_ep_id = *kmxlnd_tunables.kmx_ep_id;
738         peer->mxp_board = board;
739         peer->mxp_nic_id = nic_id;
740
741         if (nic_id == 0ULL) {
742                 ret = mxlnd_ip2nic_id(ip, &nic_id);
743                 if (ret != 0)
744                         CERROR("%s: mxlnd_ip2nic_id() returned %d\n", __func__, ret);
745                 mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
746         }
747
748         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
749
750         INIT_LIST_HEAD(&peer->mxp_peers);
751         INIT_LIST_HEAD(&peer->mxp_conns);
752         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
753         if (ret != 0) {
754                 mxlnd_peer_decref(peer);
755                 return ret;
756         }
757
758         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
759                 struct kmx_ctx   *rx     = NULL;
760                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
761                 if (ret != 0) {
762                         mxlnd_reduce_idle_rxs(i);
763                         mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
764                         mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
765                         mxlnd_peer_decref(peer);
766                         return ret;
767                 }
768                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
769                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
770                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
771                 rx->mxc_put = -1;
772                 mxlnd_put_idle_rx(rx);
773         }
774         /* peer->mxp_reconnect_time = 0 */
775         /* peer->mxp_incompatible = 0 */
776
777         *peerp = peer;
778         return 0;
779 }
780
781 static inline struct kmx_peer *
782 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
783 {
784         int                     found   = 0;
785         int                     hash    = 0;
786         struct kmx_peer         *peer   = NULL;
787
788         hash = mxlnd_nid_to_hash(nid);
789
790         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
791                 if (peer->mxp_nid == nid) {
792                         found = 1;
793                         mxlnd_peer_addref(peer);
794                         break;
795                 }
796         }
797         return (found ? peer : NULL);
798 }
799
800 static inline struct kmx_peer *
801 mxlnd_find_peer_by_nid(lnet_nid_t nid)
802 {
803         struct kmx_peer *peer   = NULL;
804
805         read_lock(&kmxlnd_data.kmx_global_lock);
806         peer = mxlnd_find_peer_by_nid_locked(nid);
807         read_unlock(&kmxlnd_data.kmx_global_lock);
808         return peer;
809 }
810
811 static inline int
812 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
813 {
814         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
815                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
816                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
817                 tx->mxc_msg_type == MXLND_MSG_NOOP);
818 }
819
820 /**
821  * mxlnd_init_msg - set type and number of bytes
822  * @msg - msg pointer
823  * @type - of message
824  * @body_nob - bytes in msg body
825  */
826 static inline void
827 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
828 {
829         msg->mxm_type = type;
830         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
831 }
832
833 static inline void
834 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
835 {
836         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
837         struct kmx_msg  *msg    = NULL;
838
839         LASSERT (tx != NULL);
840         LASSERT (nob <= MXLND_EAGER_SIZE);
841
842         tx->mxc_nid = nid;
843         /* tx->mxc_peer should have already been set if we know it */
844         tx->mxc_msg_type = type;
845         tx->mxc_nseg = 1;
846         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
847         tx->mxc_seg.segment_length = nob;
848         tx->mxc_pin_type = MX_PIN_PHYSICAL;
849         //tx->mxc_state = MXLND_CTX_PENDING;
850
851         msg = tx->mxc_msg;
852         msg->mxm_type = type;
853         msg->mxm_nob  = nob;
854
855         return;
856 }
857
858 static inline __u32
859 mxlnd_cksum (void *ptr, int nob)
860 {
861         char  *c  = ptr;
862         __u32  sum = 0;
863
864         while (nob-- > 0)
865                 sum = ((sum << 1) | (sum >> 31)) + *c++;
866
867         /* ensure I don't return 0 (== no checksum) */
868         return (sum == 0) ? 1 : sum;
869 }
870
871 /**
872  * mxlnd_pack_msg - complete msg info
873  * @tx - msg to send
874  */
875 static inline void
876 mxlnd_pack_msg(struct kmx_ctx *tx)
877 {
878         struct kmx_msg  *msg    = tx->mxc_msg;
879
880         /* type and nob should already be set in init_msg() */
881         msg->mxm_magic    = MXLND_MSG_MAGIC;
882         msg->mxm_version  = MXLND_MSG_VERSION;
883         /*   mxm_type */
884         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
885          * return credits as well */
886         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
887             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
888                 spin_lock(&tx->mxc_conn->mxk_lock);
889                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
890                 tx->mxc_conn->mxk_outstanding = 0;
891                 spin_unlock(&tx->mxc_conn->mxk_lock);
892         } else {
893                 msg->mxm_credits  = 0;
894         }
895         /*   mxm_nob */
896         msg->mxm_cksum    = 0;
897         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
898         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
899         msg->mxm_dstnid   = tx->mxc_nid;
900         /* if it is a new peer, the dststamp will be 0 */
901         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
902         msg->mxm_seq      = tx->mxc_cookie;
903
904         if (*kmxlnd_tunables.kmx_cksum) {
905                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
906         }
907 }
908
909 int
910 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
911 {
912         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
913         __u32     msg_cksum     = 0;
914         int       flip          = 0;
915         int       msg_nob       = 0;
916
917         /* 6 bytes are enough to have received magic + version */
918         if (nob < 6) {
919                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
920                 return -EPROTO;
921         }
922
923         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
924                 flip = 0;
925         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
926                 flip = 1;
927         } else {
928                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
929                 return -EPROTO;
930         }
931
932         if (msg->mxm_version !=
933             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
934                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
935                 return -EPROTO;
936         }
937
938         if (nob < hdr_size) {
939                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
940                 return -EPROTO;
941         }
942
943         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
944         if (msg_nob > nob) {
945                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
946                 return -EPROTO;
947         }
948
949         /* checksum must be computed with mxm_cksum zero and BEFORE anything
950          * gets flipped */
951         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
952         msg->mxm_cksum = 0;
953         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
954                 CDEBUG(D_NETERROR, "Bad checksum\n");
955                 return -EPROTO;
956         }
957         msg->mxm_cksum = msg_cksum;
958
959         if (flip) {
960                 /* leave magic unflipped as a clue to peer endianness */
961                 __swab16s(&msg->mxm_version);
962                 CLASSERT (sizeof(msg->mxm_type) == 1);
963                 CLASSERT (sizeof(msg->mxm_credits) == 1);
964                 msg->mxm_nob = msg_nob;
965                 __swab64s(&msg->mxm_srcnid);
966                 __swab64s(&msg->mxm_srcstamp);
967                 __swab64s(&msg->mxm_dstnid);
968                 __swab64s(&msg->mxm_dststamp);
969                 __swab64s(&msg->mxm_seq);
970         }
971
972         if (msg->mxm_srcnid == LNET_NID_ANY) {
973                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
974                 return -EPROTO;
975         }
976
977         switch (msg->mxm_type) {
978         default:
979                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
980                 return -EPROTO;
981
982         case MXLND_MSG_NOOP:
983                 break;
984
985         case MXLND_MSG_EAGER:
986                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
987                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
988                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
989                         return -EPROTO;
990                 }
991                 break;
992
993         case MXLND_MSG_PUT_REQ:
994                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
995                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
996                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
997                         return -EPROTO;
998                 }
999                 if (flip)
1000                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1001                 break;
1002
1003         case MXLND_MSG_PUT_ACK:
1004                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1005                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1006                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1007                         return -EPROTO;
1008                 }
1009                 if (flip) {
1010                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1011                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1012                 }
1013                 break;
1014
1015         case MXLND_MSG_GET_REQ:
1016                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1017                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1018                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1019                         return -EPROTO;
1020                 }
1021                 if (flip) {
1022                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1023                 }
1024                 break;
1025
1026         case MXLND_MSG_CONN_REQ:
1027         case MXLND_MSG_CONN_ACK:
1028                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1029                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1030                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1031                         return -EPROTO;
1032                 }
1033                 if (flip) {
1034                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1035                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1036                 }
1037                 break;
1038         }
1039         return 0;
1040 }
1041
1042 /**
1043  * mxlnd_recv_msg
1044  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1045  * @rx
1046  * @msg_type
1047  * @cookie
1048  * @length - length of incoming message
1049  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1050  *
1051  * The caller gets the rx and sets nid, peer and conn if known.
1052  *
1053  * Returns 0 on success and -1 on failure
1054  */
1055 int
1056 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1057 {
1058         int             ret     = 0;
1059         mx_return_t     mxret   = MX_SUCCESS;
1060         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1061
1062         rx->mxc_msg_type = msg_type;
1063         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1064         rx->mxc_cookie = cookie;
1065         /* rx->mxc_match may already be set */
1066         /* rx->mxc_seg.segment_ptr is already set */
1067         rx->mxc_seg.segment_length = length;
1068         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1069         ret = mxlnd_q_pending_ctx(rx);
1070         if (ret == -1) {
1071                 /* the caller is responsible for calling conn_decref() if needed */
1072                 return -1;
1073         }
1074         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1075                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1076         if (mxret != MX_SUCCESS) {
1077                 mxlnd_deq_pending_ctx(rx);
1078                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1079                                    mx_strerror(mxret), (int) mxret);
1080                 return -1;
1081         }
1082         return 0;
1083 }
1084
1085
1086 /**
1087  * mxlnd_unexpected_recv - this is the callback function that will handle 
1088  *                         unexpected receives
1089  * @context - NULL, ignore
1090  * @source - the peer's mx_endpoint_addr_t
1091  * @match_value - the msg's bit, should be MXLND_MSG_EAGER
1092  * @length - length of incoming message
1093  * @data_if_available - ignore
1094  *
1095  * If it is an eager-sized msg, we will call recv_msg() with the actual
1096  * length. If it is a large message, we will call recv_msg() with a
1097  * length of 0 bytes to drop it because we should never have a large,
1098  * unexpected message.
1099  *
1100  * NOTE - The MX library blocks until this function completes. Make it as fast as
1101  * possible. DO NOT allocate memory which can block!
1102  *
1103  * If we cannot get a rx or the conn is closed, drop the message on the floor
1104  * (i.e. recv 0 bytes and ignore).
1105  */
1106 mx_unexp_handler_action_t
1107 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1108                  uint64_t match_value, uint32_t length, void *data_if_available)
1109 {
1110         int             ret             = 0;
1111         struct kmx_ctx  *rx             = NULL;
1112         mx_ksegment_t   seg;
1113         u8              msg_type        = 0;
1114         u8              error           = 0;
1115         u64             cookie          = 0ULL;
1116
1117         if (context != NULL) {
1118                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1119         }
1120
1121 #if MXLND_DEBUG
1122         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1123 #endif
1124
1125         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1126         if (msg_type == MXLND_MSG_BYE) {
1127                 struct kmx_peer *peer   = NULL;
1128
1129                 mx_get_endpoint_addr_context(source, (void **) &peer);
1130                 if (peer && peer->mxp_conn) {
1131                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1132                                         libcfs_nid2str(peer->mxp_nid));
1133                         mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1134                 }
1135
1136                 return MX_RECV_FINISHED;
1137         }
1138
1139         rx = mxlnd_get_idle_rx();
1140         if (rx != NULL) {
1141                 if (length <= MXLND_EAGER_SIZE) {
1142                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1143                 } else {
1144                         CDEBUG(D_NETERROR, "unexpected large receive with "
1145                                            "match_value=0x%llx length=%d\n",
1146                                            match_value, length);
1147                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1148                 }
1149
1150                 if (ret == 0) {
1151                         struct kmx_peer *peer   = NULL;
1152                         struct kmx_conn *conn   = NULL;
1153
1154                         /* NOTE to avoid a peer disappearing out from under us,
1155                          *      read lock the peers lock first */
1156                         read_lock(&kmxlnd_data.kmx_global_lock);
1157                         mx_get_endpoint_addr_context(source, (void **) &peer);
1158                         if (peer != NULL) {
1159                                 mxlnd_peer_addref(peer); /* add a ref... */
1160                                 conn = peer->mxp_conn;
1161                                 if (conn) {
1162                                         mxlnd_conn_addref(conn); /* add ref until rx completed */
1163                                         mxlnd_peer_decref(peer); /* and drop peer ref */
1164                                         rx->mxc_conn = conn;
1165                                 }
1166                                 rx->mxc_peer = peer;
1167                                 rx->mxc_nid = peer->mxp_nid;
1168                         }
1169                         read_unlock(&kmxlnd_data.kmx_global_lock);
1170                 } else {
1171                         CDEBUG(D_NETERROR, "could not post receive\n");
1172                         mxlnd_put_idle_rx(rx);
1173                 }
1174         }
1175
1176         if (rx == NULL || ret != 0) {
1177                 if (rx == NULL) {
1178                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1179                 } else {
1180                         /* ret != 0 */
1181                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1182                 }
1183                 seg.segment_ptr = 0ULL;
1184                 seg.segment_length = 0;
1185                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1186                           match_value, ~0ULL, NULL, NULL);
1187         }
1188
1189         return MX_RECV_CONTINUE;
1190 }
1191
1192
1193 int
1194 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1195 {
1196         int                      i      = 0;
1197         int                      ret    = -ENOENT;
1198         struct kmx_peer         *peer   = NULL;
1199
1200         read_lock(&kmxlnd_data.kmx_global_lock);
1201         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1202                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1203                         if (index-- == 0) {
1204                                 *nidp = peer->mxp_nid;
1205                                 *count = atomic_read(&peer->mxp_refcount);
1206                                 ret = 0;
1207                                 break;
1208                         }
1209                 }
1210         }
1211         read_unlock(&kmxlnd_data.kmx_global_lock);
1212
1213         return ret;
1214 }
1215
1216 void
1217 mxlnd_del_peer_locked(struct kmx_peer *peer)
1218 {
1219         list_del_init(&peer->mxp_peers); /* remove from the global list */
1220         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1221         mxlnd_peer_decref(peer); /* drop global list ref */
1222         return;
1223 }
1224
1225 int
1226 mxlnd_del_peer(lnet_nid_t nid)
1227 {
1228         int             i       = 0;
1229         int             ret     = 0;
1230         struct kmx_peer *peer   = NULL;
1231         struct kmx_peer *next   = NULL;
1232
1233         if (nid != LNET_NID_ANY) {
1234                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1235         }
1236         write_lock(&kmxlnd_data.kmx_global_lock);
1237         if (nid != LNET_NID_ANY) {
1238                 if (peer == NULL) {
1239                         ret = -ENOENT;
1240                 } if (peer == kmxlnd_data.kmx_localhost) {
1241                         mxlnd_peer_decref(peer); /* and drops it */
1242                         CERROR("cannot free this host's NID 0x%llx\n", nid);
1243                 } else {
1244                         mxlnd_peer_decref(peer); /* and drops it */
1245                         mxlnd_del_peer_locked(peer);
1246                 }
1247         } else { /* LNET_NID_ANY */
1248                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1249                         list_for_each_entry_safe(peer, next,
1250                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1251                                 if (peer != kmxlnd_data.kmx_localhost)
1252                                         mxlnd_del_peer_locked(peer);
1253                         }
1254                 }
1255         }
1256         write_unlock(&kmxlnd_data.kmx_global_lock);
1257
1258         return ret;
1259 }
1260
1261 struct kmx_conn *
1262 mxlnd_get_conn_by_idx(int index)
1263 {
1264         int                      i      = 0;
1265         struct kmx_peer         *peer   = NULL;
1266         struct kmx_conn         *conn   = NULL;
1267
1268         read_lock(&kmxlnd_data.kmx_global_lock);
1269         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1270                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1271                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1272                                 if (index-- > 0) {
1273                                         continue;
1274                                 }
1275
1276                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1277                                 read_unlock(&kmxlnd_data.kmx_global_lock);
1278                                 return conn;
1279                         }
1280                 }
1281         }
1282         read_unlock(&kmxlnd_data.kmx_global_lock);
1283
1284         return NULL;
1285 }
1286
1287 void
1288 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1289 {
1290         struct kmx_conn *conn   = NULL;
1291         struct kmx_conn *next   = NULL;
1292
1293         if (peer == kmxlnd_data.kmx_localhost) return;
1294
1295         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1296                 mxlnd_conn_disconnect(conn, 0, 1);
1297
1298         return;
1299 }
1300
1301 int
1302 mxlnd_close_matching_conns(lnet_nid_t nid)
1303 {
1304         int             i       = 0;
1305         int             ret     = 0;
1306         struct kmx_peer *peer   = NULL;
1307
1308         read_lock(&kmxlnd_data.kmx_global_lock);
1309         if (nid != LNET_NID_ANY) {
1310                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1311                 if (peer == NULL) {
1312                         ret = -ENOENT;
1313                 } else {
1314                         mxlnd_close_matching_conns_locked(peer);
1315                         mxlnd_peer_decref(peer); /* and drops it here */
1316                 }
1317         } else { /* LNET_NID_ANY */
1318                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1319                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1320                                 mxlnd_close_matching_conns_locked(peer);
1321                 }
1322         }
1323         read_unlock(&kmxlnd_data.kmx_global_lock);
1324
1325         return ret;
1326 }
1327
1328 /**
1329  * mxlnd_ctl - modify MXLND parameters
1330  * @ni - LNET interface handle
1331  * @cmd - command to change
1332  * @arg - the ioctl data
1333  *
1334  * Not implemented yet.
1335  */
1336 int
1337 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1338 {
1339         struct libcfs_ioctl_data *data  = arg;
1340         int                       ret   = -EINVAL;
1341
1342         LASSERT (ni == kmxlnd_data.kmx_ni);
1343
1344         switch (cmd) {
1345         case IOC_LIBCFS_GET_PEER: {
1346                 lnet_nid_t      nid     = 0;
1347                 int             count   = 0;
1348
1349                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1350                 data->ioc_nid    = nid;
1351                 data->ioc_count  = count;
1352                 break;
1353         }
1354         case IOC_LIBCFS_DEL_PEER: {
1355                 ret = mxlnd_del_peer(data->ioc_nid);
1356                 break;
1357         }
1358         case IOC_LIBCFS_GET_CONN: {
1359                 struct kmx_conn *conn = NULL;
1360
1361                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1362                 if (conn == NULL) {
1363                         ret = -ENOENT;
1364                 } else {
1365                         ret = 0;
1366                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1367                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1368                 }
1369                 break;
1370         }
1371         case IOC_LIBCFS_CLOSE_CONNECTION: {
1372                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1373                 break;
1374         }
1375         default:
1376                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1377                 break;
1378         }
1379
1380         return ret;
1381 }
1382
1383 /**
1384  * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1385  * @tx
1386  *
1387  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1388  */
1389 void
1390 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1391 {
1392         u8                      msg_type        = tx->mxc_msg_type;
1393         //struct kmx_peer         *peer           = tx->mxc_peer;
1394         struct kmx_conn         *conn           = tx->mxc_conn;
1395
1396         LASSERT (msg_type != 0);
1397         LASSERT (tx->mxc_nid != 0);
1398         LASSERT (tx->mxc_peer != NULL);
1399         LASSERT (tx->mxc_conn != NULL);
1400
1401         tx->mxc_incarnation = conn->mxk_incarnation;
1402
1403         if (msg_type != MXLND_MSG_PUT_DATA &&
1404             msg_type != MXLND_MSG_GET_DATA) {
1405                 /* msg style tx */
1406                 if (mxlnd_tx_requires_credit(tx)) {
1407                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1408                         conn->mxk_ntx_msgs++;
1409                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1410                            msg_type == MXLND_MSG_CONN_ACK) {
1411                         /* put conn msgs at the front of the queue */
1412                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1413                 } else {
1414                         /* PUT_ACK, PUT_NAK */
1415                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1416                         conn->mxk_ntx_msgs++;
1417                 }
1418         } else {
1419                 /* data style tx */
1420                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1421                 conn->mxk_ntx_data++;
1422         }
1423
1424         return;
1425 }
1426
1427 /**
1428  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1429  * @tx
1430  *
1431  * Add the tx to the peer's msg or data queue
1432  */
1433 static inline void
1434 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1435 {
1436         LASSERT(tx->mxc_peer != NULL);
1437         LASSERT(tx->mxc_conn != NULL);
1438         spin_lock(&tx->mxc_conn->mxk_lock);
1439         mxlnd_peer_queue_tx_locked(tx);
1440         spin_unlock(&tx->mxc_conn->mxk_lock);
1441
1442         return;
1443 }
1444
1445 /**
1446  * mxlnd_queue_tx - add the tx to the global tx queue
1447  * @tx
1448  *
1449  * Add the tx to the global queue and up the tx_queue_sem
1450  */
1451 void
1452 mxlnd_queue_tx(struct kmx_ctx *tx)
1453 {
1454         struct kmx_peer *peer   = tx->mxc_peer;
1455         LASSERT (tx->mxc_nid != 0);
1456
1457         if (peer != NULL) {
1458                 if (peer->mxp_incompatible &&
1459                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1460                         /* let this fail now */
1461                         tx->mxc_status.code = -ECONNABORTED;
1462                         mxlnd_conn_decref(peer->mxp_conn);
1463                         mxlnd_put_idle_tx(tx);
1464                         return;
1465                 }
1466                 if (tx->mxc_conn == NULL) {
1467                         int             ret     = 0;
1468                         struct kmx_conn *conn   = NULL;
1469
1470                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1471                         if (ret != 0) {
1472                                 tx->mxc_status.code = ret;
1473                                 mxlnd_put_idle_tx(tx);
1474                                 goto done;
1475                         }
1476                         tx->mxc_conn = conn;
1477                         mxlnd_peer_decref(peer); /* and takes it from peer */
1478                 }
1479                 LASSERT(tx->mxc_conn != NULL);
1480                 mxlnd_peer_queue_tx(tx);
1481                 mxlnd_check_sends(peer);
1482         } else {
1483                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1484                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1485                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1486                 up(&kmxlnd_data.kmx_tx_queue_sem);
1487         }
1488 done:
1489         return;
1490 }
1491
1492 int
1493 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1494 {
1495         int             i                       = 0;
1496         int             sum                     = 0;
1497         int             old_sum                 = 0;
1498         int             nseg                    = 0;
1499         int             first_iov               = -1;
1500         int             first_iov_offset        = 0;
1501         int             first_found             = 0;
1502         int             last_iov                = -1;
1503         int             last_iov_length         = 0;
1504         mx_ksegment_t  *seg                     = NULL;
1505
1506         if (niov == 0) return 0;
1507         LASSERT(iov != NULL);
1508
1509         for (i = 0; i < niov; i++) {
1510                 sum = old_sum + (u32) iov[i].iov_len;
1511                 if (!first_found && (sum > offset)) {
1512                         first_iov = i;
1513                         first_iov_offset = offset - old_sum;
1514                         first_found = 1;
1515                         sum = (u32) iov[i].iov_len - first_iov_offset;
1516                         old_sum = 0;
1517                 }
1518                 if (sum >= nob) {
1519                         last_iov = i;
1520                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1521                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1522                         break;
1523                 }
1524                 old_sum = sum;
1525         }
1526         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1527         nseg = last_iov - first_iov + 1;
1528         LASSERT(nseg > 0);
1529
1530         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1531         if (seg == NULL) {
1532                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1533                 return -1;
1534         }
1535         memset(seg, 0, nseg * sizeof(*seg));
1536         ctx->mxc_nseg = nseg;
1537         sum = 0;
1538         for (i = 0; i < nseg; i++) {
1539                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1540                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1541                 if (i == 0) {
1542                         seg[i].segment_ptr += (u64) first_iov_offset;
1543                         seg[i].segment_length -= (u32) first_iov_offset;
1544                 }
1545                 if (i == (nseg - 1)) {
1546                         seg[i].segment_length = (u32) last_iov_length;
1547                 }
1548                 sum += seg[i].segment_length;
1549         }
1550         ctx->mxc_seg_list = seg;
1551         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1552 #ifdef MX_PIN_FULLPAGES
1553         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1554 #endif
1555         LASSERT(nob == sum);
1556         return 0;
1557 }
1558
1559 int
1560 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1561 {
1562         int             i                       = 0;
1563         int             sum                     = 0;
1564         int             old_sum                 = 0;
1565         int             nseg                    = 0;
1566         int             first_kiov              = -1;
1567         int             first_kiov_offset       = 0;
1568         int             first_found             = 0;
1569         int             last_kiov               = -1;
1570         int             last_kiov_length        = 0;
1571         mx_ksegment_t  *seg                     = NULL;
1572
1573         if (niov == 0) return 0;
1574         LASSERT(kiov != NULL);
1575
1576         for (i = 0; i < niov; i++) {
1577                 sum = old_sum + kiov[i].kiov_len;
1578                 if (i == 0) sum -= kiov[i].kiov_offset;
1579                 if (!first_found && (sum > offset)) {
1580                         first_kiov = i;
1581                         first_kiov_offset = offset - old_sum;
1582                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1583                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1584                         first_found = 1;
1585                         sum = kiov[i].kiov_len - first_kiov_offset;
1586                         old_sum = 0;
1587                 }
1588                 if (sum >= nob) {
1589                         last_kiov = i;
1590                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1591                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1592                         break;
1593                 }
1594                 old_sum = sum;
1595         }
1596         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1597         nseg = last_kiov - first_kiov + 1;
1598         LASSERT(nseg > 0);
1599
1600         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1601         if (seg == NULL) {
1602                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1603                 return -1;
1604         }
1605         memset(seg, 0, niov * sizeof(*seg));
1606         ctx->mxc_nseg = niov;
1607         sum = 0;
1608         for (i = 0; i < niov; i++) {
1609                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1610                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1611                 if (i == 0) {
1612                         seg[i].segment_ptr += (u64) first_kiov_offset;
1613                         /* we have to add back the original kiov_offset */
1614                         seg[i].segment_length -= first_kiov_offset +
1615                                                  kiov[first_kiov].kiov_offset;
1616                 }
1617                 if (i == (nseg - 1)) {
1618                         seg[i].segment_length = last_kiov_length;
1619                 }
1620                 sum += seg[i].segment_length;
1621         }
1622         ctx->mxc_seg_list = seg;
1623         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1624 #ifdef MX_PIN_FULLPAGES
1625         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1626 #endif
1627         LASSERT(nob == sum);
1628         return 0;
1629 }
1630
1631 void
1632 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1633 {
1634         LASSERT(type == MXLND_MSG_PUT_ACK);
1635         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1636         tx->mxc_cookie = cookie;
1637         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1638         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1639         tx->mxc_match = mxlnd_create_match(tx, status);
1640
1641         mxlnd_queue_tx(tx);
1642 }
1643
1644
1645 /**
1646  * mxlnd_send_data - get tx, map [k]iov, queue tx
1647  * @ni
1648  * @lntmsg
1649  * @peer
1650  * @msg_type
1651  * @cookie
1652  *
1653  * This setups the DATA send for PUT or GET.
1654  *
1655  * On success, it queues the tx, on failure it calls lnet_finalize()
1656  */
1657 void
1658 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1659 {
1660         int                     ret             = 0;
1661         lnet_process_id_t       target          = lntmsg->msg_target;
1662         unsigned int            niov            = lntmsg->msg_niov;
1663         struct iovec           *iov             = lntmsg->msg_iov;
1664         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1665         unsigned int            offset          = lntmsg->msg_offset;
1666         unsigned int            nob             = lntmsg->msg_len;
1667         struct kmx_ctx         *tx              = NULL;
1668
1669         LASSERT(lntmsg != NULL);
1670         LASSERT(peer != NULL);
1671         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1672         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1673
1674         tx = mxlnd_get_idle_tx();
1675         if (tx == NULL) {
1676                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1677                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1678                         libcfs_nid2str(target.nid));
1679                 goto failed_0;
1680         }
1681         tx->mxc_nid = target.nid;
1682         /* NOTE called when we have a ref on the conn, get one for this tx */
1683         mxlnd_conn_addref(peer->mxp_conn);
1684         tx->mxc_peer = peer;
1685         tx->mxc_conn = peer->mxp_conn;
1686         tx->mxc_msg_type = msg_type;
1687         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1688         tx->mxc_state = MXLND_CTX_PENDING;
1689         tx->mxc_lntmsg[0] = lntmsg;
1690         tx->mxc_cookie = cookie;
1691         tx->mxc_match = mxlnd_create_match(tx, 0);
1692
1693         /* This setups up the mx_ksegment_t to send the DATA payload  */
1694         if (nob == 0) {
1695                 /* do not setup the segments */
1696                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1697                                    "to %s?\n", libcfs_nid2str(target.nid));
1698                 ret = 0;
1699         } else if (kiov == NULL) {
1700                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1701         } else {
1702                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1703         }
1704         if (ret != 0) {
1705                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1706                                    libcfs_nid2str(target.nid));
1707                 tx->mxc_status.code = -EIO;
1708                 goto failed_1;
1709         }
1710         mxlnd_queue_tx(tx);
1711         return;
1712
1713 failed_1:
1714         mxlnd_conn_decref(peer->mxp_conn);
1715         mxlnd_put_idle_tx(tx);
1716         return;
1717
1718 failed_0:
1719         CDEBUG(D_NETERROR, "no tx avail\n");
1720         lnet_finalize(ni, lntmsg, -EIO);
1721         return;
1722 }
1723
1724 /**
1725  * mxlnd_recv_data - map [k]iov, post rx
1726  * @ni
1727  * @lntmsg
1728  * @rx
1729  * @msg_type
1730  * @cookie
1731  *
1732  * This setups the DATA receive for PUT or GET.
1733  *
1734  * On success, it returns 0, on failure it returns -1
1735  */
1736 int
1737 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1738 {
1739         int                     ret     = 0;
1740         lnet_process_id_t       target  = lntmsg->msg_target;
1741         unsigned int            niov    = lntmsg->msg_niov;
1742         struct iovec           *iov     = lntmsg->msg_iov;
1743         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1744         unsigned int            offset  = lntmsg->msg_offset;
1745         unsigned int            nob     = lntmsg->msg_len;
1746         mx_return_t             mxret   = MX_SUCCESS;
1747         u64                     mask    = ~(MXLND_ERROR_MASK);
1748
1749         /* above assumes MXLND_MSG_PUT_DATA */
1750         if (msg_type == MXLND_MSG_GET_DATA) {
1751                 niov = lntmsg->msg_md->md_niov;
1752                 iov = lntmsg->msg_md->md_iov.iov;
1753                 kiov = lntmsg->msg_md->md_iov.kiov;
1754                 offset = 0;
1755                 nob = lntmsg->msg_md->md_length;
1756         }
1757
1758         LASSERT(lntmsg != NULL);
1759         LASSERT(rx != NULL);
1760         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1761         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
1762
1763         rx->mxc_msg_type = msg_type;
1764         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1765         rx->mxc_state = MXLND_CTX_PENDING;
1766         rx->mxc_nid = target.nid;
1767         /* if posting a GET_DATA, we may not yet know the peer */
1768         if (rx->mxc_peer != NULL) {
1769                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1770         }
1771         rx->mxc_lntmsg[0] = lntmsg;
1772         rx->mxc_cookie = cookie;
1773         rx->mxc_match = mxlnd_create_match(rx, 0);
1774         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1775         if (kiov == NULL) {
1776                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1777         } else {
1778                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1779         }
1780         if (msg_type == MXLND_MSG_GET_DATA) {
1781                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1782                 if (rx->mxc_lntmsg[1] == NULL) {
1783                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1784                                            libcfs_nid2str(target.nid));
1785                         ret = -1;
1786                 }
1787         }
1788         if (ret != 0) {
1789                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1790                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1791                        libcfs_nid2str(target.nid));
1792                 return -1;
1793         }
1794         ret = mxlnd_q_pending_ctx(rx);
1795         if (ret == -1) {
1796                 return -1;
1797         }
1798         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1799         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1800                           rx->mxc_seg_list, rx->mxc_nseg,
1801                           rx->mxc_pin_type, rx->mxc_match,
1802                           mask, (void *) rx,
1803                           &rx->mxc_mxreq);
1804         if (mxret != MX_SUCCESS) {
1805                 if (rx->mxc_conn != NULL) {
1806                         mxlnd_deq_pending_ctx(rx);
1807                 }
1808                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1809                                    (int) mxret, libcfs_nid2str(target.nid));
1810                 return -1;
1811         }
1812
1813         return 0;
1814 }
1815
1816 /**
1817  * mxlnd_send - the LND required send function
1818  * @ni
1819  * @private
1820  * @lntmsg
1821  *
1822  * This must not block. Since we may not have a peer struct for the receiver,
1823  * it will append send messages on a global tx list. We will then up the
1824  * tx_queued's semaphore to notify it of the new send. 
1825  */
1826 int
1827 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1828 {
1829         int                     ret             = 0;
1830         int                     type            = lntmsg->msg_type;
1831         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1832         lnet_process_id_t       target          = lntmsg->msg_target;
1833         lnet_nid_t              nid             = target.nid;
1834         int                     target_is_router = lntmsg->msg_target_is_router;
1835         int                     routing         = lntmsg->msg_routing;
1836         unsigned int            payload_niov    = lntmsg->msg_niov;
1837         struct iovec           *payload_iov     = lntmsg->msg_iov;
1838         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1839         unsigned int            payload_offset  = lntmsg->msg_offset;
1840         unsigned int            payload_nob     = lntmsg->msg_len;
1841         struct kmx_ctx         *tx              = NULL;
1842         struct kmx_msg         *txmsg           = NULL;
1843         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1844         struct kmx_ctx         *rx_data         = NULL;
1845         struct kmx_conn        *conn            = NULL;
1846         int                     nob             = 0;
1847         uint32_t                length          = 0;
1848         struct kmx_peer         *peer           = NULL;
1849
1850         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1851                        payload_nob, payload_niov, libcfs_id2str(target));
1852
1853         LASSERT (payload_nob == 0 || payload_niov > 0);
1854         LASSERT (payload_niov <= LNET_MAX_IOV);
1855         /* payload is either all vaddrs or all pages */
1856         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1857
1858         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1859
1860         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1861          * to a new peer, use the nid */
1862         peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1863         if (peer != NULL) {
1864                 if (unlikely(peer->mxp_incompatible)) {
1865                         mxlnd_peer_decref(peer); /* drop ref taken above */
1866                 } else {
1867                         read_lock(&kmxlnd_data.kmx_global_lock);
1868                         conn = peer->mxp_conn;
1869                         if (conn) {
1870                                 mxlnd_conn_addref(conn);
1871                                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1872                         }
1873                         read_unlock(&kmxlnd_data.kmx_global_lock);
1874                 }
1875         }
1876         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
1877         if (conn == NULL && peer != NULL) {
1878                 CDEBUG(D_NET, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1879                        peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1880         }
1881
1882         switch (type) {
1883         case LNET_MSG_ACK:
1884                 LASSERT (payload_nob == 0);
1885                 break;
1886
1887         case LNET_MSG_REPLY:
1888         case LNET_MSG_PUT:
1889                 /* Is the payload small enough not to need DATA? */
1890                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1891                 if (nob <= MXLND_EAGER_SIZE)
1892                         break;                  /* send EAGER */
1893
1894                 tx = mxlnd_get_idle_tx();
1895                 if (unlikely(tx == NULL)) {
1896                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1897                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1898                                libcfs_nid2str(nid));
1899                         if (conn) mxlnd_conn_decref(conn);
1900                         return -ENOMEM;
1901                 }
1902
1903                 /* the peer may be NULL */
1904                 tx->mxc_peer = peer;
1905                 tx->mxc_conn = conn; /* may be NULL */
1906                 /* we added a conn ref above */
1907                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1908                 txmsg = tx->mxc_msg;
1909                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1910                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1911                 tx->mxc_match = mxlnd_create_match(tx, 0);
1912
1913                 /* we must post a receive _before_ sending the request.
1914                  * we need to determine how much to receive, it will be either
1915                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1916
1917                 rx = mxlnd_get_idle_rx();
1918                 if (unlikely(rx == NULL)) {
1919                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1920                                            libcfs_nid2str(nid));
1921                         mxlnd_put_idle_tx(tx);
1922                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1923                         return -ENOMEM;
1924                 }
1925                 rx->mxc_nid = nid;
1926                 rx->mxc_peer = peer;
1927                 /* conn may be NULL but unlikely since the first msg is always small */
1928                 /* NOTE no need to lock peer before adding conn ref since we took
1929                  * a conn ref for the tx (it cannot be freed between there and here ) */
1930                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1931                 rx->mxc_conn = conn;
1932                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1933                 rx->mxc_cookie = tx->mxc_cookie;
1934                 rx->mxc_match = mxlnd_create_match(rx, 0);
1935
1936                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1937                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1938                 if (unlikely(ret != 0)) {
1939                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1940                                            libcfs_nid2str(nid));
1941                         rx->mxc_lntmsg[0] = NULL;
1942                         mxlnd_put_idle_rx(rx);
1943                         mxlnd_put_idle_tx(tx);
1944                         if (conn) {
1945                                 mxlnd_conn_decref(conn); /* for the rx... */
1946                                 mxlnd_conn_decref(conn); /* and for the tx */
1947                         }
1948                         return -EHOSTUNREACH;
1949                 }
1950
1951                 mxlnd_queue_tx(tx);
1952                 return 0;
1953
1954         case LNET_MSG_GET:
1955                 if (routing || target_is_router)
1956                         break;                  /* send EAGER */
1957
1958                 /* is the REPLY message too small for DATA? */
1959                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1960                 if (nob <= MXLND_EAGER_SIZE)
1961                         break;                  /* send EAGER */
1962
1963                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1964                  * then post GET_REQ tx */
1965                 tx = mxlnd_get_idle_tx();
1966                 if (unlikely(tx == NULL)) {
1967                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1968                                            libcfs_nid2str(nid));
1969                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1970                         return -ENOMEM;
1971                 }
1972                 rx_data = mxlnd_get_idle_rx();
1973                 if (unlikely(rx_data == NULL)) {
1974                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1975                                            libcfs_nid2str(nid));
1976                         mxlnd_put_idle_tx(tx);
1977                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1978                         return -ENOMEM;
1979                 }
1980                 rx_data->mxc_peer = peer;
1981                 /* NOTE no need to lock peer before adding conn ref since we took
1982                  * a conn ref for the tx (it cannot be freed between there and here ) */
1983                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1984                 rx_data->mxc_conn = conn; /* may be NULL */
1985
1986                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1987                 if (unlikely(ret != 0)) {
1988                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1989                                            libcfs_nid2str(nid));
1990                         mxlnd_put_idle_rx(rx_data);
1991                         mxlnd_put_idle_tx(tx);
1992                         if (conn) {
1993                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1994                                 mxlnd_conn_decref(conn); /* and for the tx */
1995                         }
1996                         return -EIO;
1997                 }
1998
1999                 tx->mxc_peer = peer;
2000                 tx->mxc_conn = conn; /* may be NULL */
2001                 /* conn ref taken above */
2002                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2003                 txmsg = tx->mxc_msg;
2004                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2005                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2006                 tx->mxc_match = mxlnd_create_match(tx, 0);
2007
2008                 mxlnd_queue_tx(tx);
2009                 return 0;
2010
2011         default:
2012                 LBUG();
2013                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2014                 return -EIO;
2015         }
2016
2017         /* send EAGER */
2018
2019         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2020                 <= MXLND_EAGER_SIZE);
2021
2022         tx = mxlnd_get_idle_tx();
2023         if (unlikely(tx == NULL)) {
2024                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2025                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2026                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
2027                 return -ENOMEM;
2028         }
2029
2030         tx->mxc_peer = peer;
2031         tx->mxc_conn = conn; /* may be NULL */
2032         /* conn ref taken above */
2033         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2034         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2035         tx->mxc_match = mxlnd_create_match(tx, 0);
2036
2037         txmsg = tx->mxc_msg;
2038         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2039
2040         if (payload_kiov != NULL)
2041                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
2042                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2043                             payload_niov, payload_kiov, payload_offset, payload_nob);
2044         else
2045                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
2046                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2047                             payload_niov, payload_iov, payload_offset, payload_nob);
2048
2049         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2050         mxlnd_queue_tx(tx);
2051         return 0;
2052 }
2053
2054 /**
2055  * mxlnd_recv - the LND required recv function
2056  * @ni
2057  * @private
2058  * @lntmsg
2059  * @delayed
2060  * @niov
2061  * @kiov
2062  * @offset
2063  * @mlen
2064  * @rlen
2065  *
2066  * This must not block.
2067  */
2068 int
2069 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2070              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2071              unsigned int offset, unsigned int mlen, unsigned int rlen)
2072 {
2073         int                     ret             = 0;
2074         int                     nob             = 0;
2075         int                     len             = 0;
2076         struct kmx_ctx          *rx             = private;
2077         struct kmx_msg          *rxmsg          = rx->mxc_msg;
2078         lnet_nid_t               nid            = rx->mxc_nid;
2079         struct kmx_ctx          *tx             = NULL;
2080         struct kmx_msg          *txmsg          = NULL;
2081         struct kmx_peer         *peer           = rx->mxc_peer;
2082         struct kmx_conn         *conn           = peer->mxp_conn;
2083         u64                      cookie         = 0ULL;
2084         int                      msg_type       = rxmsg->mxm_type;
2085         int                      repost         = 1;
2086         int                      credit         = 0;
2087         int                      finalize       = 0;
2088
2089         LASSERT (mlen <= rlen);
2090         /* Either all pages or all vaddrs */
2091         LASSERT (!(kiov != NULL && iov != NULL));
2092         LASSERT (peer != NULL);
2093
2094         /* conn_addref(conn) already taken for the primary rx */
2095
2096         switch (msg_type) {
2097         case MXLND_MSG_EAGER:
2098                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2099                 len = rx->mxc_status.xfer_length;
2100                 if (unlikely(nob > len)) {
2101                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2102                                            libcfs_nid2str(nid), nob, len);
2103                         ret = -EPROTO;
2104                         break;
2105                 }
2106
2107                 if (kiov != NULL)
2108                         lnet_copy_flat2kiov(niov, kiov, offset,
2109                                 MXLND_EAGER_SIZE, rxmsg,
2110                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2111                                 mlen);
2112                 else
2113                         lnet_copy_flat2iov(niov, iov, offset,
2114                                 MXLND_EAGER_SIZE, rxmsg,
2115                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2116                                 mlen);
2117                 finalize = 1;
2118                 credit = 1;
2119                 break;
2120
2121         case MXLND_MSG_PUT_REQ:
2122                 /* we are going to reuse the rx, store the needed info */
2123                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2124
2125                 /* get tx, post rx, send PUT_ACK */
2126
2127                 tx = mxlnd_get_idle_tx();
2128                 if (unlikely(tx == NULL)) {
2129                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2130                         /* Not replying will break the connection */
2131                         ret = -ENOMEM;
2132                         break;
2133                 }
2134                 if (unlikely(mlen == 0)) {
2135                         finalize = 1;
2136                         tx->mxc_peer = peer;
2137                         tx->mxc_conn = conn;
2138                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2139                         /* repost = 1 */
2140                         break;
2141                 }
2142
2143                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2144                 tx->mxc_peer = peer;
2145                 tx->mxc_conn = conn;
2146                 /* no need to lock peer first since we already have a ref */
2147                 mxlnd_conn_addref(conn); /* for the tx */
2148                 txmsg = tx->mxc_msg;
2149                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2150                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2151                 tx->mxc_cookie = cookie;
2152                 tx->mxc_match = mxlnd_create_match(tx, 0);
2153
2154                 /* we must post a receive _before_ sending the PUT_ACK */
2155                 mxlnd_ctx_init(rx);
2156                 rx->mxc_state = MXLND_CTX_PREP;
2157                 rx->mxc_peer = peer;
2158                 rx->mxc_conn = conn;
2159                 /* do not take another ref for this rx, it is already taken */
2160                 rx->mxc_nid = peer->mxp_nid;
2161                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2162                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2163
2164                 if (unlikely(ret != 0)) {
2165                         /* Notify peer that it's over */
2166                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2167                                            libcfs_nid2str(nid), ret);
2168                         mxlnd_ctx_init(tx);
2169                         tx->mxc_state = MXLND_CTX_PREP;
2170                         tx->mxc_peer = peer;
2171                         tx->mxc_conn = conn;
2172                         /* finalize = 0, let the PUT_ACK tx finalize this */
2173                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2174                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2175                         /* conn ref already taken above */
2176                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2177                         /* repost = 1 */
2178                         break;
2179                 }
2180
2181                 mxlnd_queue_tx(tx);
2182                 /* do not return a credit until after PUT_DATA returns */
2183                 repost = 0;
2184                 break;
2185
2186         case MXLND_MSG_GET_REQ:
2187                 if (likely(lntmsg != NULL)) {
2188                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2189                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2190                 } else {
2191                         /* GET didn't match anything */
2192                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2193                          * We have to embed the error code in the match bits.
2194                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2195                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2196
2197                         tx = mxlnd_get_idle_tx();
2198                         if (unlikely(tx == NULL)) {
2199                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2200                                                    libcfs_nid2str(nid));
2201                                 ret = -ENOMEM;
2202                                 break;
2203                         }
2204                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2205                         tx->mxc_state = MXLND_CTX_PENDING;
2206                         tx->mxc_nid = nid;
2207                         tx->mxc_peer = peer;
2208                         tx->mxc_conn = conn;
2209                         /* no need to lock peer first since we already have a ref */
2210                         mxlnd_conn_addref(conn); /* for this tx */
2211                         tx->mxc_cookie = cookie;
2212                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2213                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2214                         mxlnd_queue_tx(tx);
2215                 }
2216                 /* finalize lntmsg after tx completes */
2217                 break;
2218
2219         default:
2220                 LBUG();
2221         }
2222
2223         if (repost) {
2224                 /* we received a message, increment peer's outstanding credits */
2225                 if (credit == 1) {
2226                         spin_lock(&conn->mxk_lock);
2227                         conn->mxk_outstanding++;
2228                         spin_unlock(&conn->mxk_lock);
2229                 }
2230                 /* we are done with the rx */
2231                 mxlnd_put_idle_rx(rx);
2232                 mxlnd_conn_decref(conn);
2233         }
2234
2235         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2236
2237         /* we received a credit, see if we can use it to send a msg */
2238         if (credit) mxlnd_check_sends(peer);
2239
2240         return ret;
2241 }
2242
2243 void
2244 mxlnd_sleep(unsigned long timeout)
2245 {
2246         set_current_state(TASK_INTERRUPTIBLE);
2247         schedule_timeout(timeout);
2248         return;
2249 }
2250
2251 /**
2252  * mxlnd_tx_queued - the generic send queue thread
2253  * @arg - thread id (as a void *)
2254  *
2255  * This thread moves send messages from the global tx_queue to the owning
2256  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2257  * it to the global peer list.
2258  */
2259 int
2260 mxlnd_tx_queued(void *arg)
2261 {
2262         long                    id      = (long) arg;
2263         int                     ret     = 0;
2264         int                     found   = 0;
2265         struct kmx_ctx         *tx      = NULL;
2266         struct kmx_peer        *peer    = NULL;
2267         struct list_head       *tmp_tx  = NULL;
2268
2269         cfs_daemonize("mxlnd_tx_queued");
2270         //cfs_block_allsigs();
2271
2272         while (!kmxlnd_data.kmx_shutdown) {
2273                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2274                 if (kmxlnd_data.kmx_shutdown)
2275                         break;
2276                 if (ret != 0) // Should we check for -EINTR?
2277                         continue;
2278                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2279                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2280                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2281                         continue;
2282                 }
2283                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2284                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2285                 list_del_init(&tx->mxc_list);
2286                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2287
2288                 found = 0;
2289                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2290                 if (peer != NULL) {
2291                         tx->mxc_peer = peer;
2292                         write_lock(&kmxlnd_data.kmx_global_lock);
2293                         if (peer->mxp_conn == NULL) {
2294                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2295                                 if (ret != 0) {
2296                                         /* out of memory, give up and fail tx */
2297                                         tx->mxc_status.code = -ENOMEM;
2298                                         write_unlock(&kmxlnd_data.kmx_global_lock);
2299                                         mxlnd_peer_decref(peer);
2300                                         mxlnd_put_idle_tx(tx);
2301                                         continue;
2302                                 }
2303                         }
2304                         tx->mxc_conn = peer->mxp_conn;
2305                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2306                         write_unlock(&kmxlnd_data.kmx_global_lock);
2307                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2308                         mxlnd_queue_tx(tx);
2309                         found = 1;
2310                 }
2311                 if (found == 0) {
2312                         int              hash   = 0;
2313                         struct kmx_peer *peer = NULL;
2314                         struct kmx_peer *old = NULL;
2315
2316                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2317
2318                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2319                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2320                         /* create peer */
2321                         /* adds conn ref for this function */
2322                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2323                                         *kmxlnd_tunables.kmx_board,
2324                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2325                         if (ret != 0) {
2326                                 /* finalize message */
2327                                 tx->mxc_status.code = ret;
2328                                 mxlnd_put_idle_tx(tx);
2329                                 continue;
2330                         }
2331                         tx->mxc_peer = peer;
2332                         tx->mxc_conn = peer->mxp_conn;
2333                         /* this tx will keep the conn ref taken in peer_alloc() */
2334
2335                         /* add peer to global peer list, but look to see
2336                          * if someone already created it after we released
2337                          * the read lock */
2338                         write_lock(&kmxlnd_data.kmx_global_lock);
2339                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2340                                 if (old->mxp_nid == peer->mxp_nid) {
2341                                         /* somebody beat us here, we created a duplicate */
2342                                         found = 1;
2343                                         break;
2344                                 }
2345                         }
2346
2347                         if (found == 0) {
2348                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2349                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2350                         } else {
2351                                 tx->mxc_peer = old;
2352                                 tx->mxc_conn = old->mxp_conn;
2353                                 /* FIXME can conn be NULL? */
2354                                 LASSERT(old->mxp_conn != NULL);
2355                                 mxlnd_conn_addref(old->mxp_conn);
2356                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2357                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2358                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2359                                 mxlnd_peer_decref(peer);
2360                         }
2361                         write_unlock(&kmxlnd_data.kmx_global_lock);
2362
2363                         mxlnd_queue_tx(tx);
2364                 }
2365         }
2366         mxlnd_thread_stop(id);
2367         return 0;
2368 }
2369
2370 /* When calling this, we must not have the peer lock. */
2371 void
2372 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2373 {
2374         mx_return_t     mxret           = MX_SUCCESS;
2375         mx_request_t    request;
2376         struct kmx_conn *conn           = peer->mxp_conn;
2377         u8              msg_type        = (u8) MXLND_MSG_TYPE(mask);
2378
2379         /* NOTE we are holding a conn ref every time we call this function,
2380          * we do not need to lock the peer before taking another ref */
2381         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2382
2383         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2384
2385         if (peer->mxp_reconnect_time == 0) {
2386                 peer->mxp_reconnect_time = jiffies;
2387         }
2388
2389         if (peer->mxp_nic_id == 0ULL) {
2390                 int             ret     = 0;
2391
2392                 ret = mxlnd_ip2nic_id(peer->mxp_ip, &peer->mxp_nic_id);
2393                 if (ret == 0) {
2394                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2395                 }
2396                 if (peer->mxp_nic_id == 0ULL) {
2397                         /* not mapped yet, return */
2398                         spin_lock(&conn->mxk_lock);
2399                         conn->mxk_status = MXLND_CONN_INIT;
2400                         spin_unlock(&conn->mxk_lock);
2401                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2402                                 /* give up and notify LNET */
2403                                 mxlnd_conn_disconnect(conn, 0, 0);
2404                                 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2405                                                                             function... */
2406                                 mxlnd_conn_decref(peer->mxp_conn); /* which we no
2407                                                                       longer need */
2408                         }
2409                         mxlnd_conn_decref(conn);
2410                         return;
2411                 }
2412         }
2413
2414         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2415                             peer->mxp_ep_id, MXLND_MSG_MAGIC, mask,
2416                             (void *) peer, &request);
2417         if (unlikely(mxret != MX_SUCCESS)) {
2418                 spin_lock(&conn->mxk_lock);
2419                 conn->mxk_status = MXLND_CONN_FAIL;
2420                 spin_unlock(&conn->mxk_lock);
2421                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2422                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2423                 mxlnd_conn_decref(conn);
2424         }
2425         return;
2426 }
2427
2428 #define MXLND_STATS 0
2429
2430 int
2431 mxlnd_check_sends(struct kmx_peer *peer)
2432 {
2433         int                     ret             = 0;
2434         int                     found           = 0;
2435         mx_return_t             mxret           = MX_SUCCESS;
2436         struct kmx_ctx          *tx             = NULL;
2437         struct kmx_conn         *conn           = NULL;
2438         u8                      msg_type        = 0;
2439         int                     credit          = 0;
2440         int                     status          = 0;
2441         int                     ntx_posted      = 0;
2442         int                     credits         = 0;
2443 #if MXLND_STATS
2444         static unsigned long    last            = 0;
2445 #endif
2446
2447         if (unlikely(peer == NULL)) {
2448                 LASSERT(peer != NULL);
2449                 return -1;
2450         }
2451         write_lock(&kmxlnd_data.kmx_global_lock);
2452         conn = peer->mxp_conn;
2453         /* NOTE take a ref for the duration of this function since it is called
2454          * when there might not be any queued txs for this peer */
2455         if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2456         write_unlock(&kmxlnd_data.kmx_global_lock);
2457
2458         /* do not add another ref for this tx */
2459
2460         if (conn == NULL) {
2461                 /* we do not have any conns */
2462                 return -1;
2463         }
2464
2465 #if MXLND_STATS
2466         if (time_after(jiffies, last)) {
2467                 last = jiffies + HZ;
2468                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2469                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2470                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2471                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2472                               conn->mxk_ntx_data, conn->mxk_data_posted);
2473         }
2474 #endif
2475
2476         /* cache peer state for asserts */
2477         spin_lock(&conn->mxk_lock);
2478         ntx_posted = conn->mxk_ntx_posted;
2479         credits = conn->mxk_credits;
2480         spin_unlock(&conn->mxk_lock);
2481
2482         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2483         LASSERT(ntx_posted >= 0);
2484
2485         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2486         LASSERT(credits >= 0);
2487
2488         /* check number of queued msgs, ignore data */
2489         spin_lock(&conn->mxk_lock);
2490         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2491                 /* check if any txs queued that could return credits... */
2492                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2493                         /* if not, send a NOOP */
2494                         tx = mxlnd_get_idle_tx();
2495                         if (likely(tx != NULL)) {
2496                                 tx->mxc_peer = peer;
2497                                 tx->mxc_conn = peer->mxp_conn;
2498                                 mxlnd_conn_addref(conn); /* for this tx */
2499                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2500                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2501                                 mxlnd_peer_queue_tx_locked(tx);
2502                                 found = 1;
2503                                 goto done_locked;
2504                         }
2505                 }
2506         }
2507         spin_unlock(&conn->mxk_lock);
2508
2509         /* if the peer is not ready, try to connect */
2510         spin_lock(&conn->mxk_lock);
2511         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2512             conn->mxk_status == MXLND_CONN_FAIL ||
2513             conn->mxk_status == MXLND_CONN_REQ)) {
2514                 u64 match = (u64) MXLND_MSG_ICON_REQ << MXLND_MSG_OFFSET;
2515                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2516                 conn->mxk_status = MXLND_CONN_WAIT;
2517                 spin_unlock(&conn->mxk_lock);
2518                 mxlnd_iconnect(peer, match);
2519                 goto done;
2520         }
2521         spin_unlock(&conn->mxk_lock);
2522
2523         spin_lock(&conn->mxk_lock);
2524         while (!list_empty(&conn->mxk_tx_free_queue) ||
2525                !list_empty(&conn->mxk_tx_credit_queue)) {
2526                 /* We have something to send. If we have a queued tx that does not
2527                  * require a credit (free), choose it since its completion will 
2528                  * return a credit (here or at the peer), complete a DATA or 
2529                  * CONN_REQ or CONN_ACK. */
2530                 struct list_head *tmp_tx = NULL;
2531                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2532                         tmp_tx = &conn->mxk_tx_free_queue;
2533                 } else {
2534                         tmp_tx = &conn->mxk_tx_credit_queue;
2535                 }
2536                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2537
2538                 msg_type = tx->mxc_msg_type;
2539
2540                 /* don't try to send a rx */
2541                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2542
2543                 /* ensure that it is a valid msg type */
2544                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2545                         msg_type == MXLND_MSG_CONN_ACK ||
2546                         msg_type == MXLND_MSG_NOOP     ||
2547                         msg_type == MXLND_MSG_EAGER    ||
2548                         msg_type == MXLND_MSG_PUT_REQ  ||
2549                         msg_type == MXLND_MSG_PUT_ACK  ||
2550                         msg_type == MXLND_MSG_PUT_DATA ||
2551                         msg_type == MXLND_MSG_GET_REQ  ||
2552                         msg_type == MXLND_MSG_GET_DATA);
2553                 LASSERT(tx->mxc_peer == peer);
2554                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2555
2556                 credit = mxlnd_tx_requires_credit(tx);
2557                 if (credit) {
2558
2559                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2560                                 CDEBUG(D_NET, "%s: posted enough\n",
2561                                               libcfs_nid2str(peer->mxp_nid));
2562                                 goto done_locked;
2563                         }
2564
2565                         if (conn->mxk_credits == 0) {
2566                                 CDEBUG(D_NET, "%s: no credits\n",
2567                                               libcfs_nid2str(peer->mxp_nid));
2568                                 goto done_locked;
2569                         }
2570
2571                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2572                             conn->mxk_outstanding == 0) {  /* giving back credits */
2573                                 CDEBUG(D_NET, "%s: not using last credit\n",
2574                                               libcfs_nid2str(peer->mxp_nid));
2575                                 goto done_locked;
2576                         }
2577                 }
2578
2579                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2580                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2581                                 msg_type == MXLND_MSG_CONN_ACK)) {
2582                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2583                                              mxlnd_connstatus_to_str(conn->mxk_status),
2584                                              tx->mxc_cookie,
2585                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2586                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2587                                         list_del_init(&tx->mxc_list);
2588                                         tx->mxc_status.code = -ECONNABORTED;
2589                                         mxlnd_put_idle_tx(tx);
2590                                         mxlnd_conn_decref(conn);
2591                                 }
2592                                 goto done_locked;
2593                         }
2594                 }
2595
2596                 list_del_init(&tx->mxc_list);
2597
2598                 /* handle credits, etc now while we have the lock to avoid races */
2599                 if (credit) {
2600                         conn->mxk_credits--;
2601                         conn->mxk_ntx_posted++;
2602                 }
2603                 if (msg_type != MXLND_MSG_PUT_DATA &&
2604                     msg_type != MXLND_MSG_GET_DATA) {
2605                         if (msg_type != MXLND_MSG_CONN_REQ &&
2606                             msg_type != MXLND_MSG_CONN_ACK) {
2607                                 conn->mxk_ntx_msgs--;
2608                         }
2609                 }
2610                 if (tx->mxc_incarnation == 0 &&
2611                     conn->mxk_incarnation != 0) {
2612                         tx->mxc_incarnation = conn->mxk_incarnation;
2613                 }
2614                 spin_unlock(&conn->mxk_lock);
2615
2616                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2617                  * or (2) there is a non-DATA msg that can return credits in the 
2618                  * queue, then drop this duplicate NOOP */
2619                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2620                         spin_lock(&conn->mxk_lock);
2621                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2622                             (conn->mxk_ntx_msgs >= 1)) {
2623                                 conn->mxk_credits++;
2624                                 conn->mxk_ntx_posted--;
2625                                 spin_unlock(&conn->mxk_lock);
2626                                 /* redundant NOOP */
2627                                 mxlnd_put_idle_tx(tx);
2628                                 mxlnd_conn_decref(conn);
2629                                 CDEBUG(D_NET, "%s: redundant noop\n",
2630                                               libcfs_nid2str(peer->mxp_nid));
2631                                 found = 1;
2632                                 goto done;
2633                         }
2634                         spin_unlock(&conn->mxk_lock);
2635                 }
2636
2637                 found = 1;
2638                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2639                     (msg_type != MXLND_MSG_GET_DATA))) {
2640                         mxlnd_pack_msg(tx);
2641                 }
2642
2643                 //ret = -ECONNABORTED;
2644                 mxret = MX_SUCCESS;
2645
2646                 spin_lock(&conn->mxk_lock);
2647                 status = conn->mxk_status;
2648                 spin_unlock(&conn->mxk_lock);
2649
2650                 if (likely((status == MXLND_CONN_READY) ||
2651                     (msg_type == MXLND_MSG_CONN_REQ) ||
2652                     (msg_type == MXLND_MSG_CONN_ACK))) {
2653                         ret = 0;
2654                         if (msg_type != MXLND_MSG_CONN_REQ &&
2655                             msg_type != MXLND_MSG_CONN_ACK) {
2656                                 /* add to the pending list */
2657                                 ret = mxlnd_q_pending_ctx(tx);
2658                                 if (ret == -1) {
2659                                         /* FIXME the conn is disconnected, now what? */
2660                                 }
2661                         } else {
2662                                 /* CONN_REQ/ACK */
2663                                 tx->mxc_state = MXLND_CTX_PENDING;
2664                         }
2665
2666                         if (ret == 0) {
2667                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2668                                     msg_type != MXLND_MSG_GET_DATA)) {
2669                                         /* send a msg style tx */
2670                                         LASSERT(tx->mxc_nseg == 1);
2671                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2672                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2673                                                mxlnd_msgtype_to_str(msg_type),
2674                                                tx->mxc_cookie);
2675                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2676                                                           &tx->mxc_seg,
2677                                                           tx->mxc_nseg,
2678                                                           tx->mxc_pin_type,
2679                                                           conn->mxk_epa,
2680                                                           tx->mxc_match,
2681                                                           (void *) tx,
2682                                                           &tx->mxc_mxreq);
2683                                 } else {
2684                                         /* send a DATA tx */
2685                                         spin_lock(&conn->mxk_lock);
2686                                         conn->mxk_ntx_data--;
2687                                         conn->mxk_data_posted++;
2688                                         spin_unlock(&conn->mxk_lock);
2689                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2690                                                mxlnd_msgtype_to_str(msg_type),
2691                                                tx->mxc_cookie);
2692                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2693                                                           tx->mxc_seg_list,
2694                                                           tx->mxc_nseg,
2695                                                           tx->mxc_pin_type,
2696                                                           conn->mxk_epa,
2697                                                           tx->mxc_match,
2698                                                           (void *) tx,
2699                                                           &tx->mxc_mxreq);
2700                                 }
2701                         } else {
2702                                 mxret = MX_CONNECTION_FAILED;
2703                         }
2704                         if (likely(mxret == MX_SUCCESS)) {
2705                                 ret = 0;
2706                         } else {
2707                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2708                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2709                                        libcfs_nid2str(peer->mxp_nid));
2710                                 /* NOTE mx_kisend() only fails if there are not enough 
2711                                 * resources. Do not change the connection status. */
2712                                 if (mxret == MX_NO_RESOURCES) {
2713                                         tx->mxc_status.code = -ENOMEM;
2714                                 } else {
2715                                         tx->mxc_status.code = -ECONNABORTED;
2716                                 }
2717                                 if (credit) {
2718                                         spin_lock(&conn->mxk_lock);
2719                                         conn->mxk_ntx_posted--;
2720                                         conn->mxk_credits++;
2721                                         spin_unlock(&conn->mxk_lock);
2722                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2723                                         msg_type == MXLND_MSG_GET_DATA) {
2724                                         spin_lock(&conn->mxk_lock);
2725                                         conn->mxk_data_posted--;
2726                                         spin_unlock(&conn->mxk_lock);
2727                                 }
2728                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2729                                     msg_type != MXLND_MSG_GET_DATA &&
2730                                     msg_type != MXLND_MSG_CONN_REQ &&
2731                                     msg_type != MXLND_MSG_CONN_ACK) {
2732                                         spin_lock(&conn->mxk_lock);
2733                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2734                                         spin_unlock(&conn->mxk_lock);
2735                                 }
2736                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2737                                     msg_type != MXLND_MSG_CONN_ACK) {
2738                                         /* remove from the pending list */
2739                                         mxlnd_deq_pending_ctx(tx);
2740                                 }
2741                                 mxlnd_put_idle_tx(tx);
2742                                 mxlnd_conn_decref(conn);
2743                         }
2744                 }
2745                 spin_lock(&conn->mxk_lock);
2746         }
2747 done_locked:
2748         spin_unlock(&conn->mxk_lock);
2749 done:
2750         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2751         return found;
2752 }
2753
2754
2755 /**
2756  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2757  * @ctx - the tx descriptor
2758  *
2759  * Determine which type of send request it was and start the next step, if needed,
2760  * or, if done, signal completion to LNET. After we are done, put back on the
2761  * idle tx list.
2762  */
2763 void
2764 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2765 {
2766         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2767         struct kmx_msg  *msg    = tx->mxc_msg;
2768         struct kmx_peer *peer   = tx->mxc_peer;
2769         struct kmx_conn *conn   = tx->mxc_conn;
2770         u8              type    = tx->mxc_msg_type;
2771         int             credit  = mxlnd_tx_requires_credit(tx);
2772         u64             cookie  = tx->mxc_cookie;
2773
2774         CDEBUG(D_NET, "entering %s (0x%llx):\n",
2775                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2776
2777         if (unlikely(conn == NULL)) {
2778                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2779                 conn = peer->mxp_conn;
2780                 if (conn != NULL) {
2781                         /* do not add a ref for the tx, it was set before sending */
2782                         tx->mxc_conn = conn;
2783                         tx->mxc_peer = conn->mxk_peer;
2784                 }
2785         }
2786         LASSERT (peer != NULL);
2787         LASSERT (conn != NULL);
2788
2789         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2790                 LASSERT (type == msg->mxm_type);
2791         }
2792
2793         if (failed) {
2794                 tx->mxc_status.code = -EIO;
2795         } else {
2796                 spin_lock(&conn->mxk_lock);
2797                 conn->mxk_last_tx = jiffies;
2798                 spin_unlock(&conn->mxk_lock);
2799         }
2800
2801         switch (type) {
2802
2803         case MXLND_MSG_GET_DATA:
2804                 spin_lock(&conn->mxk_lock);
2805                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2806                         conn->mxk_outstanding++;
2807                         conn->mxk_data_posted--;
2808                 }
2809                 spin_unlock(&conn->mxk_lock);
2810                 break;
2811
2812         case MXLND_MSG_PUT_DATA:
2813                 spin_lock(&conn->mxk_lock);
2814                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2815                         conn->mxk_data_posted--;
2816                 }
2817                 spin_unlock(&conn->mxk_lock);
2818                 break;
2819
2820         case MXLND_MSG_NOOP:
2821         case MXLND_MSG_PUT_REQ:
2822         case MXLND_MSG_PUT_ACK:
2823         case MXLND_MSG_GET_REQ:
2824         case MXLND_MSG_EAGER:
2825         //case MXLND_MSG_NAK:
2826                 break;
2827
2828         case MXLND_MSG_CONN_ACK:
2829                 if (peer->mxp_incompatible) {
2830                         /* we sent our params, now close this conn */
2831                         mxlnd_conn_disconnect(conn, 0, 1);
2832                 }
2833         case MXLND_MSG_CONN_REQ:
2834                 if (failed) {
2835                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2836                                "failed with %s (%d) to %s\n",
2837                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2838                                mx_strstatus(tx->mxc_status.code),
2839                                tx->mxc_status.code,
2840                                libcfs_nid2str(tx->mxc_nid));
2841                         if (!peer->mxp_incompatible) {
2842                                 spin_lock(&conn->mxk_lock);
2843                                 conn->mxk_status = MXLND_CONN_FAIL;
2844                                 spin_unlock(&conn->mxk_lock);
2845                         }
2846                 }
2847                 break;
2848
2849         default:
2850                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2851                 LBUG();
2852         }
2853
2854         if (credit) {
2855                 spin_lock(&conn->mxk_lock);
2856                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2857                         conn->mxk_ntx_posted--;
2858                 }
2859                 spin_unlock(&conn->mxk_lock);
2860         }
2861
2862         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2863         mxlnd_put_idle_tx(tx);
2864         mxlnd_conn_decref(conn);
2865
2866         mxlnd_check_sends(peer);
2867
2868         return;
2869 }
2870
2871 void
2872 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2873 {
2874         int                     ret             = 0;
2875         int                     repost          = 1;
2876         int                     credit          = 1;
2877         u32                     nob             = rx->mxc_status.xfer_length;
2878         u64                     bits            = rx->mxc_status.match_info;
2879         struct kmx_msg         *msg             = rx->mxc_msg;
2880         struct kmx_peer        *peer            = rx->mxc_peer;
2881         struct kmx_conn        *conn            = rx->mxc_conn;
2882         u8                      type            = rx->mxc_msg_type;
2883         u64                     seq             = 0ULL;
2884         lnet_msg_t             *lntmsg[2];
2885         int                     result          = 0;
2886         u64                     nic_id          = 0ULL;
2887         u32                     ep_id           = 0;
2888         u32                     sid             = 0;
2889         int                     peer_ref        = 0;
2890         int                     conn_ref        = 0;
2891         int                     incompatible    = 0;
2892         u64                     match           = 0ULL;
2893
2894         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2895          * failed GET reply, CONN_REQ, or a CONN_ACK */
2896
2897         /* NOTE peer may still be NULL if it is a new peer and
2898          *      conn may be NULL if this is a re-connect */
2899         if (likely(peer != NULL && conn != NULL)) {
2900                 /* we have a reference on the conn */
2901                 conn_ref = 1;
2902         } else if (peer != NULL && conn == NULL) {
2903                 /* we have a reference on the peer */
2904                 peer_ref = 1;
2905         } else if (peer == NULL && conn != NULL) {
2906                 /* fatal error */
2907                 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2908                 LBUG();
2909         } /* else peer and conn == NULL */
2910
2911 #if 0
2912         if (peer == NULL || conn == NULL) {
2913                 /* if the peer was disconnected, the peer may exist but
2914                  * not have any valid conns */
2915                 decref = 0; /* no peer means no ref was taken for this rx */
2916         }
2917 #endif
2918
2919         if (conn == NULL && peer != NULL) {
2920                 write_lock(&kmxlnd_data.kmx_global_lock);
2921                 conn = peer->mxp_conn;
2922                 if (conn) {
2923                         mxlnd_conn_addref(conn); /* conn takes ref... */
2924                         mxlnd_peer_decref(peer); /* from peer */
2925                         conn_ref = 1;
2926                         peer_ref = 0;
2927                 }
2928                 write_unlock(&kmxlnd_data.kmx_global_lock);
2929                 rx->mxc_conn = conn;
2930         }
2931
2932 #if MXLND_DEBUG
2933         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2934 #endif
2935
2936         lntmsg[0] = NULL;
2937         lntmsg[1] = NULL;
2938
2939         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2940                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2941                                    libcfs_nid2str(rx->mxc_nid),
2942                                    mx_strstatus(rx->mxc_status.code),
2943                                    (int) rx->mxc_status.code);
2944                 credit = 0;
2945                 goto cleanup;
2946         }
2947
2948         if (nob == 0) {
2949                 /* this may be a failed GET reply */
2950                 if (type == MXLND_MSG_GET_DATA) {
2951                         /* get the error (52-59) bits from the match bits */
2952                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
2953                         lntmsg[0] = rx->mxc_lntmsg[0];
2954                         result = -ret;
2955                         goto cleanup;
2956                 } else {
2957                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2958                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2959                                            libcfs_nid2str(rx->mxc_nid));
2960                         goto cleanup;
2961                 }
2962         }
2963
2964         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2965         if (type == MXLND_MSG_PUT_DATA) {
2966                 result = rx->mxc_status.code;
2967                 lntmsg[0] = rx->mxc_lntmsg[0];
2968                 goto cleanup;
2969         } else if (type == MXLND_MSG_GET_DATA) {
2970                 result = rx->mxc_status.code;
2971                 lntmsg[0] = rx->mxc_lntmsg[0];
2972                 lntmsg[1] = rx->mxc_lntmsg[1];
2973                 goto cleanup;
2974         }
2975
2976         ret = mxlnd_unpack_msg(msg, nob);
2977         if (ret != 0) {
2978                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2979                                    ret, libcfs_nid2str(rx->mxc_nid));
2980                 goto cleanup;
2981         }
2982         rx->mxc_nob = nob;
2983         type = msg->mxm_type;
2984         seq = msg->mxm_seq;
2985
2986         if (type != MXLND_MSG_CONN_REQ &&
2987             (rx->mxc_nid != msg->mxm_srcnid ||
2988              kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2989                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2990                        "0x%llx and rx msg dst is 0x%llx)\n",
2991                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2992                        msg->mxm_dstnid);
2993                 goto cleanup;
2994         }
2995
2996         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2997                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2998                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2999                         if (conn != NULL) {
3000                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3001                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3002                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3003                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3004                                        msg->mxm_srcstamp, conn->mxk_incarnation,
3005                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3006                         } else {
3007                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3008                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3009                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3010                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3011                         }
3012                         credit = 0;
3013                         goto cleanup;
3014                 }
3015         }
3016
3017         CDEBUG(D_NET, "Received %s with %d credits\n",
3018                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
3019
3020         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
3021             msg->mxm_type != MXLND_MSG_CONN_ACK) {
3022                 LASSERT(peer != NULL);
3023                 LASSERT(conn != NULL);
3024                 if (msg->mxm_credits != 0) {
3025                         spin_lock(&conn->mxk_lock);
3026                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3027                                 if ((conn->mxk_credits + msg->mxm_credits) > 
3028                                      *kmxlnd_tunables.kmx_credits) {
3029                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
3030                                                conn->mxk_credits, msg->mxm_credits);
3031                                 }
3032                                 conn->mxk_credits += msg->mxm_credits;
3033                                 LASSERT(conn->mxk_credits >= 0);
3034                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
3035                         }
3036                         spin_unlock(&conn->mxk_lock);
3037                 }
3038         }
3039
3040         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3041         switch (type) {
3042         case MXLND_MSG_NOOP:
3043                 break;
3044
3045         case MXLND_MSG_EAGER:
3046                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3047                                         msg->mxm_srcnid, rx, 0);
3048                 repost = ret < 0;
3049                 break;
3050
3051         case MXLND_MSG_PUT_REQ:
3052                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3053                                         msg->mxm_srcnid, rx, 1);
3054                 repost = ret < 0;
3055                 break;
3056
3057         case MXLND_MSG_PUT_ACK: {
3058                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3059                 if (cookie > MXLND_MAX_COOKIE) {
3060                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3061                                            libcfs_nid2str(rx->mxc_nid));
3062                         result = -((u32) MXLND_ERROR_VAL(cookie));
3063                         lntmsg[0] = rx->mxc_lntmsg[0];
3064                 } else {
3065                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3066                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3067                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3068                 }
3069                 /* repost == 1 */
3070                 break;
3071         }
3072         case MXLND_MSG_GET_REQ:
3073                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3074                                         msg->mxm_srcnid, rx, 1);
3075                 repost = ret < 0;
3076                 break;
3077
3078         case MXLND_MSG_CONN_REQ:
3079                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3080                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3081                                         libcfs_nid2str(msg->mxm_srcnid),
3082                                         libcfs_nid2str(msg->mxm_dstnid));
3083                         goto cleanup;
3084                 }
3085                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3086                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3087                                     "%d (%d wanted)\n",
3088                                         libcfs_nid2str(msg->mxm_srcnid),
3089                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3090                                         *kmxlnd_tunables.kmx_credits);
3091                         incompatible = 1;
3092                 }
3093                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3094                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3095                                     "%d (%d wanted)\n",
3096                                         libcfs_nid2str(msg->mxm_srcnid),
3097                                         msg->mxm_u.conn_req.mxcrm_eager_size,
3098                                         (int) MXLND_EAGER_SIZE);
3099                         incompatible = 1;
3100                 }
3101                 mx_decompose_endpoint_addr2(rx->mxc_status.source, &nic_id, &ep_id, &sid);
3102                 if (peer == NULL) {
3103                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3104                         if (peer == NULL) {
3105                                 int             hash    = 0;
3106                                 struct kmx_peer *existing_peer    = NULL;
3107                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3108
3109                                 rx->mxc_nid = msg->mxm_srcnid;
3110
3111                                 /* adds conn ref for peer and one for this function */
3112                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3113                                                 *kmxlnd_tunables.kmx_board,
3114                                                 *kmxlnd_tunables.kmx_ep_id, 0ULL);
3115                                 if (ret != 0) {
3116                                         goto cleanup;
3117                                 }
3118                                 peer->mxp_sid = sid;
3119                                 LASSERT(peer->mxp_ep_id == ep_id);
3120                                 write_lock(&kmxlnd_data.kmx_global_lock);
3121                                 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3122                                 if (existing_peer) {
3123                                         mxlnd_conn_decref(peer->mxp_conn);
3124                                         mxlnd_peer_decref(peer);
3125                                         peer = existing_peer;
3126                                         mxlnd_conn_addref(peer->mxp_conn);
3127                                 } else {
3128                                         list_add_tail(&peer->mxp_peers,
3129                                                       &kmxlnd_data.kmx_peers[hash]);
3130                                         atomic_inc(&kmxlnd_data.kmx_npeers);
3131                                 }
3132                                 write_unlock(&kmxlnd_data.kmx_global_lock);
3133                         } else {
3134                                 /* FIXME should write lock here */
3135                                 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3136                                 mxlnd_peer_decref(peer); /* drop ref taken above */
3137                                 if (ret != 0) {
3138                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3139                                         goto cleanup;
3140                                 }
3141                         }
3142                         conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3143                         conn = peer->mxp_conn;
3144                 } else { /* found peer */
3145                         struct kmx_conn *old_conn       = conn;
3146
3147                         if (sid != peer->mxp_sid) {
3148                                 /* do not call mx_disconnect() or send a BYE */
3149                                 mxlnd_conn_disconnect(old_conn, 0, 0);
3150
3151                                 /* the ref for this rx was taken on the old_conn */
3152                                 mxlnd_conn_decref(old_conn);
3153
3154                                 /* This allocs a conn, points peer->mxp_conn to this one.
3155                                 * The old conn is still on the peer->mxp_conns list.
3156                                 * As the pending requests complete, they will call
3157                                 * conn_decref() which will eventually free it. */
3158                                 ret = mxlnd_conn_alloc(&conn, peer);
3159                                 if (ret != 0) {
3160                                         CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3161                                         goto cleanup;
3162                                 }
3163                                 /* conn_alloc() adds one ref for the peer and one
3164                                  * for this function */
3165                                 conn_ref = 1;
3166
3167                                 peer->mxp_sid = sid;
3168                         }
3169                 }
3170                 write_lock(&kmxlnd_data.kmx_global_lock);
3171                 peer->mxp_incarnation = msg->mxm_srcstamp;
3172                 peer->mxp_incompatible = incompatible;
3173                 write_unlock(&kmxlnd_data.kmx_global_lock);
3174                 spin_lock(&conn->mxk_lock);
3175                 conn->mxk_incarnation = msg->mxm_srcstamp;
3176                 conn->mxk_status = MXLND_CONN_WAIT;
3177                 spin_unlock(&conn->mxk_lock);
3178
3179                 /* handle_conn_ack() will create the CONN_ACK msg */
3180                 match = (u64) MXLND_MSG_ICON_ACK << MXLND_MSG_OFFSET;
3181                 mxlnd_iconnect(peer, match);
3182
3183                 break;
3184
3185         case MXLND_MSG_CONN_ACK:
3186                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3187                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3188                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3189                                 libcfs_nid2str(msg->mxm_dstnid));
3190                         ret = -1;
3191                         goto failed;
3192                 }
3193                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3194                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3195                                "incompatible queue depth %d (%d wanted)\n",
3196                                 libcfs_nid2str(msg->mxm_srcnid),
3197                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3198                                 *kmxlnd_tunables.kmx_credits);
3199                         spin_lock(&conn->mxk_lock);
3200                         conn->mxk_status = MXLND_CONN_FAIL;
3201                         spin_unlock(&conn->mxk_lock);
3202                         incompatible = 1;
3203                         ret = -1;
3204                 }
3205                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3206                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3207                                "incompatible EAGER size %d (%d wanted)\n",
3208                                 libcfs_nid2str(msg->mxm_srcnid),
3209                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3210                                 (int) MXLND_EAGER_SIZE);
3211                         spin_lock(&conn->mxk_lock);
3212                         conn->mxk_status = MXLND_CONN_FAIL;
3213                         spin_unlock(&conn->mxk_lock);
3214                         incompatible = 1;
3215                         ret = -1;
3216                 }
3217                 write_lock(&kmxlnd_data.kmx_global_lock);
3218                 peer->mxp_incarnation = msg->mxm_srcstamp;
3219                 peer->mxp_incompatible = incompatible;
3220                 write_unlock(&kmxlnd_data.kmx_global_lock);
3221                 spin_lock(&conn->mxk_lock);
3222                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3223                 conn->mxk_outstanding = 0;
3224                 conn->mxk_incarnation = msg->mxm_srcstamp;
3225                 conn->mxk_timeout = 0;
3226                 if (!incompatible) {
3227                         conn->mxk_status = MXLND_CONN_READY;
3228                 }
3229                 spin_unlock(&conn->mxk_lock);
3230                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3231                 break;
3232
3233         default:
3234                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3235                                 libcfs_nid2str(rx->mxc_nid));
3236                 ret = -EPROTO;
3237                 break;
3238         }
3239
3240 failed:
3241         if (ret < 0) {
3242                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3243                 spin_lock(&conn->mxk_lock);
3244                 conn->mxk_status = MXLND_CONN_FAIL;
3245                 spin_unlock(&conn->mxk_lock);
3246         }
3247
3248 cleanup:
3249         if (conn != NULL) {
3250                 spin_lock(&conn->mxk_lock);
3251                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3252                 spin_unlock(&conn->mxk_lock);
3253         }
3254
3255         if (repost) {
3256                 /* lnet_parse() failed, etc., repost now */
3257                 mxlnd_put_idle_rx(rx);
3258                 if (conn != NULL && credit == 1) {
3259                         if (type == MXLND_MSG_PUT_DATA) {
3260                                 spin_lock(&conn->mxk_lock);
3261                                 conn->mxk_outstanding++;
3262                                 spin_unlock(&conn->mxk_lock);
3263                         } else if (type != MXLND_MSG_GET_DATA &&
3264                                   (type == MXLND_MSG_EAGER ||
3265                                    type == MXLND_MSG_PUT_REQ ||
3266                                    type == MXLND_MSG_NOOP)) {
3267                                 spin_lock(&conn->mxk_lock);
3268                                 conn->mxk_outstanding++;
3269                                 spin_unlock(&conn->mxk_lock);
3270                         }
3271                 }
3272                 if (conn_ref) mxlnd_conn_decref(conn);
3273                 LASSERT(peer_ref == 0);
3274         }
3275
3276         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3277                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3278         } else {
3279                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3280         }
3281
3282         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3283         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3284
3285         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3286
3287         return;
3288 }
3289
3290
3291
3292 void
3293 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3294 {
3295         struct kmx_ctx  *tx     = NULL;
3296         struct kmx_msg  *txmsg   = NULL;
3297         struct kmx_conn *conn   = peer->mxp_conn;
3298
3299         /* a conn ref was taken when calling mx_iconnect(), 
3300          * hold it until CONN_REQ or CONN_ACK completes */
3301
3302         CDEBUG(D_NET, "entering\n");
3303         if (status.code != MX_STATUS_SUCCESS) {
3304                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3305                         mx_strstatus(status.code), status.code,
3306                         libcfs_nid2str(peer->mxp_nid));
3307                 spin_lock(&conn->mxk_lock);
3308                 conn->mxk_status = MXLND_CONN_FAIL;
3309                 spin_unlock(&conn->mxk_lock);
3310
3311                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3312                         struct kmx_conn *new_conn       = NULL;
3313                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3314                         /* FIXME write lock here ? */
3315                         mxlnd_conn_disconnect(conn, 0, 0);
3316                         mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3317                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3318                         peer->mxp_reconnect_time = 0;
3319                 }
3320
3321                 mxlnd_conn_decref(conn);
3322                 return;
3323         }
3324
3325         spin_lock(&conn->mxk_lock);
3326         conn->mxk_epa = status.source;
3327         spin_unlock(&conn->mxk_lock);
3328         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3329          *      we should not need to lock the peer */
3330         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3331
3332         /* mx_iconnect() succeeded, reset delay to 0 */
3333         write_lock(&kmxlnd_data.kmx_global_lock);
3334         peer->mxp_reconnect_time = 0;
3335         write_unlock(&kmxlnd_data.kmx_global_lock);
3336
3337         /* marshal CONN_REQ msg */
3338         /* we are still using the conn ref from iconnect() - do not take another */
3339         tx = mxlnd_get_idle_tx();
3340         if (tx == NULL) {
3341                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3342                                    libcfs_nid2str(peer->mxp_nid));
3343                 spin_lock(&conn->mxk_lock);
3344                 conn->mxk_status = MXLND_CONN_FAIL;
3345                 spin_unlock(&conn->mxk_lock);
3346                 mxlnd_conn_decref(conn);
3347                 return;
3348         }
3349
3350         tx->mxc_peer = peer;
3351         tx->mxc_conn = conn;
3352         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3353         txmsg = tx->mxc_msg;
3354         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3355         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3356         tx->mxc_match = mxlnd_create_match(tx, 0);
3357
3358         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3359         mxlnd_queue_tx(tx);
3360         return;
3361 }
3362
3363 void
3364 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3365 {
3366         struct kmx_ctx  *tx     = NULL;
3367         struct kmx_msg  *txmsg   = NULL;
3368         struct kmx_conn *conn   = peer->mxp_conn;
3369         u64             nic_id  = 0ULL;
3370         u32             ep_id   = 0;
3371         u32             sid     = 0;
3372
3373         /* a conn ref was taken when calling mx_iconnect(), 
3374          * hold it until CONN_REQ or CONN_ACK completes */
3375
3376         CDEBUG(D_NET, "entering\n");
3377         if (status.code != MX_STATUS_SUCCESS) {
3378                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3379                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3380                         mx_strstatus(status.code), status.code,
3381                         libcfs_nid2str(peer->mxp_nid),
3382                         peer->mxp_nid,
3383                         peer->mxp_nic_id,
3384                         peer->mxp_ep_id);
3385                 spin_lock(&conn->mxk_lock);
3386                 conn->mxk_status = MXLND_CONN_FAIL;
3387                 spin_unlock(&conn->mxk_lock);
3388
3389                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3390                         struct kmx_conn *new_conn       = NULL;
3391                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3392                         /* FIXME write lock here? */
3393                         mxlnd_conn_disconnect(conn, 0, 1);
3394                         mxlnd_conn_alloc(&new_conn, peer); /* adds ref for 
3395                                                               this function... */
3396                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3397                         peer->mxp_reconnect_time = 0;
3398                 }
3399
3400                 mxlnd_conn_decref(conn);
3401                 return;
3402         }
3403         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3404         spin_lock(&conn->mxk_lock);
3405         conn->mxk_epa = status.source;
3406         if (likely(!peer->mxp_incompatible)) {
3407                 conn->mxk_status = MXLND_CONN_READY;
3408         }
3409         spin_unlock(&conn->mxk_lock);
3410         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3411          *      we should not have to lock the peer */
3412         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3413
3414         /* mx_iconnect() succeeded, reset delay to 0 */
3415         write_lock(&kmxlnd_data.kmx_global_lock);
3416         peer->mxp_reconnect_time = 0;
3417         peer->mxp_sid = sid;
3418         write_unlock(&kmxlnd_data.kmx_global_lock);
3419
3420         /* marshal CONN_ACK msg */
3421         tx = mxlnd_get_idle_tx();
3422         if (tx == NULL) {
3423                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3424                                    libcfs_nid2str(peer->mxp_nid));
3425                 spin_lock(&conn->mxk_lock);
3426                 conn->mxk_status = MXLND_CONN_FAIL;
3427                 spin_unlock(&conn->mxk_lock);
3428                 mxlnd_conn_decref(conn);
3429                 return;
3430         }
3431
3432         tx->mxc_peer = peer;
3433         tx->mxc_conn = conn;
3434         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3435         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3436         txmsg = tx->mxc_msg;
3437         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3438         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3439         tx->mxc_match = mxlnd_create_match(tx, 0);
3440
3441         mxlnd_queue_tx(tx);
3442         return;
3443 }
3444
3445 /**
3446  * mxlnd_request_waitd - the MX request completion thread(s)
3447  * @arg - thread id (as a void *)
3448  *
3449  * This thread waits for a MX completion and then completes the request.
3450  * We will create one thread per CPU.
3451  */
3452 int
3453 mxlnd_request_waitd(void *arg)
3454 {
3455         long                    id              = (long) arg;
3456         char                    name[24];
3457         __u32                   result          = 0;
3458         mx_return_t             mxret           = MX_SUCCESS;
3459         mx_status_t             status;
3460         struct kmx_ctx         *ctx             = NULL;
3461         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3462         struct kmx_peer        *peer            = NULL;
3463         struct kmx_conn        *conn            = NULL;
3464 #if MXLND_POLLING
3465         int                     count           = 0;
3466 #endif
3467
3468         memset(name, 0, sizeof(name));
3469         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3470         cfs_daemonize(name);
3471         //cfs_block_allsigs();
3472
3473         memset(&status, 0, sizeof(status));
3474
3475         CDEBUG(D_NET, "%s starting\n", name);
3476
3477         while (!kmxlnd_data.kmx_shutdown) {
3478                 u8      msg_type        = 0;
3479
3480                 mxret = MX_SUCCESS;
3481                 result = 0;
3482 #if MXLND_POLLING
3483                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3484                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3485                                             &status, &result);
3486                 } else {
3487                         count = 0;
3488                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3489                                             0ULL, 0ULL, &status, &result);
3490                 }
3491 #else
3492                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3493                                     0ULL, 0ULL, &status, &result);
3494 #endif
3495                 if (unlikely(kmxlnd_data.kmx_shutdown))
3496                         break;
3497
3498                 if (result != 1) {
3499                         /* nothing completed... */
3500                         continue;
3501                 }
3502
3503                 if (status.code != MX_STATUS_SUCCESS) {
3504                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3505                                "match_info 0x%llx and length %d\n",
3506                                mx_strstatus(status.code), status.code,
3507                                (u64) status.match_info, status.msg_length);
3508                 }
3509
3510                 msg_type = MXLND_MSG_TYPE(status.match_info);
3511
3512                 /* This may be a mx_iconnect() request completing,
3513                  * check the bit mask for CONN_REQ and CONN_ACK */
3514                 if (msg_type == MXLND_MSG_ICON_REQ ||
3515                     msg_type == MXLND_MSG_ICON_ACK) {
3516                         peer = (struct kmx_peer*) status.context;
3517                         if (msg_type == MXLND_MSG_ICON_REQ) {
3518                                 mxlnd_handle_conn_req(peer, status);
3519                         } else {
3520                                 mxlnd_handle_conn_ack(peer, status);
3521                         }
3522                         continue;
3523                 }
3524
3525                 /* This must be a tx or rx */
3526
3527                 /* NOTE: if this is a RX from the unexpected callback, it may
3528                  * have very little info. If we dropped it in unexpected_recv(),
3529                  * it will not have a context. If so, ignore it. */
3530                 ctx = (struct kmx_ctx *) status.context;
3531                 if (ctx != NULL) {
3532
3533                         req_type = ctx->mxc_type;
3534                         conn = ctx->mxc_conn; /* this may be NULL */
3535                         mxlnd_deq_pending_ctx(ctx);
3536
3537                         /* copy status to ctx->mxc_status */
3538                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3539
3540                         switch (req_type) {
3541                         case MXLND_REQ_TX:
3542                                 mxlnd_handle_tx_completion(ctx);
3543                                 break;
3544                         case MXLND_REQ_RX:
3545                                 mxlnd_handle_rx_completion(ctx);
3546                                 break;
3547                         default:
3548                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3549                                 LBUG();
3550                                 break;
3551                         }
3552
3553                         /* FIXME may need to reconsider this */
3554                         /* conn is always set except for the first CONN_REQ rx
3555                          * from a new peer */
3556                         if (!(status.code == MX_STATUS_SUCCESS ||
3557                               status.code == MX_STATUS_TRUNCATED) &&
3558                               conn != NULL) {
3559                                 mxlnd_conn_disconnect(conn, 1, 1);
3560                         }
3561                 }
3562                 CDEBUG(D_NET, "waitd() completed task\n");
3563         }
3564         CDEBUG(D_NET, "%s stopping\n", name);
3565         mxlnd_thread_stop(id);
3566         return 0;
3567 }
3568
3569
3570 unsigned long
3571 mxlnd_check_timeouts(unsigned long now)
3572 {
3573         int                     i               = 0;
3574         int                     disconnect      = 0;
3575         unsigned long           next            = 0;
3576         struct  kmx_peer        *peer           = NULL;
3577         struct  kmx_conn        *conn           = NULL;
3578
3579         read_lock(&kmxlnd_data.kmx_global_lock);
3580         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3581                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3582
3583                         if (unlikely(kmxlnd_data.kmx_shutdown)) {
3584                                 read_unlock(&kmxlnd_data.kmx_global_lock);
3585                                 return next;
3586                         }
3587
3588                         conn = peer->mxp_conn;
3589                         if (conn) {
3590                                 mxlnd_conn_addref(conn);
3591                         } else {
3592                                 continue;
3593                         }
3594
3595                         /* FIXMEis this needed? */
3596                         spin_lock(&conn->mxk_lock);
3597
3598                         /* if nothing pending (timeout == 0) or
3599                          * if conn is already disconnected,
3600                          * skip this conn */
3601                         if (conn->mxk_timeout == 0 ||
3602                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3603                                 /* FIXME is this needed? */
3604                                 spin_unlock(&conn->mxk_lock);
3605                                 mxlnd_conn_decref(conn);
3606                                 continue;
3607                         }
3608
3609                         /* we want to find the timeout that will occur first.
3610                          * if it is in the future, we will sleep until then.
3611                          * if it is in the past, then we will sleep one
3612                          * second and repeat the process. */
3613                         if ((next == 0) || (conn->mxk_timeout < next)) {
3614                                 next = conn->mxk_timeout;
3615                         }
3616
3617                         disconnect = 0;
3618
3619                         if (time_after_eq(now, conn->mxk_timeout))  {
3620                                 disconnect = 1;
3621                         }
3622                         spin_unlock(&conn->mxk_lock);
3623
3624                         if (disconnect) {
3625                                 mxlnd_conn_disconnect(conn, 1, 1);
3626                         }
3627                         mxlnd_conn_decref(conn);
3628                 }
3629         }
3630         read_unlock(&kmxlnd_data.kmx_global_lock);
3631         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3632
3633         return next;
3634 }
3635
3636 /**
3637  * mxlnd_timeoutd - enforces timeouts on messages
3638  * @arg - thread id (as a void *)
3639  *
3640  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3641  * it calls mxlnd_conn_disconnect().
3642  *
3643  * After checking for timeouts, try progressing sends (call check_sends()).
3644  */
3645 int
3646 mxlnd_timeoutd(void *arg)
3647 {
3648         int                     i       = 0;
3649         long                    id      = (long) arg;
3650         unsigned long           now     = 0;
3651         unsigned long           next    = 0;
3652         unsigned long           delay   = HZ;
3653         struct kmx_peer        *peer    = NULL;
3654         struct kmx_conn        *conn    = NULL;
3655
3656         cfs_daemonize("mxlnd_timeoutd");
3657         //cfs_block_allsigs();
3658
3659         CDEBUG(D_NET, "timeoutd starting\n");
3660
3661         while (!kmxlnd_data.kmx_shutdown) {
3662
3663                 now = jiffies;
3664                 /* if the next timeout has not arrived, go back to sleep */
3665                 if (time_after(now, next)) {
3666                         next = mxlnd_check_timeouts(now);
3667                 }
3668
3669                read_lock(&kmxlnd_data.kmx_global_lock);
3670                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3671                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3672                                 /* FIXME upgrade to write lock?
3673                                  * is any lock needed? */
3674                                 conn = peer->mxp_conn;
3675                                 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3676
3677                                 if (conn == NULL)
3678                                         continue;
3679
3680                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3681                                     time_after(now, conn->mxk_last_tx + HZ)) {
3682                                         /* FIXME drop lock or call check_sends_locked */
3683                                         read_unlock(&kmxlnd_data.kmx_global_lock);
3684                                         mxlnd_check_sends(peer);
3685                                         read_lock(&kmxlnd_data.kmx_global_lock);
3686                                 }
3687                                 mxlnd_conn_decref(conn); /* until here */
3688                         }
3689                 }
3690                 read_unlock(&kmxlnd_data.kmx_global_lock);
3691
3692                 mxlnd_sleep(delay);
3693         }
3694         CDEBUG(D_NET, "timeoutd stopping\n");
3695         mxlnd_thread_stop(id);
3696         return 0;
3697 }