Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 inline void mxlnd_noop(char *s, ...)
47 {
48         return;
49 }
50
51 char *
52 mxlnd_ctxstate_to_str(int mxc_state)
53 {
54         switch (mxc_state) {
55         case MXLND_CTX_INIT:
56                 return "MXLND_CTX_INIT";
57         case MXLND_CTX_IDLE:
58                 return "MXLND_CTX_IDLE";
59         case MXLND_CTX_PREP:
60                 return "MXLND_CTX_PREP";
61         case MXLND_CTX_PENDING:
62                 return "MXLND_CTX_PENDING";
63         case MXLND_CTX_COMPLETED:
64                 return "MXLND_CTX_COMPLETED";
65         case MXLND_CTX_CANCELED:
66                 return "MXLND_CTX_CANCELED";
67         default:
68                 return "*unknown*";
69         }
70 }
71
72 char *
73 mxlnd_connstatus_to_str(int mxk_status)
74 {
75         switch (mxk_status) {
76         case MXLND_CONN_READY:
77                 return "MXLND_CONN_READY";
78         case MXLND_CONN_INIT:
79                 return "MXLND_CONN_INIT";
80         case MXLND_CONN_REQ:
81                 return "MXLND_CONN_REQ";
82         case MXLND_CONN_ACK:
83                 return "MXLND_CONN_ACK";
84         case MXLND_CONN_WAIT:
85                 return "MXLND_CONN_WAIT";
86         case MXLND_CONN_DISCONNECT:
87                 return "MXLND_CONN_DISCONNECT";
88         case MXLND_CONN_FAIL:
89                 return "MXLND_CONN_FAIL";
90         default:
91                 return "unknown";
92         }
93 }
94
95 char *
96 mxlnd_msgtype_to_str(int type) {
97         switch (type) {
98         case MXLND_MSG_EAGER:
99                 return "MXLND_MSG_EAGER";
100         case MXLND_MSG_CONN_REQ:
101                 return "MXLND_MSG_CONN_REQ";
102         case MXLND_MSG_CONN_ACK:
103                 return "MXLND_MSG_CONN_ACK";
104         case MXLND_MSG_NOOP:
105                 return "MXLND_MSG_NOOP";
106         case MXLND_MSG_PUT_REQ:
107                 return "MXLND_MSG_PUT_REQ";
108         case MXLND_MSG_PUT_ACK:
109                 return "MXLND_MSG_PUT_ACK";
110         case MXLND_MSG_PUT_DATA:
111                 return "MXLND_MSG_PUT_DATA";
112         case MXLND_MSG_GET_REQ:
113                 return "MXLND_MSG_GET_REQ";
114         case MXLND_MSG_GET_DATA:
115                 return "MXLND_MSG_GET_DATA";
116         default:
117                 return "unknown";
118         }
119 }
120
121 char *
122 mxlnd_lnetmsg_to_str(int type)
123 {
124         switch (type) {
125         case LNET_MSG_ACK:
126                 return "LNET_MSG_ACK";
127         case LNET_MSG_PUT:
128                 return "LNET_MSG_PUT";
129         case LNET_MSG_GET:
130                 return "LNET_MSG_GET";
131         case LNET_MSG_REPLY:
132                 return "LNET_MSG_REPLY";
133         case LNET_MSG_HELLO:
134                 return "LNET_MSG_HELLO";
135         default:
136                 LBUG();
137                 return "*unknown*";
138         }
139 }
140
141 static inline u64
142 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
143 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
144 {
145         u64 type        = (u64) ctx->mxc_msg_type;
146         u64 err         = (u64) error;
147         u64 match       = 0LL;
148
149         LASSERT(ctx->mxc_msg_type != 0);
150         LASSERT(ctx->mxc_cookie >> 52 == 0);
151         match = (type << 60) | (err << 52) | ctx->mxc_cookie;
152         return match;
153 }
154
155 static inline void
156 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
157 {
158         *msg_type = (u8) (match >> 60);
159         *error    = (u8) ((match >> 52) & 0xFF);
160         *cookie   = match & 0xFFFFFFFFFFFFFLL;
161         LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
162                 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
163                 *msg_type == MXLND_MSG_EAGER    ||
164                 *msg_type == MXLND_MSG_CONN_REQ ||
165                 *msg_type == MXLND_MSG_CONN_ACK ||
166                 *msg_type == MXLND_MSG_NOOP     ||
167                 *msg_type == MXLND_MSG_PUT_REQ  ||
168                 *msg_type == MXLND_MSG_PUT_ACK  ||
169                 *msg_type == MXLND_MSG_PUT_DATA ||
170                 *msg_type == MXLND_MSG_GET_REQ  ||
171                 *msg_type == MXLND_MSG_GET_DATA);
172         return;
173 }
174
175 struct kmx_ctx *
176 mxlnd_get_idle_rx(void)
177 {
178         struct list_head        *tmp    = NULL;
179         struct kmx_ctx          *rx     = NULL;
180
181         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
182
183         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
184                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
185                 return NULL;
186         }
187
188         tmp = &kmxlnd_data.kmx_rx_idle;
189         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
190         list_del_init(&rx->mxc_list);
191         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
192
193 #if MXLND_DEBUG
194         if (rx->mxc_get != rx->mxc_put) {
195                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
196                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
197                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
198                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
199                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
200                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
201                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
202                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
203                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
204                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
205         }
206 #endif
207         LASSERT (rx->mxc_get == rx->mxc_put);
208
209         rx->mxc_get++;
210
211         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
212         rx->mxc_state = MXLND_CTX_PREP;
213
214         return rx;
215 }
216
217 int
218 mxlnd_put_idle_rx(struct kmx_ctx *rx)
219 {
220         if (rx == NULL) {
221                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
222                 return -EINVAL;
223         } else if (rx->mxc_type != MXLND_REQ_RX) {
224                 CDEBUG(D_NETERROR, "called with tx\n");
225                 return -EINVAL;
226         }
227         LASSERT(rx->mxc_get == rx->mxc_put + 1);
228         mxlnd_ctx_init(rx);
229         rx->mxc_put++;
230         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
231         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
232         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
233         return 0;
234 }
235
236 int
237 mxlnd_reduce_idle_rxs(__u32 count)
238 {
239         __u32                   i       = 0;
240         struct kmx_ctx          *rx     = NULL;
241
242         spin_lock(&kmxlnd_data.kmx_rxs_lock);
243         for (i = 0; i < count; i++) {
244                 rx = mxlnd_get_idle_rx();
245                 if (rx != NULL) {
246                         struct list_head *tmp = &rx->mxc_global_list;
247                         list_del_init(tmp);
248                         mxlnd_ctx_free(rx);
249                 } else {
250                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
251                         break;
252                 }
253         }
254         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
255         return 0;
256 }
257
258 struct kmx_ctx *
259 mxlnd_get_idle_tx(void)
260 {
261         struct list_head        *tmp    = NULL;
262         struct kmx_ctx          *tx     = NULL;
263
264         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
265
266         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
267                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
268                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
269                 return NULL;
270         }
271
272         tmp = &kmxlnd_data.kmx_tx_idle;
273         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
274         list_del_init(&tx->mxc_list);
275
276         /* Allocate a new completion cookie.  It might not be needed,
277          * but we've got a lock right now and we're unlikely to
278          * wrap... */
279         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
280         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
281                 kmxlnd_data.kmx_tx_next_cookie = 1;
282         }
283         kmxlnd_data.kmx_tx_used++;
284         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
285
286         LASSERT (tx->mxc_get == tx->mxc_put);
287
288         tx->mxc_get++;
289
290         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
291         LASSERT (tx->mxc_lntmsg[0] == NULL);
292         LASSERT (tx->mxc_lntmsg[1] == NULL);
293
294         tx->mxc_state = MXLND_CTX_PREP;
295
296         return tx;
297 }
298
299 int
300 mxlnd_put_idle_tx(struct kmx_ctx *tx)
301 {
302         //int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
303         int             result  = 0;
304         lnet_msg_t      *lntmsg[2];
305
306         if (tx == NULL) {
307                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
308                 return -EINVAL;
309         } else if (tx->mxc_type != MXLND_REQ_TX) {
310                 CDEBUG(D_NETERROR, "called with rx\n");
311                 return -EINVAL;
312         }
313         if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
314               tx->mxc_status.code == MX_STATUS_TRUNCATED))
315                 result = -EIO;
316
317         lntmsg[0] = tx->mxc_lntmsg[0];
318         lntmsg[1] = tx->mxc_lntmsg[1];
319
320         LASSERT(tx->mxc_get == tx->mxc_put + 1);
321         mxlnd_ctx_init(tx);
322         tx->mxc_put++;
323         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
324         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
325         kmxlnd_data.kmx_tx_used--;
326         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
327         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
328         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
329         return 0;
330 }
331
332 /**
333  * mxlnd_conn_free - free the conn
334  * @conn - a kmx_conn pointer
335  *
336  * The calling function should remove the conn from the conns list first
337  * then destroy it.
338  */
339 void
340 mxlnd_conn_free(struct kmx_conn *conn)
341 {
342         struct kmx_peer *peer   = conn->mxk_peer;
343
344         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
345         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
346                  list_empty (&conn->mxk_tx_free_queue) &&
347                  list_empty (&conn->mxk_pending));
348         if (!list_empty(&conn->mxk_list)) {
349                 spin_lock(&peer->mxp_lock);
350                 list_del_init(&conn->mxk_list);
351                 if (peer->mxp_conn == conn) {
352                         peer->mxp_conn = NULL;
353                         if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
354                                 mx_set_endpoint_addr_context(conn->mxk_epa,
355                                                              (void *) NULL);
356                         }
357                 }
358                 spin_unlock(&peer->mxp_lock);
359         }
360         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
361         MXLND_FREE (conn, sizeof (*conn));
362         return;
363 }
364
365
366 void
367 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
368 {
369         int                     found   = 0;
370         struct kmx_ctx          *ctx    = NULL;
371         struct kmx_ctx          *next   = NULL;
372         mx_return_t             mxret   = MX_SUCCESS;
373         u32                     result  = 0;
374
375         do {
376                 found = 0;
377                 spin_lock(&conn->mxk_lock);
378                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
379                         /* we will delete all including txs */
380                         list_del_init(&ctx->mxc_list);
381                         if (ctx->mxc_type == MXLND_REQ_RX) {
382                                 found = 1;
383                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
384                                                   &ctx->mxc_mxreq,
385                                                   &result);
386                                 if (mxret != MX_SUCCESS) {
387                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
388                                 }
389                                 if (result == 1) {
390                                         ctx->mxc_status.code = -ECONNABORTED;
391                                         ctx->mxc_state = MXLND_CTX_CANCELED;
392                                         /* NOTE this calls lnet_finalize() and
393                                          * we cannot hold any locks when calling it.
394                                          * It also calls mxlnd_conn_decref(conn) */
395                                         spin_unlock(&conn->mxk_lock);
396                                         mxlnd_handle_rx_completion(ctx);
397                                         spin_lock(&conn->mxk_lock);
398                                 }
399                                 break;
400                         }
401                 }
402                 spin_unlock(&conn->mxk_lock);
403         }
404         while (found);
405
406         return;
407 }
408
409 /**
410  * mxlnd_conn_disconnect - shutdown a connection
411  * @conn - a kmx_conn pointer
412  *
413  * This function sets the status to DISCONNECT, completes queued
414  * txs with failure, calls mx_disconnect, which will complete
415  * pending txs and matched rxs with failure.
416  */
417 void
418 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
419 {
420         struct list_head        *tmp    = NULL;
421
422         spin_lock(&conn->mxk_lock);
423         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
424                 spin_unlock(&conn->mxk_lock);
425                 return;
426         }
427         conn->mxk_status = MXLND_CONN_DISCONNECT;
428         conn->mxk_timeout = 0;
429
430         while (!list_empty(&conn->mxk_tx_free_queue) ||
431                !list_empty(&conn->mxk_tx_credit_queue)) {
432
433                 struct kmx_ctx          *tx     = NULL;
434
435                 if (!list_empty(&conn->mxk_tx_free_queue)) {
436                         tmp = &conn->mxk_tx_free_queue;
437                 } else {
438                         tmp = &conn->mxk_tx_credit_queue;
439                 }
440
441                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
442                 list_del_init(&tx->mxc_list);
443                 tx->mxc_status.code = -ECONNABORTED;
444                 spin_unlock(&conn->mxk_lock);
445                 mxlnd_put_idle_tx(tx);
446                 mxlnd_conn_decref(conn); /* for this tx */
447                 spin_lock(&conn->mxk_lock);
448         }
449
450         spin_unlock(&conn->mxk_lock);
451
452         /* cancel pending rxs */
453         mxlnd_conn_cancel_pending_rxs(conn);
454
455         if (kmxlnd_data.kmx_shutdown != 1) {
456
457                 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
458
459                 if (notify) {
460                         time_t          last_alive      = 0;
461                         unsigned long   last_msg        = 0;
462
463                         /* notify LNET that we are giving up on this peer */
464                         if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
465                                 last_msg = conn->mxk_last_rx;
466                         } else {
467                                 last_msg = conn->mxk_last_tx;
468                         }
469                         last_alive = cfs_time_current_sec() -
470                                      cfs_duration_sec(cfs_time_current() - last_msg);
471                         lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
472                 }
473         }
474         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
475
476         return;
477 }
478
479 /**
480  * mxlnd_conn_alloc - allocate and initialize a new conn struct
481  * @connp - address of a kmx_conn pointer
482  * @peer - owning kmx_peer
483  *
484  * Returns 0 on success and -ENOMEM on failure
485  */
486 int
487 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
488 {
489         struct kmx_conn *conn    = NULL;
490
491         LASSERT(peer != NULL);
492
493         MXLND_ALLOC(conn, sizeof (*conn));
494         if (conn == NULL) {
495                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
496                 return -ENOMEM;
497         }
498         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
499
500         memset(conn, 0, sizeof(*conn));
501
502         /* conn->mxk_incarnation = 0 - will be set by peer */
503         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer 
504                                                    and one for the caller */
505         conn->mxk_peer = peer;
506         /* mxk_epa - to be set after mx_iconnect() */
507         INIT_LIST_HEAD(&conn->mxk_list);
508         spin_lock_init(&conn->mxk_lock);
509         /* conn->mxk_timeout = 0 */
510         conn->mxk_last_tx = jiffies;
511         conn->mxk_last_rx = conn->mxk_last_tx;
512         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
513         /* mxk_outstanding = 0 */
514         conn->mxk_status = MXLND_CONN_INIT;
515         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
516         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
517         /* conn->mxk_ntx_msgs = 0 */
518         /* conn->mxk_ntx_data = 0 */
519         /* conn->mxk_ntx_posted = 0 */
520         /* conn->mxk_data_posted = 0 */
521         INIT_LIST_HEAD(&conn->mxk_pending);
522
523         *connp = conn;
524
525         mxlnd_peer_addref(peer);        /* add a ref for this conn */
526
527         /* add to front of peer's conns list */
528         list_add(&conn->mxk_list, &peer->mxp_conns);
529         peer->mxp_conn = conn;
530         return 0;
531 }
532
533 int
534 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
535 {
536         int ret = 0;
537         spin_lock(&peer->mxp_lock);
538         ret = mxlnd_conn_alloc_locked(connp, peer);
539         spin_unlock(&peer->mxp_lock);
540         return ret;
541 }
542
543 int
544 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
545 {
546         int             ret     = 0;
547         struct kmx_conn *conn   = ctx->mxc_conn;
548
549         ctx->mxc_state = MXLND_CTX_PENDING;
550         if (conn != NULL) {
551                 spin_lock(&conn->mxk_lock);
552                 if (conn->mxk_status >= MXLND_CONN_INIT) {
553                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
554                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
555                                 conn->mxk_timeout = ctx->mxc_deadline;
556                         }
557                 } else {
558                         ctx->mxc_state = MXLND_CTX_COMPLETED;
559                         ret = -1;
560                 }
561                 spin_unlock(&conn->mxk_lock);
562         }
563         return ret;
564 }
565
566 int
567 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
568 {
569         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
570                 ctx->mxc_state == MXLND_CTX_COMPLETED);
571         if (ctx->mxc_state != MXLND_CTX_PENDING &&
572             ctx->mxc_state != MXLND_CTX_COMPLETED) {
573                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
574                        mxlnd_ctxstate_to_str(ctx->mxc_state));
575         }
576         ctx->mxc_state = MXLND_CTX_COMPLETED;
577         if (!list_empty(&ctx->mxc_list)) {
578                 struct kmx_conn *conn = ctx->mxc_conn;
579                 struct kmx_ctx *next = NULL;
580                 LASSERT(conn != NULL);
581                 spin_lock(&conn->mxk_lock);
582                 list_del_init(&ctx->mxc_list);
583                 conn->mxk_timeout = 0;
584                 if (!list_empty(&conn->mxk_pending)) {
585                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
586                         conn->mxk_timeout = next->mxc_deadline;
587                 }
588                 spin_unlock(&conn->mxk_lock);
589         }
590         return 0;
591 }
592
593 /**
594  * mxlnd_peer_free - free the peer
595  * @peer - a kmx_peer pointer
596  *
597  * The calling function should decrement the rxs, drain the tx queues and
598  * remove the peer from the peers list first then destroy it.
599  */
600 void
601 mxlnd_peer_free(struct kmx_peer *peer)
602 {
603         CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
604
605         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
606
607         if (peer->mxp_host != NULL) {
608                 spin_lock(&peer->mxp_host->mxh_lock);
609                 peer->mxp_host->mxh_peer = NULL;
610                 spin_unlock(&peer->mxp_host->mxh_lock);
611         }
612         if (!list_empty(&peer->mxp_peers)) {
613                 /* assume we are locked */
614                 list_del_init(&peer->mxp_peers);
615         }
616
617         MXLND_FREE (peer, sizeof (*peer));
618         atomic_dec(&kmxlnd_data.kmx_npeers);
619         return;
620 }
621
622 void
623 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
624 {
625         u64             nic_id  = 0LL;
626         char            name[MX_MAX_HOSTNAME_LEN + 1];
627         mx_return_t     mxret   = MX_SUCCESS;
628
629         memset(name, 0, sizeof(name));
630         snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
631         mxret = mx_hostname_to_nic_id(name, &nic_id);
632         if (mxret == MX_SUCCESS) {
633                 peer->mxp_nic_id = nic_id;
634         } else {
635                 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
636                                    "with %s\n", name, mx_strerror(mxret));
637                 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
638                 if (mxret == MX_SUCCESS) {
639                         peer->mxp_nic_id = nic_id;
640                 } else {
641                         CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
642                                            "with %s\n", peer->mxp_host->mxh_hostname,
643                                            mx_strerror(mxret));
644                 }
645         }
646         return;
647 }
648
649 /**
650  * mxlnd_peer_alloc - allocate and initialize a new peer struct
651  * @peerp - address of a kmx_peer pointer
652  * @nid - LNET node id
653  *
654  * Returns 0 on success and -ENOMEM on failure
655  */
656 int
657 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
658 {
659         int                     i       = 0;
660         int                     ret     = 0;
661         u32                     addr    = LNET_NIDADDR(nid);
662         struct kmx_peer        *peer    = NULL;
663         struct kmx_host        *host    = NULL;
664
665         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
666
667         MXLND_ALLOC(peer, sizeof (*peer));
668         if (peer == NULL) {
669                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
670                 return -ENOMEM;
671         }
672         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
673
674         memset(peer, 0, sizeof(*peer));
675
676         list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
677                 if (addr == host->mxh_addr) {
678                         peer->mxp_host = host;
679                         spin_lock(&host->mxh_lock);
680                         host->mxh_peer = peer;
681                         spin_unlock(&host->mxh_lock);
682                         break;
683                 }
684         }
685         if (peer->mxp_host == NULL) {
686                 CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
687                 MXLND_FREE(peer, sizeof(*peer));
688                 return -ENXIO;
689         }
690
691         peer->mxp_nid = nid;
692         /* peer->mxp_incarnation */
693         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
694         mxlnd_peer_hostname_to_nic_id(peer);
695
696         INIT_LIST_HEAD(&peer->mxp_peers);
697         spin_lock_init(&peer->mxp_lock);
698         INIT_LIST_HEAD(&peer->mxp_conns);
699         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
700         if (ret != 0) {
701                 mxlnd_peer_decref(peer);
702                 return ret;
703         }
704
705         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
706                 struct kmx_ctx   *rx     = NULL;
707                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
708                 if (ret != 0) {
709                         mxlnd_reduce_idle_rxs(i);
710                         mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
711                         mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
712                         mxlnd_peer_decref(peer);
713                         return ret;
714                 }
715                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
716                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
717                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
718                 rx->mxc_put = -1;
719                 mxlnd_put_idle_rx(rx);
720         }
721         /* peer->mxp_reconnect_time = 0 */
722         /* peer->mxp_incompatible = 0 */
723
724         *peerp = peer;
725         return 0;
726 }
727
728 /**
729  * mxlnd_nid_to_hash - hash the nid
730  * @nid - msg pointer
731  *
732  * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
733  */
734 static inline int
735 mxlnd_nid_to_hash(lnet_nid_t nid)
736 {
737         return (nid & MXLND_HASH_MASK) ^
738                ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
739 }
740
741 static inline struct kmx_peer *
742 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
743 {
744         int                     found   = 0;
745         int                     hash    = 0;
746         struct kmx_peer         *peer   = NULL;
747
748         hash = mxlnd_nid_to_hash(nid);
749
750         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
751                 if (peer->mxp_nid == nid) {
752                         found = 1;
753                         mxlnd_peer_addref(peer);
754                         break;
755                 }
756         }
757         return (found ? peer : NULL);
758 }
759
760 static inline struct kmx_peer *
761 mxlnd_find_peer_by_nid(lnet_nid_t nid)
762 {
763         struct kmx_peer *peer   = NULL;
764
765         read_lock(&kmxlnd_data.kmx_peers_lock);
766         peer = mxlnd_find_peer_by_nid_locked(nid);
767         read_unlock(&kmxlnd_data.kmx_peers_lock);
768         return peer;
769 }
770
771 static inline int
772 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
773 {
774         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
775                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
776                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
777                 tx->mxc_msg_type == MXLND_MSG_NOOP);
778 }
779
780 /**
781  * mxlnd_init_msg - set type and number of bytes
782  * @msg - msg pointer
783  * @type - of message
784  * @body_nob - bytes in msg body
785  */
786 static inline void
787 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
788 {
789         msg->mxm_type = type;
790         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
791 }
792
793 static inline void
794 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
795 {
796         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
797         struct kmx_msg  *msg    = NULL;
798
799         LASSERT (tx != NULL);
800         LASSERT (nob <= MXLND_EAGER_SIZE);
801
802         tx->mxc_nid = nid;
803         /* tx->mxc_peer should have already been set if we know it */
804         tx->mxc_msg_type = type;
805         tx->mxc_nseg = 1;
806         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
807         tx->mxc_seg.segment_length = nob;
808         tx->mxc_pin_type = MX_PIN_PHYSICAL;
809         //tx->mxc_state = MXLND_CTX_PENDING;
810
811         msg = tx->mxc_msg;
812         msg->mxm_type = type;
813         msg->mxm_nob  = nob;
814
815         return;
816 }
817
818 static inline __u32
819 mxlnd_cksum (void *ptr, int nob)
820 {
821         char  *c  = ptr;
822         __u32  sum = 0;
823
824         while (nob-- > 0)
825                 sum = ((sum << 1) | (sum >> 31)) + *c++;
826
827         /* ensure I don't return 0 (== no checksum) */
828         return (sum == 0) ? 1 : sum;
829 }
830
831 /**
832  * mxlnd_pack_msg - complete msg info
833  * @tx - msg to send
834  */
835 static inline void
836 mxlnd_pack_msg(struct kmx_ctx *tx)
837 {
838         struct kmx_msg  *msg    = tx->mxc_msg;
839
840         /* type and nob should already be set in init_msg() */
841         msg->mxm_magic    = MXLND_MSG_MAGIC;
842         msg->mxm_version  = MXLND_MSG_VERSION;
843         /*   mxm_type */
844         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
845          * return credits as well */
846         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
847             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
848                 spin_lock(&tx->mxc_conn->mxk_lock);
849                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
850                 tx->mxc_conn->mxk_outstanding = 0;
851                 spin_unlock(&tx->mxc_conn->mxk_lock);
852         } else {
853                 msg->mxm_credits  = 0;
854         }
855         /*   mxm_nob */
856         msg->mxm_cksum    = 0;
857         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
858         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
859         msg->mxm_dstnid   = tx->mxc_nid;
860         /* if it is a new peer, the dststamp will be 0 */
861         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
862         msg->mxm_seq      = tx->mxc_cookie;
863
864         if (*kmxlnd_tunables.kmx_cksum) {
865                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
866         }
867 }
868
869 int
870 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
871 {
872         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
873         __u32     msg_cksum     = 0;
874         int       flip          = 0;
875         int       msg_nob       = 0;
876
877         /* 6 bytes are enough to have received magic + version */
878         if (nob < 6) {
879                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
880                 return -EPROTO;
881         }
882
883         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
884                 flip = 0;
885         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
886                 flip = 1;
887         } else {
888                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
889                 return -EPROTO;
890         }
891
892         if (msg->mxm_version !=
893             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
894                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
895                 return -EPROTO;
896         }
897
898         if (nob < hdr_size) {
899                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
900                 return -EPROTO;
901         }
902
903         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
904         if (msg_nob > nob) {
905                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
906                 return -EPROTO;
907         }
908
909         /* checksum must be computed with mxm_cksum zero and BEFORE anything
910          * gets flipped */
911         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
912         msg->mxm_cksum = 0;
913         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
914                 CDEBUG(D_NETERROR, "Bad checksum\n");
915                 return -EPROTO;
916         }
917         msg->mxm_cksum = msg_cksum;
918
919         if (flip) {
920                 /* leave magic unflipped as a clue to peer endianness */
921                 __swab16s(&msg->mxm_version);
922                 CLASSERT (sizeof(msg->mxm_type) == 1);
923                 CLASSERT (sizeof(msg->mxm_credits) == 1);
924                 msg->mxm_nob = msg_nob;
925                 __swab64s(&msg->mxm_srcnid);
926                 __swab64s(&msg->mxm_srcstamp);
927                 __swab64s(&msg->mxm_dstnid);
928                 __swab64s(&msg->mxm_dststamp);
929                 __swab64s(&msg->mxm_seq);
930         }
931
932         if (msg->mxm_srcnid == LNET_NID_ANY) {
933                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
934                 return -EPROTO;
935         }
936
937         switch (msg->mxm_type) {
938         default:
939                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
940                 return -EPROTO;
941
942         case MXLND_MSG_NOOP:
943                 break;
944
945         case MXLND_MSG_EAGER:
946                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
947                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
948                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
949                         return -EPROTO;
950                 }
951                 break;
952
953         case MXLND_MSG_PUT_REQ:
954                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
955                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
956                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
957                         return -EPROTO;
958                 }
959                 if (flip)
960                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
961                 break;
962
963         case MXLND_MSG_PUT_ACK:
964                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
965                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
966                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
967                         return -EPROTO;
968                 }
969                 if (flip) {
970                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
971                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
972                 }
973                 break;
974
975         case MXLND_MSG_GET_REQ:
976                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
977                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
978                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
979                         return -EPROTO;
980                 }
981                 if (flip) {
982                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
983                 }
984                 break;
985
986         case MXLND_MSG_CONN_REQ:
987         case MXLND_MSG_CONN_ACK:
988                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
989                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
990                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
991                         return -EPROTO;
992                 }
993                 if (flip) {
994                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
995                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
996                 }
997                 break;
998         }
999         return 0;
1000 }
1001
1002 /**
1003  * mxlnd_recv_msg
1004  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1005  * @rx
1006  * @msg_type
1007  * @cookie
1008  * @length - length of incoming message
1009  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1010  *
1011  * The caller gets the rx and sets nid, peer and conn if known.
1012  *
1013  * Returns 0 on success and -1 on failure
1014  */
1015 int
1016 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1017 {
1018         int             ret     = 0;
1019         mx_return_t     mxret   = MX_SUCCESS;
1020         uint64_t        mask    = 0xF00FFFFFFFFFFFFFLL;
1021
1022         rx->mxc_msg_type = msg_type;
1023         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1024         rx->mxc_cookie = cookie;
1025         /* rx->mxc_match may already be set */
1026         /* rx->mxc_seg.segment_ptr is already set */
1027         rx->mxc_seg.segment_length = length;
1028         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1029         ret = mxlnd_q_pending_ctx(rx);
1030         if (ret == -1) {
1031                 /* the caller is responsible for calling conn_decref() if needed */
1032                 return -1;
1033         }
1034         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1035                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1036         if (mxret != MX_SUCCESS) {
1037                 mxlnd_deq_pending_ctx(rx);
1038                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n", 
1039                                    mx_strerror(mxret), (int) mxret);
1040                 return -1;
1041         }
1042         return 0;
1043 }
1044
1045
1046 /**
1047  * mxlnd_unexpected_recv - this is the callback function that will handle 
1048  *                         unexpected receives
1049  * @context - NULL, ignore
1050  * @source - the peer's mx_endpoint_addr_t
1051  * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1052  * @length - length of incoming message
1053  * @data_if_available - ignore
1054  *
1055  * If it is an eager-sized msg, we will call recv_msg() with the actual
1056  * length. If it is a large message, we will call recv_msg() with a
1057  * length of 0 bytes to drop it because we should never have a large,
1058  * unexpected message.
1059  *
1060  * NOTE - The MX library blocks until this function completes. Make it as fast as
1061  * possible. DO NOT allocate memory which can block!
1062  *
1063  * If we cannot get a rx or the conn is closed, drop the message on the floor
1064  * (i.e. recv 0 bytes and ignore).
1065  */
1066 mx_unexp_handler_action_t
1067 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1068                  uint64_t match_value, uint32_t length, void *data_if_available)
1069 {
1070         int             ret             = 0;
1071         struct kmx_ctx  *rx             = NULL;
1072         mx_ksegment_t   seg;
1073         u8              msg_type        = 0;
1074         u8              error           = 0;
1075         u64             cookie          = 0LL;
1076
1077         if (context != NULL) {
1078                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1079         }
1080
1081 #if MXLND_DEBUG
1082         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1083 #endif
1084
1085         rx = mxlnd_get_idle_rx();
1086         if (rx != NULL) {
1087                 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1088                 if (length <= MXLND_EAGER_SIZE) {
1089                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1090                 } else {
1091                         CDEBUG(D_NETERROR, "unexpected large receive with "
1092                                            "match_value=0x%llx length=%d\n",
1093                                            match_value, length);
1094                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1095                 }
1096
1097                 if (ret == 0) {
1098                         struct kmx_peer *peer   = NULL;
1099                         struct kmx_conn *conn   = NULL;
1100
1101                         /* NOTE to avoid a peer disappearing out from under us,
1102                          *      read lock the peers lock first */
1103                         read_lock(&kmxlnd_data.kmx_peers_lock);
1104                         mx_get_endpoint_addr_context(source, (void **) &peer);
1105                         if (peer != NULL) {
1106                                 mxlnd_peer_addref(peer); /* add a ref... */
1107                                 spin_lock(&peer->mxp_lock);
1108                                 conn = peer->mxp_conn;
1109                                 if (conn) {
1110                                         mxlnd_conn_addref(conn); /* add ref until rx completed */
1111                                         mxlnd_peer_decref(peer); /* and drop peer ref */
1112                                         rx->mxc_conn = conn;
1113                                 }
1114                                 spin_unlock(&peer->mxp_lock);
1115                                 rx->mxc_peer = peer;
1116                                 rx->mxc_nid = peer->mxp_nid;
1117                         }
1118                         read_unlock(&kmxlnd_data.kmx_peers_lock);
1119                 } else {
1120                         CDEBUG(D_NETERROR, "could not post receive\n");
1121                         mxlnd_put_idle_rx(rx);
1122                 }
1123         }
1124
1125         if (rx == NULL || ret != 0) {
1126                 if (rx == NULL) {
1127                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1128                 } else {
1129                         /* ret != 0 */
1130                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1131                 }
1132                 seg.segment_ptr = 0LL;
1133                 seg.segment_length = 0;
1134                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1135                           match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1136         }
1137
1138         return MX_RECV_CONTINUE;
1139 }
1140
1141
1142 int
1143 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1144 {
1145         int                      i      = 0;
1146         int                      ret    = -ENOENT;
1147         struct kmx_peer         *peer   = NULL;
1148
1149         read_lock(&kmxlnd_data.kmx_peers_lock);
1150         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1151                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1152                         if (index-- > 0)
1153                                 continue;
1154
1155                         *nidp = peer->mxp_nid;
1156                         *count = atomic_read(&peer->mxp_refcount);
1157                         ret = 0;
1158                         break;
1159                 }
1160         }
1161         read_unlock(&kmxlnd_data.kmx_peers_lock);
1162
1163         return ret;
1164 }
1165
1166 void
1167 mxlnd_del_peer_locked(struct kmx_peer *peer)
1168 {
1169         list_del_init(&peer->mxp_peers); /* remove from the global list */
1170         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1171         mxlnd_peer_decref(peer); /* drop global list ref */
1172         return;
1173 }
1174
1175 int
1176 mxlnd_del_peer(lnet_nid_t nid)
1177 {
1178         int             i       = 0;
1179         int             ret     = 0;
1180         struct kmx_peer *peer   = NULL;
1181         struct kmx_peer *next   = NULL;
1182
1183         if (nid != LNET_NID_ANY) {
1184                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1185         }
1186         write_lock(&kmxlnd_data.kmx_peers_lock);
1187         if (nid != LNET_NID_ANY) {
1188                 if (peer == NULL) {
1189                         ret = -ENOENT;
1190                 } else {
1191                         mxlnd_peer_decref(peer); /* and drops it */
1192                         mxlnd_del_peer_locked(peer);
1193                 }
1194         } else { /* LNET_NID_ANY */
1195                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1196                         list_for_each_entry_safe(peer, next,
1197                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1198                                 mxlnd_del_peer_locked(peer);
1199                         }
1200                 }
1201         }
1202         write_unlock(&kmxlnd_data.kmx_peers_lock);
1203
1204         return ret;
1205 }
1206
1207 struct kmx_conn *
1208 mxlnd_get_conn_by_idx(int index)
1209 {
1210         int                      i      = 0;
1211         struct kmx_peer         *peer   = NULL;
1212         struct kmx_conn         *conn   = NULL;
1213
1214         read_lock(&kmxlnd_data.kmx_peers_lock);
1215         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1216                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1217                         spin_lock(&peer->mxp_lock);
1218                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1219                                 if (index-- > 0) {
1220                                         continue;
1221                                 }
1222
1223                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1224                                 spin_unlock(&peer->mxp_lock);
1225                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
1226                                 return conn;
1227                         }
1228                         spin_unlock(&peer->mxp_lock);
1229                 }
1230         }
1231         read_unlock(&kmxlnd_data.kmx_peers_lock);
1232
1233         return NULL;
1234 }
1235
1236 void
1237 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1238 {
1239         struct kmx_conn *conn   = NULL;
1240         struct kmx_conn *next   = NULL;
1241
1242         spin_lock(&peer->mxp_lock);
1243         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1244                 mxlnd_conn_disconnect(conn, 0 , 0);
1245         }
1246         spin_unlock(&peer->mxp_lock);
1247         return;
1248 }
1249
1250 int
1251 mxlnd_close_matching_conns(lnet_nid_t nid)
1252 {
1253         int             i       = 0;
1254         int             ret     = 0;
1255         struct kmx_peer *peer   = NULL;
1256
1257         read_lock(&kmxlnd_data.kmx_peers_lock);
1258         if (nid != LNET_NID_ANY) {
1259                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1260                 if (peer == NULL) {
1261                         ret = -ENOENT;
1262                 } else {
1263                         mxlnd_close_matching_conns_locked(peer);
1264                         mxlnd_peer_decref(peer); /* and drops it here */
1265                 }
1266         } else { /* LNET_NID_ANY */
1267                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1268                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1269                                 mxlnd_close_matching_conns_locked(peer);
1270                 }
1271         }
1272         read_unlock(&kmxlnd_data.kmx_peers_lock);
1273
1274         return ret;
1275 }
1276
1277 /**
1278  * mxlnd_ctl - modify MXLND parameters
1279  * @ni - LNET interface handle
1280  * @cmd - command to change
1281  * @arg - the ioctl data
1282  *
1283  * Not implemented yet.
1284  */
1285 int
1286 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1287 {
1288         struct libcfs_ioctl_data *data  = arg;
1289         int                       ret   = -EINVAL;
1290
1291         LASSERT (ni == kmxlnd_data.kmx_ni);
1292
1293         switch (cmd) {
1294         case IOC_LIBCFS_GET_PEER: {
1295                 lnet_nid_t      nid     = 0;
1296                 int             count   = 0;
1297
1298                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1299                 data->ioc_nid    = nid;
1300                 data->ioc_count  = count;
1301                 break;
1302         }
1303         case IOC_LIBCFS_DEL_PEER: {
1304                 ret = mxlnd_del_peer(data->ioc_nid);
1305                 break;
1306         }
1307         case IOC_LIBCFS_GET_CONN: {
1308                 struct kmx_conn *conn = NULL;
1309
1310                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1311                 if (conn == NULL) {
1312                         ret = -ENOENT;
1313                 } else {
1314                         ret = 0;
1315                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1316                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1317                 }
1318                 break;
1319         }
1320         case IOC_LIBCFS_CLOSE_CONNECTION: {
1321                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1322                 break;
1323         }
1324         default:
1325                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1326                 break;
1327         }
1328
1329         return ret;
1330 }
1331
1332 /**
1333  * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1334  * @tx
1335  *
1336  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1337  */
1338 void
1339 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1340 {
1341         u8                      msg_type        = tx->mxc_msg_type;
1342         //struct kmx_peer         *peer           = tx->mxc_peer;
1343         struct kmx_conn         *conn           = tx->mxc_conn;
1344
1345         LASSERT (msg_type != 0);
1346         LASSERT (tx->mxc_nid != 0);
1347         LASSERT (tx->mxc_peer != NULL);
1348         LASSERT (tx->mxc_conn != NULL);
1349
1350         tx->mxc_incarnation = conn->mxk_incarnation;
1351
1352         if (msg_type != MXLND_MSG_PUT_DATA &&
1353             msg_type != MXLND_MSG_GET_DATA) {
1354                 /* msg style tx */
1355                 if (mxlnd_tx_requires_credit(tx)) {
1356                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1357                         conn->mxk_ntx_msgs++;
1358                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1359                            msg_type == MXLND_MSG_CONN_ACK) {
1360                         /* put conn msgs at the front of the queue */
1361                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1362                 } else {
1363                         /* PUT_ACK, PUT_NAK */
1364                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1365                         conn->mxk_ntx_msgs++;
1366                 }
1367         } else {
1368                 /* data style tx */
1369                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1370                 conn->mxk_ntx_data++;
1371         }
1372
1373         return;
1374 }
1375
1376 /**
1377  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1378  * @tx
1379  *
1380  * Add the tx to the peer's msg or data queue
1381  */
1382 static inline void
1383 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1384 {
1385         LASSERT(tx->mxc_peer != NULL);
1386         LASSERT(tx->mxc_conn != NULL);
1387         spin_lock(&tx->mxc_conn->mxk_lock);
1388         mxlnd_peer_queue_tx_locked(tx);
1389         spin_unlock(&tx->mxc_conn->mxk_lock);
1390
1391         return;
1392 }
1393
1394 /**
1395  * mxlnd_queue_tx - add the tx to the global tx queue
1396  * @tx
1397  *
1398  * Add the tx to the global queue and up the tx_queue_sem
1399  */
1400 void
1401 mxlnd_queue_tx(struct kmx_ctx *tx)
1402 {
1403         struct kmx_peer *peer   = tx->mxc_peer;
1404         LASSERT (tx->mxc_nid != 0);
1405
1406         if (peer != NULL) {
1407                 if (peer->mxp_incompatible &&
1408                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1409                         /* let this fail now */
1410                         tx->mxc_status.code = -ECONNABORTED;
1411                         mxlnd_conn_decref(peer->mxp_conn);
1412                         mxlnd_put_idle_tx(tx);
1413                         return;
1414                 }
1415                 if (tx->mxc_conn == NULL) {
1416                         int             ret     = 0;
1417                         struct kmx_conn *conn   = NULL;
1418
1419                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1420                         if (ret != 0) {
1421                                 tx->mxc_status.code = ret;
1422                                 mxlnd_put_idle_tx(tx);
1423                                 goto done;
1424                         }
1425                         tx->mxc_conn = conn;
1426                         mxlnd_peer_decref(peer); /* and takes it from peer */
1427                 }
1428                 LASSERT(tx->mxc_conn != NULL);
1429                 mxlnd_peer_queue_tx(tx);
1430                 mxlnd_check_sends(peer);
1431         } else {
1432                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1433                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1434                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1435                 up(&kmxlnd_data.kmx_tx_queue_sem);
1436         }
1437 done:
1438         return;
1439 }
1440
1441 int
1442 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1443 {
1444         int             i                       = 0;
1445         int             sum                     = 0;
1446         int             old_sum                 = 0;
1447         int             nseg                    = 0;
1448         int             first_iov               = -1;
1449         int             first_iov_offset        = 0;
1450         int             first_found             = 0;
1451         int             last_iov                = -1;
1452         int             last_iov_length         = 0;
1453         mx_ksegment_t  *seg                     = NULL;
1454
1455         if (niov == 0) return 0;
1456         LASSERT(iov != NULL);
1457
1458         for (i = 0; i < niov; i++) {
1459                 sum = old_sum + (u32) iov[i].iov_len;
1460                 if (!first_found && (sum > offset)) {
1461                         first_iov = i;
1462                         first_iov_offset = offset - old_sum;
1463                         first_found = 1;
1464                         sum = (u32) iov[i].iov_len - first_iov_offset;
1465                         old_sum = 0;
1466                 }
1467                 if (sum >= nob) {
1468                         last_iov = i;
1469                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1470                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1471                         break;
1472                 }
1473                 old_sum = sum;
1474         }
1475         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1476         nseg = last_iov - first_iov + 1;
1477         LASSERT(nseg > 0);
1478
1479         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1480         if (seg == NULL) {
1481                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1482                 return -1;
1483         }
1484         memset(seg, 0, nseg * sizeof(*seg));
1485         ctx->mxc_nseg = nseg;
1486         sum = 0;
1487         for (i = 0; i < nseg; i++) {
1488                 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1489                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1490                 if (i == 0) {
1491                         seg[i].segment_ptr += (u64) first_iov_offset;
1492                         seg[i].segment_length -= (u32) first_iov_offset;
1493                 }
1494                 if (i == (nseg - 1)) {
1495                         seg[i].segment_length = (u32) last_iov_length;
1496                 }
1497                 sum += seg[i].segment_length;
1498         }
1499         ctx->mxc_seg_list = seg;
1500         ctx->mxc_pin_type = MX_PIN_KERNEL;
1501 #ifdef MX_PIN_FULLPAGES
1502         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1503 #endif
1504         LASSERT(nob == sum);
1505         return 0;
1506 }
1507
1508 int
1509 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1510 {
1511         int             i                       = 0;
1512         int             sum                     = 0;
1513         int             old_sum                 = 0;
1514         int             nseg                    = 0;
1515         int             first_kiov              = -1;
1516         int             first_kiov_offset       = 0;
1517         int             first_found             = 0;
1518         int             last_kiov               = -1;
1519         int             last_kiov_length        = 0;
1520         mx_ksegment_t  *seg                     = NULL;
1521
1522         if (niov == 0) return 0;
1523         LASSERT(kiov != NULL);
1524
1525         for (i = 0; i < niov; i++) {
1526                 sum = old_sum + kiov[i].kiov_len;
1527                 if (i == 0) sum -= kiov[i].kiov_offset;
1528                 if (!first_found && (sum > offset)) {
1529                         first_kiov = i;
1530                         first_kiov_offset = offset - old_sum;
1531                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1532                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1533                         first_found = 1;
1534                         sum = kiov[i].kiov_len - first_kiov_offset;
1535                         old_sum = 0;
1536                 }
1537                 if (sum >= nob) {
1538                         last_kiov = i;
1539                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1540                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1541                         break;
1542                 }
1543                 old_sum = sum;
1544         }
1545         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1546         nseg = last_kiov - first_kiov + 1;
1547         LASSERT(nseg > 0);
1548
1549         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1550         if (seg == NULL) {
1551                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1552                 return -1;
1553         }
1554         memset(seg, 0, niov * sizeof(*seg));
1555         ctx->mxc_nseg = niov;
1556         sum = 0;
1557         for (i = 0; i < niov; i++) {
1558                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1559                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1560                 if (i == 0) {
1561                         seg[i].segment_ptr += (u64) first_kiov_offset;
1562                         /* we have to add back the original kiov_offset */
1563                         seg[i].segment_length -= first_kiov_offset +
1564                                                  kiov[first_kiov].kiov_offset;
1565                 }
1566                 if (i == (nseg - 1)) {
1567                         seg[i].segment_length = last_kiov_length;
1568                 }
1569                 sum += seg[i].segment_length;
1570         }
1571         ctx->mxc_seg_list = seg;
1572         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1573 #ifdef MX_PIN_FULLPAGES
1574         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1575 #endif
1576         LASSERT(nob == sum);
1577         return 0;
1578 }
1579
1580 void
1581 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1582 {
1583         LASSERT(type == MXLND_MSG_PUT_ACK);
1584         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1585         tx->mxc_cookie = cookie;
1586         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1587         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1588         tx->mxc_match = mxlnd_create_match(tx, status);
1589
1590         mxlnd_queue_tx(tx);
1591 }
1592
1593
1594 /**
1595  * mxlnd_send_data - get tx, map [k]iov, queue tx
1596  * @ni
1597  * @lntmsg
1598  * @peer
1599  * @msg_type
1600  * @cookie
1601  *
1602  * This setups the DATA send for PUT or GET.
1603  *
1604  * On success, it queues the tx, on failure it calls lnet_finalize()
1605  */
1606 void
1607 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1608 {
1609         int                     ret             = 0;
1610         lnet_process_id_t       target          = lntmsg->msg_target;
1611         unsigned int            niov            = lntmsg->msg_niov;
1612         struct iovec           *iov             = lntmsg->msg_iov;
1613         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1614         unsigned int            offset          = lntmsg->msg_offset;
1615         unsigned int            nob             = lntmsg->msg_len;
1616         struct kmx_ctx         *tx              = NULL;
1617
1618         LASSERT(lntmsg != NULL);
1619         LASSERT(peer != NULL);
1620         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1621         LASSERT((cookie>>52) == 0);
1622
1623         tx = mxlnd_get_idle_tx();
1624         if (tx == NULL) {
1625                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1626                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1627                         libcfs_nid2str(target.nid));
1628                 goto failed_0;
1629         }
1630         tx->mxc_nid = target.nid;
1631         /* NOTE called when we have a ref on the conn, get one for this tx */
1632         mxlnd_conn_addref(peer->mxp_conn);
1633         tx->mxc_peer = peer;
1634         tx->mxc_conn = peer->mxp_conn;
1635         tx->mxc_msg_type = msg_type;
1636         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1637         tx->mxc_state = MXLND_CTX_PENDING;
1638         tx->mxc_lntmsg[0] = lntmsg;
1639         tx->mxc_cookie = cookie;
1640         tx->mxc_match = mxlnd_create_match(tx, 0);
1641
1642         /* This setups up the mx_ksegment_t to send the DATA payload  */
1643         if (nob == 0) {
1644                 /* do not setup the segments */
1645                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1646                                    "to %s?\n", libcfs_nid2str(target.nid));
1647                 ret = 0;
1648         } else if (kiov == NULL) {
1649                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1650         } else {
1651                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1652         }
1653         if (ret != 0) {
1654                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1655                                    libcfs_nid2str(target.nid));
1656                 tx->mxc_status.code = -EIO;
1657                 goto failed_1;
1658         }
1659         mxlnd_queue_tx(tx);
1660         return;
1661
1662 failed_1:
1663         mxlnd_conn_decref(peer->mxp_conn);
1664         mxlnd_put_idle_tx(tx);
1665         return;
1666
1667 failed_0:
1668         CDEBUG(D_NETERROR, "no tx avail\n");
1669         lnet_finalize(ni, lntmsg, -EIO);
1670         return;
1671 }
1672
1673 /**
1674  * mxlnd_recv_data - map [k]iov, post rx
1675  * @ni
1676  * @lntmsg
1677  * @rx
1678  * @msg_type
1679  * @cookie
1680  *
1681  * This setups the DATA receive for PUT or GET.
1682  *
1683  * On success, it returns 0, on failure it returns -1
1684  */
1685 int
1686 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1687 {
1688         int                     ret             = 0;
1689         lnet_process_id_t       target          = lntmsg->msg_target;
1690         unsigned int            niov            = lntmsg->msg_niov;
1691         struct iovec           *iov             = lntmsg->msg_iov;
1692         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1693         unsigned int            offset          = lntmsg->msg_offset;
1694         unsigned int            nob             = lntmsg->msg_len;
1695         mx_return_t             mxret           = MX_SUCCESS;
1696
1697         /* above assumes MXLND_MSG_PUT_DATA */
1698         if (msg_type == MXLND_MSG_GET_DATA) {
1699                 niov = lntmsg->msg_md->md_niov;
1700                 iov = lntmsg->msg_md->md_iov.iov;
1701                 kiov = lntmsg->msg_md->md_iov.kiov;
1702                 offset = 0;
1703                 nob = lntmsg->msg_md->md_length;
1704         }
1705
1706         LASSERT(lntmsg != NULL);
1707         LASSERT(rx != NULL);
1708         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1709         LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1710
1711         rx->mxc_msg_type = msg_type;
1712         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1713         rx->mxc_state = MXLND_CTX_PENDING;
1714         rx->mxc_nid = target.nid;
1715         /* if posting a GET_DATA, we may not yet know the peer */
1716         if (rx->mxc_peer != NULL) {
1717                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1718         }
1719         rx->mxc_lntmsg[0] = lntmsg;
1720         rx->mxc_cookie = cookie;
1721         rx->mxc_match = mxlnd_create_match(rx, 0);
1722         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1723         if (kiov == NULL) {
1724                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1725         } else {
1726                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1727         }
1728         if (msg_type == MXLND_MSG_GET_DATA) {
1729                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1730                 if (rx->mxc_lntmsg[1] == NULL) {
1731                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1732                                            libcfs_nid2str(target.nid));
1733                         ret = -1;
1734                 }
1735         }
1736         if (ret != 0) {
1737                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1738                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1739                        libcfs_nid2str(target.nid));
1740                 return -1;
1741         }
1742         ret = mxlnd_q_pending_ctx(rx);
1743         if (ret == -1) {
1744                 return -1;
1745         }
1746         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1747         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1748                           rx->mxc_seg_list, rx->mxc_nseg,
1749                           rx->mxc_pin_type, rx->mxc_match,
1750                           0xF00FFFFFFFFFFFFFLL, (void *) rx,
1751                           &rx->mxc_mxreq);
1752         if (mxret != MX_SUCCESS) {
1753                 if (rx->mxc_conn != NULL) {
1754                         mxlnd_deq_pending_ctx(rx);
1755                 }
1756                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1757                                    (int) mxret, libcfs_nid2str(target.nid));
1758                 return -1;
1759         }
1760
1761         return 0;
1762 }
1763
1764 /**
1765  * mxlnd_send - the LND required send function
1766  * @ni
1767  * @private
1768  * @lntmsg
1769  *
1770  * This must not block. Since we may not have a peer struct for the receiver,
1771  * it will append send messages on a global tx list. We will then up the
1772  * tx_queued's semaphore to notify it of the new send. 
1773  */
1774 int
1775 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1776 {
1777         int                     ret             = 0;
1778         int                     type            = lntmsg->msg_type;
1779         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1780         lnet_process_id_t       target          = lntmsg->msg_target;
1781         lnet_nid_t              nid             = target.nid;
1782         int                     target_is_router = lntmsg->msg_target_is_router;
1783         int                     routing         = lntmsg->msg_routing;
1784         unsigned int            payload_niov    = lntmsg->msg_niov;
1785         struct iovec           *payload_iov     = lntmsg->msg_iov;
1786         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1787         unsigned int            payload_offset  = lntmsg->msg_offset;
1788         unsigned int            payload_nob     = lntmsg->msg_len;
1789         struct kmx_ctx         *tx              = NULL;
1790         struct kmx_msg         *txmsg           = NULL;
1791         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1792         struct kmx_ctx         *rx_data         = NULL;
1793         struct kmx_conn        *conn            = NULL;
1794         int                     nob             = 0;
1795         uint32_t                length          = 0;
1796         struct kmx_peer         *peer           = NULL;
1797
1798         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1799                        payload_nob, payload_niov, libcfs_id2str(target));
1800
1801         LASSERT (payload_nob == 0 || payload_niov > 0);
1802         LASSERT (payload_niov <= LNET_MAX_IOV);
1803         /* payload is either all vaddrs or all pages */
1804         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1805
1806         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1807
1808         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1809          * to a new peer, use the nid */
1810         peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1811         if (peer != NULL) {
1812                 if (unlikely(peer->mxp_incompatible)) {
1813                         mxlnd_peer_decref(peer); /* drop ref taken above */
1814                 } else {
1815                         spin_lock(&peer->mxp_lock);
1816                         conn = peer->mxp_conn;
1817                         if (conn) {
1818                                 mxlnd_conn_addref(conn);
1819                                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1820                         }
1821                         spin_unlock(&peer->mxp_lock);
1822                 }
1823         }
1824         if (conn == NULL && peer != NULL) {
1825                 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1826                        peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1827         }
1828
1829         switch (type) {
1830         case LNET_MSG_ACK:
1831                 LASSERT (payload_nob == 0);
1832                 break;
1833
1834         case LNET_MSG_REPLY:
1835         case LNET_MSG_PUT:
1836                 /* Is the payload small enough not to need DATA? */
1837                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1838                 if (nob <= MXLND_EAGER_SIZE)
1839                         break;                  /* send EAGER */
1840
1841                 tx = mxlnd_get_idle_tx();
1842                 if (unlikely(tx == NULL)) {
1843                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1844                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1845                                libcfs_nid2str(nid));
1846                         if (conn) mxlnd_conn_decref(conn);
1847                         return -ENOMEM;
1848                 }
1849
1850                 /* the peer may be NULL */
1851                 tx->mxc_peer = peer;
1852                 tx->mxc_conn = conn; /* may be NULL */
1853                 /* we added a conn ref above */
1854                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1855                 txmsg = tx->mxc_msg;
1856                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1857                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1858                 tx->mxc_match = mxlnd_create_match(tx, 0);
1859
1860                 /* we must post a receive _before_ sending the request.
1861                  * we need to determine how much to receive, it will be either
1862                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1863
1864                 rx = mxlnd_get_idle_rx();
1865                 if (unlikely(rx == NULL)) {
1866                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1867                                            libcfs_nid2str(nid));
1868                         mxlnd_put_idle_tx(tx);
1869                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1870                         return -ENOMEM;
1871                 }
1872                 rx->mxc_nid = nid;
1873                 rx->mxc_peer = peer;
1874                 /* conn may be NULL but unlikely since the first msg is always small */
1875                 /* NOTE no need to lock peer before adding conn ref since we took
1876                  * a conn ref for the tx (it cannot be freed between there and here ) */
1877                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1878                 rx->mxc_conn = conn;
1879                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1880                 rx->mxc_cookie = tx->mxc_cookie;
1881                 rx->mxc_match = mxlnd_create_match(rx, 0);
1882
1883                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1884                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1885                 if (unlikely(ret != 0)) {
1886                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1887                                            libcfs_nid2str(nid));
1888                         rx->mxc_lntmsg[0] = NULL;
1889                         mxlnd_put_idle_rx(rx);
1890                         mxlnd_put_idle_tx(tx);
1891                         if (conn) {
1892                                 mxlnd_conn_decref(conn); /* for the rx... */
1893                                 mxlnd_conn_decref(conn); /* and for the tx */
1894                         }
1895                         return -EHOSTUNREACH;
1896                 }
1897
1898                 mxlnd_queue_tx(tx);
1899                 return 0;
1900
1901         case LNET_MSG_GET:
1902                 if (routing || target_is_router)
1903                         break;                  /* send EAGER */
1904
1905                 /* is the REPLY message too small for DATA? */
1906                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1907                 if (nob <= MXLND_EAGER_SIZE)
1908                         break;                  /* send EAGER */
1909
1910                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1911                  * then post GET_REQ tx */
1912                 tx = mxlnd_get_idle_tx();
1913                 if (unlikely(tx == NULL)) {
1914                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1915                                            libcfs_nid2str(nid));
1916                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1917                         return -ENOMEM;
1918                 }
1919                 rx_data = mxlnd_get_idle_rx();
1920                 if (unlikely(rx_data == NULL)) {
1921                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1922                                            libcfs_nid2str(nid));
1923                         mxlnd_put_idle_tx(tx);
1924                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1925                         return -ENOMEM;
1926                 }
1927                 rx_data->mxc_peer = peer;
1928                 /* NOTE no need to lock peer before adding conn ref since we took
1929                  * a conn ref for the tx (it cannot be freed between there and here ) */
1930                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1931                 rx_data->mxc_conn = conn; /* may be NULL */
1932
1933                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1934                 if (unlikely(ret != 0)) {
1935                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1936                                            libcfs_nid2str(nid));
1937                         mxlnd_put_idle_rx(rx_data);
1938                         mxlnd_put_idle_tx(tx);
1939                         if (conn) {
1940                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1941                                 mxlnd_conn_decref(conn); /* and for the tx */
1942                         }
1943                         return -EIO;
1944                 }
1945
1946                 tx->mxc_peer = peer;
1947                 tx->mxc_conn = conn; /* may be NULL */
1948                 /* conn ref taken above */
1949                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1950                 txmsg = tx->mxc_msg;
1951                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1952                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1953                 tx->mxc_match = mxlnd_create_match(tx, 0);
1954
1955                 mxlnd_queue_tx(tx);
1956                 return 0;
1957
1958         default:
1959                 LBUG();
1960                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1961                 return -EIO;
1962         }
1963
1964         /* send EAGER */
1965
1966         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1967                 <= MXLND_EAGER_SIZE);
1968
1969         tx = mxlnd_get_idle_tx();
1970         if (unlikely(tx == NULL)) {
1971                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1972                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1973                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1974                 return -ENOMEM;
1975         }
1976
1977         tx->mxc_peer = peer;
1978         tx->mxc_conn = conn; /* may be NULL */
1979         /* conn ref taken above */
1980         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1981         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1982         tx->mxc_match = mxlnd_create_match(tx, 0);
1983
1984         txmsg = tx->mxc_msg;
1985         txmsg->mxm_u.eager.mxem_hdr = *hdr;
1986
1987         if (payload_kiov != NULL)
1988                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1989                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1990                             payload_niov, payload_kiov, payload_offset, payload_nob);
1991         else
1992                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1993                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1994                             payload_niov, payload_iov, payload_offset, payload_nob);
1995
1996         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1997         mxlnd_queue_tx(tx);
1998         return 0;
1999 }
2000
2001 /**
2002  * mxlnd_recv - the LND required recv function
2003  * @ni
2004  * @private
2005  * @lntmsg
2006  * @delayed
2007  * @niov
2008  * @kiov
2009  * @offset
2010  * @mlen
2011  * @rlen
2012  *
2013  * This must not block.
2014  */
2015 int
2016 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2017              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2018              unsigned int offset, unsigned int mlen, unsigned int rlen)
2019 {
2020         int                     ret             = 0;
2021         int                     nob             = 0;
2022         int                     len             = 0;
2023         struct kmx_ctx          *rx             = private;
2024         struct kmx_msg          *rxmsg          = rx->mxc_msg;
2025         lnet_nid_t               nid            = rx->mxc_nid;
2026         struct kmx_ctx          *tx             = NULL;
2027         struct kmx_msg          *txmsg          = NULL;
2028         struct kmx_peer         *peer           = rx->mxc_peer;
2029         struct kmx_conn         *conn           = peer->mxp_conn;
2030         u64                      cookie         = 0LL;
2031         int                      msg_type       = rxmsg->mxm_type;
2032         int                      repost         = 1;
2033         int                      credit         = 0;
2034         int                      finalize       = 0;
2035
2036         LASSERT (mlen <= rlen);
2037         /* Either all pages or all vaddrs */
2038         LASSERT (!(kiov != NULL && iov != NULL));
2039         LASSERT (peer != NULL);
2040
2041         /* conn_addref(conn) already taken for the primary rx */
2042
2043         switch (msg_type) {
2044         case MXLND_MSG_EAGER:
2045                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2046                 len = rx->mxc_status.xfer_length;
2047                 if (unlikely(nob > len)) {
2048                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2049                                            libcfs_nid2str(nid), nob, len);
2050                         ret = -EPROTO;
2051                         break;
2052                 }
2053
2054                 if (kiov != NULL)
2055                         lnet_copy_flat2kiov(niov, kiov, offset,
2056                                 MXLND_EAGER_SIZE, rxmsg,
2057                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2058                                 mlen);
2059                 else
2060                         lnet_copy_flat2iov(niov, iov, offset,
2061                                 MXLND_EAGER_SIZE, rxmsg,
2062                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2063                                 mlen);
2064                 finalize = 1;
2065                 credit = 1;
2066                 break;
2067
2068         case MXLND_MSG_PUT_REQ:
2069                 /* we are going to reuse the rx, store the needed info */
2070                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2071
2072                 /* get tx, post rx, send PUT_ACK */
2073
2074                 tx = mxlnd_get_idle_tx();
2075                 if (unlikely(tx == NULL)) {
2076                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2077                         /* Not replying will break the connection */
2078                         ret = -ENOMEM;
2079                         break;
2080                 }
2081                 if (unlikely(mlen == 0)) {
2082                         finalize = 1;
2083                         tx->mxc_peer = peer;
2084                         tx->mxc_conn = conn;
2085                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2086                         /* repost = 1 */
2087                         break;
2088                 }
2089
2090                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2091                 tx->mxc_peer = peer;
2092                 tx->mxc_conn = conn;
2093                 /* no need to lock peer first since we already have a ref */
2094                 mxlnd_conn_addref(conn); /* for the tx */
2095                 txmsg = tx->mxc_msg;
2096                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2097                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2098                 tx->mxc_cookie = cookie;
2099                 tx->mxc_match = mxlnd_create_match(tx, 0);
2100
2101                 /* we must post a receive _before_ sending the PUT_ACK */
2102                 mxlnd_ctx_init(rx);
2103                 rx->mxc_state = MXLND_CTX_PREP;
2104                 rx->mxc_peer = peer;
2105                 rx->mxc_conn = conn;
2106                 /* do not take another ref for this rx, it is already taken */
2107                 rx->mxc_nid = peer->mxp_nid;
2108                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2109                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2110
2111                 if (unlikely(ret != 0)) {
2112                         /* Notify peer that it's over */
2113                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2114                                            libcfs_nid2str(nid), ret);
2115                         mxlnd_ctx_init(tx);
2116                         tx->mxc_state = MXLND_CTX_PREP;
2117                         tx->mxc_peer = peer;
2118                         tx->mxc_conn = conn;
2119                         /* finalize = 0, let the PUT_ACK tx finalize this */
2120                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2121                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2122                         /* conn ref already taken above */
2123                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2124                         /* repost = 1 */
2125                         break;
2126                 }
2127
2128                 mxlnd_queue_tx(tx);
2129                 /* do not return a credit until after PUT_DATA returns */
2130                 repost = 0;
2131                 break;
2132
2133         case MXLND_MSG_GET_REQ:
2134                 if (likely(lntmsg != NULL)) {
2135                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2136                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2137                 } else {
2138                         /* GET didn't match anything */
2139                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2140                          * We have to embed the error code in the match bits.
2141                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2142                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2143
2144                         tx = mxlnd_get_idle_tx();
2145                         if (unlikely(tx == NULL)) {
2146                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2147                                                    libcfs_nid2str(nid));
2148                                 ret = -ENOMEM;
2149                                 break;
2150                         }
2151                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2152                         tx->mxc_state = MXLND_CTX_PENDING;
2153                         tx->mxc_nid = nid;
2154                         tx->mxc_peer = peer;
2155                         tx->mxc_conn = conn;
2156                         /* no need to lock peer first since we already have a ref */
2157                         mxlnd_conn_addref(conn); /* for this tx */
2158                         tx->mxc_cookie = cookie;
2159                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2160                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2161                         mxlnd_queue_tx(tx);
2162                 }
2163                 /* finalize lntmsg after tx completes */
2164                 break;
2165
2166         default:
2167                 LBUG();
2168         }
2169
2170         if (repost) {
2171                 /* we received a message, increment peer's outstanding credits */
2172                 if (credit == 1) {
2173                         spin_lock(&conn->mxk_lock);
2174                         conn->mxk_outstanding++;
2175                         spin_unlock(&conn->mxk_lock);
2176                 }
2177                 /* we are done with the rx */
2178                 mxlnd_put_idle_rx(rx);
2179                 mxlnd_conn_decref(conn);
2180         }
2181
2182         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2183
2184         /* we received a credit, see if we can use it to send a msg */
2185         if (credit) mxlnd_check_sends(peer);
2186
2187         return ret;
2188 }
2189
2190 void
2191 mxlnd_sleep(unsigned long timeout)
2192 {
2193         set_current_state(TASK_INTERRUPTIBLE);
2194         schedule_timeout(timeout);
2195         return;
2196 }
2197
2198 /**
2199  * mxlnd_tx_queued - the generic send queue thread
2200  * @arg - thread id (as a void *)
2201  *
2202  * This thread moves send messages from the global tx_queue to the owning
2203  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2204  * it to the global peer list.
2205  */
2206 int
2207 mxlnd_tx_queued(void *arg)
2208 {
2209         long                    id      = (long) arg;
2210         int                     ret     = 0;
2211         int                     found   = 0;
2212         struct kmx_ctx         *tx      = NULL;
2213         struct kmx_peer        *peer    = NULL;
2214         struct list_head       *tmp_tx  = NULL;
2215
2216         cfs_daemonize("mxlnd_tx_queued");
2217         //cfs_block_allsigs();
2218
2219         while (!kmxlnd_data.kmx_shutdown) {
2220                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2221                 if (kmxlnd_data.kmx_shutdown)
2222                         break;
2223                 if (ret != 0) // Should we check for -EINTR?
2224                         continue;
2225                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2226                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2227                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2228                         continue;
2229                 }
2230                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2231                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2232                 list_del_init(&tx->mxc_list);
2233                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2234
2235                 found = 0;
2236                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2237                 if (peer != NULL) {
2238                         tx->mxc_peer = peer;
2239                         spin_lock(&peer->mxp_lock);
2240                         if (peer->mxp_conn == NULL) {
2241                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2242                                 if (ret != 0) {
2243                                         /* out of memory, give up and fail tx */
2244                                         tx->mxc_status.code = -ENOMEM;
2245                                         spin_unlock(&peer->mxp_lock);
2246                                         mxlnd_peer_decref(peer);
2247                                         mxlnd_put_idle_tx(tx);
2248                                         continue;
2249                                 }
2250                         }
2251                         tx->mxc_conn = peer->mxp_conn;
2252                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2253                         spin_unlock(&peer->mxp_lock);
2254                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2255                         mxlnd_queue_tx(tx);
2256                         found = 1;
2257                 }
2258                 if (found == 0) {
2259                         int              hash   = 0;
2260                         struct kmx_peer *peer = NULL;
2261                         struct kmx_peer *old = NULL;
2262
2263                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2264
2265                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2266                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2267                         /* create peer */
2268                         /* adds conn ref for this function */
2269                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2270                         if (ret != 0) {
2271                                 /* finalize message */
2272                                 tx->mxc_status.code = ret;
2273                                 mxlnd_put_idle_tx(tx);
2274                                 continue;
2275                         }
2276                         tx->mxc_peer = peer;
2277                         tx->mxc_conn = peer->mxp_conn;
2278                         /* this tx will keep the conn ref taken in peer_alloc() */
2279
2280                         /* add peer to global peer list, but look to see
2281                          * if someone already created it after we released
2282                          * the read lock */
2283                         write_lock(&kmxlnd_data.kmx_peers_lock);
2284                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2285                                 if (old->mxp_nid == peer->mxp_nid) {
2286                                         /* somebody beat us here, we created a duplicate */
2287                                         found = 1;
2288                                         break;
2289                                 }
2290                         }
2291
2292                         if (found == 0) {
2293                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2294                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2295                         } else {
2296                                 tx->mxc_peer = old;
2297                                 spin_lock(&old->mxp_lock);
2298                                 tx->mxc_conn = old->mxp_conn;
2299                                 /* FIXME can conn be NULL? */
2300                                 LASSERT(old->mxp_conn != NULL);
2301                                 mxlnd_conn_addref(old->mxp_conn);
2302                                 spin_unlock(&old->mxp_lock);
2303                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2304                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2305                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2306                                 mxlnd_peer_decref(peer);
2307                         }
2308                         write_unlock(&kmxlnd_data.kmx_peers_lock);
2309
2310                         mxlnd_queue_tx(tx);
2311                 }
2312         }
2313         mxlnd_thread_stop(id);
2314         return 0;
2315 }
2316
2317 /* When calling this, we must not have the peer lock. */
2318 void
2319 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2320 {
2321         mx_return_t             mxret   = MX_SUCCESS;
2322         mx_request_t            request;
2323         struct kmx_conn         *conn   = peer->mxp_conn;
2324
2325         /* NOTE we are holding a conn ref every time we call this function,
2326          * we do not need to lock the peer before taking another ref */
2327         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2328
2329         LASSERT(mask == MXLND_MASK_ICON_REQ ||
2330                 mask == MXLND_MASK_ICON_ACK);
2331
2332         if (peer->mxp_reconnect_time == 0) {
2333                 peer->mxp_reconnect_time = jiffies;
2334         }
2335
2336         if (peer->mxp_nic_id == 0LL) {
2337                 mxlnd_peer_hostname_to_nic_id(peer);
2338                 if (peer->mxp_nic_id == 0LL) {
2339                         /* not mapped yet, return */
2340                         spin_lock(&conn->mxk_lock);
2341                         conn->mxk_status = MXLND_CONN_INIT;
2342                         spin_unlock(&conn->mxk_lock);
2343                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2344                                 /* give up and notify LNET */
2345                                 mxlnd_conn_disconnect(conn, 0, 1);
2346                                 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2347                                                                             function... */
2348                                 mxlnd_conn_decref(peer->mxp_conn); /* which we no 
2349                                                                       longer need */
2350                         }
2351                         mxlnd_conn_decref(conn);
2352                         return;
2353                 }
2354         }
2355
2356         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2357                             peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2358                             (void *) peer, &request);
2359         if (unlikely(mxret != MX_SUCCESS)) {
2360                 spin_lock(&conn->mxk_lock);
2361                 conn->mxk_status = MXLND_CONN_FAIL;
2362                 spin_unlock(&conn->mxk_lock);
2363                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2364                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2365                 mxlnd_conn_decref(conn);
2366         }
2367         return;
2368 }
2369
2370 #define MXLND_STATS 0
2371
2372 int
2373 mxlnd_check_sends(struct kmx_peer *peer)
2374 {
2375         int                     ret             = 0;
2376         int                     found           = 0;
2377         mx_return_t             mxret           = MX_SUCCESS;
2378         struct kmx_ctx          *tx             = NULL;
2379         struct kmx_conn         *conn           = NULL;
2380         u8                      msg_type        = 0;
2381         int                     credit          = 0;
2382         int                     status          = 0;
2383         int                     ntx_posted      = 0;
2384         int                     credits         = 0;
2385 #if MXLND_STATS
2386         static unsigned long    last            = 0;
2387 #endif
2388
2389         if (unlikely(peer == NULL)) {
2390                 LASSERT(peer != NULL);
2391                 return -1;
2392         }
2393         spin_lock(&peer->mxp_lock);
2394         conn = peer->mxp_conn;
2395         /* NOTE take a ref for the duration of this function since it is called
2396          * when there might not be any queued txs for this peer */
2397         if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2398         spin_unlock(&peer->mxp_lock);
2399
2400         /* do not add another ref for this tx */
2401
2402         if (conn == NULL) {
2403                 /* we do not have any conns */
2404                 return -1;
2405         }
2406
2407 #if MXLND_STATS
2408         if (time_after(jiffies, last)) {
2409                 last = jiffies + HZ;
2410                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2411                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2412                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2413                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2414                               conn->mxk_ntx_data, conn->mxk_data_posted);
2415         }
2416 #endif
2417
2418         /* cache peer state for asserts */
2419         spin_lock(&conn->mxk_lock);
2420         ntx_posted = conn->mxk_ntx_posted;
2421         credits = conn->mxk_credits;
2422         spin_unlock(&conn->mxk_lock);
2423
2424         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2425         LASSERT(ntx_posted >= 0);
2426
2427         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2428         LASSERT(credits >= 0);
2429
2430         /* check number of queued msgs, ignore data */
2431         spin_lock(&conn->mxk_lock);
2432         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2433                 /* check if any txs queued that could return credits... */
2434                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2435                         /* if not, send a NOOP */
2436                         tx = mxlnd_get_idle_tx();
2437                         if (likely(tx != NULL)) {
2438                                 tx->mxc_peer = peer;
2439                                 tx->mxc_conn = peer->mxp_conn;
2440                                 mxlnd_conn_addref(conn); /* for this tx */
2441                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2442                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2443                                 mxlnd_peer_queue_tx_locked(tx);
2444                                 found = 1;
2445                                 goto done_locked;
2446                         }
2447                 }
2448         }
2449         spin_unlock(&conn->mxk_lock);
2450
2451         /* if the peer is not ready, try to connect */
2452         spin_lock(&conn->mxk_lock);
2453         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2454             conn->mxk_status == MXLND_CONN_FAIL ||
2455             conn->mxk_status == MXLND_CONN_REQ)) {
2456                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2457                 conn->mxk_status = MXLND_CONN_WAIT;
2458                 spin_unlock(&conn->mxk_lock);
2459                 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2460                 goto done;
2461         }
2462         spin_unlock(&conn->mxk_lock);
2463
2464         spin_lock(&conn->mxk_lock);
2465         while (!list_empty(&conn->mxk_tx_free_queue) ||
2466                !list_empty(&conn->mxk_tx_credit_queue)) {
2467                 /* We have something to send. If we have a queued tx that does not
2468                  * require a credit (free), choose it since its completion will 
2469                  * return a credit (here or at the peer), complete a DATA or 
2470                  * CONN_REQ or CONN_ACK. */
2471                 struct list_head *tmp_tx = NULL;
2472                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2473                         tmp_tx = &conn->mxk_tx_free_queue;
2474                 } else {
2475                         tmp_tx = &conn->mxk_tx_credit_queue;
2476                 }
2477                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2478
2479                 msg_type = tx->mxc_msg_type;
2480
2481                 /* don't try to send a rx */
2482                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2483
2484                 /* ensure that it is a valid msg type */
2485                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2486                         msg_type == MXLND_MSG_CONN_ACK ||
2487                         msg_type == MXLND_MSG_NOOP     ||
2488                         msg_type == MXLND_MSG_EAGER    ||
2489                         msg_type == MXLND_MSG_PUT_REQ  ||
2490                         msg_type == MXLND_MSG_PUT_ACK  ||
2491                         msg_type == MXLND_MSG_PUT_DATA ||
2492                         msg_type == MXLND_MSG_GET_REQ  ||
2493                         msg_type == MXLND_MSG_GET_DATA);
2494                 LASSERT(tx->mxc_peer == peer);
2495                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2496
2497                 credit = mxlnd_tx_requires_credit(tx);
2498                 if (credit) {
2499
2500                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2501                                 CDEBUG(D_NET, "%s: posted enough\n",
2502                                               libcfs_nid2str(peer->mxp_nid));
2503                                 goto done_locked;
2504                         }
2505
2506                         if (conn->mxk_credits == 0) {
2507                                 CDEBUG(D_NET, "%s: no credits\n",
2508                                               libcfs_nid2str(peer->mxp_nid));
2509                                 goto done_locked;
2510                         }
2511
2512                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2513                             conn->mxk_outstanding == 0) {  /* giving back credits */
2514                                 CDEBUG(D_NET, "%s: not using last credit\n",
2515                                               libcfs_nid2str(peer->mxp_nid));
2516                                 goto done_locked;
2517                         }
2518                 }
2519
2520                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2521                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2522                                 msg_type == MXLND_MSG_CONN_ACK)) {
2523                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2524                                              mxlnd_connstatus_to_str(conn->mxk_status),
2525                                              tx->mxc_cookie,
2526                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2527                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2528                                         list_del_init(&tx->mxc_list);
2529                                         tx->mxc_status.code = -ECONNABORTED;
2530                                         mxlnd_put_idle_tx(tx);
2531                                         mxlnd_conn_decref(conn);
2532                                 }
2533                                 goto done_locked;
2534                         }
2535                 }
2536
2537                 list_del_init(&tx->mxc_list);
2538
2539                 /* handle credits, etc now while we have the lock to avoid races */
2540                 if (credit) {
2541                         conn->mxk_credits--;
2542                         conn->mxk_ntx_posted++;
2543                 }
2544                 if (msg_type != MXLND_MSG_PUT_DATA &&
2545                     msg_type != MXLND_MSG_GET_DATA) {
2546                         if (msg_type != MXLND_MSG_CONN_REQ &&
2547                             msg_type != MXLND_MSG_CONN_ACK) {
2548                                 conn->mxk_ntx_msgs--;
2549                         }
2550                 }
2551                 if (tx->mxc_incarnation == 0 &&
2552                     conn->mxk_incarnation != 0) {
2553                         tx->mxc_incarnation = conn->mxk_incarnation;
2554                 }
2555                 spin_unlock(&conn->mxk_lock);
2556
2557                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2558                  * or (2) there is a non-DATA msg that can return credits in the 
2559                  * queue, then drop this duplicate NOOP */
2560                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2561                         spin_lock(&conn->mxk_lock);
2562                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2563                             (conn->mxk_ntx_msgs >= 1)) {
2564                                 conn->mxk_credits++;
2565                                 conn->mxk_ntx_posted--;
2566                                 spin_unlock(&conn->mxk_lock);
2567                                 /* redundant NOOP */
2568                                 mxlnd_put_idle_tx(tx);
2569                                 mxlnd_conn_decref(conn);
2570                                 CDEBUG(D_NET, "%s: redundant noop\n",
2571                                               libcfs_nid2str(peer->mxp_nid));
2572                                 found = 1;
2573                                 goto done;
2574                         }
2575                         spin_unlock(&conn->mxk_lock);
2576                 }
2577
2578                 found = 1;
2579                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2580                     (msg_type != MXLND_MSG_GET_DATA))) {
2581                         mxlnd_pack_msg(tx);
2582                 }
2583
2584                 //ret = -ECONNABORTED;
2585                 mxret = MX_SUCCESS;
2586
2587                 spin_lock(&conn->mxk_lock);
2588                 status = conn->mxk_status;
2589                 spin_unlock(&conn->mxk_lock);
2590
2591                 if (likely((status == MXLND_CONN_READY) ||
2592                     (msg_type == MXLND_MSG_CONN_REQ) ||
2593                     (msg_type == MXLND_MSG_CONN_ACK))) {
2594                         ret = 0;
2595                         if (msg_type != MXLND_MSG_CONN_REQ &&
2596                             msg_type != MXLND_MSG_CONN_ACK) {
2597                                 /* add to the pending list */
2598                                 ret = mxlnd_q_pending_ctx(tx);
2599                                 if (ret == -1) {
2600                                         /* FIXME the conn is disconnected, now what? */
2601                                 }
2602                         } else {
2603                                 /* CONN_REQ/ACK */
2604                                 tx->mxc_state = MXLND_CTX_PENDING;
2605                         }
2606
2607                         if (ret == 0) {
2608                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2609                                     msg_type != MXLND_MSG_GET_DATA)) {
2610                                         /* send a msg style tx */
2611                                         LASSERT(tx->mxc_nseg == 1);
2612                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2613                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2614                                                mxlnd_msgtype_to_str(msg_type),
2615                                                tx->mxc_cookie);
2616                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2617                                                           &tx->mxc_seg,
2618                                                           tx->mxc_nseg,
2619                                                           tx->mxc_pin_type,
2620                                                           conn->mxk_epa,
2621                                                           tx->mxc_match,
2622                                                           (void *) tx,
2623                                                           &tx->mxc_mxreq);
2624                                 } else {
2625                                         /* send a DATA tx */
2626                                         spin_lock(&conn->mxk_lock);
2627                                         conn->mxk_ntx_data--;
2628                                         conn->mxk_data_posted++;
2629                                         spin_unlock(&conn->mxk_lock);
2630                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2631                                                mxlnd_msgtype_to_str(msg_type),
2632                                                tx->mxc_cookie);
2633                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2634                                                           tx->mxc_seg_list,
2635                                                           tx->mxc_nseg,
2636                                                           tx->mxc_pin_type,
2637                                                           conn->mxk_epa,
2638                                                           tx->mxc_match,
2639                                                           (void *) tx,
2640                                                           &tx->mxc_mxreq);
2641                                 }
2642                         } else {
2643                                 mxret = MX_CONNECTION_FAILED;
2644                         }
2645                         if (likely(mxret == MX_SUCCESS)) {
2646                                 ret = 0;
2647                         } else {
2648                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2649                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2650                                        libcfs_nid2str(peer->mxp_nid));
2651                                 /* NOTE mx_kisend() only fails if there are not enough 
2652                                 * resources. Do not change the connection status. */
2653                                 if (mxret == MX_NO_RESOURCES) {
2654                                         tx->mxc_status.code = -ENOMEM;
2655                                 } else {
2656                                         tx->mxc_status.code = -ECONNABORTED;
2657                                 }
2658                                 if (credit) {
2659                                         spin_lock(&conn->mxk_lock);
2660                                         conn->mxk_ntx_posted--;
2661                                         conn->mxk_credits++;
2662                                         spin_unlock(&conn->mxk_lock);
2663                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2664                                         msg_type == MXLND_MSG_GET_DATA) {
2665                                         spin_lock(&conn->mxk_lock);
2666                                         conn->mxk_data_posted--;
2667                                         spin_unlock(&conn->mxk_lock);
2668                                 }
2669                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2670                                     msg_type != MXLND_MSG_GET_DATA &&
2671                                     msg_type != MXLND_MSG_CONN_REQ &&
2672                                     msg_type != MXLND_MSG_CONN_ACK) {
2673                                         spin_lock(&conn->mxk_lock);
2674                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2675                                         spin_unlock(&conn->mxk_lock);
2676                                 }
2677                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2678                                     msg_type != MXLND_MSG_CONN_ACK) {
2679                                         /* remove from the pending list */
2680                                         mxlnd_deq_pending_ctx(tx);
2681                                 }
2682                                 mxlnd_put_idle_tx(tx);
2683                                 mxlnd_conn_decref(conn);
2684                         }
2685                 }
2686                 spin_lock(&conn->mxk_lock);
2687         }
2688 done_locked:
2689         spin_unlock(&conn->mxk_lock);
2690 done:
2691         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2692         return found;
2693 }
2694
2695
2696 /**
2697  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2698  * @ctx - the tx descriptor
2699  *
2700  * Determine which type of send request it was and start the next step, if needed,
2701  * or, if done, signal completion to LNET. After we are done, put back on the
2702  * idle tx list.
2703  */
2704 void
2705 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2706 {
2707         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2708         struct kmx_msg  *msg    = tx->mxc_msg;
2709         struct kmx_peer *peer   = tx->mxc_peer;
2710         struct kmx_conn *conn   = tx->mxc_conn;
2711         u8              type    = tx->mxc_msg_type;
2712         int             credit  = mxlnd_tx_requires_credit(tx);
2713         u64             cookie  = tx->mxc_cookie;
2714
2715         CDEBUG(D_NET, "entering %s (0x%llx):\n",
2716                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2717
2718         if (unlikely(conn == NULL)) {
2719                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2720                 conn = peer->mxp_conn;
2721                 if (conn != NULL) {
2722                         /* do not add a ref for the tx, it was set before sending */
2723                         tx->mxc_conn = conn;
2724                         tx->mxc_peer = conn->mxk_peer;
2725                 }
2726         }
2727         LASSERT (peer != NULL);
2728         LASSERT (conn != NULL);
2729
2730         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2731                 LASSERT (type == msg->mxm_type);
2732         }
2733
2734         if (failed) {
2735                 tx->mxc_status.code = -EIO;
2736         } else {
2737                 spin_lock(&conn->mxk_lock);
2738                 conn->mxk_last_tx = jiffies;
2739                 spin_unlock(&conn->mxk_lock);
2740         }
2741
2742         switch (type) {
2743
2744         case MXLND_MSG_GET_DATA:
2745                 spin_lock(&conn->mxk_lock);
2746                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2747                         conn->mxk_outstanding++;
2748                         conn->mxk_data_posted--;
2749                 }
2750                 spin_unlock(&conn->mxk_lock);
2751                 break;
2752
2753         case MXLND_MSG_PUT_DATA:
2754                 spin_lock(&conn->mxk_lock);
2755                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2756                         conn->mxk_data_posted--;
2757                 }
2758                 spin_unlock(&conn->mxk_lock);
2759                 break;
2760
2761         case MXLND_MSG_NOOP:
2762         case MXLND_MSG_PUT_REQ:
2763         case MXLND_MSG_PUT_ACK:
2764         case MXLND_MSG_GET_REQ:
2765         case MXLND_MSG_EAGER:
2766         //case MXLND_MSG_NAK:
2767                 break;
2768
2769         case MXLND_MSG_CONN_ACK:
2770                 if (peer->mxp_incompatible) {
2771                         /* we sent our params, now close this conn */
2772                         mxlnd_conn_disconnect(conn, 0, 1);
2773                 }
2774         case MXLND_MSG_CONN_REQ:
2775                 if (failed) {
2776                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2777                                "failed with %s (%d) to %s\n",
2778                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2779                                mx_strstatus(tx->mxc_status.code),
2780                                tx->mxc_status.code,
2781                                libcfs_nid2str(tx->mxc_nid));
2782                         if (!peer->mxp_incompatible) {
2783                                 spin_lock(&conn->mxk_lock);
2784                                 conn->mxk_status = MXLND_CONN_FAIL;
2785                                 spin_unlock(&conn->mxk_lock);
2786                         }
2787                 }
2788                 break;
2789
2790         default:
2791                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2792                 LBUG();
2793         }
2794
2795         if (credit) {
2796                 spin_lock(&conn->mxk_lock);
2797                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2798                         conn->mxk_ntx_posted--;
2799                 }
2800                 spin_unlock(&conn->mxk_lock);
2801         }
2802
2803         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2804         mxlnd_put_idle_tx(tx);
2805         mxlnd_conn_decref(conn);
2806
2807         mxlnd_check_sends(peer);
2808
2809         return;
2810 }
2811
2812 void
2813 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2814 {
2815         int                     ret             = 0;
2816         int                     repost          = 1;
2817         int                     credit          = 1;
2818         u32                     nob             = rx->mxc_status.xfer_length;
2819         u64                     bits            = rx->mxc_status.match_info;
2820         struct kmx_msg         *msg             = rx->mxc_msg;
2821         struct kmx_peer        *peer            = rx->mxc_peer;
2822         struct kmx_conn        *conn            = rx->mxc_conn;
2823         u8                      type            = rx->mxc_msg_type;
2824         u64                     seq             = 0LL;
2825         lnet_msg_t             *lntmsg[2];
2826         int                     result          = 0;
2827         u64                     nic_id          = 0LL;
2828         u32                     ep_id           = 0;
2829         int                     peer_ref        = 0;
2830         int                     conn_ref        = 0;
2831         int                     incompatible    = 0;
2832
2833         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2834          * failed GET reply, CONN_REQ, or a CONN_ACK */
2835
2836         /* NOTE peer may still be NULL if it is a new peer and
2837          *      conn may be NULL if this is a re-connect */
2838         if (likely(peer != NULL && conn != NULL)) {
2839                 /* we have a reference on the conn */
2840                 conn_ref = 1;
2841         } else if (peer != NULL && conn == NULL) {
2842                 /* we have a reference on the peer */
2843                 peer_ref = 1;
2844         } else if (peer == NULL && conn != NULL) {
2845                 /* fatal error */
2846                 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2847                 LBUG();
2848         } /* else peer and conn == NULL */
2849
2850 #if 0
2851         if (peer == NULL || conn == NULL) {
2852                 /* if the peer was disconnected, the peer may exist but
2853                  * not have any valid conns */
2854                 decref = 0; /* no peer means no ref was taken for this rx */
2855         }
2856 #endif
2857
2858         if (conn == NULL && peer != NULL) {
2859                 spin_lock(&peer->mxp_lock);
2860                 conn = peer->mxp_conn;
2861                 if (conn) {
2862                         mxlnd_conn_addref(conn); /* conn takes ref... */
2863                         mxlnd_peer_decref(peer); /* from peer */
2864                         conn_ref = 1;
2865                         peer_ref = 0;
2866                 }
2867                 spin_unlock(&peer->mxp_lock);
2868                 rx->mxc_conn = conn;
2869         }
2870
2871 #if MXLND_DEBUG
2872         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2873 #endif
2874
2875         lntmsg[0] = NULL;
2876         lntmsg[1] = NULL;
2877
2878         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2879                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2880                                    libcfs_nid2str(rx->mxc_nid),
2881                                    mx_strstatus(rx->mxc_status.code),
2882                                    (int) rx->mxc_status.code);
2883                 credit = 0;
2884                 goto cleanup;
2885         }
2886
2887         if (nob == 0) {
2888                 /* this may be a failed GET reply */
2889                 if (type == MXLND_MSG_GET_DATA) {
2890                         bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2891                         ret = (u32) (bits>>52);
2892                         lntmsg[0] = rx->mxc_lntmsg[0];
2893                         result = -ret;
2894                         goto cleanup;
2895                 } else {
2896                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2897                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2898                                            libcfs_nid2str(rx->mxc_nid));
2899                         goto cleanup;
2900                 }
2901         }
2902
2903         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2904         if (type == MXLND_MSG_PUT_DATA) {
2905                 result = rx->mxc_status.code;
2906                 lntmsg[0] = rx->mxc_lntmsg[0];
2907                 goto cleanup;
2908         } else if (type == MXLND_MSG_GET_DATA) {
2909                 result = rx->mxc_status.code;
2910                 lntmsg[0] = rx->mxc_lntmsg[0];
2911                 lntmsg[1] = rx->mxc_lntmsg[1];
2912                 goto cleanup;
2913         }
2914
2915         ret = mxlnd_unpack_msg(msg, nob);
2916         if (ret != 0) {
2917                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2918                                    ret, libcfs_nid2str(rx->mxc_nid));
2919                 goto cleanup;
2920         }
2921         rx->mxc_nob = nob;
2922         type = msg->mxm_type;
2923         seq = msg->mxm_seq;
2924
2925         if (type != MXLND_MSG_CONN_REQ &&
2926             (rx->mxc_nid != msg->mxm_srcnid ||
2927              kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid)) {
2928                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2929                        "0x%llx and rx msg dst is 0x%llx)\n",
2930                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2931                        msg->mxm_dstnid);
2932                 goto cleanup;
2933         }
2934
2935         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2936                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2937                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2938                         if (conn != NULL) {
2939                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2940                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2941                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2942                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2943                                        msg->mxm_srcstamp, conn->mxk_incarnation,
2944                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2945                         } else {
2946                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2947                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2948                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2949                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2950                         }
2951                         credit = 0;
2952                         goto cleanup;
2953                 }
2954         }
2955
2956         CDEBUG(D_NET, "Received %s with %d credits\n",
2957                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
2958
2959         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2960             msg->mxm_type != MXLND_MSG_CONN_ACK) {
2961                 LASSERT(peer != NULL);
2962                 LASSERT(conn != NULL);
2963                 if (msg->mxm_credits != 0) {
2964                         spin_lock(&conn->mxk_lock);
2965                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2966                                 if ((conn->mxk_credits + msg->mxm_credits) > 
2967                                      *kmxlnd_tunables.kmx_credits) {
2968                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
2969                                                conn->mxk_credits, msg->mxm_credits);
2970                                 }
2971                                 conn->mxk_credits += msg->mxm_credits;
2972                                 LASSERT(conn->mxk_credits >= 0);
2973                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2974                         }
2975                         spin_unlock(&conn->mxk_lock);
2976                 }
2977         }
2978
2979         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2980         switch (type) {
2981         case MXLND_MSG_NOOP:
2982                 break;
2983
2984         case MXLND_MSG_EAGER:
2985                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2986                                         msg->mxm_srcnid, rx, 0);
2987                 repost = ret < 0;
2988                 break;
2989
2990         case MXLND_MSG_PUT_REQ:
2991                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2992                                         msg->mxm_srcnid, rx, 1);
2993                 repost = ret < 0;
2994                 break;
2995
2996         case MXLND_MSG_PUT_ACK: {
2997                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2998                 if (cookie > MXLND_MAX_COOKIE) {
2999                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3000                                            libcfs_nid2str(rx->mxc_nid));
3001                         result = -((cookie >> 52) & 0xff);
3002                         lntmsg[0] = rx->mxc_lntmsg[0];
3003                 } else {
3004                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3005                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3006                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3007                 }
3008                 /* repost == 1 */
3009                 break;
3010         }
3011         case MXLND_MSG_GET_REQ:
3012                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3013                                         msg->mxm_srcnid, rx, 1);
3014                 repost = ret < 0;
3015                 break;
3016
3017         case MXLND_MSG_CONN_REQ:
3018                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3019                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3020                                         libcfs_nid2str(msg->mxm_srcnid),
3021                                         libcfs_nid2str(msg->mxm_dstnid));
3022                         goto cleanup;
3023                 }
3024                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3025                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3026                                     "%d (%d wanted)\n",
3027                                         libcfs_nid2str(msg->mxm_srcnid),
3028                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3029                                         *kmxlnd_tunables.kmx_credits);
3030                         incompatible = 1;
3031                 }
3032                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3033                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3034                                     "%d (%d wanted)\n",
3035                                         libcfs_nid2str(msg->mxm_srcnid),
3036                                         msg->mxm_u.conn_req.mxcrm_eager_size,
3037                                         (int) MXLND_EAGER_SIZE);
3038                         incompatible = 1;
3039                 }
3040                 if (peer == NULL) {
3041                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3042                         if (peer == NULL) {
3043                                 int             hash    = 0;
3044                                 struct kmx_peer *existing_peer    = NULL;
3045                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3046
3047                                 mx_decompose_endpoint_addr(rx->mxc_status.source,
3048                                                            &nic_id, &ep_id);
3049                                 rx->mxc_nid = msg->mxm_srcnid;
3050
3051                                 /* adds conn ref for peer and one for this function */
3052                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
3053                                 if (ret != 0) {
3054                                         goto cleanup;
3055                                 }
3056                                 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
3057                                 write_lock(&kmxlnd_data.kmx_peers_lock);
3058                                 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3059                                 if (existing_peer) {
3060                                         mxlnd_conn_decref(peer->mxp_conn);
3061                                         mxlnd_peer_decref(peer);
3062                                         peer = existing_peer;
3063                                         mxlnd_conn_addref(peer->mxp_conn);
3064                                 } else {
3065                                         list_add_tail(&peer->mxp_peers,
3066                                                       &kmxlnd_data.kmx_peers[hash]);
3067                                         write_unlock(&kmxlnd_data.kmx_peers_lock);
3068                                         atomic_inc(&kmxlnd_data.kmx_npeers);
3069                                 }
3070                         } else {
3071                                 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3072                                 mxlnd_peer_decref(peer); /* drop ref taken above */
3073                                 if (ret != 0) {
3074                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3075                                         goto cleanup;
3076                                 }
3077                         }
3078                         conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3079                         conn = peer->mxp_conn;
3080                 } else {
3081                         struct kmx_conn *old_conn       = conn;
3082
3083                         /* do not call mx_disconnect() */
3084                         mxlnd_conn_disconnect(old_conn, 0, 0);
3085
3086                         /* the ref for this rx was taken on the old_conn */
3087                         mxlnd_conn_decref(old_conn);
3088
3089                         /* This allocs a conn, points peer->mxp_conn to this one.
3090                          * The old conn is still on the peer->mxp_conns list.
3091                          * As the pending requests complete, they will call
3092                          * conn_decref() which will eventually free it. */
3093                         ret = mxlnd_conn_alloc(&conn, peer);
3094                         if (ret != 0) {
3095                                 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3096                                 goto cleanup;
3097                         }
3098                         /* conn_alloc() adds one ref for the peer and one for this function */
3099                         conn_ref = 1;
3100                 }
3101                 spin_lock(&peer->mxp_lock);
3102                 peer->mxp_incarnation = msg->mxm_srcstamp;
3103                 peer->mxp_incompatible = incompatible;
3104                 spin_unlock(&peer->mxp_lock);
3105                 spin_lock(&conn->mxk_lock);
3106                 conn->mxk_incarnation = msg->mxm_srcstamp;
3107                 conn->mxk_status = MXLND_CONN_WAIT;
3108                 spin_unlock(&conn->mxk_lock);
3109
3110                 /* handle_conn_ack() will create the CONN_ACK msg */
3111                 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
3112
3113                 break;
3114
3115         case MXLND_MSG_CONN_ACK:
3116                 if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3117                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3118                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3119                                 libcfs_nid2str(msg->mxm_dstnid));
3120                         ret = -1;
3121                         goto failed;
3122                 }
3123                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3124                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3125                                "incompatible queue depth %d (%d wanted)\n",
3126                                 libcfs_nid2str(msg->mxm_srcnid),
3127                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3128                                 *kmxlnd_tunables.kmx_credits);
3129                         spin_lock(&conn->mxk_lock);
3130                         conn->mxk_status = MXLND_CONN_FAIL;
3131                         spin_unlock(&conn->mxk_lock);
3132                         incompatible = 1;
3133                         ret = -1;
3134                 }
3135                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3136                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3137                                "incompatible EAGER size %d (%d wanted)\n",
3138                                 libcfs_nid2str(msg->mxm_srcnid),
3139                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3140                                 (int) MXLND_EAGER_SIZE);
3141                         spin_lock(&conn->mxk_lock);
3142                         conn->mxk_status = MXLND_CONN_FAIL;
3143                         spin_unlock(&conn->mxk_lock);
3144                         incompatible = 1;
3145                         ret = -1;
3146                 }
3147                 spin_lock(&peer->mxp_lock);
3148                 peer->mxp_incarnation = msg->mxm_srcstamp;
3149                 peer->mxp_incompatible = incompatible;
3150                 spin_unlock(&peer->mxp_lock);
3151                 spin_lock(&conn->mxk_lock);
3152                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3153                 conn->mxk_outstanding = 0;
3154                 conn->mxk_incarnation = msg->mxm_srcstamp;
3155                 conn->mxk_timeout = 0;
3156                 if (!incompatible) {
3157                         conn->mxk_status = MXLND_CONN_READY;
3158                 }
3159                 spin_unlock(&conn->mxk_lock);
3160                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3161                 break;
3162
3163         default:
3164                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3165                                 libcfs_nid2str(rx->mxc_nid));
3166                 ret = -EPROTO;
3167                 break;
3168         }
3169
3170 failed:
3171         if (ret < 0) {
3172                 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3173                 spin_lock(&conn->mxk_lock);
3174                 conn->mxk_status = MXLND_CONN_FAIL;
3175                 spin_unlock(&conn->mxk_lock);
3176         }
3177
3178 cleanup:
3179         if (conn != NULL) {
3180                 spin_lock(&conn->mxk_lock);
3181                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3182                 spin_unlock(&conn->mxk_lock);
3183         }
3184
3185         if (repost) {
3186                 /* lnet_parse() failed, etc., repost now */
3187                 mxlnd_put_idle_rx(rx);
3188                 if (conn != NULL && credit == 1) {
3189                         if (type == MXLND_MSG_PUT_DATA) {
3190                                 spin_lock(&conn->mxk_lock);
3191                                 conn->mxk_outstanding++;
3192                                 spin_unlock(&conn->mxk_lock);
3193                         } else if (type != MXLND_MSG_GET_DATA &&
3194                                   (type == MXLND_MSG_EAGER ||
3195                                    type == MXLND_MSG_PUT_REQ ||
3196                                    type == MXLND_MSG_NOOP)) {
3197                                 spin_lock(&conn->mxk_lock);
3198                                 conn->mxk_outstanding++;
3199                                 spin_unlock(&conn->mxk_lock);
3200                         }
3201                 }
3202                 if (conn_ref) mxlnd_conn_decref(conn);
3203                 LASSERT(peer_ref == 0);
3204         }
3205
3206         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3207                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3208         } else {
3209                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3210         }
3211
3212         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3213         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3214
3215         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3216
3217         return;
3218 }
3219
3220
3221
3222 void
3223 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3224 {
3225         struct kmx_ctx  *tx     = NULL;
3226         struct kmx_msg  *txmsg   = NULL;
3227         struct kmx_conn *conn   = peer->mxp_conn;
3228
3229         /* a conn ref was taken when calling mx_iconnect(), 
3230          * hold it until CONN_REQ or CONN_ACK completes */
3231
3232         CDEBUG(D_NET, "entering\n");
3233         if (status.code != MX_STATUS_SUCCESS) {
3234                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3235                         mx_strstatus(status.code), status.code,
3236                         libcfs_nid2str(peer->mxp_nid));
3237                 spin_lock(&conn->mxk_lock);
3238                 conn->mxk_status = MXLND_CONN_FAIL;
3239                 spin_unlock(&conn->mxk_lock);
3240
3241                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3242                         struct kmx_conn *new_conn       = NULL;
3243                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3244                         mxlnd_conn_disconnect(conn, 0, 1);
3245                         mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3246                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3247                         spin_lock(&peer->mxp_lock);
3248                         peer->mxp_reconnect_time = 0;
3249                         spin_unlock(&peer->mxp_lock);
3250                 }
3251
3252                 mxlnd_conn_decref(conn);
3253                 return;
3254         }
3255
3256         spin_lock(&conn->mxk_lock);
3257         conn->mxk_epa = status.source;
3258         spin_unlock(&conn->mxk_lock);
3259         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3260          *      we should not need to lock the peer */
3261         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3262
3263         /* mx_iconnect() succeeded, reset delay to 0 */
3264         spin_lock(&peer->mxp_lock);
3265         peer->mxp_reconnect_time = 0;
3266         spin_unlock(&peer->mxp_lock);
3267
3268         /* marshal CONN_REQ msg */
3269         /* we are still using the conn ref from iconnect() - do not take another */
3270         tx = mxlnd_get_idle_tx();
3271         if (tx == NULL) {
3272                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3273                                    libcfs_nid2str(peer->mxp_nid));
3274                 spin_lock(&conn->mxk_lock);
3275                 conn->mxk_status = MXLND_CONN_FAIL;
3276                 spin_unlock(&conn->mxk_lock);
3277                 mxlnd_conn_decref(conn);
3278                 return;
3279         }
3280
3281         tx->mxc_peer = peer;
3282         tx->mxc_conn = conn;
3283         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3284         txmsg = tx->mxc_msg;
3285         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3286         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3287         tx->mxc_match = mxlnd_create_match(tx, 0);
3288
3289         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3290         mxlnd_queue_tx(tx);
3291         return;
3292 }
3293
3294 void
3295 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3296 {
3297         struct kmx_ctx  *tx     = NULL;
3298         struct kmx_msg  *txmsg   = NULL;
3299         struct kmx_conn *conn   = peer->mxp_conn;
3300
3301         /* a conn ref was taken when calling mx_iconnect(), 
3302          * hold it until CONN_REQ or CONN_ACK completes */
3303
3304         CDEBUG(D_NET, "entering\n");
3305         if (status.code != MX_STATUS_SUCCESS) {
3306                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3307                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3308                         mx_strstatus(status.code), status.code,
3309                         libcfs_nid2str(peer->mxp_nid),
3310                         peer->mxp_nid,
3311                         peer->mxp_nic_id,
3312                         peer->mxp_host->mxh_ep_id);
3313                 spin_lock(&conn->mxk_lock);
3314                 conn->mxk_status = MXLND_CONN_FAIL;
3315                 spin_unlock(&conn->mxk_lock);
3316
3317                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3318                         struct kmx_conn *new_conn       = NULL;
3319                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3320                         mxlnd_conn_disconnect(conn, 0, 1);
3321                         mxlnd_conn_alloc(&new_conn, peer); /* adds ref for 
3322                                                               this function... */
3323                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3324                         spin_lock(&peer->mxp_lock);
3325                         peer->mxp_reconnect_time = 0;
3326                         spin_unlock(&peer->mxp_lock);
3327                 }
3328
3329                 mxlnd_conn_decref(conn);
3330                 return;
3331         }
3332         spin_lock(&conn->mxk_lock);
3333         conn->mxk_epa = status.source;
3334         if (likely(!peer->mxp_incompatible)) {
3335                 conn->mxk_status = MXLND_CONN_READY;
3336         }
3337         spin_unlock(&conn->mxk_lock);
3338         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3339          *      we should not have to lock the peer */
3340         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3341
3342         /* mx_iconnect() succeeded, reset delay to 0 */
3343         spin_lock(&peer->mxp_lock);
3344         peer->mxp_reconnect_time = 0;
3345         spin_unlock(&peer->mxp_lock);
3346
3347         /* marshal CONN_ACK msg */
3348         tx = mxlnd_get_idle_tx();
3349         if (tx == NULL) {
3350                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3351                                    libcfs_nid2str(peer->mxp_nid));
3352                 spin_lock(&conn->mxk_lock);
3353                 conn->mxk_status = MXLND_CONN_FAIL;
3354                 spin_unlock(&conn->mxk_lock);
3355                 mxlnd_conn_decref(conn);
3356                 return;
3357         }
3358
3359         tx->mxc_peer = peer;
3360         tx->mxc_conn = conn;
3361         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3362         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3363         txmsg = tx->mxc_msg;
3364         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3365         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3366         tx->mxc_match = mxlnd_create_match(tx, 0);
3367
3368         mxlnd_queue_tx(tx);
3369         return;
3370 }
3371
3372 /**
3373  * mxlnd_request_waitd - the MX request completion thread(s)
3374  * @arg - thread id (as a void *)
3375  *
3376  * This thread waits for a MX completion and then completes the request.
3377  * We will create one thread per CPU.
3378  */
3379 int
3380 mxlnd_request_waitd(void *arg)
3381 {
3382         long                    id              = (long) arg;
3383         char                    name[24];
3384         __u32                   result          = 0;
3385         mx_return_t             mxret           = MX_SUCCESS;
3386         mx_status_t             status;
3387         struct kmx_ctx         *ctx             = NULL;
3388         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3389         struct kmx_peer        *peer            = NULL;
3390         struct kmx_conn        *conn            = NULL;
3391 #if MXLND_POLLING
3392         int                     count           = 0;
3393 #endif
3394
3395         memset(name, 0, sizeof(name));
3396         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3397         cfs_daemonize(name);
3398         //cfs_block_allsigs();
3399
3400         memset(&status, 0, sizeof(status));
3401
3402         CDEBUG(D_NET, "%s starting\n", name);
3403
3404         while (!kmxlnd_data.kmx_shutdown) {
3405                 mxret = MX_SUCCESS;
3406                 result = 0;
3407 #if MXLND_POLLING
3408                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3409                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3410                                             &status, &result);
3411                 } else {
3412                         count = 0;
3413                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3414                                             0LL, 0LL, &status, &result);
3415                 }
3416 #else
3417                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3418                                     0LL, 0LL, &status, &result);
3419 #endif
3420                 if (unlikely(kmxlnd_data.kmx_shutdown))
3421                         break;
3422
3423                 if (result != 1) {
3424                         /* nothing completed... */
3425                         continue;
3426                 }
3427
3428                 if (status.code != MX_STATUS_SUCCESS) {
3429                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3430                                "match_info 0x%llx and length %d\n",
3431                                mx_strstatus(status.code), status.code,
3432                                (u64) status.match_info, status.msg_length);
3433                 }
3434
3435                 /* This may be a mx_iconnect() request completing,
3436                  * check the bit mask for CONN_REQ and CONN_ACK */
3437                 if (status.match_info == MXLND_MASK_ICON_REQ ||
3438                     status.match_info == MXLND_MASK_ICON_ACK) {
3439                         peer = (struct kmx_peer*) status.context;
3440                         if (status.match_info == MXLND_MASK_ICON_REQ) {
3441                                 mxlnd_handle_conn_req(peer, status);
3442                         } else {
3443                                 mxlnd_handle_conn_ack(peer, status);
3444                         }
3445                         continue;
3446                 }
3447
3448                 /* This must be a tx or rx */
3449
3450                 /* NOTE: if this is a RX from the unexpected callback, it may
3451                  * have very little info. If we dropped it in unexpected_recv(),
3452                  * it will not have a context. If so, ignore it. */
3453                 ctx = (struct kmx_ctx *) status.context;
3454                 if (ctx != NULL) {
3455
3456                         req_type = ctx->mxc_type;
3457                         conn = ctx->mxc_conn; /* this may be NULL */
3458                         mxlnd_deq_pending_ctx(ctx);
3459
3460                         /* copy status to ctx->mxc_status */
3461                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3462
3463                         switch (req_type) {
3464                         case MXLND_REQ_TX:
3465                                 mxlnd_handle_tx_completion(ctx);
3466                                 break;
3467                         case MXLND_REQ_RX:
3468                                 mxlnd_handle_rx_completion(ctx);
3469                                 break;
3470                         default:
3471                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3472                                 LBUG();
3473                                 break;
3474                         }
3475
3476                         /* FIXME may need to reconsider this */
3477                         /* conn is always set except for the first CONN_REQ rx
3478                          * from a new peer */
3479                         if (!(status.code == MX_STATUS_SUCCESS ||
3480                               status.code == MX_STATUS_TRUNCATED) &&
3481                               conn != NULL) {
3482                                 mxlnd_conn_disconnect(conn, 1, 1);
3483                         }
3484                 }
3485                 CDEBUG(D_NET, "waitd() completed task\n");
3486         }
3487         CDEBUG(D_NET, "%s stopping\n", name);
3488         mxlnd_thread_stop(id);
3489         return 0;
3490 }
3491
3492
3493 unsigned long
3494 mxlnd_check_timeouts(unsigned long now)
3495 {
3496         int                     i               = 0;
3497         int                     disconnect      = 0;
3498         unsigned long           next            = 0;
3499         struct  kmx_peer        *peer           = NULL;
3500         struct  kmx_conn        *conn           = NULL;
3501
3502         read_lock(&kmxlnd_data.kmx_peers_lock);
3503         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3504                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3505
3506                         if (unlikely(kmxlnd_data.kmx_shutdown)) {
3507                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3508                                 return next;
3509                         }
3510
3511                         spin_lock(&peer->mxp_lock);
3512                         conn = peer->mxp_conn;
3513                         if (conn) {
3514                                 mxlnd_conn_addref(conn);
3515                                 spin_unlock(&peer->mxp_lock);
3516                         } else {
3517                                 spin_unlock(&peer->mxp_lock);
3518                                 continue;
3519                         }
3520
3521                         spin_lock(&conn->mxk_lock);
3522
3523                         /* if nothing pending (timeout == 0) or
3524                          * if conn is already disconnected,
3525                          * skip this conn */
3526                         if (conn->mxk_timeout == 0 ||
3527                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3528                                 spin_unlock(&conn->mxk_lock);
3529                                 mxlnd_conn_decref(conn);
3530                                 continue;
3531                         }
3532
3533                         /* we want to find the timeout that will occur first.
3534                          * if it is in the future, we will sleep until then.
3535                          * if it is in the past, then we will sleep one
3536                          * second and repeat the process. */
3537                         if ((next == 0) || (conn->mxk_timeout < next)) {
3538                                 next = conn->mxk_timeout;
3539                         }
3540
3541                         disconnect = 0;
3542
3543                         if (time_after_eq(now, conn->mxk_timeout))  {
3544                                 disconnect = 1;
3545                         }
3546                         spin_unlock(&conn->mxk_lock);
3547
3548                         if (disconnect) {
3549                                 mxlnd_conn_disconnect(conn, 1, 1);
3550                         }
3551                         mxlnd_conn_decref(conn);
3552                 }
3553         }
3554         read_unlock(&kmxlnd_data.kmx_peers_lock);
3555         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3556
3557         return next;
3558 }
3559
3560 /**
3561  * mxlnd_timeoutd - enforces timeouts on messages
3562  * @arg - thread id (as a void *)
3563  *
3564  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3565  * it calls mxlnd_conn_disconnect().
3566  *
3567  * After checking for timeouts, try progressing sends (call check_sends()).
3568  */
3569 int
3570 mxlnd_timeoutd(void *arg)
3571 {
3572         int                     i       = 0;
3573         long                    id      = (long) arg;
3574         unsigned long           now     = 0;
3575         unsigned long           next    = 0;
3576         unsigned long           delay   = HZ;
3577         struct kmx_peer        *peer    = NULL;
3578         struct kmx_conn        *conn    = NULL;
3579
3580         cfs_daemonize("mxlnd_timeoutd");
3581         //cfs_block_allsigs();
3582
3583         CDEBUG(D_NET, "timeoutd starting\n");
3584
3585         while (!kmxlnd_data.kmx_shutdown) {
3586
3587                 now = jiffies;
3588                 /* if the next timeout has not arrived, go back to sleep */
3589                 if (time_after(now, next)) {
3590                         next = mxlnd_check_timeouts(now);
3591                 }
3592
3593                read_lock(&kmxlnd_data.kmx_peers_lock);
3594                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3595                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3596                                 spin_lock(&peer->mxp_lock);
3597                                 conn = peer->mxp_conn;
3598                                 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3599                                 spin_unlock(&peer->mxp_lock);
3600
3601                                 if (conn == NULL)
3602                                         continue;
3603
3604                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3605                                     time_after(now, conn->mxk_last_tx + HZ)) {
3606                                         mxlnd_check_sends(peer);
3607                                 }
3608                                 mxlnd_conn_decref(conn); /* until here */
3609                         }
3610                 }
3611                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3612
3613                 mxlnd_sleep(delay);
3614         }
3615         CDEBUG(D_NET, "timeoutd stopping\n");
3616         mxlnd_thread_stop(id);
3617         return 0;
3618 }