Whamcloud - gitweb
convert uint64_t to __u64
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  * Copyright (C) 2006 Myricom, Inc.
7  *   Author: Myricom, Inc. <help at myri.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #include "mxlnd.h"
26
27 inline void mxlnd_noop(char *s, ...)
28 {
29         return;
30 }
31
32 char *
33 mxlnd_ctxstate_to_str(int mxc_state)
34 {
35         switch (mxc_state) {
36         case MXLND_CTX_INIT:
37                 return "MXLND_CTX_INIT";
38         case MXLND_CTX_IDLE:
39                 return "MXLND_CTX_IDLE";
40         case MXLND_CTX_PREP:
41                 return "MXLND_CTX_PREP";
42         case MXLND_CTX_PENDING:
43                 return "MXLND_CTX_PENDING";
44         case MXLND_CTX_COMPLETED:
45                 return "MXLND_CTX_COMPLETED";
46         case MXLND_CTX_CANCELED:
47                 return "MXLND_CTX_CANCELED";
48         default:
49                 return "*unknown*";
50         }
51 }
52
53 char *
54 mxlnd_connstatus_to_str(int mxk_status)
55 {
56         switch (mxk_status) {
57         case MXLND_CONN_READY:
58                 return "MXLND_CONN_READY";
59         case MXLND_CONN_INIT:
60                 return "MXLND_CONN_INIT";
61         case MXLND_CONN_REQ:
62                 return "MXLND_CONN_REQ";
63         case MXLND_CONN_ACK:
64                 return "MXLND_CONN_ACK";
65         case MXLND_CONN_WAIT:
66                 return "MXLND_CONN_WAIT";
67         case MXLND_CONN_DISCONNECT:
68                 return "MXLND_CONN_DISCONNECT";
69         case MXLND_CONN_FAIL:
70                 return "MXLND_CONN_FAIL";
71         default:
72                 return "unknown";
73         }
74 }
75
76 char *
77 mxlnd_msgtype_to_str(int type) {
78         switch (type) {
79         case MXLND_MSG_EAGER:
80                 return "MXLND_MSG_EAGER";
81         case MXLND_MSG_CONN_REQ:
82                 return "MXLND_MSG_CONN_REQ";
83         case MXLND_MSG_CONN_ACK:
84                 return "MXLND_MSG_CONN_ACK";
85         case MXLND_MSG_NOOP:
86                 return "MXLND_MSG_NOOP";
87         case MXLND_MSG_PUT_REQ:
88                 return "MXLND_MSG_PUT_REQ";
89         case MXLND_MSG_PUT_ACK:
90                 return "MXLND_MSG_PUT_ACK";
91         case MXLND_MSG_PUT_DATA:
92                 return "MXLND_MSG_PUT_DATA";
93         case MXLND_MSG_GET_REQ:
94                 return "MXLND_MSG_GET_REQ";
95         case MXLND_MSG_GET_DATA:
96                 return "MXLND_MSG_GET_DATA";
97         default:
98                 return "unknown";
99         }
100 }
101
102 char *
103 mxlnd_lnetmsg_to_str(int type)
104 {
105         switch (type) {
106         case LNET_MSG_ACK:
107                 return "LNET_MSG_ACK";
108         case LNET_MSG_PUT:
109                 return "LNET_MSG_PUT";
110         case LNET_MSG_GET:
111                 return "LNET_MSG_GET";
112         case LNET_MSG_REPLY:
113                 return "LNET_MSG_REPLY";
114         case LNET_MSG_HELLO:
115                 return "LNET_MSG_HELLO";
116         default:
117                 return "*unknown*";
118         }
119 }
120
121 static inline u64
122 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
123 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
124 {
125         u64 type        = (u64) ctx->mxc_msg_type;
126         u64 err         = (u64) error;
127         u64 match       = 0LL;
128
129         LASSERT(ctx->mxc_msg_type != 0);
130         LASSERT(ctx->mxc_cookie >> 52 == 0);
131         match = (type << 60) | (err << 52) | ctx->mxc_cookie;
132         return match;
133 }
134
135 static inline void
136 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
137 {
138         *msg_type = (u8) (match >> 60);
139         *error    = (u8) ((match >> 52) & 0xFF);
140         *cookie   = match & 0xFFFFFFFFFFFFFLL;
141         LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
142                 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
143                 *msg_type == MXLND_MSG_EAGER    ||
144                 *msg_type == MXLND_MSG_CONN_REQ ||
145                 *msg_type == MXLND_MSG_CONN_ACK ||
146                 *msg_type == MXLND_MSG_NOOP     ||
147                 *msg_type == MXLND_MSG_PUT_REQ  ||
148                 *msg_type == MXLND_MSG_PUT_ACK  ||
149                 *msg_type == MXLND_MSG_PUT_DATA ||
150                 *msg_type == MXLND_MSG_GET_REQ  ||
151                 *msg_type == MXLND_MSG_GET_DATA);
152         return;
153 }
154
155 struct kmx_ctx *
156 mxlnd_get_idle_rx(void)
157 {
158         struct list_head        *tmp    = NULL;
159         struct kmx_ctx          *rx     = NULL;
160
161         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
162
163         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
164                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
165                 return NULL;
166         }
167
168         tmp = &kmxlnd_data.kmx_rx_idle;
169         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
170         list_del_init(&rx->mxc_list);
171         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
172
173 #if MXLND_DEBUG
174         if (rx->mxc_get != rx->mxc_put) {
175                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
176                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
177                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
178                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
179                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
180                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
181                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
182                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
183                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
184                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
185         }
186 #endif
187         LASSERT (rx->mxc_get == rx->mxc_put);
188
189         rx->mxc_get++;
190
191         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
192         rx->mxc_state = MXLND_CTX_PREP;
193
194         return rx;
195 }
196
197 int
198 mxlnd_put_idle_rx(struct kmx_ctx *rx)
199 {
200         if (rx == NULL) {
201                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
202                 return -EINVAL;
203         } else if (rx->mxc_type != MXLND_REQ_RX) {
204                 CDEBUG(D_NETERROR, "called with tx\n");
205                 return -EINVAL;
206         }
207         LASSERT(rx->mxc_get == rx->mxc_put + 1);
208         mxlnd_ctx_init(rx);
209         rx->mxc_put++;
210         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
211         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
212         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
213         return 0;
214 }
215
216 int
217 mxlnd_reduce_idle_rxs(__u32 count)
218 {
219         __u32                   i       = 0;
220         struct kmx_ctx          *rx     = NULL;
221
222         spin_lock(&kmxlnd_data.kmx_rxs_lock);
223         for (i = 0; i < count; i++) {
224                 rx = mxlnd_get_idle_rx();
225                 if (rx != NULL) {
226                         struct list_head *tmp = &rx->mxc_global_list;
227                         list_del_init(tmp);
228                         mxlnd_ctx_free(rx);
229                 } else {
230                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
231                         break;
232                 }
233         }
234         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
235         return 0;
236 }
237
238 struct kmx_ctx *
239 mxlnd_get_idle_tx(void)
240 {
241         struct list_head        *tmp    = NULL;
242         struct kmx_ctx          *tx     = NULL;
243
244         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
245
246         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
247                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
248                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
249                 return NULL;
250         }
251
252         tmp = &kmxlnd_data.kmx_tx_idle;
253         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
254         list_del_init(&tx->mxc_list);
255
256         /* Allocate a new completion cookie.  It might not be needed,
257          * but we've got a lock right now and we're unlikely to
258          * wrap... */
259         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
260         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
261                 tx->mxc_cookie = 1;
262         }
263         kmxlnd_data.kmx_tx_used++;
264         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
265
266         LASSERT (tx->mxc_get == tx->mxc_put);
267
268         tx->mxc_get++;
269
270         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
271         LASSERT (tx->mxc_lntmsg[0] == NULL);
272         LASSERT (tx->mxc_lntmsg[1] == NULL);
273
274         tx->mxc_state = MXLND_CTX_PREP;
275
276         return tx;
277 }
278
279 int
280 mxlnd_put_idle_tx(struct kmx_ctx *tx)
281 {
282         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
283         int             result  = failed ? -EIO : 0;
284         lnet_msg_t      *lntmsg[2];
285
286         if (tx == NULL) {
287                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
288                 return -EINVAL;
289         } else if (tx->mxc_type != MXLND_REQ_TX) {
290                 CDEBUG(D_NETERROR, "called with rx\n");
291                 return -EINVAL;
292         }
293
294         lntmsg[0] = tx->mxc_lntmsg[0];
295         lntmsg[1] = tx->mxc_lntmsg[1];
296
297         LASSERT(tx->mxc_get == tx->mxc_put + 1);
298         mxlnd_ctx_init(tx);
299         tx->mxc_put++;
300         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
301         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
302         kmxlnd_data.kmx_tx_used--;
303         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
304         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
305         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
306         return 0;
307 }
308
309 /**
310  * mxlnd_conn_free - free the conn
311  * @conn - a kmx_conn pointer
312  *
313  * The calling function should remove the conn from the conns list first
314  * then destroy it.
315  */
316 void
317 mxlnd_conn_free(struct kmx_conn *conn)
318 {
319         struct kmx_peer *peer   = conn->mxk_peer;
320
321         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
322         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
323                  list_empty (&conn->mxk_tx_free_queue) &&
324                  list_empty (&conn->mxk_pending));
325         if (!list_empty(&conn->mxk_list)) {
326                 spin_lock(&peer->mxp_lock);
327                 list_del_init(&conn->mxk_list);
328                 if (peer->mxp_conn == conn) {
329                         peer->mxp_conn = NULL;
330                         if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
331                                 mx_set_endpoint_addr_context(conn->mxk_epa,
332                                                              (void *) NULL);
333                         }
334                 }
335                 spin_unlock(&peer->mxp_lock);
336         }
337         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
338         MXLND_FREE (conn, sizeof (*conn));
339         return;
340 }
341
342
343 void
344 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
345 {
346         int                     found   = 0;
347         struct kmx_ctx          *ctx    = NULL;
348         struct kmx_ctx          *next   = NULL;
349         mx_return_t             mxret   = MX_SUCCESS;
350         u32                     result  = 0;
351
352         do {
353                 found = 0;
354                 spin_lock(&conn->mxk_lock);
355                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
356                         /* we will delete all including txs */
357                         list_del_init(&ctx->mxc_list);
358                         if (ctx->mxc_type == MXLND_REQ_RX) {
359                                 found = 1;
360                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
361                                                   &ctx->mxc_mxreq,
362                                                   &result);
363                                 if (mxret != MX_SUCCESS) {
364                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
365                                 }
366                                 if (result == 1) {
367                                         ctx->mxc_status.code = -ECONNABORTED;
368                                         ctx->mxc_state = MXLND_CTX_CANCELED;
369                                         /* NOTE this calls lnet_finalize() and
370                                          * we cannot hold any locks when calling it.
371                                          * It also calls mxlnd_conn_decref(conn) */
372                                         spin_unlock(&conn->mxk_lock);
373                                         mxlnd_handle_rx_completion(ctx);
374                                         spin_lock(&conn->mxk_lock);
375                                 }
376                                 break;
377                         }
378                 }
379                 spin_unlock(&conn->mxk_lock);
380         }
381         while (found);
382
383         return;
384 }
385
386 /**
387  * mxlnd_conn_disconnect - shutdown a connection
388  * @conn - a kmx_conn pointer
389  *
390  * This function sets the status to DISCONNECT, completes queued
391  * txs with failure, calls mx_disconnect, which will complete
392  * pending txs and matched rxs with failure.
393  */
394 void
395 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
396 {
397         struct list_head        *tmp    = NULL;
398
399         spin_lock(&conn->mxk_lock);
400         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
401                 spin_unlock(&conn->mxk_lock);
402                 return;
403         }
404         conn->mxk_status = MXLND_CONN_DISCONNECT;
405         conn->mxk_timeout = 0;
406
407         while (!list_empty(&conn->mxk_tx_free_queue) ||
408                !list_empty(&conn->mxk_tx_credit_queue)) {
409
410                 struct kmx_ctx          *tx     = NULL;
411
412                 if (!list_empty(&conn->mxk_tx_free_queue)) {
413                         tmp = &conn->mxk_tx_free_queue;
414                 } else {
415                         tmp = &conn->mxk_tx_credit_queue;
416                 }
417
418                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
419                 list_del_init(&tx->mxc_list);
420                 tx->mxc_status.code = -ECONNABORTED;
421                 spin_unlock(&conn->mxk_lock);
422                 mxlnd_put_idle_tx(tx);
423                 mxlnd_conn_decref(conn); /* for this tx */
424                 spin_lock(&conn->mxk_lock);
425         }
426
427         spin_unlock(&conn->mxk_lock);
428
429         /* cancel pending rxs */
430         mxlnd_conn_cancel_pending_rxs(conn);
431
432         if (kmxlnd_data.kmx_shutdown != 1) {
433
434                 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
435
436                 if (notify) {
437                         time_t          last_alive      = 0;
438                         unsigned long   last_msg        = 0;
439
440                         /* notify LNET that we are giving up on this peer */
441                         if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
442                                 last_msg = conn->mxk_last_rx;
443                         } else {
444                                 last_msg = conn->mxk_last_tx;
445                         }
446                         last_alive = cfs_time_current_sec() -
447                                      cfs_duration_sec(cfs_time_current() - last_msg);
448                         lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
449                 }
450         }
451         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
452         
453         return;
454 }
455
456 /**
457  * mxlnd_conn_alloc - allocate and initialize a new conn struct
458  * @connp - address of a kmx_conn pointer
459  * @peer - owning kmx_peer
460  *
461  * Returns 0 on success and -ENOMEM on failure
462  */
463 int
464 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
465 {
466         struct kmx_conn *conn    = NULL;
467
468         LASSERT(peer != NULL);
469
470         MXLND_ALLOC(conn, sizeof (*conn));
471         if (conn == NULL) {
472                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
473                 return -ENOMEM;
474         }
475         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
476
477         memset(conn, 0, sizeof(*conn));
478
479         /* conn->mxk_incarnation = 0 - will be set by peer */
480         atomic_set(&conn->mxk_refcount, 1);     /* ref for owning peer */
481         conn->mxk_peer = peer;
482         /* mxk_epa - to be set after mx_iconnect() */
483         INIT_LIST_HEAD(&conn->mxk_list);
484         spin_lock_init(&conn->mxk_lock);
485         /* conn->mxk_timeout = 0 */
486         conn->mxk_last_tx = jiffies;
487         conn->mxk_last_rx = conn->mxk_last_tx;
488         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
489         /* mxk_outstanding = 0 */
490         conn->mxk_status = MXLND_CONN_INIT;
491         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
492         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
493         /* conn->mxk_ntx_msgs = 0 */
494         /* conn->mxk_ntx_data = 0 */
495         /* conn->mxk_ntx_posted = 0 */
496         /* conn->mxk_data_posted = 0 */
497         INIT_LIST_HEAD(&conn->mxk_pending);
498
499         *connp = conn;
500
501         mxlnd_peer_addref(peer);        /* add a ref for this conn */
502
503         /* add to front of peer's conns list */
504         spin_lock(&peer->mxp_lock);
505         list_add(&conn->mxk_list, &peer->mxp_conns);
506         peer->mxp_conn = conn;
507         spin_unlock(&peer->mxp_lock);
508         return 0;
509 }
510
511
512 int
513 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
514 {
515         int             ret     = 0;
516         struct kmx_conn *conn   = ctx->mxc_conn;
517
518         ctx->mxc_state = MXLND_CTX_PENDING;
519         if (conn != NULL) {
520                 spin_lock(&conn->mxk_lock);
521                 if (conn->mxk_status >= MXLND_CONN_INIT) {
522                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
523                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
524                                 conn->mxk_timeout = ctx->mxc_deadline;
525                         }
526                 } else {
527                         ctx->mxc_state = MXLND_CTX_COMPLETED;
528                         ret = -1;
529                 }
530                 spin_unlock(&conn->mxk_lock);
531         }
532         return ret;
533 }
534
535 int
536 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
537 {
538         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
539                 ctx->mxc_state == MXLND_CTX_COMPLETED);
540         if (ctx->mxc_state != MXLND_CTX_PENDING &&
541             ctx->mxc_state != MXLND_CTX_COMPLETED) {
542                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
543                        mxlnd_ctxstate_to_str(ctx->mxc_state));
544         }
545         ctx->mxc_state = MXLND_CTX_COMPLETED;
546         if (!list_empty(&ctx->mxc_list)) {
547                 struct kmx_conn *conn = ctx->mxc_conn;
548                 struct kmx_ctx *next = NULL;
549                 LASSERT(conn != NULL);
550                 spin_lock(&conn->mxk_lock);
551                 list_del_init(&ctx->mxc_list);
552                 conn->mxk_timeout = 0;
553                 if (!list_empty(&conn->mxk_pending)) {
554                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
555                         conn->mxk_timeout = next->mxc_deadline;
556                 }
557                 spin_unlock(&ctx->mxc_conn->mxk_lock);
558         }
559         return 0;
560 }
561
562 /**
563  * mxlnd_peer_free - free the peer
564  * @peer - a kmx_peer pointer
565  *
566  * The calling function should decrement the rxs, drain the tx queues and
567  * remove the peer from the peers list first then destroy it.
568  */
569 void
570 mxlnd_peer_free(struct kmx_peer *peer)
571 {
572         CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
573
574         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
575
576         if (peer->mxp_host != NULL) {
577                 spin_lock(&peer->mxp_host->mxh_lock);
578                 peer->mxp_host->mxh_peer = NULL;
579                 spin_unlock(&peer->mxp_host->mxh_lock);
580         }
581         if (!list_empty(&peer->mxp_peers)) {
582                 /* assume we are locked */
583                 list_del_init(&peer->mxp_peers);
584         }
585
586         MXLND_FREE (peer, sizeof (*peer));
587         atomic_dec(&kmxlnd_data.kmx_npeers);
588         return;
589 }
590
591 void
592 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
593 {
594         u64             nic_id  = 0LL;
595         char            name[MX_MAX_HOSTNAME_LEN + 1];
596         mx_return_t     mxret   = MX_SUCCESS;
597
598         memset(name, 0, sizeof(name));
599         snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
600         mxret = mx_hostname_to_nic_id(name, &nic_id);
601         if (mxret == MX_SUCCESS) {
602                 peer->mxp_nic_id = nic_id;
603         } else {
604                 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
605                                    "with %s\n", mx_strerror(mxret), name);
606                 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
607                 if (mxret == MX_SUCCESS) {
608                         peer->mxp_nic_id = nic_id;
609                 } else {
610                         CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
611                                            "with %s\n", mx_strerror(mxret), 
612                                            peer->mxp_host->mxh_hostname);
613                 }
614         }
615         return;
616 }
617
618 /**
619  * mxlnd_peer_alloc - allocate and initialize a new peer struct
620  * @peerp - address of a kmx_peer pointer
621  * @nid - LNET node id
622  *
623  * Returns 0 on success and -ENOMEM on failure
624  */
625 int
626 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
627 {
628         int                     i       = 0;
629         int                     ret     = 0;
630         u32                     addr    = LNET_NIDADDR(nid);
631         struct kmx_peer        *peer    = NULL;
632         struct kmx_host        *host    = NULL;
633
634         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
635
636         MXLND_ALLOC(peer, sizeof (*peer));
637         if (peer == NULL) {
638                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
639                 return -ENOMEM;
640         }
641         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
642
643         memset(peer, 0, sizeof(*peer));
644
645         list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
646                 if (addr == host->mxh_addr) {
647                         peer->mxp_host = host;
648                         spin_lock(&host->mxh_lock);
649                         host->mxh_peer = peer;
650                         spin_unlock(&host->mxh_lock);
651                         break;
652                 }
653         }
654         LASSERT(peer->mxp_host != NULL);
655
656         peer->mxp_nid = nid;
657         /* peer->mxp_incarnation */
658         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
659         mxlnd_peer_hostname_to_nic_id(peer);
660
661         INIT_LIST_HEAD(&peer->mxp_peers);
662         spin_lock_init(&peer->mxp_lock);
663         INIT_LIST_HEAD(&peer->mxp_conns);
664         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer);
665         if (ret != 0) {
666                 mxlnd_peer_decref(peer);
667                 return ret;
668         }
669
670         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
671                 struct kmx_ctx   *rx     = NULL;
672                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
673                 if (ret != 0) {
674                         mxlnd_reduce_idle_rxs(i);
675                         mxlnd_peer_decref(peer);
676                         return ret;
677                 }
678                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
679                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
680                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
681                 rx->mxc_put = -1;
682                 mxlnd_put_idle_rx(rx);
683         }
684         /* peer->mxp_reconnect_time = 0 */
685         /* peer->mxp_incompatible = 0 */
686
687         *peerp = peer;
688         return 0;
689 }
690
691 /**
692  * mxlnd_nid_to_hash - hash the nid
693  * @nid - msg pointer
694  *
695  * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
696  */
697 static inline int
698 mxlnd_nid_to_hash(lnet_nid_t nid)
699 {
700         return (nid & MXLND_HASH_MASK) ^
701                ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
702 }
703
704 static inline struct kmx_peer *
705 mxlnd_find_peer_by_nid(lnet_nid_t nid)
706 {
707         int                     found   = 0;
708         int                     hash    = 0;
709         struct kmx_peer         *peer   = NULL;
710
711         hash = mxlnd_nid_to_hash(nid);
712
713         read_lock(&kmxlnd_data.kmx_peers_lock);
714         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
715                 if (peer->mxp_nid == nid) {
716                         found = 1;
717                         break;
718                 }
719         }
720         read_unlock(&kmxlnd_data.kmx_peers_lock);
721         return (found ? peer : NULL);
722 }
723
724 static inline int
725 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
726 {
727         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
728                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
729                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
730                 tx->mxc_msg_type == MXLND_MSG_NOOP);
731 }
732
733 /**
734  * mxlnd_init_msg - set type and number of bytes
735  * @msg - msg pointer
736  * @type - of message
737  * @body_nob - bytes in msg body
738  */
739 static inline void
740 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
741 {
742         msg->mxm_type = type;
743         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
744 }
745
746 static inline void
747 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
748 {
749         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
750         struct kmx_msg  *msg    = NULL;
751         
752         LASSERT (tx != NULL);
753         LASSERT (nob <= MXLND_EAGER_SIZE);
754
755         tx->mxc_nid = nid;
756         /* tx->mxc_peer should have already been set if we know it */
757         tx->mxc_msg_type = type;
758         tx->mxc_nseg = 1;
759         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
760         tx->mxc_seg.segment_length = nob;
761         tx->mxc_pin_type = MX_PIN_PHYSICAL;
762         //tx->mxc_state = MXLND_CTX_PENDING;
763
764         msg = tx->mxc_msg;
765         msg->mxm_type = type;
766         msg->mxm_nob  = nob;
767
768         return;
769 }
770
771 static inline __u32 
772 mxlnd_cksum (void *ptr, int nob)
773 {
774         char  *c  = ptr;
775         __u32  sum = 0;
776
777         while (nob-- > 0)
778                 sum = ((sum << 1) | (sum >> 31)) + *c++;
779
780         /* ensure I don't return 0 (== no checksum) */
781         return (sum == 0) ? 1 : sum;
782 }
783
784 /**
785  * mxlnd_pack_msg - complete msg info
786  * @tx - msg to send
787  */
788 static inline void
789 mxlnd_pack_msg(struct kmx_ctx *tx)
790 {
791         struct kmx_msg  *msg    = tx->mxc_msg;
792
793         /* type and nob should already be set in init_msg() */
794         msg->mxm_magic    = MXLND_MSG_MAGIC;
795         msg->mxm_version  = MXLND_MSG_VERSION;
796         /*   mxm_type */
797         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
798          * return credits as well */
799         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
800             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
801                 spin_lock(&tx->mxc_conn->mxk_lock);
802                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
803                 tx->mxc_conn->mxk_outstanding = 0;
804                 spin_unlock(&tx->mxc_conn->mxk_lock);
805         } else {
806                 msg->mxm_credits  = 0;
807         }
808         /*   mxm_nob */
809         msg->mxm_cksum    = 0;        
810         msg->mxm_srcnid   = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
811         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
812         msg->mxm_dstnid   = tx->mxc_nid;
813         /* if it is a new peer, the dststamp will be 0 */
814         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;         
815         msg->mxm_seq      = tx->mxc_cookie;
816
817         if (*kmxlnd_tunables.kmx_cksum) {
818                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
819         }       
820 }
821
822 int
823 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
824 {
825         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
826         __u32     msg_cksum     = 0;
827         int       flip          = 0;
828         int       msg_nob       = 0;
829
830         /* 6 bytes are enough to have received magic + version */
831         if (nob < 6) {
832                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
833                 return -EPROTO;
834         }
835
836         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
837                 flip = 0;
838         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
839                 flip = 1;
840         } else {
841                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
842                 return -EPROTO;
843         }
844
845         if (msg->mxm_version !=
846             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
847                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
848                 return -EPROTO;
849         }
850
851         if (nob < hdr_size) {
852                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
853                 return -EPROTO;
854         }
855
856         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
857         if (msg_nob > nob) {
858                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
859                 return -EPROTO;
860         }
861
862         /* checksum must be computed with mxm_cksum zero and BEFORE anything
863          * gets flipped */
864         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
865         msg->mxm_cksum = 0;
866         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
867                 CDEBUG(D_NETERROR, "Bad checksum\n");
868                 return -EPROTO;
869         }
870         msg->mxm_cksum = msg_cksum;
871
872         if (flip) {
873                 /* leave magic unflipped as a clue to peer endianness */
874                 __swab16s(&msg->mxm_version);
875                 CLASSERT (sizeof(msg->mxm_type) == 1);
876                 CLASSERT (sizeof(msg->mxm_credits) == 1);
877                 msg->mxm_nob = msg_nob;
878                 __swab64s(&msg->mxm_srcnid);
879                 __swab64s(&msg->mxm_srcstamp);
880                 __swab64s(&msg->mxm_dstnid);
881                 __swab64s(&msg->mxm_dststamp);
882                 __swab64s(&msg->mxm_seq);
883         }
884
885         if (msg->mxm_srcnid == LNET_NID_ANY) {
886                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
887                 return -EPROTO;
888         }
889
890         switch (msg->mxm_type) {
891         default:
892                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
893                 return -EPROTO;
894
895         case MXLND_MSG_NOOP:
896                 break;
897
898         case MXLND_MSG_EAGER:
899                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
900                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
901                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
902                         return -EPROTO;
903                 }
904                 break;
905
906         case MXLND_MSG_PUT_REQ:
907                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
908                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
909                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
910                         return -EPROTO;
911                 }
912                 if (flip)
913                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
914                 break;
915
916         case MXLND_MSG_PUT_ACK:
917                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
918                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
919                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
920                         return -EPROTO;
921                 }
922                 if (flip) {
923                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
924                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
925                 }
926                 break;
927
928         case MXLND_MSG_GET_REQ:
929                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
930                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
931                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
932                         return -EPROTO;
933                 }
934                 if (flip) {
935                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
936                 }
937                 break;
938
939         case MXLND_MSG_CONN_REQ:
940         case MXLND_MSG_CONN_ACK:
941                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
942                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
943                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
944                         return -EPROTO;
945                 }
946                 if (flip) {
947                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
948                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
949                 }
950                 break;
951         }
952         return 0;
953 }
954
955 /**
956  * mxlnd_recv_msg
957  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
958  * @rx
959  * @msg_type
960  * @cookie
961  * @length - length of incoming message
962  * @pending - add to kmx_pending (0 is NO and 1 is YES)
963  *
964  * The caller gets the rx and sets nid, peer and conn if known.
965  *
966  * Returns 0 on success and -1 on failure
967  */
968 int
969 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
970 {
971         int             ret     = 0;
972         mx_return_t     mxret   = MX_SUCCESS;
973         __u64           mask    = 0xF00FFFFFFFFFFFFFLL;
974
975         rx->mxc_msg_type = msg_type;
976         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
977         rx->mxc_cookie = cookie;
978         /* rx->mxc_match may already be set */
979         /* rx->mxc_seg.segment_ptr is already set */
980         rx->mxc_seg.segment_length = length;
981         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
982         ret = mxlnd_q_pending_ctx(rx);
983         if (ret == -1) {
984                 /* FIXME the conn is disconnected, now what? */
985                 return -1;
986         }
987         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
988                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
989         if (mxret != MX_SUCCESS) {
990                 mxlnd_deq_pending_ctx(rx);
991                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n", 
992                                    mx_strerror(mxret), (int) mxret);
993                 return -1;
994         }
995         return 0;
996 }
997
998
999 /**
1000  * mxlnd_unexpected_recv - this is the callback function that will handle 
1001  *                         unexpected receives
1002  * @context - NULL, ignore
1003  * @source - the peer's mx_endpoint_addr_t
1004  * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1005  * @length - length of incoming message
1006  * @data_if_available - ignore
1007  *
1008  * If it is an eager-sized msg, we will call recv_msg() with the actual
1009  * length. If it is a large message, we will call recv_msg() with a
1010  * length of 0 bytes to drop it because we should never have a large,
1011  * unexpected message.
1012  *
1013  * NOTE - The MX library blocks until this function completes. Make it as fast as
1014  * possible. DO NOT allocate memory which can block!
1015  *
1016  * If we cannot get a rx or the conn is closed, drop the message on the floor
1017  * (i.e. recv 0 bytes and ignore).
1018  */
1019 mx_unexp_handler_action_t
1020 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1021                  __u64 match_value, __u32 length, void *data_if_available)
1022 {
1023         int             ret             = 0;
1024         struct kmx_ctx  *rx             = NULL;
1025         mx_ksegment_t   seg;
1026         u8              msg_type        = 0;
1027         u8              error           = 0;
1028         u64             cookie          = 0LL;
1029
1030         if (context != NULL) {
1031                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1032         }
1033
1034 #if MXLND_DEBUG
1035         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1036 #endif
1037
1038         rx = mxlnd_get_idle_rx();
1039         if (rx != NULL) {
1040                 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1041                 if (length <= MXLND_EAGER_SIZE) {
1042                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1043                 } else {
1044                         CDEBUG(D_NETERROR, "unexpected large receive with "
1045                                            "match_value=0x%llx length=%d\n", 
1046                                            match_value, length);
1047                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1048                 }
1049                 if (ret == 0) {
1050                         struct kmx_conn *conn   = NULL;
1051                         mx_get_endpoint_addr_context(source, (void **) &conn);
1052                         if (conn != NULL) {
1053                                 mxlnd_conn_addref(conn);
1054                                 rx->mxc_conn = conn;
1055                                 rx->mxc_peer = conn->mxk_peer;
1056                                 if (conn->mxk_peer != NULL) {
1057                                         rx->mxc_nid = conn->mxk_peer->mxp_nid;
1058                                 } else {
1059                                         CDEBUG(D_NETERROR, "conn is 0x%p and peer "
1060                                                            "is NULL\n", conn);
1061                                 }
1062                         }
1063                 } else {
1064                         CDEBUG(D_NETERROR, "could not post receive\n");
1065                         mxlnd_put_idle_rx(rx);
1066                 }
1067         }
1068
1069         if (rx == NULL || ret != 0) {
1070                 if (rx == NULL) {
1071                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1072                 } else {
1073                         /* ret != 0 */
1074                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1075                 }
1076                 seg.segment_ptr = 0LL;
1077                 seg.segment_length = 0;
1078                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1079                           match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1080         }
1081
1082         return MX_RECV_CONTINUE;
1083 }
1084
1085
1086 int
1087 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1088 {
1089         int                      i      = 0;
1090         int                      ret    = -ENOENT;
1091         struct kmx_peer         *peer   = NULL;
1092         struct kmx_conn         *conn   = NULL;
1093
1094         read_lock(&kmxlnd_data.kmx_peers_lock);
1095         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1096                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1097                         conn = peer->mxp_conn;
1098                         if (index-- > 0)
1099                                 continue;
1100
1101                         *nidp = peer->mxp_nid;
1102                         *count = atomic_read(&peer->mxp_refcount);
1103                         ret = 0;
1104                         break;
1105                 }
1106         }
1107         read_unlock(&kmxlnd_data.kmx_peers_lock);
1108         
1109         return ret;
1110 }
1111
1112 void
1113 mxlnd_del_peer_locked(struct kmx_peer *peer)
1114 {
1115         list_del_init(&peer->mxp_peers); /* remove from the global list */
1116         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 0, 0);
1117         mxlnd_peer_decref(peer); /* drop global list ref */
1118         return;
1119 }
1120
1121 int
1122 mxlnd_del_peer(lnet_nid_t nid)
1123 {
1124         int             i       = 0;
1125         int             ret     = 0;
1126         struct kmx_peer *peer   = NULL;
1127         struct kmx_peer *next   = NULL;
1128
1129         if (nid != LNET_NID_ANY) {
1130                 peer = mxlnd_find_peer_by_nid(nid);
1131         }
1132         write_lock(&kmxlnd_data.kmx_peers_lock);
1133         if (nid != LNET_NID_ANY) {
1134                 if (peer == NULL) {
1135                         ret = -ENOENT;
1136                 } else {
1137                         mxlnd_del_peer_locked(peer);
1138                 }
1139         } else { /* LNET_NID_ANY */
1140                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1141                         list_for_each_entry_safe(peer, next, 
1142                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1143                                 mxlnd_del_peer_locked(peer);
1144                         }
1145                 }
1146         }
1147         write_unlock(&kmxlnd_data.kmx_peers_lock);
1148
1149         return ret;
1150 }
1151
1152 struct kmx_conn *
1153 mxlnd_get_conn_by_idx(int index)
1154 {
1155         int                      i      = 0;
1156         struct kmx_peer         *peer   = NULL;
1157         struct kmx_conn         *conn   = NULL;
1158
1159         read_lock(&kmxlnd_data.kmx_peers_lock);
1160         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1161                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1162                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1163                                 if (index-- > 0)
1164                                         continue;
1165
1166                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1167                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
1168                                 return conn;
1169                         }
1170                 }
1171         }
1172         read_unlock(&kmxlnd_data.kmx_peers_lock);
1173         
1174         return NULL;
1175 }
1176
1177 void
1178 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1179 {
1180         struct kmx_conn *conn   = NULL;
1181         struct kmx_conn *next   = NULL;
1182
1183         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1184                 mxlnd_conn_disconnect(conn, 0 , 0);
1185         }
1186         return;
1187 }
1188
1189 int
1190 mxlnd_close_matching_conns(lnet_nid_t nid)
1191 {
1192         int             i       = 0;
1193         int             ret     = 0;
1194         struct kmx_peer *peer   = NULL;
1195
1196         read_lock(&kmxlnd_data.kmx_peers_lock);
1197         if (nid != LNET_NID_ANY) {
1198                 peer = mxlnd_find_peer_by_nid(nid);
1199                 if (peer == NULL) {
1200                         ret = -ENOENT;
1201                 } else {
1202                         mxlnd_close_matching_conns_locked(peer);
1203                 }
1204         } else { /* LNET_NID_ANY */
1205                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1206                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1207                                 mxlnd_close_matching_conns_locked(peer);
1208                 }
1209         }
1210         read_unlock(&kmxlnd_data.kmx_peers_lock);
1211
1212         return ret;
1213 }
1214
1215 /**
1216  * mxlnd_ctl - modify MXLND parameters
1217  * @ni - LNET interface handle
1218  * @cmd - command to change
1219  * @arg - the ioctl data
1220  *
1221  * Not implemented yet.
1222  */
1223 int
1224 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1225 {
1226         struct libcfs_ioctl_data *data  = arg;
1227         int                       ret   = -EINVAL;
1228
1229         LASSERT (ni == kmxlnd_data.kmx_ni);
1230
1231         switch (cmd) {
1232         case IOC_LIBCFS_GET_PEER: {
1233                 lnet_nid_t      nid     = 0;
1234                 int             count   = 0;
1235
1236                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1237                 data->ioc_nid    = nid;
1238                 data->ioc_count  = count;
1239                 break;
1240         }
1241         case IOC_LIBCFS_DEL_PEER: {
1242                 ret = mxlnd_del_peer(data->ioc_nid);
1243                 break;
1244         }
1245         case IOC_LIBCFS_GET_CONN: {
1246                 struct kmx_conn *conn = NULL;
1247
1248                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1249                 if (conn == NULL) {
1250                         ret = -ENOENT;
1251                 } else {
1252                         ret = 0;
1253                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1254                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1255                 }
1256                 break;
1257         }
1258         case IOC_LIBCFS_CLOSE_CONNECTION: {
1259                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1260                 break;
1261         }
1262         default:
1263                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1264                 break;
1265         }
1266         
1267         return ret;
1268 }
1269
1270 /**
1271  * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1272  * @tx
1273  *
1274  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1275  */
1276 void
1277 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1278 {
1279         u8                      msg_type        = tx->mxc_msg_type;
1280         //struct kmx_peer         *peer           = tx->mxc_peer;
1281         struct kmx_conn         *conn           = tx->mxc_conn;
1282
1283         LASSERT (msg_type != 0);
1284         LASSERT (tx->mxc_nid != 0);
1285         LASSERT (tx->mxc_peer != NULL);
1286         LASSERT (tx->mxc_conn != NULL);
1287
1288         tx->mxc_incarnation = conn->mxk_incarnation;
1289
1290         if (msg_type != MXLND_MSG_PUT_DATA &&
1291             msg_type != MXLND_MSG_GET_DATA) {
1292                 /* msg style tx */
1293                 if (mxlnd_tx_requires_credit(tx)) {
1294                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1295                         conn->mxk_ntx_msgs++;
1296                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1297                            msg_type == MXLND_MSG_CONN_ACK) {
1298                         /* put conn msgs at the front of the queue */
1299                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1300                 } else {
1301                         /* PUT_ACK, PUT_NAK */
1302                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1303                         conn->mxk_ntx_msgs++;
1304                 }
1305         } else {
1306                 /* data style tx */
1307                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1308                 conn->mxk_ntx_data++;
1309         }
1310
1311         return;
1312 }
1313
1314 /**
1315  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1316  * @tx
1317  *
1318  * Add the tx to the peer's msg or data queue
1319  */
1320 static inline void
1321 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1322 {
1323         LASSERT(tx->mxc_peer != NULL);
1324         LASSERT(tx->mxc_conn != NULL);
1325         spin_lock(&tx->mxc_conn->mxk_lock);
1326         mxlnd_peer_queue_tx_locked(tx);
1327         spin_unlock(&tx->mxc_conn->mxk_lock);
1328
1329         return;
1330 }
1331
1332 /**
1333  * mxlnd_queue_tx - add the tx to the global tx queue
1334  * @tx
1335  *
1336  * Add the tx to the global queue and up the tx_queue_sem
1337  */
1338 void
1339 mxlnd_queue_tx(struct kmx_ctx *tx)
1340 {
1341         int             ret     = 0;
1342         struct kmx_peer *peer   = tx->mxc_peer;
1343         LASSERT (tx->mxc_nid != 0);
1344
1345         if (peer != NULL) {
1346                 if (peer->mxp_incompatible &&
1347                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1348                         /* let this fail now */
1349                         tx->mxc_status.code = -ECONNABORTED;
1350                         mxlnd_put_idle_tx(tx);
1351                         return;
1352                 }
1353                 if (tx->mxc_conn == NULL) {
1354                         mxlnd_conn_alloc(&tx->mxc_conn, peer);
1355                 }
1356                 LASSERT(tx->mxc_conn != NULL);
1357                 mxlnd_peer_queue_tx(tx);
1358                 ret = mxlnd_check_sends(peer);
1359         } else {
1360                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1361                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1362                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1363                 up(&kmxlnd_data.kmx_tx_queue_sem);
1364         }
1365         return;
1366 }
1367
1368 int
1369 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1370 {
1371         int             i                       = 0;
1372         int             sum                     = 0;
1373         int             old_sum                 = 0;
1374         int             nseg                    = 0;
1375         int             first_iov               = -1;
1376         int             first_iov_offset        = 0;
1377         int             first_found             = 0;
1378         int             last_iov                = -1;
1379         int             last_iov_length         = 0;
1380         mx_ksegment_t  *seg                     = NULL;
1381
1382         if (niov == 0) return 0;
1383         LASSERT(iov != NULL);
1384
1385         for (i = 0; i < niov; i++) {
1386                 sum = old_sum + (u32) iov[i].iov_len;
1387                 if (!first_found && (sum > offset)) {
1388                         first_iov = i;
1389                         first_iov_offset = offset - old_sum;
1390                         first_found = 1;
1391                         sum = (u32) iov[i].iov_len - first_iov_offset;
1392                         old_sum = 0;
1393                 }
1394                 if (sum >= nob) {
1395                         last_iov = i;
1396                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1397                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1398                         break;
1399                 }
1400                 old_sum = sum;
1401         }
1402         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1403         nseg = last_iov - first_iov + 1;
1404         LASSERT(nseg > 0);
1405         
1406         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1407         if (seg == NULL) {
1408                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1409                 return -1;
1410         }
1411         memset(seg, 0, nseg * sizeof(*seg));
1412         ctx->mxc_nseg = nseg;
1413         sum = 0;
1414         for (i = 0; i < nseg; i++) {
1415                 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1416                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1417                 if (i == 0) {
1418                         seg[i].segment_ptr += (u64) first_iov_offset;
1419                         seg[i].segment_length -= (u32) first_iov_offset;
1420                 }
1421                 if (i == (nseg - 1)) {
1422                         seg[i].segment_length = (u32) last_iov_length;
1423                 }
1424                 sum += seg[i].segment_length;
1425         }
1426         ctx->mxc_seg_list = seg;
1427         ctx->mxc_pin_type = MX_PIN_KERNEL;
1428 #ifdef MX_PIN_FULLPAGES
1429         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1430 #endif
1431         LASSERT(nob == sum);
1432         return 0;
1433 }
1434
1435 int
1436 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1437 {
1438         int             i                       = 0;
1439         int             sum                     = 0;
1440         int             old_sum                 = 0;
1441         int             nseg                    = 0;
1442         int             first_kiov              = -1;
1443         int             first_kiov_offset       = 0;
1444         int             first_found             = 0;
1445         int             last_kiov               = -1;
1446         int             last_kiov_length        = 0;
1447         mx_ksegment_t  *seg                     = NULL;
1448
1449         if (niov == 0) return 0;
1450         LASSERT(kiov != NULL);
1451
1452         for (i = 0; i < niov; i++) {
1453                 sum = old_sum + kiov[i].kiov_len;
1454                 if (i == 0) sum -= kiov[i].kiov_offset;
1455                 if (!first_found && (sum > offset)) {
1456                         first_kiov = i;
1457                         first_kiov_offset = offset - old_sum;
1458                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1459                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1460                         first_found = 1;
1461                         sum = kiov[i].kiov_len - first_kiov_offset;
1462                         old_sum = 0;
1463                 }
1464                 if (sum >= nob) {
1465                         last_kiov = i;
1466                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1467                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1468                         break;
1469                 }
1470                 old_sum = sum;
1471         }
1472         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1473         nseg = last_kiov - first_kiov + 1;
1474         LASSERT(nseg > 0);
1475         
1476         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1477         if (seg == NULL) {
1478                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1479                 return -1;
1480         }
1481         memset(seg, 0, niov * sizeof(*seg));
1482         ctx->mxc_nseg = niov;
1483         sum = 0;
1484         for (i = 0; i < niov; i++) {
1485                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1486                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1487                 if (i == 0) {
1488                         seg[i].segment_ptr += (u64) first_kiov_offset;
1489                         /* we have to add back the original kiov_offset */
1490                         seg[i].segment_length -= first_kiov_offset +
1491                                                  kiov[first_kiov].kiov_offset;
1492                 }
1493                 if (i == (nseg - 1)) {
1494                         seg[i].segment_length = last_kiov_length;
1495                 }
1496                 sum += seg[i].segment_length;
1497         }
1498         ctx->mxc_seg_list = seg;
1499         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1500 #ifdef MX_PIN_FULLPAGES
1501         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1502 #endif
1503         LASSERT(nob == sum);
1504         return 0;
1505 }
1506
1507 void
1508 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1509 {
1510         LASSERT(type == MXLND_MSG_PUT_ACK);
1511         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1512         tx->mxc_cookie = cookie;
1513         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1514         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1515         tx->mxc_match = mxlnd_create_match(tx, status);
1516
1517         mxlnd_queue_tx(tx);
1518 }
1519
1520
1521 /**
1522  * mxlnd_send_data - get tx, map [k]iov, queue tx
1523  * @ni
1524  * @lntmsg
1525  * @peer
1526  * @msg_type
1527  * @cookie
1528  *
1529  * This setups the DATA send for PUT or GET.
1530  *
1531  * On success, it queues the tx, on failure it calls lnet_finalize()
1532  */
1533 void
1534 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1535 {
1536         int                     ret             = 0;
1537         lnet_process_id_t       target          = lntmsg->msg_target;
1538         unsigned int            niov            = lntmsg->msg_niov;
1539         struct iovec           *iov             = lntmsg->msg_iov;
1540         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1541         unsigned int            offset          = lntmsg->msg_offset;
1542         unsigned int            nob             = lntmsg->msg_len;
1543         struct kmx_ctx         *tx              = NULL;
1544
1545         LASSERT(lntmsg != NULL);
1546         LASSERT(peer != NULL);
1547         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1548         LASSERT((cookie>>52) == 0);
1549
1550         tx = mxlnd_get_idle_tx();
1551         if (tx == NULL) {
1552                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1553                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1554                         libcfs_nid2str(target.nid));
1555                 goto failed_0;
1556         }
1557         tx->mxc_nid = target.nid;
1558         mxlnd_conn_addref(peer->mxp_conn);
1559         tx->mxc_peer = peer;
1560         tx->mxc_conn = peer->mxp_conn;
1561         tx->mxc_msg_type = msg_type;
1562         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1563         tx->mxc_state = MXLND_CTX_PENDING;
1564         tx->mxc_lntmsg[0] = lntmsg;
1565         tx->mxc_cookie = cookie;
1566         tx->mxc_match = mxlnd_create_match(tx, 0);
1567
1568         /* This setups up the mx_ksegment_t to send the DATA payload  */
1569         if (nob == 0) {
1570                 /* do not setup the segments */
1571                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1572                                    "to %s?\n", libcfs_nid2str(target.nid));
1573                 ret = 0;
1574         } else if (kiov == NULL) {
1575                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1576         } else {
1577                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1578         }
1579         if (ret != 0) {
1580                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1581                                    libcfs_nid2str(target.nid));
1582                 tx->mxc_status.code = -EIO;
1583                 goto failed_1;
1584         }
1585         mxlnd_queue_tx(tx);
1586         return;
1587
1588 failed_1:
1589         mxlnd_conn_decref(peer->mxp_conn);
1590         mxlnd_put_idle_tx(tx);
1591         return;
1592
1593 failed_0:
1594         CDEBUG(D_NETERROR, "no tx avail\n");
1595         lnet_finalize(ni, lntmsg, -EIO);
1596         return;
1597 }
1598
1599 /**
1600  * mxlnd_recv_data - map [k]iov, post rx
1601  * @ni
1602  * @lntmsg
1603  * @rx
1604  * @msg_type
1605  * @cookie
1606  *
1607  * This setups the DATA receive for PUT or GET.
1608  *
1609  * On success, it returns 0, on failure it returns -1
1610  */
1611 int
1612 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1613 {
1614         int                     ret             = 0;
1615         lnet_process_id_t       target          = lntmsg->msg_target;
1616         unsigned int            niov            = lntmsg->msg_niov;
1617         struct iovec           *iov             = lntmsg->msg_iov;
1618         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1619         unsigned int            offset          = lntmsg->msg_offset;
1620         unsigned int            nob             = lntmsg->msg_len;
1621         mx_return_t             mxret           = MX_SUCCESS;
1622
1623         /* above assumes MXLND_MSG_PUT_DATA */
1624         if (msg_type == MXLND_MSG_GET_DATA) {
1625                 niov = lntmsg->msg_md->md_niov;
1626                 iov = lntmsg->msg_md->md_iov.iov;
1627                 kiov = lntmsg->msg_md->md_iov.kiov;
1628                 offset = 0;
1629                 nob = lntmsg->msg_md->md_length;
1630         }
1631
1632         LASSERT(lntmsg != NULL);
1633         LASSERT(rx != NULL);
1634         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1635         LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1636
1637         rx->mxc_msg_type = msg_type;
1638         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1639         rx->mxc_state = MXLND_CTX_PENDING;
1640         rx->mxc_nid = target.nid;
1641         /* if posting a GET_DATA, we may not yet know the peer */
1642         if (rx->mxc_peer != NULL) {
1643                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1644         }
1645         rx->mxc_lntmsg[0] = lntmsg;
1646         rx->mxc_cookie = cookie;
1647         rx->mxc_match = mxlnd_create_match(rx, 0);
1648         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1649         if (kiov == NULL) {
1650                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1651         } else {
1652                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1653         }
1654         if (msg_type == MXLND_MSG_GET_DATA) {
1655                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1656                 if (rx->mxc_lntmsg[1] == NULL) {
1657                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1658                                            libcfs_nid2str(target.nid));
1659                         ret = -1;
1660                 }
1661         }
1662         if (ret != 0) {
1663                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1664                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1665                        libcfs_nid2str(target.nid));
1666                 return -1;
1667         }
1668         ret = mxlnd_q_pending_ctx(rx);
1669         if (ret == -1) {
1670                 return -1;
1671         }
1672         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1673         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, 
1674                           rx->mxc_seg_list, rx->mxc_nseg,
1675                           rx->mxc_pin_type, rx->mxc_match,
1676                           0xF00FFFFFFFFFFFFFLL, (void *) rx, 
1677                           &rx->mxc_mxreq);
1678         if (mxret != MX_SUCCESS) {
1679                 if (rx->mxc_conn != NULL) {
1680                         mxlnd_deq_pending_ctx(rx);
1681                 }
1682                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n", 
1683                                    (int) mxret, libcfs_nid2str(target.nid));
1684                 return -1;
1685         }
1686
1687         return 0;
1688 }
1689
1690 /**
1691  * mxlnd_send - the LND required send function
1692  * @ni
1693  * @private
1694  * @lntmsg
1695  *
1696  * This must not block. Since we may not have a peer struct for the receiver,
1697  * it will append send messages on a global tx list. We will then up the
1698  * tx_queued's semaphore to notify it of the new send. 
1699  */
1700 int
1701 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1702 {
1703         int                     ret             = 0;
1704         int                     type            = lntmsg->msg_type;
1705         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1706         lnet_process_id_t       target          = lntmsg->msg_target;
1707         lnet_nid_t              nid             = target.nid;
1708         int                     target_is_router = lntmsg->msg_target_is_router;
1709         int                     routing         = lntmsg->msg_routing;
1710         unsigned int            payload_niov    = lntmsg->msg_niov;
1711         struct iovec           *payload_iov     = lntmsg->msg_iov;
1712         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1713         unsigned int            payload_offset  = lntmsg->msg_offset;
1714         unsigned int            payload_nob     = lntmsg->msg_len;
1715         struct kmx_ctx         *tx              = NULL;
1716         struct kmx_msg         *txmsg           = NULL;
1717         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1718         struct kmx_ctx         *rx_data         = NULL;
1719         struct kmx_conn        *conn            = NULL;
1720         int                     nob             = 0;
1721         __u32                   length          = 0;
1722         struct kmx_peer         *peer           = NULL;
1723
1724         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1725                        payload_nob, payload_niov, libcfs_id2str(target));
1726
1727         LASSERT (payload_nob == 0 || payload_niov > 0);
1728         LASSERT (payload_niov <= LNET_MAX_IOV);
1729         /* payload is either all vaddrs or all pages */
1730         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1731
1732         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1733
1734         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1735          * to a new peer, use the nid */
1736         peer = mxlnd_find_peer_by_nid(nid);
1737         if (peer != NULL) {
1738                 conn = peer->mxp_conn;
1739                 if (conn) mxlnd_conn_addref(conn);
1740         }
1741         if (conn == NULL && peer != NULL) {
1742                 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n", 
1743                        peer, nid, payload_nob, ((type==LNET_MSG_PUT) ? "PUT" : 
1744                        ((type==LNET_MSG_GET) ? "GET" : "Other")));
1745         }
1746
1747         switch (type) {
1748         case LNET_MSG_ACK:
1749                 LASSERT (payload_nob == 0);
1750                 break;
1751
1752         case LNET_MSG_REPLY:
1753         case LNET_MSG_PUT:
1754                 /* Is the payload small enough not to need DATA? */
1755                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1756                 if (nob <= MXLND_EAGER_SIZE)
1757                         break;                  /* send EAGER */
1758
1759                 tx = mxlnd_get_idle_tx();
1760                 if (unlikely(tx == NULL)) {
1761                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1762                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1763                                libcfs_nid2str(nid));
1764                         if (conn) mxlnd_conn_decref(conn);
1765                         return -ENOMEM;
1766                 }
1767
1768                 /* the peer may be NULL */
1769                 tx->mxc_peer = peer;
1770                 tx->mxc_conn = conn; /* may be NULL */
1771                 /* we added a conn ref above */
1772                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1773                 txmsg = tx->mxc_msg;
1774                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1775                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1776                 tx->mxc_match = mxlnd_create_match(tx, 0);
1777
1778                 /* we must post a receive _before_ sending the request.
1779                  * we need to determine how much to receive, it will be either
1780                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1781
1782                 rx = mxlnd_get_idle_rx();
1783                 if (unlikely(rx == NULL)) {
1784                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1785                                            libcfs_nid2str(nid));
1786                         mxlnd_put_idle_tx(tx);
1787                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1788                         return -ENOMEM;
1789                 }
1790                 rx->mxc_nid = nid;
1791                 rx->mxc_peer = peer;
1792                 /* conn may be NULL but unlikely since the first msg is always small */
1793                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1794                 rx->mxc_conn = conn;
1795                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1796                 rx->mxc_cookie = tx->mxc_cookie;
1797                 rx->mxc_match = mxlnd_create_match(rx, 0);
1798
1799                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1800                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1801                 if (unlikely(ret != 0)) {
1802                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1803                                            libcfs_nid2str(nid));
1804                         rx->mxc_lntmsg[0] = NULL;
1805                         mxlnd_put_idle_rx(rx);
1806                         mxlnd_put_idle_tx(tx);
1807                         if (conn) {
1808                                 mxlnd_conn_decref(conn); /* for the rx... */
1809                                 mxlnd_conn_decref(conn); /* and for the tx */
1810                         }
1811                         return -ENOMEM;
1812                 }
1813
1814                 mxlnd_queue_tx(tx);
1815                 return 0;
1816
1817         case LNET_MSG_GET:
1818                 if (routing || target_is_router)
1819                         break;                  /* send EAGER */
1820
1821                 /* is the REPLY message too small for DATA? */
1822                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1823                 if (nob <= MXLND_EAGER_SIZE)
1824                         break;                  /* send EAGER */
1825
1826                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1827                  * then post GET_REQ tx */
1828                 tx = mxlnd_get_idle_tx();
1829                 if (unlikely(tx == NULL)) {
1830                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1831                                            libcfs_nid2str(nid));
1832                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1833                         return -ENOMEM;
1834                 }
1835                 rx_data = mxlnd_get_idle_rx();
1836                 if (unlikely(rx_data == NULL)) {
1837                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1838                                            libcfs_nid2str(nid));
1839                         mxlnd_put_idle_tx(tx);
1840                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1841                         return -ENOMEM;
1842                 }
1843                 rx_data->mxc_peer = peer;
1844                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1845                 rx_data->mxc_conn = conn; /* may be NULL */
1846
1847                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1848                 if (unlikely(ret != 0)) {
1849                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1850                                            libcfs_nid2str(nid));
1851                         mxlnd_put_idle_rx(rx_data);
1852                         mxlnd_put_idle_tx(tx);
1853                         if (conn) {
1854                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1855                                 mxlnd_conn_decref(conn); /* and for the tx */
1856                         }
1857                         return -EIO;
1858                 }
1859
1860                 tx->mxc_peer = peer;
1861                 tx->mxc_conn = conn; /* may be NULL */
1862                 /* conn ref taken above */
1863                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1864                 txmsg = tx->mxc_msg;
1865                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1866                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1867                 tx->mxc_match = mxlnd_create_match(tx, 0);
1868
1869                 mxlnd_queue_tx(tx);
1870                 return 0;
1871
1872         default:
1873                 LBUG();
1874                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1875                 return -EIO;
1876         }
1877
1878         /* send EAGER */
1879
1880         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1881                 <= MXLND_EAGER_SIZE);
1882
1883         tx = mxlnd_get_idle_tx();
1884         if (unlikely(tx == NULL)) {
1885                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1886                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1887                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1888                 return -ENOMEM;
1889         }
1890
1891         tx->mxc_peer = peer;
1892         tx->mxc_conn = conn; /* may be NULL */
1893         /* conn ref taken above */
1894         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1895         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1896         tx->mxc_match = mxlnd_create_match(tx, 0);
1897
1898         txmsg = tx->mxc_msg;
1899         txmsg->mxm_u.eager.mxem_hdr = *hdr;
1900
1901         if (payload_kiov != NULL)
1902                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1903                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1904                             payload_niov, payload_kiov, payload_offset, payload_nob);
1905         else
1906                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1907                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1908                             payload_niov, payload_iov, payload_offset, payload_nob);
1909
1910         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1911         mxlnd_queue_tx(tx);
1912         return 0;
1913 }
1914
1915 /**
1916  * mxlnd_recv - the LND required recv function
1917  * @ni
1918  * @private
1919  * @lntmsg
1920  * @delayed
1921  * @niov
1922  * @kiov
1923  * @offset
1924  * @mlen
1925  * @rlen
1926  *
1927  * This must not block.
1928  */
1929 int
1930 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1931              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1932              unsigned int offset, unsigned int mlen, unsigned int rlen)
1933 {
1934         int                     ret             = 0;
1935         int                     nob             = 0;
1936         int                     len             = 0;
1937         struct kmx_ctx          *rx             = private;
1938         struct kmx_msg          *rxmsg          = rx->mxc_msg;
1939         lnet_nid_t               nid            = rx->mxc_nid;
1940         struct kmx_ctx          *tx             = NULL;
1941         struct kmx_msg          *txmsg          = NULL;
1942         struct kmx_peer         *peer           = rx->mxc_peer;
1943         struct kmx_conn         *conn           = peer->mxp_conn;
1944         u64                      cookie         = 0LL;
1945         int                      msg_type       = rxmsg->mxm_type;
1946         int                      repost         = 1;
1947         int                      credit         = 0;
1948         int                      finalize       = 0;
1949
1950         LASSERT (mlen <= rlen);
1951         /* Either all pages or all vaddrs */
1952         LASSERT (!(kiov != NULL && iov != NULL));
1953         LASSERT (peer != NULL);
1954
1955         /* conn_addref(conn) already taken for the primary rx */
1956
1957         switch (msg_type) {
1958         case MXLND_MSG_EAGER:
1959                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
1960                 len = rx->mxc_status.xfer_length;
1961                 if (unlikely(nob > len)) {
1962                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
1963                                            libcfs_nid2str(nid), nob, len);
1964                         ret = -EPROTO;
1965                         break;
1966                 }
1967
1968                 if (kiov != NULL)
1969                         lnet_copy_flat2kiov(niov, kiov, offset,
1970                                 MXLND_EAGER_SIZE, rxmsg,
1971                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1972                                 mlen);
1973                 else
1974                         lnet_copy_flat2iov(niov, iov, offset,
1975                                 MXLND_EAGER_SIZE, rxmsg,
1976                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1977                                 mlen);
1978                 finalize = 1;
1979                 credit = 1;
1980                 break;
1981
1982         case MXLND_MSG_PUT_REQ:
1983                 /* we are going to reuse the rx, store the needed info */
1984                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
1985
1986                 /* get tx, post rx, send PUT_ACK */
1987
1988                 tx = mxlnd_get_idle_tx();
1989                 if (unlikely(tx == NULL)) {
1990                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
1991                         /* Not replying will break the connection */
1992                         ret = -ENOMEM;
1993                         break;
1994                 }
1995                 if (unlikely(mlen == 0)) {
1996                         finalize = 1;
1997                         tx->mxc_peer = peer;
1998                         tx->mxc_conn = conn;
1999                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2000                         /* repost = 1 */
2001                         break;
2002                 }
2003
2004                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2005                 tx->mxc_peer = peer;
2006                 tx->mxc_conn = conn;
2007                 mxlnd_conn_addref(conn); /* for the tx */
2008                 txmsg = tx->mxc_msg;
2009                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2010                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2011                 tx->mxc_cookie = cookie;
2012                 tx->mxc_match = mxlnd_create_match(tx, 0);
2013
2014                 /* we must post a receive _before_ sending the PUT_ACK */
2015                 mxlnd_ctx_init(rx);
2016                 rx->mxc_state = MXLND_CTX_PREP;
2017                 rx->mxc_peer = peer;
2018                 rx->mxc_conn = conn;
2019                 /* do not take another ref for this rx, it is already taken */
2020                 rx->mxc_nid = peer->mxp_nid;
2021                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2022                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2023
2024                 if (unlikely(ret != 0)) {
2025                         /* Notify peer that it's over */
2026                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2027                                            libcfs_nid2str(nid), ret);
2028                         mxlnd_ctx_init(tx);
2029                         tx->mxc_state = MXLND_CTX_PREP;
2030                         tx->mxc_peer = peer;
2031                         tx->mxc_conn = conn;
2032                         /* finalize = 0, let the PUT_ACK tx finalize this */
2033                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2034                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2035                         /* conn ref already taken above */
2036                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2037                         /* repost = 1 */
2038                         break;
2039                 }
2040
2041                 mxlnd_queue_tx(tx);
2042                 /* do not return a credit until after PUT_DATA returns */
2043                 repost = 0;
2044                 break;
2045
2046         case MXLND_MSG_GET_REQ:
2047                 if (likely(lntmsg != NULL)) {
2048                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2049                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2050                 } else {
2051                         /* GET didn't match anything */
2052                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2053                          * We have to embed the error code in the match bits.
2054                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2055                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2056
2057                         tx = mxlnd_get_idle_tx();
2058                         if (unlikely(tx == NULL)) {
2059                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2060                                                    libcfs_nid2str(nid));
2061                                 ret = -ENOMEM;
2062                                 break;
2063                         }
2064                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2065                         tx->mxc_state = MXLND_CTX_PENDING;
2066                         tx->mxc_nid = nid;
2067                         tx->mxc_peer = peer;
2068                         tx->mxc_conn = conn;
2069                         mxlnd_conn_addref(conn); /* for this tx */
2070                         tx->mxc_cookie = cookie;
2071                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2072                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2073                         mxlnd_queue_tx(tx);
2074                 }
2075                 /* finalize lntmsg after tx completes */
2076                 break;
2077
2078         default:
2079                 LBUG();
2080         }
2081
2082         if (repost) {
2083                 /* we received a message, increment peer's outstanding credits */
2084                 if (credit == 1) {
2085                         spin_lock(&conn->mxk_lock);
2086                         conn->mxk_outstanding++;
2087                         spin_unlock(&conn->mxk_lock);
2088                 }
2089                 /* we are done with the rx */
2090                 mxlnd_put_idle_rx(rx);
2091                 mxlnd_conn_decref(conn);
2092         }
2093
2094         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2095
2096         /* we received a credit, see if we can use it to send a msg */
2097         if (credit) mxlnd_check_sends(peer);
2098         
2099         return ret;
2100 }
2101
2102 void
2103 mxlnd_sleep(unsigned long timeout)
2104 {
2105         set_current_state(TASK_INTERRUPTIBLE);
2106         schedule_timeout(timeout);
2107         return;
2108 }
2109
2110 /**
2111  * mxlnd_tx_queued - the generic send queue thread
2112  * @arg - thread id (as a void *)
2113  *
2114  * This thread moves send messages from the global tx_queue to the owning
2115  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2116  * it to the global peer list.
2117  */
2118 int
2119 mxlnd_tx_queued(void *arg)
2120 {
2121         long                    id      = (long) arg;
2122         int                     ret     = 0;
2123         int                     found   = 0;
2124         struct kmx_ctx         *tx      = NULL;
2125         struct kmx_peer        *peer    = NULL;
2126         struct list_head       *tmp_tx  = NULL;
2127
2128         cfs_daemonize("mxlnd_tx_queued");
2129         //cfs_block_allsigs();
2130
2131         while (!kmxlnd_data.kmx_shutdown) {
2132                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2133                 if (kmxlnd_data.kmx_shutdown)
2134                         break;
2135                 if (ret != 0) // Should we check for -EINTR?
2136                         continue;
2137                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2138                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2139                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2140                         continue;
2141                 }
2142                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2143                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2144                 list_del_init(&tx->mxc_list);
2145                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2146
2147                 found = 0;
2148                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid);
2149                 if (peer != NULL) {
2150                         tx->mxc_peer = peer;
2151                         tx->mxc_conn = peer->mxp_conn;
2152                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2153                         mxlnd_queue_tx(tx);
2154                         found = 1;
2155                 }
2156                 if (found == 0) {
2157                         int              hash   = 0;
2158                         struct kmx_peer *peer = NULL;
2159                         struct kmx_peer *old = NULL;
2160
2161                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2162
2163                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2164                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2165                         /* create peer */
2166                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2167                         if (ret != 0) {
2168                                 /* finalize message */
2169                                 tx->mxc_status.code = -ECONNABORTED;
2170                                 mxlnd_put_idle_tx(tx);
2171                                 continue;
2172                         }
2173                         tx->mxc_peer = peer;
2174                         tx->mxc_conn = peer->mxp_conn;
2175
2176                         /* add peer to global peer list, but look to see
2177                          * if someone already created it after we released
2178                          * the read lock */
2179                         write_lock(&kmxlnd_data.kmx_peers_lock);
2180                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2181                                 if (old->mxp_nid == peer->mxp_nid) {
2182                                         /* somebody beat us here, we created a duplicate */
2183                                         found = 1;
2184                                         break;
2185                                 }
2186                         }
2187
2188                         if (found == 0) {
2189                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2190                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2191                         } else {
2192                                 tx->mxc_peer = old;
2193                                 tx->mxc_conn = old->mxp_conn;
2194                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2195                                 mxlnd_peer_decref(peer);
2196                         }
2197                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2198                         write_unlock(&kmxlnd_data.kmx_peers_lock);
2199
2200                         mxlnd_queue_tx(tx);
2201                 }
2202         }
2203         mxlnd_thread_stop(id);
2204         return 0;
2205 }
2206
2207 /* When calling this, we must not have the peer lock. */
2208 void
2209 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2210 {
2211         mx_return_t             mxret   = MX_SUCCESS;
2212         mx_request_t            request;
2213         struct kmx_conn         *conn   = peer->mxp_conn;
2214
2215         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2216
2217         LASSERT(mask == MXLND_MASK_ICON_REQ ||
2218                 mask == MXLND_MASK_ICON_ACK);
2219
2220         if (peer->mxp_reconnect_time == 0) {
2221                 peer->mxp_reconnect_time = jiffies;
2222         }
2223
2224         if (peer->mxp_nic_id == 0LL) {
2225                 mxlnd_peer_hostname_to_nic_id(peer);
2226                 if (peer->mxp_nic_id == 0LL) {
2227                         /* not mapped yet, return */
2228                         spin_lock(&conn->mxk_lock);
2229                         conn->mxk_status = MXLND_CONN_INIT;
2230                         spin_unlock(&conn->mxk_lock);
2231                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2232                                 /* give up and notify LNET */
2233                                 mxlnd_conn_disconnect(conn, 0, 1);
2234                                 mxlnd_conn_alloc(&peer->mxp_conn, peer);
2235                         }
2236                         mxlnd_conn_decref(conn);
2237                         return;
2238                 }
2239         }
2240
2241         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id, 
2242                             peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask, 
2243                             (void *) peer, &request);
2244         if (unlikely(mxret != MX_SUCCESS)) {
2245                 spin_lock(&conn->mxk_lock);
2246                 conn->mxk_status = MXLND_CONN_FAIL;
2247                 spin_unlock(&conn->mxk_lock);
2248                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2249                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2250                 mxlnd_conn_decref(conn);
2251         }
2252         return;
2253 }
2254
2255 #define MXLND_STATS 0
2256
2257 int
2258 mxlnd_check_sends(struct kmx_peer *peer)
2259 {
2260         int                     ret             = 0;
2261         int                     found           = 0;
2262         mx_return_t             mxret           = MX_SUCCESS;
2263         struct kmx_ctx          *tx             = NULL;
2264         struct kmx_conn         *conn           = NULL;
2265         u8                      msg_type        = 0;
2266         int                     credit          = 0;
2267         int                     status          = 0;
2268         int                     ntx_posted      = 0;
2269         int                     credits         = 0;
2270 #if MXLND_STATS
2271         static unsigned long    last            = 0;
2272 #endif
2273
2274         if (unlikely(peer == NULL)) {
2275                 LASSERT(peer != NULL);
2276                 return -1;
2277         }
2278         conn = peer->mxp_conn;
2279         /* do not add another ref for this tx */
2280
2281         if (conn == NULL) {
2282                 /* we do not have any conns */
2283                 return -1;
2284         }
2285
2286 #if MXLND_STATS
2287         if (time_after(jiffies, last)) {
2288                 last = jiffies + HZ;
2289                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2290                               "ntx_posted= %d ntx_data= %d data_posted= %d\n", 
2291                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits, 
2292                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2293                               conn->mxk_ntx_data, conn->mxk_data_posted);
2294         }
2295 #endif
2296
2297         /* cache peer state for asserts */
2298         spin_lock(&conn->mxk_lock);
2299         ntx_posted = conn->mxk_ntx_posted;
2300         credits = conn->mxk_credits;
2301         spin_unlock(&conn->mxk_lock);
2302
2303         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2304         LASSERT(ntx_posted >= 0);
2305
2306         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2307         LASSERT(credits >= 0);
2308
2309         /* check number of queued msgs, ignore data */
2310         spin_lock(&conn->mxk_lock);
2311         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2312                 /* check if any txs queued that could return credits... */
2313                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2314                         /* if not, send a NOOP */
2315                         tx = mxlnd_get_idle_tx();
2316                         if (likely(tx != NULL)) {
2317                                 tx->mxc_peer = peer;
2318                                 tx->mxc_conn = peer->mxp_conn;
2319                                 mxlnd_conn_addref(conn); /* for this tx */
2320                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2321                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2322                                 mxlnd_peer_queue_tx_locked(tx);
2323                                 found = 1;
2324                                 goto done_locked;
2325                         }
2326                 }
2327         }
2328         spin_unlock(&conn->mxk_lock);
2329
2330         /* if the peer is not ready, try to connect */
2331         spin_lock(&conn->mxk_lock);
2332         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2333             conn->mxk_status == MXLND_CONN_FAIL ||
2334             conn->mxk_status == MXLND_CONN_REQ)) {
2335                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2336                 conn->mxk_status = MXLND_CONN_WAIT;
2337                 spin_unlock(&conn->mxk_lock);
2338                 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2339                 goto done;
2340         }
2341         spin_unlock(&conn->mxk_lock);
2342
2343         spin_lock(&conn->mxk_lock);
2344         while (!list_empty(&conn->mxk_tx_free_queue) ||
2345                !list_empty(&conn->mxk_tx_credit_queue)) {
2346                 /* We have something to send. If we have a queued tx that does not
2347                  * require a credit (free), choose it since its completion will 
2348                  * return a credit (here or at the peer), complete a DATA or 
2349                  * CONN_REQ or CONN_ACK. */
2350                 struct list_head *tmp_tx = NULL;
2351                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2352                         tmp_tx = &conn->mxk_tx_free_queue;
2353                 } else {
2354                         tmp_tx = &conn->mxk_tx_credit_queue;
2355                 }
2356                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2357
2358                 msg_type = tx->mxc_msg_type;
2359
2360                 /* don't try to send a rx */
2361                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2362
2363                 /* ensure that it is a valid msg type */
2364                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2365                         msg_type == MXLND_MSG_CONN_ACK ||
2366                         msg_type == MXLND_MSG_NOOP     ||
2367                         msg_type == MXLND_MSG_EAGER    ||
2368                         msg_type == MXLND_MSG_PUT_REQ  ||
2369                         msg_type == MXLND_MSG_PUT_ACK  ||
2370                         msg_type == MXLND_MSG_PUT_DATA ||
2371                         msg_type == MXLND_MSG_GET_REQ  ||
2372                         msg_type == MXLND_MSG_GET_DATA);
2373                 LASSERT(tx->mxc_peer == peer);
2374                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2375
2376                 credit = mxlnd_tx_requires_credit(tx);
2377                 if (credit) {
2378
2379                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2380                                 CDEBUG(D_NET, "%s: posted enough\n", 
2381                                               libcfs_nid2str(peer->mxp_nid));
2382                                 goto done_locked;
2383                         }
2384         
2385                         if (conn->mxk_credits == 0) {
2386                                 CDEBUG(D_NET, "%s: no credits\n", 
2387                                               libcfs_nid2str(peer->mxp_nid));
2388                                 goto done_locked;
2389                         }
2390
2391                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2392                             conn->mxk_outstanding == 0) {  /* giving back credits */
2393                                 CDEBUG(D_NET, "%s: not using last credit\n",
2394                                               libcfs_nid2str(peer->mxp_nid));
2395                                 goto done_locked;
2396                         }
2397                 }
2398
2399                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2400                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2401                                 msg_type == MXLND_MSG_CONN_ACK)) {
2402                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2403                                              mxlnd_connstatus_to_str(conn->mxk_status), 
2404                                              tx->mxc_cookie, 
2405                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2406                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2407                                         list_del_init(&tx->mxc_list);
2408                                         tx->mxc_status.code = -ECONNABORTED;
2409                                         mxlnd_put_idle_tx(tx);
2410                                         mxlnd_conn_decref(conn);
2411                                 }
2412                                 goto done_locked;
2413                         }
2414                 }
2415
2416                 list_del_init(&tx->mxc_list);
2417
2418                 /* handle credits, etc now while we have the lock to avoid races */
2419                 if (credit) {
2420                         conn->mxk_credits--;
2421                         conn->mxk_ntx_posted++;
2422                 }
2423                 if (msg_type != MXLND_MSG_PUT_DATA &&
2424                     msg_type != MXLND_MSG_GET_DATA) {
2425                         if (msg_type != MXLND_MSG_CONN_REQ &&
2426                             msg_type != MXLND_MSG_CONN_ACK) {
2427                                 conn->mxk_ntx_msgs--;
2428                         }
2429                 }
2430                 if (tx->mxc_incarnation == 0 &&
2431                     conn->mxk_incarnation != 0) {
2432                         tx->mxc_incarnation = conn->mxk_incarnation;
2433                 }
2434                 spin_unlock(&conn->mxk_lock);
2435
2436                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2437                  * or (2) there is a non-DATA msg that can return credits in the 
2438                  * queue, then drop this duplicate NOOP */
2439                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2440                         spin_lock(&conn->mxk_lock);
2441                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2442                             (conn->mxk_ntx_msgs >= 1)) {
2443                                 conn->mxk_credits++;
2444                                 conn->mxk_ntx_posted--;
2445                                 spin_unlock(&conn->mxk_lock);
2446                                 /* redundant NOOP */
2447                                 mxlnd_put_idle_tx(tx);
2448                                 mxlnd_conn_decref(conn);
2449                                 CDEBUG(D_NET, "%s: redundant noop\n",
2450                                               libcfs_nid2str(peer->mxp_nid));
2451                                 found = 1;
2452                                 goto done;
2453                         }
2454                         spin_unlock(&conn->mxk_lock);
2455                 }
2456
2457                 found = 1;
2458                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2459                     (msg_type != MXLND_MSG_GET_DATA))) {
2460                         mxlnd_pack_msg(tx);
2461                 }
2462
2463                 //ret = -ECONNABORTED;
2464                 mxret = MX_SUCCESS;
2465
2466                 spin_lock(&conn->mxk_lock);
2467                 status = conn->mxk_status;
2468                 spin_unlock(&conn->mxk_lock);
2469
2470                 if (likely((status == MXLND_CONN_READY) ||
2471                     (msg_type == MXLND_MSG_CONN_REQ) ||
2472                     (msg_type == MXLND_MSG_CONN_ACK))) {
2473                         ret = 0;
2474                         if (msg_type != MXLND_MSG_CONN_REQ &&
2475                             msg_type != MXLND_MSG_CONN_ACK) {
2476                                 /* add to the pending list */
2477                                 ret = mxlnd_q_pending_ctx(tx);
2478                                 if (ret == -1) {
2479                                         /* FIXME the conn is disconnected, now what? */
2480                                 }
2481                         } else {
2482                                 /* CONN_REQ/ACK */
2483                                 tx->mxc_state = MXLND_CTX_PENDING;
2484                         }
2485
2486                         if (ret == 0) {
2487                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2488                                     msg_type != MXLND_MSG_GET_DATA)) {
2489                                         /* send a msg style tx */
2490                                         LASSERT(tx->mxc_nseg == 1);
2491                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2492                                         CDEBUG(D_NET, "sending %s 0x%llx\n", 
2493                                                mxlnd_msgtype_to_str(msg_type),
2494                                                tx->mxc_cookie);
2495                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt, 
2496                                                           &tx->mxc_seg, 
2497                                                           tx->mxc_nseg,
2498                                                           tx->mxc_pin_type,
2499                                                           conn->mxk_epa, 
2500                                                           tx->mxc_match, 
2501                                                           (void *) tx,
2502                                                           &tx->mxc_mxreq);
2503                                 } else {
2504                                         /* send a DATA tx */
2505                                         spin_lock(&conn->mxk_lock);
2506                                         conn->mxk_ntx_data--;
2507                                         conn->mxk_data_posted++;
2508                                         spin_unlock(&conn->mxk_lock);
2509                                         CDEBUG(D_NET, "sending %s 0x%llx\n", 
2510                                                mxlnd_msgtype_to_str(msg_type), 
2511                                                tx->mxc_cookie);
2512                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt, 
2513                                                           tx->mxc_seg_list, 
2514                                                           tx->mxc_nseg,
2515                                                           tx->mxc_pin_type,
2516                                                           conn->mxk_epa, 
2517                                                           tx->mxc_match, 
2518                                                           (void *) tx,
2519                                                           &tx->mxc_mxreq);
2520                                 }
2521                         } else {
2522                                 mxret = MX_CONNECTION_FAILED;
2523                         }
2524                         if (likely(mxret == MX_SUCCESS)) {
2525                                 ret = 0;
2526                         } else {
2527                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2528                                        "sending to %s\n", mx_strerror(mxret), (int) mxret, 
2529                                        libcfs_nid2str(peer->mxp_nid));
2530                                 /* NOTE mx_kisend() only fails if there are not enough 
2531                                 * resources. Do not change the connection status. */
2532                                 if (mxret == MX_NO_RESOURCES) {
2533                                         tx->mxc_status.code = -ENOMEM;
2534                                 } else {
2535                                         tx->mxc_status.code = -ECONNABORTED;
2536                                 }
2537                                 if (credit) {
2538                                         spin_lock(&conn->mxk_lock);
2539                                         conn->mxk_ntx_posted--;
2540                                         conn->mxk_credits++;
2541                                         spin_unlock(&conn->mxk_lock);
2542                                 } else if (msg_type == MXLND_MSG_PUT_DATA || 
2543                                         msg_type == MXLND_MSG_GET_DATA) {
2544                                         spin_lock(&conn->mxk_lock);
2545                                         conn->mxk_data_posted--;
2546                                         spin_unlock(&conn->mxk_lock);
2547                                 }
2548                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2549                                     msg_type != MXLND_MSG_GET_DATA &&
2550                                     msg_type != MXLND_MSG_CONN_REQ &&
2551                                     msg_type != MXLND_MSG_CONN_ACK) {
2552                                         spin_lock(&conn->mxk_lock);
2553                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2554                                         spin_unlock(&conn->mxk_lock);
2555                                 }
2556                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2557                                     msg_type != MXLND_MSG_CONN_ACK) {
2558                                         /* remove from the pending list */
2559                                         mxlnd_deq_pending_ctx(tx);
2560                                 }
2561                                 mxlnd_put_idle_tx(tx);
2562                                 mxlnd_conn_decref(conn);
2563                         }
2564                 }
2565                 spin_lock(&conn->mxk_lock);
2566         }
2567 done_locked:
2568         spin_unlock(&conn->mxk_lock);
2569 done:
2570         return found;
2571 }
2572
2573
2574 /**
2575  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2576  * @ctx - the tx descriptor
2577  *
2578  * Determine which type of send request it was and start the next step, if needed,
2579  * or, if done, signal completion to LNET. After we are done, put back on the
2580  * idle tx list.
2581  */
2582 void
2583 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2584 {
2585         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2586         struct kmx_msg  *msg    = tx->mxc_msg;
2587         struct kmx_peer *peer   = tx->mxc_peer;
2588         struct kmx_conn *conn   = tx->mxc_conn;
2589         u8              type    = tx->mxc_msg_type;
2590         int             credit  = mxlnd_tx_requires_credit(tx);
2591         u64             cookie  = tx->mxc_cookie;
2592
2593         CDEBUG(D_NET, "entering %s (0x%llx):\n", 
2594                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2595
2596         if (unlikely(conn == NULL)) {
2597                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &conn);
2598                 if (conn != NULL) {
2599                         /* do not add a ref for the tx, it was set before sending */
2600                         tx->mxc_conn = conn;
2601                         tx->mxc_peer = conn->mxk_peer;
2602                 }
2603         }
2604         LASSERT (peer != NULL);
2605         LASSERT (conn != NULL);
2606
2607         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2608                 LASSERT (type == msg->mxm_type);
2609         }
2610
2611         if (failed) {
2612                 tx->mxc_status.code = -EIO;
2613         } else {
2614                 spin_lock(&conn->mxk_lock);
2615                 conn->mxk_last_tx = jiffies;
2616                 spin_unlock(&conn->mxk_lock);
2617         }
2618
2619         switch (type) {
2620
2621         case MXLND_MSG_GET_DATA:
2622                 spin_lock(&conn->mxk_lock);
2623                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2624                         conn->mxk_outstanding++;
2625                         conn->mxk_data_posted--;
2626                 }
2627                 spin_unlock(&conn->mxk_lock);
2628                 break;
2629
2630         case MXLND_MSG_PUT_DATA:
2631                 spin_lock(&conn->mxk_lock);
2632                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2633                         conn->mxk_data_posted--;
2634                 }
2635                 spin_unlock(&conn->mxk_lock);
2636                 break;
2637
2638         case MXLND_MSG_NOOP:
2639         case MXLND_MSG_PUT_REQ:
2640         case MXLND_MSG_PUT_ACK:
2641         case MXLND_MSG_GET_REQ:
2642         case MXLND_MSG_EAGER:
2643         //case MXLND_MSG_NAK:
2644                 break;
2645
2646         case MXLND_MSG_CONN_ACK:
2647                 if (peer->mxp_incompatible) {
2648                         /* we sent our params, now close this conn */
2649                         mxlnd_conn_disconnect(conn, 0, 1);
2650                 }
2651         case MXLND_MSG_CONN_REQ:
2652                 if (failed) {
2653                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2654                                "failed with %s (%d) to %s\n",
2655                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2656                                mx_strstatus(tx->mxc_status.code), 
2657                                tx->mxc_status.code,
2658                                libcfs_nid2str(tx->mxc_nid));
2659                         if (!peer->mxp_incompatible) {
2660                                 spin_lock(&conn->mxk_lock);
2661                                 conn->mxk_status = MXLND_CONN_FAIL;
2662                                 spin_unlock(&conn->mxk_lock);
2663                         }
2664                 }
2665                 break;
2666
2667         default:
2668                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2669                 LBUG();
2670         }
2671
2672         if (credit) {
2673                 spin_lock(&conn->mxk_lock);
2674                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2675                         conn->mxk_ntx_posted--;
2676                 }
2677                 spin_unlock(&conn->mxk_lock);
2678         }
2679
2680         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2681         mxlnd_put_idle_tx(tx);
2682         mxlnd_conn_decref(conn);
2683
2684         mxlnd_check_sends(peer);
2685
2686         return;
2687 }
2688
2689 void
2690 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2691 {
2692         int                     ret             = 0;
2693         int                     repost          = 1;
2694         int                     credit          = 1;
2695         u32                     nob             = rx->mxc_status.xfer_length;
2696         u64                     bits            = rx->mxc_status.match_info;
2697         struct kmx_msg         *msg             = rx->mxc_msg;
2698         struct kmx_peer        *peer            = rx->mxc_peer;
2699         struct kmx_conn        *conn            = rx->mxc_conn;
2700         u8                      type            = rx->mxc_msg_type;
2701         u64                     seq             = 0LL;
2702         lnet_msg_t             *lntmsg[2];
2703         int                     result          = 0;
2704         u64                     nic_id          = 0LL;
2705         u32                     ep_id           = 0;
2706         int                     decref          = 1;
2707         int                     incompatible    = 0;
2708
2709         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2710          * failed GET reply, CONN_REQ, or a CONN_ACK */
2711
2712         /* NOTE peer may still be NULL if it is a new peer */
2713         if (peer == NULL || conn == NULL) {
2714                 /* if the peer was disconnected, the peer may exist but
2715                  * not have any valid conns */
2716                 decref = 0; /* no peer means no ref was taken for this rx */
2717         }
2718
2719         if (conn == NULL && peer != NULL) {
2720                 conn = peer->mxp_conn;
2721                 rx->mxc_conn = conn;
2722         }
2723
2724 #if MXLND_DEBUG
2725         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2726 #endif
2727
2728         lntmsg[0] = NULL;
2729         lntmsg[1] = NULL;
2730
2731         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2732                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2733                                    libcfs_nid2str(rx->mxc_nid),
2734                                    mx_strstatus(rx->mxc_status.code),
2735                                    (int) rx->mxc_status.code);
2736                 credit = 0;
2737                 goto cleanup;
2738         }
2739
2740         if (nob == 0) {
2741                 /* this may be a failed GET reply */
2742                 if (type == MXLND_MSG_GET_DATA) {
2743                         bits = rx->mxc_status.match_info & 0x0FF0000000000000LL; 
2744                         ret = (u32) (bits>>52);
2745                         lntmsg[0] = rx->mxc_lntmsg[0];
2746                         result = -ret;
2747                         goto cleanup;
2748                 } else {
2749                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2750                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2751                                            libcfs_nid2str(rx->mxc_nid));
2752                         goto cleanup;
2753                 }
2754         }
2755
2756         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2757         if (type == MXLND_MSG_PUT_DATA) {
2758                 result = rx->mxc_status.code;
2759                 lntmsg[0] = rx->mxc_lntmsg[0];
2760                 goto cleanup;
2761         } else if (type == MXLND_MSG_GET_DATA) {
2762                 result = rx->mxc_status.code;
2763                 lntmsg[0] = rx->mxc_lntmsg[0];
2764                 lntmsg[1] = rx->mxc_lntmsg[1];
2765                 goto cleanup;
2766         }
2767
2768         ret = mxlnd_unpack_msg(msg, nob);
2769         if (ret != 0) {
2770                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2771                                    ret, libcfs_nid2str(rx->mxc_nid));
2772                 goto cleanup;
2773         }
2774         rx->mxc_nob = nob;
2775         type = msg->mxm_type;
2776         seq = msg->mxm_seq;
2777
2778         if (type != MXLND_MSG_CONN_REQ &&
2779             (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
2780              !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
2781                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2782                        "0x%llx and rx msg dst is 0x%llx)\n", 
2783                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid, 
2784                        msg->mxm_dstnid);
2785                 goto cleanup;
2786         }
2787
2788         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2789                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2790                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2791                         if (conn != NULL) {
2792                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2793                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2794                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", 
2795                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2796                                        msg->mxm_srcstamp, conn->mxk_incarnation, 
2797                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2798                         } else {
2799                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2800                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n", 
2801                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2802                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2803                         }
2804                         credit = 0;
2805                         goto cleanup;
2806                 }
2807         }
2808
2809         CDEBUG(D_NET, "Received %s with %d credits\n", 
2810                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
2811
2812         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2813             msg->mxm_type != MXLND_MSG_CONN_ACK) {
2814                 LASSERT(peer != NULL);
2815                 LASSERT(conn != NULL);
2816                 if (msg->mxm_credits != 0) {
2817                         spin_lock(&conn->mxk_lock);
2818                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2819                                 if ((conn->mxk_credits + msg->mxm_credits) > 
2820                                      *kmxlnd_tunables.kmx_credits) {
2821                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
2822                                                conn->mxk_credits, msg->mxm_credits);
2823                                 }
2824                                 conn->mxk_credits += msg->mxm_credits;
2825                                 LASSERT(conn->mxk_credits >= 0);
2826                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2827                         }
2828                         spin_unlock(&conn->mxk_lock);
2829                 }
2830         }
2831
2832         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2833         switch (type) {
2834         case MXLND_MSG_NOOP:
2835                 break;
2836
2837         case MXLND_MSG_EAGER:
2838                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2839                                         msg->mxm_srcnid, rx, 0);
2840                 repost = ret < 0;
2841                 break;
2842
2843         case MXLND_MSG_PUT_REQ:
2844                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2845                                         msg->mxm_srcnid, rx, 1);
2846                 repost = ret < 0;
2847                 break;
2848
2849         case MXLND_MSG_PUT_ACK: {
2850                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2851                 if (cookie > MXLND_MAX_COOKIE) {
2852                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type, 
2853                                            libcfs_nid2str(rx->mxc_nid));
2854                         result = -((cookie >> 52) & 0xff);
2855                         lntmsg[0] = rx->mxc_lntmsg[0];
2856                 } else {
2857                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0], 
2858                                         rx->mxc_peer, MXLND_MSG_PUT_DATA, 
2859                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
2860                 }
2861                 /* repost == 1 */
2862                 break;
2863         }
2864         case MXLND_MSG_GET_REQ:
2865                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
2866                                         msg->mxm_srcnid, rx, 1);
2867                 repost = ret < 0;
2868                 break;
2869
2870         case MXLND_MSG_CONN_REQ:
2871                 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
2872                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
2873                                         libcfs_nid2str(msg->mxm_srcnid),
2874                                         libcfs_nid2str(msg->mxm_dstnid));
2875                         goto cleanup;
2876                 }
2877                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
2878                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
2879                                     "%d (%d wanted)\n",
2880                                         libcfs_nid2str(msg->mxm_srcnid),
2881                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
2882                                         *kmxlnd_tunables.kmx_credits);
2883                         incompatible = 1;
2884                 }
2885                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
2886                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
2887                                     "%d (%d wanted)\n",
2888                                         libcfs_nid2str(msg->mxm_srcnid),
2889                                         msg->mxm_u.conn_req.mxcrm_eager_size,
2890                                         (int) MXLND_EAGER_SIZE);
2891                         incompatible = 1;
2892                 }
2893                 if (peer == NULL) {
2894                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid);
2895                         if (peer == NULL) {
2896                                 int hash        = 0;
2897                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
2898         
2899                                 mx_decompose_endpoint_addr(rx->mxc_status.source,
2900                                                            &nic_id, &ep_id);
2901                                 rx->mxc_nid = msg->mxm_srcnid;
2902         
2903                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
2904                                 if (ret != 0) {
2905                                         goto cleanup;
2906                                 }
2907                                 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
2908                                 write_lock(&kmxlnd_data.kmx_peers_lock);
2909                                 list_add_tail(&peer->mxp_peers,
2910                                               &kmxlnd_data.kmx_peers[hash]);
2911                                 write_unlock(&kmxlnd_data.kmx_peers_lock);
2912                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2913                         } else {
2914                                 ret = mxlnd_conn_alloc(&conn, peer);
2915                                 if (ret != 0) {
2916                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
2917                                         goto cleanup;
2918                                 }
2919                         }
2920                         conn = peer->mxp_conn;
2921                 } else {
2922                         struct kmx_conn *old_conn       = conn;
2923
2924                         /* do not call mx_disconnect() */
2925                         mxlnd_conn_disconnect(old_conn, 0, 0);
2926
2927                         /* the ref for this rx was taken on the old_conn */
2928                         mxlnd_conn_decref(old_conn);
2929
2930                         /* do not decref this conn below */
2931                         decref = 0;
2932
2933                         /* This allocs a conn, points peer->mxp_conn to this one.
2934                          * The old conn is still on the peer->mxp_conns list.
2935                          * As the pending requests complete, they will call
2936                          * conn_decref() which will eventually free it. */
2937                         ret = mxlnd_conn_alloc(&conn, peer);
2938                         if (ret != 0) {
2939                                 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
2940                                 goto cleanup;
2941                         }
2942                 }
2943                 spin_lock(&peer->mxp_lock);
2944                 peer->mxp_incarnation = msg->mxm_srcstamp;
2945                 peer->mxp_incompatible = incompatible;
2946                 spin_unlock(&peer->mxp_lock);
2947                 spin_lock(&conn->mxk_lock);
2948                 conn->mxk_incarnation = msg->mxm_srcstamp;
2949                 conn->mxk_status = MXLND_CONN_WAIT;
2950                 spin_unlock(&conn->mxk_lock);
2951
2952                 /* handle_conn_ack() will create the CONN_ACK msg */
2953                 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
2954
2955                 break;
2956
2957         case MXLND_MSG_CONN_ACK:
2958                 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
2959                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
2960                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
2961                                 libcfs_nid2str(msg->mxm_dstnid));
2962                         ret = -1;
2963                         goto failed;
2964                 }
2965                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
2966                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
2967                                "incompatible queue depth %d (%d wanted)\n",
2968                                 libcfs_nid2str(msg->mxm_srcnid),
2969                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
2970                                 *kmxlnd_tunables.kmx_credits);
2971                         spin_lock(&conn->mxk_lock);
2972                         conn->mxk_status = MXLND_CONN_FAIL;
2973                         spin_unlock(&conn->mxk_lock);
2974                         incompatible = 1;
2975                         ret = -1;
2976                 }
2977                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
2978                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
2979                                "incompatible EAGER size %d (%d wanted)\n",
2980                                 libcfs_nid2str(msg->mxm_srcnid),
2981                                 msg->mxm_u.conn_req.mxcrm_eager_size,
2982                                 (int) MXLND_EAGER_SIZE);
2983                         spin_lock(&conn->mxk_lock);
2984                         conn->mxk_status = MXLND_CONN_FAIL;
2985                         spin_unlock(&conn->mxk_lock);
2986                         incompatible = 1;
2987                         ret = -1;
2988                 }
2989                 spin_lock(&peer->mxp_lock);
2990                 peer->mxp_incarnation = msg->mxm_srcstamp;
2991                 peer->mxp_incompatible = incompatible;
2992                 spin_unlock(&peer->mxp_lock);
2993                 spin_lock(&conn->mxk_lock);
2994                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
2995                 conn->mxk_outstanding = 0;
2996                 conn->mxk_incarnation = msg->mxm_srcstamp;
2997                 conn->mxk_timeout = 0;
2998                 if (!incompatible) {
2999                         conn->mxk_status = MXLND_CONN_READY;
3000                 }
3001                 spin_unlock(&conn->mxk_lock);
3002                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3003                 break;
3004
3005         default:
3006                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3007                                 libcfs_nid2str(rx->mxc_nid));
3008                 ret = -EPROTO;
3009                 break;
3010         }
3011
3012 failed:
3013         if (ret < 0) {
3014                 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3015                 spin_lock(&conn->mxk_lock);
3016                 conn->mxk_status = MXLND_CONN_FAIL;
3017                 spin_unlock(&conn->mxk_lock);
3018         }
3019
3020 cleanup:
3021         if (conn != NULL) {
3022                 spin_lock(&conn->mxk_lock);
3023                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3024                 spin_unlock(&conn->mxk_lock);
3025         }
3026
3027         if (repost) {
3028                 /* lnet_parse() failed, etc., repost now */
3029                 mxlnd_put_idle_rx(rx);
3030                 if (conn != NULL && credit == 1) {
3031                         if (type == MXLND_MSG_PUT_DATA) {
3032                                 spin_lock(&conn->mxk_lock);
3033                                 conn->mxk_outstanding++;
3034                                 spin_unlock(&conn->mxk_lock);
3035                         } else if (type != MXLND_MSG_GET_DATA &&
3036                                   (type == MXLND_MSG_EAGER ||
3037                                    type == MXLND_MSG_PUT_REQ ||
3038                                    type == MXLND_MSG_NOOP)) {
3039                                 spin_lock(&conn->mxk_lock);
3040                                 conn->mxk_outstanding++;
3041                                 spin_unlock(&conn->mxk_lock);
3042                         }
3043                 }
3044                 if (decref) mxlnd_conn_decref(conn);
3045         }
3046
3047         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3048                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3049         } else {
3050                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3051         }
3052
3053         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result); 
3054         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result); 
3055
3056         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3057
3058         return;
3059 }
3060
3061
3062
3063 void
3064 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3065 {
3066         struct kmx_ctx  *tx     = NULL;
3067         struct kmx_msg  *txmsg   = NULL;
3068         struct kmx_conn *conn   = peer->mxp_conn;
3069
3070         /* a conn ref was taken when calling mx_iconnect(), 
3071          * hold it until CONN_REQ or CONN_ACK completes */
3072
3073         CDEBUG(D_NET, "entering\n");
3074         if (status.code != MX_STATUS_SUCCESS) {
3075                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n", 
3076                         mx_strstatus(status.code), status.code, 
3077                         libcfs_nid2str(peer->mxp_nid));
3078                 spin_lock(&conn->mxk_lock);
3079                 conn->mxk_status = MXLND_CONN_FAIL;
3080                 spin_unlock(&conn->mxk_lock);
3081
3082                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3083                         struct kmx_conn *new_conn       = NULL;
3084                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3085                         mxlnd_conn_disconnect(conn, 0, 1);
3086                         mxlnd_conn_alloc(&new_conn, peer);
3087                         spin_lock(&peer->mxp_lock);
3088                         peer->mxp_reconnect_time = 0;
3089                         spin_unlock(&peer->mxp_lock);
3090                 }
3091
3092                 mxlnd_conn_decref(conn);
3093                 return;
3094         }
3095
3096         spin_lock(&conn->mxk_lock);
3097         conn->mxk_epa = status.source;
3098         spin_unlock(&conn->mxk_lock);
3099         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3100
3101         /* mx_iconnect() succeeded, reset delay to 0 */
3102         spin_lock(&peer->mxp_lock);
3103         peer->mxp_reconnect_time = 0;
3104         spin_unlock(&peer->mxp_lock);
3105
3106         /* marshal CONN_REQ msg */
3107         /* we are still using the conn ref from iconnect() - do not take another */
3108         tx = mxlnd_get_idle_tx();
3109         if (tx == NULL) {
3110                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n", 
3111                                    libcfs_nid2str(peer->mxp_nid));
3112                 spin_lock(&conn->mxk_lock);
3113                 conn->mxk_status = MXLND_CONN_FAIL;
3114                 spin_unlock(&conn->mxk_lock);
3115                 mxlnd_conn_decref(conn);
3116                 return;
3117         }
3118
3119         tx->mxc_peer = peer;
3120         tx->mxc_conn = conn;
3121         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3122         txmsg = tx->mxc_msg;
3123         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3124         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3125         tx->mxc_match = mxlnd_create_match(tx, 0);
3126
3127         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3128         mxlnd_queue_tx(tx);
3129         return;
3130 }
3131
3132 void
3133 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3134 {
3135         struct kmx_ctx  *tx     = NULL;
3136         struct kmx_msg  *txmsg   = NULL;
3137         struct kmx_conn *conn   = peer->mxp_conn;
3138
3139         /* a conn ref was taken when calling mx_iconnect(), 
3140          * hold it until CONN_REQ or CONN_ACK completes */
3141
3142         CDEBUG(D_NET, "entering\n");
3143         if (status.code != MX_STATUS_SUCCESS) {
3144                 struct kmx_conn *conn   = peer->mxp_conn;
3145                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3146                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n", 
3147                         mx_strstatus(status.code), status.code, 
3148                         libcfs_nid2str(peer->mxp_nid),
3149                         peer->mxp_nid,
3150                         peer->mxp_nic_id,
3151                         peer->mxp_host->mxh_ep_id);
3152                 spin_lock(&conn->mxk_lock);
3153                 conn->mxk_status = MXLND_CONN_FAIL;
3154                 spin_unlock(&conn->mxk_lock);
3155
3156                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3157                         struct kmx_conn *new_conn       = NULL;
3158                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3159                         mxlnd_conn_disconnect(conn, 0, 1);
3160                         mxlnd_conn_alloc(&new_conn, peer);
3161                         spin_lock(&peer->mxp_lock);
3162                         peer->mxp_reconnect_time = 0;
3163                         spin_unlock(&peer->mxp_lock);
3164                 }
3165
3166                 mxlnd_conn_decref(conn);
3167                 return;
3168         }
3169         spin_lock(&conn->mxk_lock);
3170         conn->mxk_epa = status.source;
3171         if (likely(!peer->mxp_incompatible)) {
3172                 conn->mxk_status = MXLND_CONN_READY;
3173         }
3174         spin_unlock(&conn->mxk_lock);
3175         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3176
3177         /* mx_iconnect() succeeded, reset delay to 0 */
3178         spin_lock(&peer->mxp_lock);
3179         peer->mxp_reconnect_time = 0;
3180         spin_unlock(&peer->mxp_lock);
3181
3182         /* marshal CONN_ACK msg */
3183         tx = mxlnd_get_idle_tx();
3184         if (tx == NULL) {
3185                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n", 
3186                                    libcfs_nid2str(peer->mxp_nid));
3187                 spin_lock(&conn->mxk_lock);
3188                 conn->mxk_status = MXLND_CONN_FAIL;
3189                 spin_unlock(&conn->mxk_lock);
3190                 mxlnd_conn_decref(conn);
3191                 return;
3192         }
3193
3194         tx->mxc_peer = peer;
3195         tx->mxc_conn = conn;
3196         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3197         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3198         txmsg = tx->mxc_msg;
3199         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3200         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3201         tx->mxc_match = mxlnd_create_match(tx, 0);
3202
3203         mxlnd_queue_tx(tx);
3204         return;
3205 }
3206
3207 /**
3208  * mxlnd_request_waitd - the MX request completion thread(s)
3209  * @arg - thread id (as a void *)
3210  *
3211  * This thread waits for a MX completion and then completes the request.
3212  * We will create one thread per CPU.
3213  */
3214 int
3215 mxlnd_request_waitd(void *arg)
3216 {
3217         long                    id              = (long) arg;
3218         char                    name[24];
3219         __u32                   result          = 0;
3220         mx_return_t             mxret           = MX_SUCCESS;
3221         mx_status_t             status;
3222         struct kmx_ctx         *ctx             = NULL;
3223         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3224         struct kmx_peer        *peer            = NULL;
3225         struct kmx_conn        *conn            = NULL;
3226 #if MXLND_POLLING
3227         int                     count           = 0;
3228 #endif
3229
3230         memset(name, 0, sizeof(name));
3231         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3232         cfs_daemonize(name);
3233         //cfs_block_allsigs();
3234
3235         memset(&status, 0, sizeof(status));
3236
3237         CDEBUG(D_NET, "%s starting\n", name);
3238
3239         while (!kmxlnd_data.kmx_shutdown) {
3240                 mxret = MX_SUCCESS;
3241                 result = 0;
3242 #if MXLND_POLLING
3243                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3244                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL, 
3245                                             &status, &result);
3246                 } else {
3247                         count = 0;
3248                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, 
3249                                             0LL, 0LL, &status, &result);
3250                 }
3251 #else
3252                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT, 
3253                                     0LL, 0LL, &status, &result);
3254 #endif
3255                 if (unlikely(kmxlnd_data.kmx_shutdown))
3256                         break;
3257
3258                 if (result != 1) {
3259                         /* nothing completed... */
3260                         continue;
3261                 }
3262
3263                 if (status.code != MX_STATUS_SUCCESS) {
3264                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3265                                "match_info 0x%llx and length %d\n", 
3266                                mx_strstatus(status.code), status.code, 
3267                                (u64) status.match_info, status.msg_length);
3268                 }
3269
3270                 /* This may be a mx_iconnect() request completing,
3271                  * check the bit mask for CONN_REQ and CONN_ACK */
3272                 if (status.match_info == MXLND_MASK_ICON_REQ ||
3273                     status.match_info == MXLND_MASK_ICON_ACK) {
3274                         peer = (struct kmx_peer*) status.context;
3275                         if (status.match_info == MXLND_MASK_ICON_REQ) {
3276                                 mxlnd_handle_conn_req(peer, status);
3277                         } else {
3278                                 mxlnd_handle_conn_ack(peer, status);
3279                         }
3280                         continue;
3281                 }
3282
3283                 /* This must be a tx or rx */
3284
3285                 /* NOTE: if this is a RX from the unexpected callback, it may
3286                  * have very little info. If we dropped it in unexpected_recv(),
3287                  * it will not have a context. If so, ignore it. */
3288                 ctx = (struct kmx_ctx *) status.context;
3289                 if (ctx != NULL) {
3290
3291                         req_type = ctx->mxc_type;
3292                         conn = ctx->mxc_conn; /* this may be NULL */
3293                         mxlnd_deq_pending_ctx(ctx);
3294         
3295                         /* copy status to ctx->mxc_status */
3296                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3297         
3298                         switch (req_type) {
3299                         case MXLND_REQ_TX:
3300                                 mxlnd_handle_tx_completion(ctx);
3301                                 break;
3302                         case MXLND_REQ_RX:
3303                                 mxlnd_handle_rx_completion(ctx);
3304                                 break;
3305                         default:
3306                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3307                                 LBUG();
3308                                 break;
3309                         }
3310         
3311                         /* conn is always set except for the first CONN_REQ rx
3312                          * from a new peer */
3313                         if (!(status.code == MX_STATUS_SUCCESS || 
3314                               status.code == MX_STATUS_TRUNCATED) &&
3315                               conn != NULL) {
3316                                 mxlnd_conn_disconnect(conn, 1, 1);
3317                         }
3318                 }
3319                 CDEBUG(D_NET, "waitd() completed task\n");
3320         }
3321         CDEBUG(D_NET, "%s stopping\n", name);
3322         mxlnd_thread_stop(id);
3323         return 0;
3324 }
3325
3326
3327 unsigned long
3328 mxlnd_check_timeouts(unsigned long now)
3329 {
3330         int                     i               = 0;
3331         int                     disconnect      = 0;
3332         unsigned long           next            = 0;
3333         struct  kmx_peer        *peer           = NULL;
3334         struct  kmx_conn        *conn           = NULL;
3335
3336         read_lock(&kmxlnd_data.kmx_peers_lock);
3337         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3338                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3339
3340                         if (unlikely(kmxlnd_data.kmx_shutdown))
3341                                 return next;
3342         
3343                         conn = peer->mxp_conn;
3344                         if (conn == NULL)
3345                                 continue;
3346
3347                         mxlnd_conn_addref(conn);
3348                         spin_lock(&conn->mxk_lock);
3349         
3350                         /* if nothing pending (timeout == 0) or
3351                          * if conn is already disconnected,
3352                          * skip this conn */
3353                         if (conn->mxk_timeout == 0 ||
3354                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3355                                 spin_unlock(&conn->mxk_lock);
3356                                 mxlnd_conn_decref(conn);
3357                                 continue;
3358                         }
3359
3360                         /* we want to find the timeout that will occur first.
3361                          * if it is in the future, we will sleep until then.
3362                          * if it is in the past, then we will sleep one
3363                          * second and repeat the process. */
3364                         if ((next == 0) || (conn->mxk_timeout < next)) {
3365                                 next = conn->mxk_timeout;
3366                         }
3367         
3368                         disconnect = 0;
3369
3370                         if (time_after_eq(now, conn->mxk_timeout))  {
3371                                 disconnect = 1;
3372                         }
3373                         spin_unlock(&conn->mxk_lock);
3374
3375                         if (disconnect) {
3376                                 mxlnd_conn_disconnect(conn, 1, 1);
3377                         }
3378                         mxlnd_conn_decref(conn);
3379                 }
3380         }
3381         read_unlock(&kmxlnd_data.kmx_peers_lock);
3382         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3383
3384         return next;
3385 }
3386
3387 /**
3388  * mxlnd_timeoutd - enforces timeouts on messages
3389  * @arg - thread id (as a void *)
3390  *
3391  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3392  * it calls mxlnd_conn_disconnect().
3393  *
3394  * After checking for timeouts, try progressing sends (call check_sends()).
3395  */
3396 int
3397 mxlnd_timeoutd(void *arg)
3398 {
3399         int                     i       = 0;
3400         long                    id      = (long) arg;
3401         unsigned long           now     = 0;
3402         unsigned long           next    = 0;
3403         unsigned long           delay   = HZ;
3404         struct kmx_peer        *peer    = NULL;
3405         struct kmx_conn        *conn    = NULL;
3406
3407         cfs_daemonize("mxlnd_timeoutd");
3408         //cfs_block_allsigs();
3409
3410         CDEBUG(D_NET, "timeoutd starting\n");
3411
3412         while (!kmxlnd_data.kmx_shutdown) {
3413
3414                 now = jiffies;
3415                 /* if the next timeout has not arrived, go back to sleep */
3416                 if (time_after(now, next)) {
3417                         next = mxlnd_check_timeouts(now);
3418                 }
3419
3420                read_lock(&kmxlnd_data.kmx_peers_lock);
3421                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3422                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3423                                 conn = peer->mxp_conn;
3424                                 if (conn == NULL)
3425                                         continue;
3426
3427                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3428                                     time_after(now, conn->mxk_last_tx + HZ)) {
3429                                         mxlnd_check_sends(peer);
3430                                 }
3431                         }
3432                 }
3433                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3434
3435                 mxlnd_sleep(delay);
3436         }
3437         CDEBUG(D_NET, "timeoutd stopping\n");
3438         mxlnd_thread_stop(id);
3439         return 0;
3440 }