Whamcloud - gitweb
LU-812 kernel: remove smp_lock.h
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (C) 2006 Myricom, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/mxlnd/mxlnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  * Author: Scott Atchley <atchley at myri.com>
40  */
41
42 #include "mxlnd.h"
43
44 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
45
46 inline int
47 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
48 {
49         /* if memcmp() == 0, it is NULL */
50         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
51 }
52
53 char *
54 mxlnd_ctxstate_to_str(int mxc_state)
55 {
56         switch (mxc_state) {
57         case MXLND_CTX_INIT:
58                 return "MXLND_CTX_INIT";
59         case MXLND_CTX_IDLE:
60                 return "MXLND_CTX_IDLE";
61         case MXLND_CTX_PREP:
62                 return "MXLND_CTX_PREP";
63         case MXLND_CTX_PENDING:
64                 return "MXLND_CTX_PENDING";
65         case MXLND_CTX_COMPLETED:
66                 return "MXLND_CTX_COMPLETED";
67         case MXLND_CTX_CANCELED:
68                 return "MXLND_CTX_CANCELED";
69         default:
70                 return "*unknown*";
71         }
72 }
73
74 char *
75 mxlnd_connstatus_to_str(int mxk_status)
76 {
77         switch (mxk_status) {
78         case MXLND_CONN_READY:
79                 return "MXLND_CONN_READY";
80         case MXLND_CONN_INIT:
81                 return "MXLND_CONN_INIT";
82         case MXLND_CONN_WAIT:
83                 return "MXLND_CONN_WAIT";
84         case MXLND_CONN_DISCONNECT:
85                 return "MXLND_CONN_DISCONNECT";
86         case MXLND_CONN_FAIL:
87                 return "MXLND_CONN_FAIL";
88         default:
89                 return "unknown";
90         }
91 }
92
93 char *
94 mxlnd_msgtype_to_str(int type) {
95         switch (type) {
96         case MXLND_MSG_EAGER:
97                 return "MXLND_MSG_EAGER";
98         case MXLND_MSG_CONN_REQ:
99                 return "MXLND_MSG_CONN_REQ";
100         case MXLND_MSG_CONN_ACK:
101                 return "MXLND_MSG_CONN_ACK";
102         case MXLND_MSG_BYE:
103                 return "MXLND_MSG_BYE";
104         case MXLND_MSG_NOOP:
105                 return "MXLND_MSG_NOOP";
106         case MXLND_MSG_PUT_REQ:
107                 return "MXLND_MSG_PUT_REQ";
108         case MXLND_MSG_PUT_ACK:
109                 return "MXLND_MSG_PUT_ACK";
110         case MXLND_MSG_PUT_DATA:
111                 return "MXLND_MSG_PUT_DATA";
112         case MXLND_MSG_GET_REQ:
113                 return "MXLND_MSG_GET_REQ";
114         case MXLND_MSG_GET_DATA:
115                 return "MXLND_MSG_GET_DATA";
116         default:
117                 return "unknown";
118         }
119 }
120
121 char *
122 mxlnd_lnetmsg_to_str(int type)
123 {
124         switch (type) {
125         case LNET_MSG_ACK:
126                 return "LNET_MSG_ACK";
127         case LNET_MSG_PUT:
128                 return "LNET_MSG_PUT";
129         case LNET_MSG_GET:
130                 return "LNET_MSG_GET";
131         case LNET_MSG_REPLY:
132                 return "LNET_MSG_REPLY";
133         case LNET_MSG_HELLO:
134                 return "LNET_MSG_HELLO";
135         default:
136                 LBUG();
137                 return "*unknown*";
138         }
139 }
140
141 static inline u64
142 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
143 {
144         u64 type        = (u64) ctx->mxc_msg_type;
145         u64 err         = (u64) error;
146         u64 match       = 0ULL;
147
148         mxlnd_valid_msg_type(ctx->mxc_msg_type);
149         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
150         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
151         return match;
152 }
153
154 static inline void
155 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
156 {
157         *msg_type = (u8) MXLND_MSG_TYPE(match);
158         *error    = (u8) MXLND_ERROR_VAL(match);
159         *cookie   = match & MXLND_MAX_COOKIE;
160         mxlnd_valid_msg_type(*msg_type);
161         return;
162 }
163
164 kmx_ctx_t *
165 mxlnd_get_idle_rx(kmx_conn_t *conn)
166 {
167         cfs_list_t              *rxs    = NULL;
168         kmx_ctx_t               *rx     = NULL;
169
170         LASSERT(conn != NULL);
171
172         rxs = &conn->mxk_rx_idle;
173
174         cfs_spin_lock(&conn->mxk_lock);
175
176         if (cfs_list_empty (rxs)) {
177                 cfs_spin_unlock(&conn->mxk_lock);
178                 return NULL;
179         }
180
181         rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
182         cfs_list_del_init(&rx->mxc_list);
183         cfs_spin_unlock(&conn->mxk_lock);
184
185 #if MXLND_DEBUG
186         if (rx->mxc_get != rx->mxc_put) {
187                 CNETERR("*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
188                 CNETERR("*** incarnation= %lld ***\n", rx->mxc_incarnation);
189                 CNETERR("*** deadline= %ld ***\n", rx->mxc_deadline);
190                 CNETERR("*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
191                 CNETERR("*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
192                 CNETERR("*** nid= 0x%llx ***\n", rx->mxc_nid);
193                 CNETERR("*** peer= 0x%p ***\n", rx->mxc_peer);
194                 CNETERR("*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
195                 CNETERR("*** cookie= 0x%llx ***\n", rx->mxc_cookie);
196                 CNETERR("*** nob= %d ***\n", rx->mxc_nob);
197         }
198 #endif
199         LASSERT (rx->mxc_get == rx->mxc_put);
200
201         rx->mxc_get++;
202
203         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
204         rx->mxc_state = MXLND_CTX_PREP;
205         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
206
207         return rx;
208 }
209
210 int
211 mxlnd_put_idle_rx(kmx_ctx_t *rx)
212 {
213         kmx_conn_t              *conn   = rx->mxc_conn;
214         cfs_list_t              *rxs    = &conn->mxk_rx_idle;
215
216         LASSERT(rx->mxc_type == MXLND_REQ_RX);
217
218         mxlnd_ctx_init(rx);
219
220         rx->mxc_put++;
221         LASSERT(rx->mxc_get == rx->mxc_put);
222
223         cfs_spin_lock(&conn->mxk_lock);
224         cfs_list_add(&rx->mxc_list, rxs);
225         cfs_spin_unlock(&conn->mxk_lock);
226         return 0;
227 }
228
229 kmx_ctx_t *
230 mxlnd_get_idle_tx(void)
231 {
232         cfs_list_t              *tmp    = &kmxlnd_data.kmx_tx_idle;
233         kmx_ctx_t               *tx     = NULL;
234
235         cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
236
237         if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
238                 CNETERR("%d txs in use\n", kmxlnd_data.kmx_tx_used);
239                 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
240                 return NULL;
241         }
242
243         tmp = &kmxlnd_data.kmx_tx_idle;
244         tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
245         cfs_list_del_init(&tx->mxc_list);
246
247         /* Allocate a new completion cookie.  It might not be needed,
248          * but we've got a lock right now and we're unlikely to
249          * wrap... */
250         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
251         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
252                 kmxlnd_data.kmx_tx_next_cookie = 1;
253         }
254         kmxlnd_data.kmx_tx_used++;
255         cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
256
257         LASSERT (tx->mxc_get == tx->mxc_put);
258
259         tx->mxc_get++;
260
261         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
262         LASSERT (tx->mxc_lntmsg[0] == NULL);
263         LASSERT (tx->mxc_lntmsg[1] == NULL);
264
265         tx->mxc_state = MXLND_CTX_PREP;
266         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
267
268         return tx;
269 }
270
271 void
272 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
273
274 int
275 mxlnd_put_idle_tx(kmx_ctx_t *tx)
276 {
277         int             result  = 0;
278         lnet_msg_t      *lntmsg[2];
279
280         LASSERT(tx->mxc_type == MXLND_REQ_TX);
281
282         if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
283                 kmx_conn_t      *conn   = tx->mxc_conn;
284
285                 result = -EIO;
286                 if (tx->mxc_errno != 0) result = tx->mxc_errno;
287                 /* FIXME should we set mx_dis? */
288                 mxlnd_conn_disconnect(conn, 0, 1);
289         }
290
291         lntmsg[0] = tx->mxc_lntmsg[0];
292         lntmsg[1] = tx->mxc_lntmsg[1];
293
294         mxlnd_ctx_init(tx);
295
296         tx->mxc_put++;
297         LASSERT(tx->mxc_get == tx->mxc_put);
298
299         cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
300         cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
301         kmxlnd_data.kmx_tx_used--;
302         cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
303
304         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
305         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
306         return 0;
307 }
308
309
310 void
311 mxlnd_connparams_free(kmx_connparams_t *cp)
312 {
313         LASSERT(cfs_list_empty(&cp->mxr_list));
314         MXLND_FREE(cp, sizeof(*cp));
315         return;
316 }
317
318 int
319 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
320                             mx_endpoint_addr_t epa, u64 match, u32 length,
321                             kmx_conn_t *conn, kmx_peer_t *peer, void *data)
322 {
323         kmx_connparams_t *c = NULL;
324
325         MXLND_ALLOC(c, sizeof(*c));
326         if (!c) return -ENOMEM;
327
328         CFS_INIT_LIST_HEAD(&c->mxr_list);
329         c->mxr_context = context;
330         c->mxr_epa = epa;
331         c->mxr_match = match;
332         c->mxr_nob = length;
333         c->mxr_conn = conn;
334         c->mxr_peer = peer;
335         c->mxr_msg = *((kmx_msg_t *) data);
336
337         *cp = c;
338         return 0;
339 }
340
341 static inline void
342 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
343 {
344         conn->mxk_status = status;
345         cfs_mb();
346 }
347
348 /**
349  * mxlnd_conn_free_locked - free the conn
350  * @conn - a kmx_conn pointer
351  *
352  * The calling function should remove the conn from the conns list first
353  * then destroy it. Caller should have write-locked kmx_global_lock.
354  */
355 void
356 mxlnd_conn_free_locked(kmx_conn_t *conn)
357 {
358         int             valid   = !mxlnd_endpoint_addr_null(conn->mxk_epa);
359         kmx_peer_t      *peer   = conn->mxk_peer;
360
361         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
362         LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
363                  cfs_list_empty (&conn->mxk_tx_free_queue) &&
364                  cfs_list_empty (&conn->mxk_pending));
365         if (!cfs_list_empty(&conn->mxk_list)) {
366                 cfs_list_del_init(&conn->mxk_list);
367                 if (peer->mxp_conn == conn) {
368                         peer->mxp_conn = NULL;
369                         if (valid) {
370                                 kmx_conn_t      *temp   = NULL;
371
372                                 mx_get_endpoint_addr_context(conn->mxk_epa,
373                                                              (void **) &temp);
374                                 if (conn == temp) {
375                                         mx_set_endpoint_addr_context(conn->mxk_epa,
376                                                                      (void *) NULL);
377                                 }
378                         }
379                         /* unlink from global list and drop its ref */
380                         cfs_list_del_init(&peer->mxp_list);
381                         mxlnd_peer_decref(peer);
382                 }
383         }
384         mxlnd_peer_decref(peer); /* drop conn's ref to peer */
385         if (conn->mxk_rx_pages) {
386                 LASSERT (conn->mxk_rxs != NULL);
387                 mxlnd_free_pages(conn->mxk_rx_pages);
388         }
389         if (conn->mxk_rxs) {
390                 int             i       = 0;
391                 kmx_ctx_t       *rx     = NULL;
392
393                 for (i = 0; i < MXLND_RX_MSGS(); i++) {
394                         rx = &conn->mxk_rxs[i];
395                         if (rx->mxc_seg_list != NULL) {
396                                 LASSERT(rx->mxc_nseg > 0);
397                                 MXLND_FREE(rx->mxc_seg_list,
398                                            rx->mxc_nseg *
399                                            sizeof(*rx->mxc_seg_list));
400                         }
401                 }
402                 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
403         }
404
405         MXLND_FREE(conn, sizeof (*conn));
406         return;
407 }
408
409
410 int
411 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
412 {
413         int             found   = 0;
414         int             count   = 0;
415         kmx_ctx_t       *ctx    = NULL;
416         kmx_ctx_t       *next   = NULL;
417         mx_return_t     mxret   = MX_SUCCESS;
418         u32             result  = 0;
419
420         do {
421                 found = 0;
422                 cfs_spin_lock(&conn->mxk_lock);
423                 cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
424                                              mxc_list) {
425                         cfs_list_del_init(&ctx->mxc_list);
426                         if (ctx->mxc_type == MXLND_REQ_RX) {
427                                 found = 1;
428                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
429                                                   &ctx->mxc_mxreq,
430                                                   &result);
431                                 if (mxret != MX_SUCCESS) {
432                                         CNETERR("mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
433                                 }
434                                 if (result == 1) {
435                                         ctx->mxc_errno = -ECONNABORTED;
436                                         ctx->mxc_state = MXLND_CTX_CANCELED;
437                                         cfs_spin_unlock(&conn->mxk_lock);
438                                         cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
439                                         /* we may be holding the global lock,
440                                          * move to orphan list so that it can free it */
441                                         cfs_list_add_tail(&ctx->mxc_list,
442                                                           &kmxlnd_data.kmx_orphan_msgs);
443                                         count++;
444                                         cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
445                                         cfs_spin_lock(&conn->mxk_lock);
446                                 }
447                                 break;
448                         }
449                 }
450                 cfs_spin_unlock(&conn->mxk_lock);
451         }
452         while (found);
453
454         return count;
455 }
456
457 int
458 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
459 {
460         int                     count   = 0;
461         cfs_list_t             *tmp    = NULL;
462
463         cfs_spin_lock(&conn->mxk_lock);
464         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
465                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
466
467                 kmx_ctx_t       *tx     = NULL;
468
469                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
470                         tmp = &conn->mxk_tx_free_queue;
471                 } else {
472                         tmp = &conn->mxk_tx_credit_queue;
473                 }
474
475                 tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
476                 cfs_list_del_init(&tx->mxc_list);
477                 cfs_spin_unlock(&conn->mxk_lock);
478                 tx->mxc_errno = -ECONNABORTED;
479                 tx->mxc_state = MXLND_CTX_CANCELED;
480                 /* move to orphan list and then abort */
481                 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
482                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
483                 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
484                 count++;
485                 cfs_spin_lock(&conn->mxk_lock);
486         }
487         cfs_spin_unlock(&conn->mxk_lock);
488
489         return count;
490 }
491
492 void
493 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
494 {
495         u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
496                     (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
497
498         mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
499                   epa, match, NULL, NULL);
500         return;
501 }
502
503 /**
504  * mxlnd_conn_disconnect - shutdown a connection
505  * @conn - a kmx_conn pointer
506  * @mx_dis - call mx_disconnect()
507  * @send_bye - send peer a BYE msg
508  *
509  * This function sets the status to DISCONNECT, completes queued
510  * txs with failure, calls mx_disconnect, which will complete
511  * pending txs and matched rxs with failure.
512  */
513 void
514 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
515 {
516         mx_endpoint_addr_t      epa     = conn->mxk_epa;
517         int                     valid   = !mxlnd_endpoint_addr_null(epa);
518         int                     count   = 0;
519
520         cfs_spin_lock(&conn->mxk_lock);
521         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
522                 cfs_spin_unlock(&conn->mxk_lock);
523                 return;
524         }
525         mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
526         conn->mxk_timeout = 0;
527         cfs_spin_unlock(&conn->mxk_lock);
528
529         count = mxlnd_cancel_queued_txs(conn);
530         count += mxlnd_conn_cancel_pending_rxs(conn);
531
532         if (count)
533                 cfs_up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */
534
535         if (send_bye && valid &&
536             conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
537                 /* send a BYE to the peer */
538                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
539                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
540                 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
541                 /* wait to allow the peer to ack our message */
542                 mxlnd_sleep(msecs_to_jiffies(20));
543         }
544
545         if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
546                 unsigned long   last_msg        = 0;
547
548                 /* notify LNET that we are giving up on this peer */
549                 if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
550                         last_msg = conn->mxk_last_rx;
551                 else
552                         last_msg = conn->mxk_last_tx;
553
554                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
555
556                 if (mx_dis && valid &&
557                     (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
558                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
559         }
560         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
561
562         return;
563 }
564
565 /**
566  * mxlnd_conn_alloc - allocate and initialize a new conn struct
567  * @connp - address of a kmx_conn pointer
568  * @peer - owning kmx_peer
569  *
570  * Returns 0 on success and -ENOMEM on failure
571  */
572 int
573 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
574 {
575         int             i       = 0;
576         int             ret     = 0;
577         int             ipage   = 0;
578         int             offset  = 0;
579         void           *addr    = NULL;
580         kmx_conn_t     *conn    = NULL;
581         kmx_pages_t    *pages   = NULL;
582         struct page    *page    = NULL;
583         kmx_ctx_t      *rx      = NULL;
584
585         LASSERT(peer != NULL);
586
587         MXLND_ALLOC(conn, sizeof (*conn));
588         if (conn == NULL) {
589                 CNETERR("Cannot allocate conn\n");
590                 return -ENOMEM;
591         }
592         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
593
594         memset(conn, 0, sizeof(*conn));
595
596         ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
597         if (ret != 0) {
598                 CERROR("Can't allocate rx pages\n");
599                 MXLND_FREE(conn, sizeof(*conn));
600                 return -ENOMEM;
601         }
602         conn->mxk_rx_pages = pages;
603
604         MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
605         if (conn->mxk_rxs == NULL) {
606                 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
607                 mxlnd_free_pages(pages);
608                 MXLND_FREE(conn, sizeof(*conn));
609                 return -ENOMEM;
610         }
611
612         memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
613
614         conn->mxk_peer = peer;
615         CFS_INIT_LIST_HEAD(&conn->mxk_list);
616         CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
617         cfs_atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
618                                                    and one for the caller */
619         if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
620                 u64     nic_id  = 0ULL;
621                 u32     ep_id   = 0;
622
623                 /* this is localhost, set the epa and status as up */
624                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
625                 conn->mxk_epa = kmxlnd_data.kmx_epa;
626                 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
627                 peer->mxp_reconnect_time = 0;
628                 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
629                 peer->mxp_nic_id = nic_id;
630                 peer->mxp_ep_id = ep_id;
631                 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
632                 conn->mxk_timeout = 0;
633         } else {
634                 /* conn->mxk_incarnation = 0 - will be set by peer */
635                 /* conn->mxk_sid = 0 - will be set by peer */
636                 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
637                 /* mxk_epa - to be set after mx_iconnect() */
638         }
639         cfs_spin_lock_init(&conn->mxk_lock);
640         /* conn->mxk_timeout = 0 */
641         /* conn->mxk_last_tx = 0 */
642         /* conn->mxk_last_rx = 0 */
643         CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
644
645         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
646         /* mxk_outstanding = 0 */
647
648         CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
649         CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
650         /* conn->mxk_ntx_msgs = 0 */
651         /* conn->mxk_ntx_data = 0 */
652         /* conn->mxk_ntx_posted = 0 */
653         /* conn->mxk_data_posted = 0 */
654         CFS_INIT_LIST_HEAD(&conn->mxk_pending);
655
656         for (i = 0; i < MXLND_RX_MSGS(); i++) {
657
658                 rx = &conn->mxk_rxs[i];
659                 rx->mxc_type = MXLND_REQ_RX;
660                 CFS_INIT_LIST_HEAD(&rx->mxc_list);
661
662                 /* map mxc_msg to page */
663                 page = pages->mxg_pages[ipage];
664                 addr = page_address(page);
665                 LASSERT(addr != NULL);
666                 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
667                 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
668
669                 rx->mxc_conn = conn;
670                 rx->mxc_peer = peer;
671                 rx->mxc_nid = peer->mxp_nid;
672
673                 mxlnd_ctx_init(rx);
674
675                 offset += MXLND_MSG_SIZE;
676                 LASSERT (offset <= PAGE_SIZE);
677
678                 if (offset == PAGE_SIZE) {
679                         offset = 0;
680                         ipage++;
681                         LASSERT (ipage <= MXLND_TX_MSG_PAGES());
682                 }
683
684                 cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
685         }
686
687         *connp = conn;
688
689         mxlnd_peer_addref(peer);        /* add a ref for this conn */
690
691         /* add to front of peer's conns list */
692         cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
693         peer->mxp_conn = conn;
694         return 0;
695 }
696
697 int
698 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
699 {
700         int             ret     = 0;
701         cfs_rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
702
703         cfs_write_lock(g_lock);
704         ret = mxlnd_conn_alloc_locked(connp, peer);
705         cfs_write_unlock(g_lock);
706         return ret;
707 }
708
709 int
710 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
711 {
712         int             ret     = 0;
713         kmx_conn_t      *conn   = ctx->mxc_conn;
714
715         ctx->mxc_state = MXLND_CTX_PENDING;
716         if (conn != NULL) {
717                 cfs_spin_lock(&conn->mxk_lock);
718                 if (conn->mxk_status >= MXLND_CONN_INIT) {
719                         cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
720                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
721                                 conn->mxk_timeout = ctx->mxc_deadline;
722                         }
723                 } else {
724                         ctx->mxc_state = MXLND_CTX_COMPLETED;
725                         ret = -1;
726                 }
727                 cfs_spin_unlock(&conn->mxk_lock);
728         }
729         return ret;
730 }
731
732 int
733 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
734 {
735         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
736                 ctx->mxc_state == MXLND_CTX_COMPLETED);
737         if (ctx->mxc_state != MXLND_CTX_PENDING &&
738             ctx->mxc_state != MXLND_CTX_COMPLETED) {
739                 CNETERR("deq ctx->mxc_state = %s\n",
740                         mxlnd_ctxstate_to_str(ctx->mxc_state));
741         }
742         ctx->mxc_state = MXLND_CTX_COMPLETED;
743         if (!cfs_list_empty(&ctx->mxc_list)) {
744                 kmx_conn_t      *conn = ctx->mxc_conn;
745                 kmx_ctx_t       *next = NULL;
746
747                 LASSERT(conn != NULL);
748                 cfs_spin_lock(&conn->mxk_lock);
749                 cfs_list_del_init(&ctx->mxc_list);
750                 conn->mxk_timeout = 0;
751                 if (!cfs_list_empty(&conn->mxk_pending)) {
752                         next = cfs_list_entry(conn->mxk_pending.next,
753                                               kmx_ctx_t, mxc_list);
754                         conn->mxk_timeout = next->mxc_deadline;
755                 }
756                 cfs_spin_unlock(&conn->mxk_lock);
757         }
758         return 0;
759 }
760
761 /**
762  * mxlnd_peer_free - free the peer
763  * @peer - a kmx_peer pointer
764  *
765  * The calling function should decrement the rxs, drain the tx queues and
766  * remove the peer from the peers list first then destroy it.
767  */
768 void
769 mxlnd_peer_free(kmx_peer_t *peer)
770 {
771         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
772
773         LASSERT (cfs_atomic_read(&peer->mxp_refcount) == 0);
774
775         if (!cfs_list_empty(&peer->mxp_list)) {
776                 /* assume we are locked */
777                 cfs_list_del_init(&peer->mxp_list);
778         }
779
780         MXLND_FREE(peer, sizeof (*peer));
781         cfs_atomic_dec(&kmxlnd_data.kmx_npeers);
782         return;
783 }
784
785 static int
786 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
787 {
788         int                     ret     = -EHOSTUNREACH;
789         unsigned char           *haddr  = NULL;
790         struct net_device       *dev    = NULL;
791         struct neighbour        *n      = NULL;
792         __be32                  dst_ip  = htonl(ip);
793
794         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
795         if (dev == NULL)
796                 return -ENODEV;
797
798         haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
799
800         n = neigh_lookup(&arp_tbl, &dst_ip, dev);
801         if (n) {
802                 n->used = jiffies;
803                 if (n->nud_state & NUD_VALID) {
804                         memcpy(haddr, n->ha, dev->addr_len);
805                         neigh_release(n);
806                         ret = 0;
807                 }
808         }
809
810         dev_put(dev);
811
812         return ret;
813 }
814
815
816 /* We only want the MAC address of the peer's Myricom NIC. We
817  * require that each node has the IPoMX interface (myriN) up.
818  * We will not pass any traffic over IPoMX, but it allows us
819  * to get the MAC address. */
820 static int
821 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
822 {
823         int                     ret     = 0;
824         int                     try     = 1;
825         int                     fatal   = 0;
826         u64                     tmp_id  = 0ULL;
827         cfs_socket_t            *sock   = NULL;
828
829         do {
830                 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
831                 ret = mxlnd_lookup_mac(ip, &tmp_id);
832                 if (ret == 0) {
833                         break;
834                 } else {
835                         /* not found, try to connect (force an arp) */
836                         ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
837                         if (ret == -ECONNREFUSED) {
838                                 /* peer is there, get the MAC address */
839                                 mxlnd_lookup_mac(ip, &tmp_id);
840                                 if (tmp_id != 0ULL)
841                                         ret = 0;
842                                 break;
843                         } else if (ret == -EHOSTUNREACH && try < tries) {
844                                 /* add a little backoff */
845                                 CDEBUG(D_NET, "sleeping for %d jiffies\n",
846                                        CFS_HZ/4);
847                                 mxlnd_sleep(CFS_HZ/4);
848                         }
849                 }
850         } while (try++ < tries);
851         CDEBUG(D_NET, "done trying. ret = %d\n", ret);
852
853         if (tmp_id == 0ULL)
854                 ret = -EHOSTUNREACH;
855 #ifdef __LITTLE_ENDIAN
856         *nic_id = ___arch__swab64(tmp_id);
857 #else
858         *nic_id = tmp_id;
859 #endif
860         return ret;
861 }
862
863 /**
864  * mxlnd_peer_alloc - allocate and initialize a new peer struct
865  * @peerp - address of a kmx_peer pointer
866  * @nid - LNET node id
867  *
868  * Returns 0 on success and -ENOMEM on failure
869  */
870 int
871 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
872 {
873         int             ret     = 0;
874         u32             ip      = LNET_NIDADDR(nid);
875         kmx_peer_t     *peer    = NULL;
876
877         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
878
879         MXLND_ALLOC(peer, sizeof (*peer));
880         if (peer == NULL) {
881                 CNETERR("Cannot allocate peer for NID 0x%llx\n",
882                        nid);
883                 return -ENOMEM;
884         }
885         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
886
887         memset(peer, 0, sizeof(*peer));
888
889         CFS_INIT_LIST_HEAD(&peer->mxp_list);
890         peer->mxp_nid = nid;
891         /* peer->mxp_ni unused - may be used for multi-rail */
892         cfs_atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
893
894         peer->mxp_board = board;
895         peer->mxp_ep_id = ep_id;
896         peer->mxp_nic_id = nic_id;
897
898         CFS_INIT_LIST_HEAD(&peer->mxp_conns);
899         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
900         if (ret != 0) {
901                 mxlnd_peer_decref(peer);
902                 return ret;
903         }
904         CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
905
906         if (peer->mxp_nic_id != 0ULL)
907                 nic_id = peer->mxp_nic_id;
908
909         if (nic_id == 0ULL) {
910                 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
911                 if (ret == 0) {
912                         peer->mxp_nic_id = nic_id;
913                         mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
914                 }
915         }
916
917         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
918
919         /* peer->mxp_reconnect_time = 0 */
920         /* peer->mxp_incompatible = 0 */
921
922         *peerp = peer;
923         return 0;
924 }
925
926 static inline kmx_peer_t *
927 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
928 {
929         int             found   = 0;
930         int             hash    = 0;
931         kmx_peer_t      *peer   = NULL;
932
933         hash = mxlnd_nid_to_hash(nid);
934
935         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
936                 if (peer->mxp_nid == nid) {
937                         found = 1;
938                         mxlnd_peer_addref(peer);
939                         break;
940                 }
941         }
942         return (found ? peer : NULL);
943 }
944
945 static kmx_peer_t *
946 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
947 {
948         int             ret     = 0;
949         int             hash    = 0;
950         kmx_peer_t      *peer   = NULL;
951         kmx_peer_t      *old    = NULL;
952         cfs_rwlock_t    *g_lock = &kmxlnd_data.kmx_global_lock;
953
954         cfs_read_lock(g_lock);
955         peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
956
957         if ((peer && peer->mxp_conn) || /* found peer with conn or */
958             (!peer && !create)) {       /* did not find peer and do not create one */
959                 cfs_read_unlock(g_lock);
960                 return peer;
961         }
962
963         cfs_read_unlock(g_lock);
964
965         /* if peer but _not_ conn */
966         if (peer && !peer->mxp_conn) {
967                 if (create) {
968                         cfs_write_lock(g_lock);
969                         if (!peer->mxp_conn) { /* check again */
970                                 /* create the conn */
971                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
972                                 if (ret != 0) {
973                                         /* we tried, return the peer only.
974                                          * the caller needs to see if the conn exists */
975                                         CNETERR("%s: %s could not alloc conn\n",
976                                         __func__, libcfs_nid2str(peer->mxp_nid));
977                                 } else {
978                                         /* drop extra conn ref */
979                                         mxlnd_conn_decref(peer->mxp_conn);
980                                 }
981                         }
982                         cfs_write_unlock(g_lock);
983                 }
984                 return peer;
985         }
986
987         /* peer not found and we need to create one */
988         hash = mxlnd_nid_to_hash(nid);
989
990         /* create peer (and conn) */
991         /* adds conn ref for peer and one for this function */
992         ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
993                                *kmxlnd_tunables.kmx_ep_id, 0ULL);
994         if (ret != 0) /* no memory, peer is NULL */
995                 return NULL;
996
997         cfs_write_lock(g_lock);
998
999         /* look again */
1000         old = mxlnd_find_peer_by_nid_locked(nid);
1001         if (old) {
1002                 /* someone already created one */
1003                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1004                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1005                 mxlnd_peer_decref(peer);
1006                 peer = old;
1007         } else {
1008                 /* no other peer, use this one */
1009                 cfs_list_add_tail(&peer->mxp_list,
1010                                   &kmxlnd_data.kmx_peers[hash]);
1011                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
1012                 mxlnd_peer_addref(peer);
1013                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1014         }
1015
1016         cfs_write_unlock(g_lock);
1017
1018         return peer;
1019 }
1020
1021 static inline int
1022 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1023 {
1024         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1025                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1026                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1027                 tx->mxc_msg_type == MXLND_MSG_NOOP);
1028 }
1029
1030 /**
1031  * mxlnd_init_msg - set type and number of bytes
1032  * @msg - msg pointer
1033  * @type - of message
1034  * @body_nob - bytes in msg body
1035  */
1036 static inline void
1037 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1038 {
1039         msg->mxm_type = type;
1040         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
1041 }
1042
1043 static inline void
1044 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1045 {
1046         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
1047         kmx_msg_t       *msg    = NULL;
1048
1049         LASSERT (tx != NULL);
1050         LASSERT (nob <= MXLND_MSG_SIZE);
1051
1052         tx->mxc_nid = nid;
1053         /* tx->mxc_peer should have already been set if we know it */
1054         tx->mxc_msg_type = type;
1055         tx->mxc_nseg = 1;
1056         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1057         tx->mxc_seg.segment_length = nob;
1058         tx->mxc_pin_type = MX_PIN_PHYSICAL;
1059
1060         msg = tx->mxc_msg;
1061         msg->mxm_type = type;
1062         msg->mxm_nob  = nob;
1063
1064         return;
1065 }
1066
1067 static inline __u32
1068 mxlnd_cksum (void *ptr, int nob)
1069 {
1070         char  *c  = ptr;
1071         __u32  sum = 0;
1072
1073         while (nob-- > 0)
1074                 sum = ((sum << 1) | (sum >> 31)) + *c++;
1075
1076         /* ensure I don't return 0 (== no checksum) */
1077         return (sum == 0) ? 1 : sum;
1078 }
1079
1080 /**
1081  * mxlnd_pack_msg_locked - complete msg info
1082  * @tx - msg to send
1083  */
1084 static inline void
1085 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1086 {
1087         kmx_msg_t       *msg    = tx->mxc_msg;
1088
1089         /* type and nob should already be set in init_msg() */
1090         msg->mxm_magic    = MXLND_MSG_MAGIC;
1091         msg->mxm_version  = MXLND_MSG_VERSION;
1092         /*   mxm_type */
1093         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1094          * return credits as well */
1095         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1096             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1097                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
1098                 tx->mxc_conn->mxk_outstanding = 0;
1099         } else {
1100                 msg->mxm_credits  = 0;
1101         }
1102         /*   mxm_nob */
1103         msg->mxm_cksum    = 0;
1104         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
1105         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1106         msg->mxm_dstnid   = tx->mxc_nid;
1107         /* if it is a new peer, the dststamp will be 0 */
1108         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1109
1110         if (*kmxlnd_tunables.kmx_cksum) {
1111                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1112         }
1113 }
1114
1115 int
1116 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1117 {
1118         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
1119         __u32     msg_cksum     = 0;
1120         int       flip          = 0;
1121         int       msg_nob       = 0;
1122
1123         /* 6 bytes are enough to have received magic + version */
1124         if (nob < 6) {
1125                 CNETERR("not enough bytes for magic + hdr: %d\n", nob);
1126                 return -EPROTO;
1127         }
1128
1129         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1130                 flip = 0;
1131         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1132                 flip = 1;
1133         } else {
1134                 CNETERR("Bad magic: %08x\n", msg->mxm_magic);
1135                 return -EPROTO;
1136         }
1137
1138         if (msg->mxm_version !=
1139             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1140                 CNETERR("Bad version: %d\n", msg->mxm_version);
1141                 return -EPROTO;
1142         }
1143
1144         if (nob < hdr_size) {
1145                 CNETERR("not enough for a header: %d\n", nob);
1146                 return -EPROTO;
1147         }
1148
1149         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1150         if (msg_nob > nob) {
1151                 CNETERR("Short message: got %d, wanted %d\n", nob, msg_nob);
1152                 return -EPROTO;
1153         }
1154
1155         /* checksum must be computed with mxm_cksum zero and BEFORE anything
1156          * gets flipped */
1157         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1158         msg->mxm_cksum = 0;
1159         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1160                 CNETERR("Bad checksum\n");
1161                 return -EPROTO;
1162         }
1163         msg->mxm_cksum = msg_cksum;
1164
1165         if (flip) {
1166                 /* leave magic unflipped as a clue to peer endianness */
1167                 __swab16s(&msg->mxm_version);
1168                 CLASSERT (sizeof(msg->mxm_type) == 1);
1169                 CLASSERT (sizeof(msg->mxm_credits) == 1);
1170                 msg->mxm_nob = msg_nob;
1171                 __swab64s(&msg->mxm_srcnid);
1172                 __swab64s(&msg->mxm_srcstamp);
1173                 __swab64s(&msg->mxm_dstnid);
1174                 __swab64s(&msg->mxm_dststamp);
1175         }
1176
1177         if (msg->mxm_srcnid == LNET_NID_ANY) {
1178                 CNETERR("Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1179                 return -EPROTO;
1180         }
1181
1182         switch (msg->mxm_type) {
1183         default:
1184                 CNETERR("Unknown message type %x\n", msg->mxm_type);
1185                 return -EPROTO;
1186
1187         case MXLND_MSG_NOOP:
1188                 break;
1189
1190         case MXLND_MSG_EAGER:
1191                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1192                         CNETERR("Short EAGER: %d(%d)\n", msg_nob,
1193                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1194                         return -EPROTO;
1195                 }
1196                 break;
1197
1198         case MXLND_MSG_PUT_REQ:
1199                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1200                         CNETERR("Short PUT_REQ: %d(%d)\n", msg_nob,
1201                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1202                         return -EPROTO;
1203                 }
1204                 if (flip)
1205                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1206                 break;
1207
1208         case MXLND_MSG_PUT_ACK:
1209                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1210                         CNETERR("Short PUT_ACK: %d(%d)\n", msg_nob,
1211                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1212                         return -EPROTO;
1213                 }
1214                 if (flip) {
1215                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1216                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1217                 }
1218                 break;
1219
1220         case MXLND_MSG_GET_REQ:
1221                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1222                         CNETERR("Short GET_REQ: %d(%d)\n", msg_nob,
1223                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1224                         return -EPROTO;
1225                 }
1226                 if (flip) {
1227                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1228                 }
1229                 break;
1230
1231         case MXLND_MSG_CONN_REQ:
1232         case MXLND_MSG_CONN_ACK:
1233                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1234                         CNETERR("Short connreq/ack: %d(%d)\n", msg_nob,
1235                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1236                         return -EPROTO;
1237                 }
1238                 if (flip) {
1239                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1240                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1241                 }
1242                 break;
1243         }
1244         return 0;
1245 }
1246
1247
1248 /**
1249  * mxlnd_recv_msg
1250  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1251  * @rx
1252  * @msg_type
1253  * @cookie
1254  * @length - length of incoming message
1255  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1256  *
1257  * The caller gets the rx and sets nid, peer and conn if known.
1258  *
1259  * Returns 0 on success and -1 on failure
1260  */
1261 int
1262 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1263 {
1264         int             ret     = 0;
1265         mx_return_t     mxret   = MX_SUCCESS;
1266         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1267
1268         rx->mxc_msg_type = msg_type;
1269         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1270         rx->mxc_cookie = cookie;
1271         /* rx->mxc_match may already be set */
1272         /* rx->mxc_seg.segment_ptr is already set */
1273         rx->mxc_seg.segment_length = length;
1274         ret = mxlnd_q_pending_ctx(rx);
1275         if (ret == -1) {
1276                 /* the caller is responsible for calling conn_decref() if needed */
1277                 return -1;
1278         }
1279         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1280                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1281         if (mxret != MX_SUCCESS) {
1282                 mxlnd_deq_pending_ctx(rx);
1283                 CNETERR("mx_kirecv() failed with %s (%d)\n",
1284                         mx_strerror(mxret), (int) mxret);
1285                 return -1;
1286         }
1287         return 0;
1288 }
1289
1290
1291 /**
1292  * mxlnd_unexpected_recv - this is the callback function that will handle
1293  *                         unexpected receives
1294  * @context - NULL, ignore
1295  * @source - the peer's mx_endpoint_addr_t
1296  * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1297  * @length - length of incoming message
1298  * @data_if_available - used for CONN_[REQ|ACK]
1299  *
1300  * If it is an eager-sized msg, we will call recv_msg() with the actual
1301  * length. If it is a large message, we will call recv_msg() with a
1302  * length of 0 bytes to drop it because we should never have a large,
1303  * unexpected message.
1304  *
1305  * NOTE - The MX library blocks until this function completes. Make it as fast as
1306  * possible. DO NOT allocate memory which can block!
1307  *
1308  * If we cannot get a rx or the conn is closed, drop the message on the floor
1309  * (i.e. recv 0 bytes and ignore).
1310  */
1311 mx_unexp_handler_action_t
1312 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1313                  uint64_t match_value, uint32_t length, void *data_if_available)
1314 {
1315         int             ret             = 0;
1316         kmx_ctx_t       *rx             = NULL;
1317         mx_ksegment_t   seg;
1318         u8              msg_type        = 0;
1319         u8              error           = 0;
1320         u64             cookie          = 0ULL;
1321         kmx_conn_t      *conn           = NULL;
1322         kmx_peer_t      *peer           = NULL;
1323         u64             nic_id          = 0ULL;
1324         u32             ep_id           = 0;
1325         u32             sid             = 0;
1326
1327         /* TODO this will change to the net struct */
1328         if (context != NULL) {
1329                 CNETERR("non-NULL context\n");
1330         }
1331
1332 #if MXLND_DEBUG
1333         CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1334 #endif
1335
1336         mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1337         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1338         cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1339         mx_get_endpoint_addr_context(source, (void **) &conn);
1340         if (conn) {
1341                 mxlnd_conn_addref(conn); /* add ref for this function */
1342                 peer = conn->mxk_peer;
1343         }
1344         cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1345
1346         if (msg_type == MXLND_MSG_BYE) {
1347                 if (conn) {
1348                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1349                                         libcfs_nid2str(peer->mxp_nid));
1350                         mxlnd_conn_disconnect(conn, 1, 0);
1351                         mxlnd_conn_decref(conn); /* drop ref taken above */
1352                 }
1353                 return MX_RECV_FINISHED;
1354         }
1355
1356         if (msg_type == MXLND_MSG_CONN_REQ) {
1357                 kmx_connparams_t       *cp      = NULL;
1358                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1359                                                   sizeof(kmx_connreq_msg_t);
1360
1361                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1362                 if (unlikely(length != expected || !data_if_available)) {
1363                         CNETERR("received invalid CONN_REQ from %llx "
1364                                 "length=%d (expected %d)\n", nic_id, length, expected);
1365                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1366                         return MX_RECV_FINISHED;
1367                 }
1368
1369                 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1370                                          conn, peer, data_if_available);
1371                 if (unlikely(ret != 0)) {
1372                         CNETERR("unable to alloc CONN_REQ from %llx:%d\n",
1373                                 nic_id, ep_id);
1374                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1375                         return MX_RECV_FINISHED;
1376                 }
1377                 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1378                 cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1379                 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1380                 cfs_up(&kmxlnd_data.kmx_conn_sem);
1381                 return MX_RECV_FINISHED;
1382         }
1383         if (msg_type == MXLND_MSG_CONN_ACK) {
1384                 kmx_connparams_t  *cp           = NULL;
1385                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1386                                                   sizeof(kmx_connreq_msg_t);
1387
1388                 LASSERT(conn);
1389                 if (unlikely(error != 0)) {
1390                         CNETERR("received CONN_ACK from %s with error -%d\n",
1391                                libcfs_nid2str(peer->mxp_nid), (int) error);
1392                         mxlnd_conn_disconnect(conn, 1, 0);
1393                 } else if (unlikely(length != expected || !data_if_available)) {
1394                         CNETERR("received %s CONN_ACK from %s "
1395                                "length=%d (expected %d)\n",
1396                                data_if_available ? "short" : "missing",
1397                                libcfs_nid2str(peer->mxp_nid), length, expected);
1398                         mxlnd_conn_disconnect(conn, 1, 1);
1399                 } else {
1400                         /* peer is ready for messages */
1401                         ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1402                                          conn, peer, data_if_available);
1403                         if (unlikely(ret != 0)) {
1404                                 CNETERR("unable to alloc kmx_connparams_t"
1405                                                " from %llx:%d\n", nic_id, ep_id);
1406                                 mxlnd_conn_disconnect(conn, 1, 1);
1407                         } else {
1408                                 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1409                                 cfs_list_add_tail(&cp->mxr_list,
1410                                                   &kmxlnd_data.kmx_conn_reqs);
1411                                 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1412                                 cfs_up(&kmxlnd_data.kmx_conn_sem);
1413                         }
1414                 }
1415                 mxlnd_conn_decref(conn); /* drop ref taken above */
1416
1417                 return MX_RECV_FINISHED;
1418         }
1419
1420         /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1421
1422         LASSERT(peer != NULL && conn != NULL);
1423
1424         rx = mxlnd_get_idle_rx(conn);
1425         if (rx != NULL) {
1426                 if (length <= MXLND_MSG_SIZE) {
1427                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1428                 } else {
1429                         CNETERR("unexpected large receive with "
1430                                 "match_value=0x%llx length=%d\n",
1431                                 match_value, length);
1432                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1433                 }
1434
1435                 if (ret == 0) {
1436                         /* hold conn ref until rx completes */
1437                         rx->mxc_conn = conn;
1438                         rx->mxc_peer = peer;
1439                         rx->mxc_nid = peer->mxp_nid;
1440                 } else {
1441                         CNETERR("could not post receive\n");
1442                         mxlnd_put_idle_rx(rx);
1443                 }
1444         }
1445
1446         /* Encountered error, drop incoming message on the floor */
1447         /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1448          * uses the standard code path and acks the sender normally */
1449
1450         if (rx == NULL || ret != 0) {
1451                 mxlnd_conn_decref(conn); /* drop ref taken above */
1452                 if (rx == NULL) {
1453                         CNETERR("no idle rxs available - dropping rx"
1454                                 " 0x%llx from %s\n", match_value,
1455                                 libcfs_nid2str(peer->mxp_nid));
1456                 } else {
1457                         /* ret != 0 */
1458                         CNETERR("disconnected peer - dropping rx\n");
1459                 }
1460                 seg.segment_ptr = 0ULL;
1461                 seg.segment_length = 0;
1462                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1463                           match_value, ~0ULL, NULL, NULL);
1464         }
1465
1466         return MX_RECV_CONTINUE;
1467 }
1468
1469
1470 int
1471 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1472 {
1473         int              i      = 0;
1474         int              ret    = -ENOENT;
1475         kmx_peer_t      *peer   = NULL;
1476
1477         cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1478         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1479                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1480                                         mxp_list) {
1481                         if (index-- == 0) {
1482                                 *nidp = peer->mxp_nid;
1483                                 *count = cfs_atomic_read(&peer->mxp_refcount);
1484                                 ret = 0;
1485                                 break;
1486                         }
1487                 }
1488         }
1489         cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1490
1491         return ret;
1492 }
1493
1494 void
1495 mxlnd_del_peer_locked(kmx_peer_t *peer)
1496 {
1497         if (peer->mxp_conn) {
1498                 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1499         } else {
1500                 cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
1501                 mxlnd_peer_decref(peer); /* drop global list ref */
1502         }
1503         return;
1504 }
1505
1506 int
1507 mxlnd_del_peer(lnet_nid_t nid)
1508 {
1509         int             i       = 0;
1510         int             ret     = 0;
1511         kmx_peer_t      *peer   = NULL;
1512         kmx_peer_t      *next   = NULL;
1513
1514         if (nid != LNET_NID_ANY) {
1515                 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1516         }
1517         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1518         if (nid != LNET_NID_ANY) {
1519                 if (peer == NULL) {
1520                         ret = -ENOENT;
1521                 } else {
1522                         mxlnd_peer_decref(peer); /* and drops it */
1523                         mxlnd_del_peer_locked(peer);
1524                 }
1525         } else { /* LNET_NID_ANY */
1526                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1527                         cfs_list_for_each_entry_safe(peer, next,
1528                                                      &kmxlnd_data.kmx_peers[i],
1529                                                      mxp_list) {
1530                                 mxlnd_del_peer_locked(peer);
1531                         }
1532                 }
1533         }
1534         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1535
1536         return ret;
1537 }
1538
1539 kmx_conn_t *
1540 mxlnd_get_conn_by_idx(int index)
1541 {
1542         int              i      = 0;
1543         kmx_peer_t      *peer   = NULL;
1544         kmx_conn_t      *conn   = NULL;
1545
1546         cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1547         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1548                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1549                                         mxp_list) {
1550                         cfs_list_for_each_entry(conn, &peer->mxp_conns,
1551                                                 mxk_list) {
1552                                 if (index-- > 0) {
1553                                         continue;
1554                                 }
1555
1556                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1557                                 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1558                                 return conn;
1559                         }
1560                 }
1561         }
1562         cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1563
1564         return NULL;
1565 }
1566
1567 void
1568 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1569 {
1570         kmx_conn_t      *conn   = NULL;
1571         kmx_conn_t      *next   = NULL;
1572
1573         cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1574                 mxlnd_conn_disconnect(conn, 0, 1);
1575
1576         return;
1577 }
1578
1579 int
1580 mxlnd_close_matching_conns(lnet_nid_t nid)
1581 {
1582         int             i       = 0;
1583         int             ret     = 0;
1584         kmx_peer_t      *peer   = NULL;
1585
1586         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1587         if (nid != LNET_NID_ANY) {
1588                 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1589                 if (peer == NULL) {
1590                         ret = -ENOENT;
1591                 } else {
1592                         mxlnd_close_matching_conns_locked(peer);
1593                         mxlnd_peer_decref(peer); /* and drops it here */
1594                 }
1595         } else { /* LNET_NID_ANY */
1596                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1597                         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],                                                mxp_list)
1598                                 mxlnd_close_matching_conns_locked(peer);
1599                 }
1600         }
1601         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1602
1603         return ret;
1604 }
1605
1606 /**
1607  * mxlnd_ctl - modify MXLND parameters
1608  * @ni - LNET interface handle
1609  * @cmd - command to change
1610  * @arg - the ioctl data
1611  */
1612 int
1613 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1614 {
1615         struct libcfs_ioctl_data *data  = arg;
1616         int                       ret   = -EINVAL;
1617
1618         LASSERT (ni == kmxlnd_data.kmx_ni);
1619
1620         switch (cmd) {
1621         case IOC_LIBCFS_GET_PEER: {
1622                 lnet_nid_t      nid     = 0;
1623                 int             count   = 0;
1624
1625                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1626                 data->ioc_nid    = nid;
1627                 data->ioc_count  = count;
1628                 break;
1629         }
1630         case IOC_LIBCFS_DEL_PEER: {
1631                 ret = mxlnd_del_peer(data->ioc_nid);
1632                 break;
1633         }
1634         case IOC_LIBCFS_GET_CONN: {
1635                 kmx_conn_t      *conn = NULL;
1636
1637                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1638                 if (conn == NULL) {
1639                         ret = -ENOENT;
1640                 } else {
1641                         ret = 0;
1642                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1643                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1644                 }
1645                 break;
1646         }
1647         case IOC_LIBCFS_CLOSE_CONNECTION: {
1648                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1649                 break;
1650         }
1651         default:
1652                 CNETERR("unknown ctl(%d)\n", cmd);
1653                 break;
1654         }
1655
1656         return ret;
1657 }
1658
1659 /**
1660  * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1661  * @tx
1662  *
1663  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1664  */
1665 void
1666 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1667 {
1668         u8              msg_type        = tx->mxc_msg_type;
1669         kmx_conn_t      *conn           = tx->mxc_conn;
1670
1671         LASSERT (msg_type != 0);
1672         LASSERT (tx->mxc_nid != 0);
1673         LASSERT (tx->mxc_peer != NULL);
1674         LASSERT (tx->mxc_conn != NULL);
1675
1676         tx->mxc_incarnation = conn->mxk_incarnation;
1677
1678         if (msg_type != MXLND_MSG_PUT_DATA &&
1679             msg_type != MXLND_MSG_GET_DATA) {
1680                 /* msg style tx */
1681                 if (mxlnd_tx_requires_credit(tx)) {
1682                         cfs_list_add_tail(&tx->mxc_list,
1683                                           &conn->mxk_tx_credit_queue);
1684                         conn->mxk_ntx_msgs++;
1685                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1686                            msg_type == MXLND_MSG_CONN_ACK) {
1687                         /* put conn msgs at the front of the queue */
1688                         cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1689                 } else {
1690                         /* PUT_ACK, PUT_NAK */
1691                         cfs_list_add_tail(&tx->mxc_list,
1692                                           &conn->mxk_tx_free_queue);
1693                         conn->mxk_ntx_msgs++;
1694                 }
1695         } else {
1696                 /* data style tx */
1697                 cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1698                 conn->mxk_ntx_data++;
1699         }
1700
1701         return;
1702 }
1703
1704 /**
1705  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1706  * @tx
1707  *
1708  * Add the tx to the peer's msg or data queue
1709  */
1710 static inline void
1711 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1712 {
1713         LASSERT(tx->mxc_peer != NULL);
1714         LASSERT(tx->mxc_conn != NULL);
1715         cfs_spin_lock(&tx->mxc_conn->mxk_lock);
1716         mxlnd_peer_queue_tx_locked(tx);
1717         cfs_spin_unlock(&tx->mxc_conn->mxk_lock);
1718
1719         return;
1720 }
1721
1722 /**
1723  * mxlnd_queue_tx - add the tx to the global tx queue
1724  * @tx
1725  *
1726  * Add the tx to the global queue and up the tx_queue_sem
1727  */
1728 void
1729 mxlnd_queue_tx(kmx_ctx_t *tx)
1730 {
1731         kmx_peer_t *peer   = tx->mxc_peer;
1732         LASSERT (tx->mxc_nid != 0);
1733
1734         if (peer != NULL) {
1735                 if (peer->mxp_incompatible &&
1736                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1737                         /* let this fail now */
1738                         tx->mxc_errno = -ECONNABORTED;
1739                         mxlnd_conn_decref(peer->mxp_conn);
1740                         mxlnd_put_idle_tx(tx);
1741                         return;
1742                 }
1743                 if (tx->mxc_conn == NULL) {
1744                         int             ret     = 0;
1745                         kmx_conn_t      *conn   = NULL;
1746
1747                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1748                         if (ret != 0) {
1749                                 tx->mxc_errno = ret;
1750                                 mxlnd_put_idle_tx(tx);
1751                                 goto done;
1752                         }
1753                         tx->mxc_conn = conn;
1754                         mxlnd_peer_decref(peer); /* and takes it from peer */
1755                 }
1756                 LASSERT(tx->mxc_conn != NULL);
1757                 mxlnd_peer_queue_tx(tx);
1758                 mxlnd_check_sends(peer);
1759         } else {
1760                 cfs_spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1761                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1762                 cfs_spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1763                 cfs_up(&kmxlnd_data.kmx_tx_queue_sem);
1764         }
1765 done:
1766         return;
1767 }
1768
1769 int
1770 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1771 {
1772         int             i                       = 0;
1773         int             sum                     = 0;
1774         int             old_sum                 = 0;
1775         int             nseg                    = 0;
1776         int             first_iov               = -1;
1777         int             first_iov_offset        = 0;
1778         int             first_found             = 0;
1779         int             last_iov                = -1;
1780         int             last_iov_length         = 0;
1781         mx_ksegment_t  *seg                     = NULL;
1782
1783         if (niov == 0) return 0;
1784         LASSERT(iov != NULL);
1785
1786         for (i = 0; i < niov; i++) {
1787                 sum = old_sum + (u32) iov[i].iov_len;
1788                 if (!first_found && (sum > offset)) {
1789                         first_iov = i;
1790                         first_iov_offset = offset - old_sum;
1791                         first_found = 1;
1792                         sum = (u32) iov[i].iov_len - first_iov_offset;
1793                         old_sum = 0;
1794                 }
1795                 if (sum >= nob) {
1796                         last_iov = i;
1797                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1798                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1799                         break;
1800                 }
1801                 old_sum = sum;
1802         }
1803         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1804         nseg = last_iov - first_iov + 1;
1805         LASSERT(nseg > 0);
1806
1807         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1808         if (seg == NULL) {
1809                 CNETERR("MXLND_ALLOC() failed\n");
1810                 return -1;
1811         }
1812         memset(seg, 0, nseg * sizeof(*seg));
1813         ctx->mxc_nseg = nseg;
1814         sum = 0;
1815         for (i = 0; i < nseg; i++) {
1816                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1817                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1818                 if (i == 0) {
1819                         seg[i].segment_ptr += (u64) first_iov_offset;
1820                         seg[i].segment_length -= (u32) first_iov_offset;
1821                 }
1822                 if (i == (nseg - 1)) {
1823                         seg[i].segment_length = (u32) last_iov_length;
1824                 }
1825                 sum += seg[i].segment_length;
1826         }
1827         ctx->mxc_seg_list = seg;
1828         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1829 #ifdef MX_PIN_FULLPAGES
1830         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1831 #endif
1832         LASSERT(nob == sum);
1833         return 0;
1834 }
1835
1836 int
1837 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1838 {
1839         int             i                       = 0;
1840         int             sum                     = 0;
1841         int             old_sum                 = 0;
1842         int             nseg                    = 0;
1843         int             first_kiov              = -1;
1844         int             first_kiov_offset       = 0;
1845         int             first_found             = 0;
1846         int             last_kiov               = -1;
1847         int             last_kiov_length        = 0;
1848         mx_ksegment_t  *seg                     = NULL;
1849
1850         if (niov == 0) return 0;
1851         LASSERT(kiov != NULL);
1852
1853         for (i = 0; i < niov; i++) {
1854                 sum = old_sum + kiov[i].kiov_len;
1855                 if (i == 0) sum -= kiov[i].kiov_offset;
1856                 if (!first_found && (sum > offset)) {
1857                         first_kiov = i;
1858                         first_kiov_offset = offset - old_sum;
1859                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1860                         first_found = 1;
1861                         sum = kiov[i].kiov_len - first_kiov_offset;
1862                         old_sum = 0;
1863                 }
1864                 if (sum >= nob) {
1865                         last_kiov = i;
1866                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1867                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1868                         break;
1869                 }
1870                 old_sum = sum;
1871         }
1872         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1873         nseg = last_kiov - first_kiov + 1;
1874         LASSERT(nseg > 0);
1875
1876         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1877         if (seg == NULL) {
1878                 CNETERR("MXLND_ALLOC() failed\n");
1879                 return -1;
1880         }
1881         memset(seg, 0, niov * sizeof(*seg));
1882         ctx->mxc_nseg = niov;
1883         sum = 0;
1884         for (i = 0; i < niov; i++) {
1885                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1886                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1887                 if (i == 0) {
1888                         seg[i].segment_ptr += (u64) first_kiov_offset;
1889                         /* we have to add back the original kiov_offset */
1890                         seg[i].segment_length -= first_kiov_offset +
1891                                                  kiov[first_kiov].kiov_offset;
1892                 }
1893                 if (i == (nseg - 1)) {
1894                         seg[i].segment_length = last_kiov_length;
1895                 }
1896                 sum += seg[i].segment_length;
1897         }
1898         ctx->mxc_seg_list = seg;
1899         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1900 #ifdef MX_PIN_FULLPAGES
1901         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1902 #endif
1903         LASSERT(nob == sum);
1904         return 0;
1905 }
1906
1907 void
1908 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1909 {
1910         LASSERT(type == MXLND_MSG_PUT_ACK);
1911         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1912         tx->mxc_cookie = cookie;
1913         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1914         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1915         tx->mxc_match = mxlnd_create_match(tx, status);
1916
1917         mxlnd_queue_tx(tx);
1918 }
1919
1920
1921 /**
1922  * mxlnd_send_data - get tx, map [k]iov, queue tx
1923  * @ni
1924  * @lntmsg
1925  * @peer
1926  * @msg_type
1927  * @cookie
1928  *
1929  * This setups the DATA send for PUT or GET.
1930  *
1931  * On success, it queues the tx, on failure it calls lnet_finalize()
1932  */
1933 void
1934 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1935 {
1936         int                     ret     = 0;
1937         lnet_process_id_t       target  = lntmsg->msg_target;
1938         unsigned int            niov    = lntmsg->msg_niov;
1939         struct iovec           *iov     = lntmsg->msg_iov;
1940         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1941         unsigned int            offset  = lntmsg->msg_offset;
1942         unsigned int            nob     = lntmsg->msg_len;
1943         kmx_ctx_t              *tx      = NULL;
1944
1945         LASSERT(lntmsg != NULL);
1946         LASSERT(peer != NULL);
1947         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1948         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1949
1950         tx = mxlnd_get_idle_tx();
1951         if (tx == NULL) {
1952                 CNETERR("Can't allocate %s tx for %s\n",
1953                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1954                         libcfs_nid2str(target.nid));
1955                 goto failed_0;
1956         }
1957         tx->mxc_nid = target.nid;
1958         /* NOTE called when we have a ref on the conn, get one for this tx */
1959         mxlnd_conn_addref(peer->mxp_conn);
1960         tx->mxc_peer = peer;
1961         tx->mxc_conn = peer->mxp_conn;
1962         tx->mxc_msg_type = msg_type;
1963         tx->mxc_lntmsg[0] = lntmsg;
1964         tx->mxc_cookie = cookie;
1965         tx->mxc_match = mxlnd_create_match(tx, 0);
1966
1967         /* This setups up the mx_ksegment_t to send the DATA payload  */
1968         if (nob == 0) {
1969                 /* do not setup the segments */
1970                 CNETERR("nob = 0; why didn't we use an EAGER reply "
1971                         "to %s?\n", libcfs_nid2str(target.nid));
1972                 ret = 0;
1973         } else if (kiov == NULL) {
1974                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1975         } else {
1976                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1977         }
1978         if (ret != 0) {
1979                 CNETERR("Can't setup send DATA for %s\n",
1980                         libcfs_nid2str(target.nid));
1981                 tx->mxc_errno = -EIO;
1982                 goto failed_1;
1983         }
1984         mxlnd_queue_tx(tx);
1985         return;
1986
1987 failed_1:
1988         mxlnd_conn_decref(peer->mxp_conn);
1989         mxlnd_put_idle_tx(tx);
1990         return;
1991
1992 failed_0:
1993         CNETERR("no tx avail\n");
1994         lnet_finalize(ni, lntmsg, -EIO);
1995         return;
1996 }
1997
1998 /**
1999  * mxlnd_recv_data - map [k]iov, post rx
2000  * @ni
2001  * @lntmsg
2002  * @rx
2003  * @msg_type
2004  * @cookie
2005  *
2006  * This setups the DATA receive for PUT or GET.
2007  *
2008  * On success, it returns 0, on failure it returns -1
2009  */
2010 int
2011 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2012 {
2013         int                     ret     = 0;
2014         lnet_process_id_t       target  = lntmsg->msg_target;
2015         unsigned int            niov    = lntmsg->msg_niov;
2016         struct iovec           *iov     = lntmsg->msg_iov;
2017         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
2018         unsigned int            offset  = lntmsg->msg_offset;
2019         unsigned int            nob     = lntmsg->msg_len;
2020         mx_return_t             mxret   = MX_SUCCESS;
2021         u64                     mask    = ~(MXLND_ERROR_MASK);
2022
2023         /* above assumes MXLND_MSG_PUT_DATA */
2024         if (msg_type == MXLND_MSG_GET_DATA) {
2025                 niov = lntmsg->msg_md->md_niov;
2026                 iov = lntmsg->msg_md->md_iov.iov;
2027                 kiov = lntmsg->msg_md->md_iov.kiov;
2028                 offset = 0;
2029                 nob = lntmsg->msg_md->md_length;
2030         }
2031
2032         LASSERT(lntmsg != NULL);
2033         LASSERT(rx != NULL);
2034         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2035         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2036
2037         rx->mxc_msg_type = msg_type;
2038         rx->mxc_state = MXLND_CTX_PENDING;
2039         rx->mxc_nid = target.nid;
2040         /* if posting a GET_DATA, we may not yet know the peer */
2041         if (rx->mxc_peer != NULL) {
2042                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2043         }
2044         rx->mxc_lntmsg[0] = lntmsg;
2045         rx->mxc_cookie = cookie;
2046         rx->mxc_match = mxlnd_create_match(rx, 0);
2047         /* This setups up the mx_ksegment_t to receive the DATA payload  */
2048         if (kiov == NULL) {
2049                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2050         } else {
2051                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2052         }
2053         if (msg_type == MXLND_MSG_GET_DATA) {
2054                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2055                 if (rx->mxc_lntmsg[1] == NULL) {
2056                         CNETERR("Can't create reply for GET -> %s\n",
2057                                 libcfs_nid2str(target.nid));
2058                         ret = -1;
2059                 }
2060         }
2061         if (ret != 0) {
2062                 CNETERR("Can't setup %s rx for %s\n",
2063                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2064                        libcfs_nid2str(target.nid));
2065                 return -1;
2066         }
2067         ret = mxlnd_q_pending_ctx(rx);
2068         if (ret == -1) {
2069                 return -1;
2070         }
2071         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2072         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2073                           rx->mxc_seg_list, rx->mxc_nseg,
2074                           rx->mxc_pin_type, rx->mxc_match,
2075                           mask, (void *) rx,
2076                           &rx->mxc_mxreq);
2077         if (mxret != MX_SUCCESS) {
2078                 if (rx->mxc_conn != NULL) {
2079                         mxlnd_deq_pending_ctx(rx);
2080                 }
2081                 CNETERR("mx_kirecv() failed with %d for %s\n",
2082                         (int) mxret, libcfs_nid2str(target.nid));
2083                 return -1;
2084         }
2085
2086         return 0;
2087 }
2088
2089 /**
2090  * mxlnd_send - the LND required send function
2091  * @ni
2092  * @private
2093  * @lntmsg
2094  *
2095  * This must not block. Since we may not have a peer struct for the receiver,
2096  * it will append send messages on a global tx list. We will then up the
2097  * tx_queued's semaphore to notify it of the new send.
2098  */
2099 int
2100 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2101 {
2102         int                     ret             = 0;
2103         int                     type            = lntmsg->msg_type;
2104         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
2105         lnet_process_id_t       target          = lntmsg->msg_target;
2106         lnet_nid_t              nid             = target.nid;
2107         int                     target_is_router = lntmsg->msg_target_is_router;
2108         int                     routing         = lntmsg->msg_routing;
2109         unsigned int            payload_niov    = lntmsg->msg_niov;
2110         struct iovec           *payload_iov     = lntmsg->msg_iov;
2111         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
2112         unsigned int            payload_offset  = lntmsg->msg_offset;
2113         unsigned int            payload_nob     = lntmsg->msg_len;
2114         kmx_ctx_t              *tx              = NULL;
2115         kmx_msg_t              *txmsg           = NULL;
2116         kmx_ctx_t              *rx              = (kmx_ctx_t *) private; /* for REPLY */
2117         kmx_ctx_t              *rx_data         = NULL;
2118         kmx_conn_t             *conn            = NULL;
2119         int                     nob             = 0;
2120         uint32_t                length          = 0;
2121         kmx_peer_t             *peer            = NULL;
2122         cfs_rwlock_t           *g_lock          = &kmxlnd_data.kmx_global_lock;
2123
2124         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2125                        payload_nob, payload_niov, libcfs_id2str(target));
2126
2127         LASSERT (payload_nob == 0 || payload_niov > 0);
2128         LASSERT (payload_niov <= LNET_MAX_IOV);
2129         /* payload is either all vaddrs or all pages */
2130         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2131
2132         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2133
2134         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2135          * to a new peer, so create one if not found */
2136         peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2137         if (peer == NULL || peer->mxp_conn == NULL) {
2138                 /* we could not find it nor could we create one or
2139                  * one exists but we cannot create a conn,
2140                  * fail this message */
2141                 if (peer) {
2142                         /* found peer without conn, drop ref taken above */
2143                         LASSERT(peer->mxp_conn == NULL);
2144                         mxlnd_peer_decref(peer);
2145                 }
2146                 return -ENOMEM;
2147         }
2148
2149         /* we have a peer with a conn */
2150
2151         if (unlikely(peer->mxp_incompatible)) {
2152                 mxlnd_peer_decref(peer); /* drop ref taken above */
2153         } else {
2154                 cfs_read_lock(g_lock);
2155                 conn = peer->mxp_conn;
2156                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
2157                         mxlnd_conn_addref(conn);
2158                 } else {
2159                         conn = NULL;
2160                 }
2161                 cfs_read_unlock(g_lock);
2162                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2163                 if (!conn)
2164                         return -ENOTCONN;
2165         }
2166
2167         LASSERT(peer && conn);
2168
2169         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2170
2171         switch (type) {
2172         case LNET_MSG_ACK:
2173                 LASSERT (payload_nob == 0);
2174                 break;
2175
2176         case LNET_MSG_REPLY:
2177         case LNET_MSG_PUT:
2178                 /* Is the payload small enough not to need DATA? */
2179                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2180                 if (nob <= MXLND_MSG_SIZE)
2181                         break;                  /* send EAGER */
2182
2183                 tx = mxlnd_get_idle_tx();
2184                 if (unlikely(tx == NULL)) {
2185                         CNETERR("Can't allocate %s tx for %s\n",
2186                                 type == LNET_MSG_PUT ? "PUT" : "REPLY",
2187                                 libcfs_nid2str(nid));
2188                         if (conn) mxlnd_conn_decref(conn);
2189                         return -ENOMEM;
2190                 }
2191
2192                 tx->mxc_peer = peer;
2193                 tx->mxc_conn = conn;
2194                 /* we added a conn ref above */
2195                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2196                 txmsg = tx->mxc_msg;
2197                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2198                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2199                 tx->mxc_match = mxlnd_create_match(tx, 0);
2200
2201                 /* we must post a receive _before_ sending the request.
2202                  * we need to determine how much to receive, it will be either
2203                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
2204
2205                 rx = mxlnd_get_idle_rx(conn);
2206                 if (unlikely(rx == NULL)) {
2207                         CNETERR("Can't allocate rx for PUT_ACK for %s\n",
2208                                 libcfs_nid2str(nid));
2209                         mxlnd_put_idle_tx(tx);
2210                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2211                         return -ENOMEM;
2212                 }
2213                 rx->mxc_nid = nid;
2214                 rx->mxc_peer = peer;
2215                 mxlnd_conn_addref(conn); /* for this rx */
2216                 rx->mxc_conn = conn;
2217                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2218                 rx->mxc_cookie = tx->mxc_cookie;
2219                 rx->mxc_match = mxlnd_create_match(rx, 0);
2220
2221                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2222                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2223                 if (unlikely(ret != 0)) {
2224                         CNETERR("recv_msg() failed for PUT_ACK for %s\n",
2225                                            libcfs_nid2str(nid));
2226                         rx->mxc_lntmsg[0] = NULL;
2227                         mxlnd_put_idle_rx(rx);
2228                         mxlnd_put_idle_tx(tx);
2229                         mxlnd_conn_decref(conn); /* for the rx... */
2230                         mxlnd_conn_decref(conn); /* and for the tx */
2231                         return -EHOSTUNREACH;
2232                 }
2233
2234                 mxlnd_queue_tx(tx);
2235                 return 0;
2236
2237         case LNET_MSG_GET:
2238                 if (routing || target_is_router)
2239                         break;                  /* send EAGER */
2240
2241                 /* is the REPLY message too small for DATA? */
2242                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2243                 if (nob <= MXLND_MSG_SIZE)
2244                         break;                  /* send EAGER */
2245
2246                 /* get tx (we need the cookie) , post rx for incoming DATA,
2247                  * then post GET_REQ tx */
2248                 tx = mxlnd_get_idle_tx();
2249                 if (unlikely(tx == NULL)) {
2250                         CNETERR("Can't allocate GET tx for %s\n",
2251                                 libcfs_nid2str(nid));
2252                         mxlnd_conn_decref(conn); /* for the ref taken above */
2253                         return -ENOMEM;
2254                 }
2255                 rx_data = mxlnd_get_idle_rx(conn);
2256                 if (unlikely(rx_data == NULL)) {
2257                         CNETERR("Can't allocate DATA rx for %s\n",
2258                                 libcfs_nid2str(nid));
2259                         mxlnd_put_idle_tx(tx);
2260                         mxlnd_conn_decref(conn); /* for the ref taken above */
2261                         return -ENOMEM;
2262                 }
2263                 rx_data->mxc_peer = peer;
2264                 /* NOTE no need to lock peer before adding conn ref since we took
2265                  * a conn ref for the tx (it cannot be freed between there and here ) */
2266                 mxlnd_conn_addref(conn); /* for the rx_data */
2267                 rx_data->mxc_conn = conn;
2268
2269                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2270                 if (unlikely(ret != 0)) {
2271                         CNETERR("Can't setup GET sink for %s\n",
2272                                 libcfs_nid2str(nid));
2273                         mxlnd_put_idle_rx(rx_data);
2274                         mxlnd_put_idle_tx(tx);
2275                         mxlnd_conn_decref(conn); /* for the rx_data... */
2276                         mxlnd_conn_decref(conn); /* and for the tx */
2277                         return -EIO;
2278                 }
2279
2280                 tx->mxc_peer = peer;
2281                 tx->mxc_conn = conn;
2282                 /* conn ref taken above */
2283                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2284                 txmsg = tx->mxc_msg;
2285                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2286                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2287                 tx->mxc_match = mxlnd_create_match(tx, 0);
2288
2289                 mxlnd_queue_tx(tx);
2290                 return 0;
2291
2292         default:
2293                 LBUG();
2294                 mxlnd_conn_decref(conn); /* drop ref taken above */
2295                 return -EIO;
2296         }
2297
2298         /* send EAGER */
2299
2300         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2301                 <= MXLND_MSG_SIZE);
2302
2303         tx = mxlnd_get_idle_tx();
2304         if (unlikely(tx == NULL)) {
2305                 CNETERR("Can't send %s to %s: tx descs exhausted\n",
2306                         mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2307                 mxlnd_conn_decref(conn); /* drop ref taken above */
2308                 return -ENOMEM;
2309         }
2310
2311         tx->mxc_peer = peer;
2312         tx->mxc_conn = conn;
2313         /* conn ref taken above */
2314         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2315         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2316         tx->mxc_match = mxlnd_create_match(tx, 0);
2317
2318         txmsg = tx->mxc_msg;
2319         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2320
2321         if (payload_kiov != NULL)
2322                 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2323                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2324                             payload_niov, payload_kiov, payload_offset, payload_nob);
2325         else
2326                 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2327                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2328                             payload_niov, payload_iov, payload_offset, payload_nob);
2329
2330         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2331         mxlnd_queue_tx(tx);
2332         return 0;
2333 }
2334
2335 /**
2336  * mxlnd_recv - the LND required recv function
2337  * @ni
2338  * @private
2339  * @lntmsg
2340  * @delayed
2341  * @niov
2342  * @kiov
2343  * @offset
2344  * @mlen
2345  * @rlen
2346  *
2347  * This must not block.
2348  */
2349 int
2350 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2351              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2352              unsigned int offset, unsigned int mlen, unsigned int rlen)
2353 {
2354         int             ret             = 0;
2355         int             nob             = 0;
2356         int             len             = 0;
2357         kmx_ctx_t       *rx             = private;
2358         kmx_msg_t       *rxmsg          = rx->mxc_msg;
2359         lnet_nid_t       nid            = rx->mxc_nid;
2360         kmx_ctx_t       *tx             = NULL;
2361         kmx_msg_t       *txmsg          = NULL;
2362         kmx_peer_t      *peer           = rx->mxc_peer;
2363         kmx_conn_t      *conn           = peer->mxp_conn;
2364         u64              cookie         = 0ULL;
2365         int              msg_type       = rxmsg->mxm_type;
2366         int              repost         = 1;
2367         int              credit         = 0;
2368         int              finalize       = 0;
2369
2370         LASSERT (mlen <= rlen);
2371         /* Either all pages or all vaddrs */
2372         LASSERT (!(kiov != NULL && iov != NULL));
2373         LASSERT (peer && conn);
2374
2375         /* conn_addref(conn) already taken for the primary rx */
2376
2377         switch (msg_type) {
2378         case MXLND_MSG_EAGER:
2379                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2380                 len = rx->mxc_status.xfer_length;
2381                 if (unlikely(nob > len)) {
2382                         CNETERR("Eager message from %s too big: %d(%d)\n",
2383                                 libcfs_nid2str(nid), nob, len);
2384                         ret = -EPROTO;
2385                         break;
2386                 }
2387
2388                 if (kiov != NULL)
2389                         lnet_copy_flat2kiov(niov, kiov, offset,
2390                                 MXLND_MSG_SIZE, rxmsg,
2391                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2392                                 mlen);
2393                 else
2394                         lnet_copy_flat2iov(niov, iov, offset,
2395                                 MXLND_MSG_SIZE, rxmsg,
2396                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2397                                 mlen);
2398                 finalize = 1;
2399                 credit = 1;
2400                 break;
2401
2402         case MXLND_MSG_PUT_REQ:
2403                 /* we are going to reuse the rx, store the needed info */
2404                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2405
2406                 /* get tx, post rx, send PUT_ACK */
2407
2408                 tx = mxlnd_get_idle_tx();
2409                 if (unlikely(tx == NULL)) {
2410                         CNETERR("Can't allocate tx for %s\n", libcfs_nid2str(nid));
2411                         /* Not replying will break the connection */
2412                         ret = -ENOMEM;
2413                         break;
2414                 }
2415                 if (unlikely(mlen == 0)) {
2416                         finalize = 1;
2417                         tx->mxc_peer = peer;
2418                         tx->mxc_conn = conn;
2419                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2420                         /* repost = 1 */
2421                         break;
2422                 }
2423
2424                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2425                 tx->mxc_peer = peer;
2426                 tx->mxc_conn = conn;
2427                 /* no need to lock peer first since we already have a ref */
2428                 mxlnd_conn_addref(conn); /* for the tx */
2429                 txmsg = tx->mxc_msg;
2430                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2431                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2432                 tx->mxc_cookie = cookie;
2433                 tx->mxc_match = mxlnd_create_match(tx, 0);
2434
2435                 /* we must post a receive _before_ sending the PUT_ACK */
2436                 mxlnd_ctx_init(rx);
2437                 rx->mxc_state = MXLND_CTX_PREP;
2438                 rx->mxc_peer = peer;
2439                 rx->mxc_conn = conn;
2440                 /* do not take another ref for this rx, it is already taken */
2441                 rx->mxc_nid = peer->mxp_nid;
2442                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2443                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2444
2445                 if (unlikely(ret != 0)) {
2446                         /* Notify peer that it's over */
2447                         CNETERR("Can't setup PUT_DATA rx for %s: %d\n",
2448                                 libcfs_nid2str(nid), ret);
2449                         mxlnd_ctx_init(tx);
2450                         tx->mxc_state = MXLND_CTX_PREP;
2451                         tx->mxc_peer = peer;
2452                         tx->mxc_conn = conn;
2453                         /* finalize = 0, let the PUT_ACK tx finalize this */
2454                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2455                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2456                         /* conn ref already taken above */
2457                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2458                         /* repost = 1 */
2459                         break;
2460                 }
2461
2462                 mxlnd_queue_tx(tx);
2463                 /* do not return a credit until after PUT_DATA returns */
2464                 repost = 0;
2465                 break;
2466
2467         case MXLND_MSG_GET_REQ:
2468                 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2469
2470                 if (likely(lntmsg != NULL)) {
2471                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2472                                         cookie);
2473                 } else {
2474                         /* GET didn't match anything */
2475                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2476                          * We have to embed the error code in the match bits.
2477                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2478                         tx = mxlnd_get_idle_tx();
2479                         if (unlikely(tx == NULL)) {
2480                                 CNETERR("Can't get tx for GET NAK for %s\n",
2481                                         libcfs_nid2str(nid));
2482                                 /* we can't get a tx, notify the peer that the GET failed */
2483                                 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2484                                                    ENODATA, cookie);
2485                                 ret = -ENOMEM;
2486                                 break;
2487                         }
2488                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2489                         tx->mxc_state = MXLND_CTX_PENDING;
2490                         tx->mxc_nid = nid;
2491                         tx->mxc_peer = peer;
2492                         tx->mxc_conn = conn;
2493                         /* no need to lock peer first since we already have a ref */
2494                         mxlnd_conn_addref(conn); /* for this tx */
2495                         tx->mxc_cookie = cookie;
2496                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2497                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2498                         mxlnd_queue_tx(tx);
2499                 }
2500                 /* finalize lntmsg after tx completes */
2501                 break;
2502
2503         default:
2504                 LBUG();
2505         }
2506
2507         if (repost) {
2508                 /* we received a message, increment peer's outstanding credits */
2509                 if (credit == 1) {
2510                         cfs_spin_lock(&conn->mxk_lock);
2511                         conn->mxk_outstanding++;
2512                         cfs_spin_unlock(&conn->mxk_lock);
2513                 }
2514                 /* we are done with the rx */
2515                 mxlnd_put_idle_rx(rx);
2516                 mxlnd_conn_decref(conn);
2517         }
2518
2519         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2520
2521         /* we received a credit, see if we can use it to send a msg */
2522         if (credit) mxlnd_check_sends(peer);
2523
2524         return ret;
2525 }
2526
2527 void
2528 mxlnd_sleep(unsigned long timeout)
2529 {
2530         cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2531         cfs_schedule_timeout(timeout);
2532         return;
2533 }
2534
2535 /**
2536  * mxlnd_tx_queued - the generic send queue thread
2537  * @arg - thread id (as a void *)
2538  *
2539  * This thread moves send messages from the global tx_queue to the owning
2540  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2541  * it to the global peer list.
2542  */
2543 int
2544 mxlnd_tx_queued(void *arg)
2545 {
2546         long                    id      = (long) arg;
2547         int                     ret     = 0;
2548         int                     found   = 0;
2549         kmx_ctx_t              *tx      = NULL;
2550         kmx_peer_t             *peer    = NULL;
2551         cfs_list_t             *queue   = &kmxlnd_data.kmx_tx_queue;
2552         cfs_spinlock_t         *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2553         cfs_rwlock_t           *g_lock  = &kmxlnd_data.kmx_global_lock;
2554
2555         cfs_daemonize("mxlnd_tx_queued");
2556
2557         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
2558                 ret = cfs_down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2559                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
2560                         break;
2561                 if (ret != 0) // Should we check for -EINTR?
2562                         continue;
2563                 cfs_spin_lock(tx_q_lock);
2564                 if (cfs_list_empty (&kmxlnd_data.kmx_tx_queue)) {
2565                         cfs_spin_unlock(tx_q_lock);
2566                         continue;
2567                 }
2568                 tx = cfs_list_entry (queue->next, kmx_ctx_t, mxc_list);
2569                 cfs_list_del_init(&tx->mxc_list);
2570                 cfs_spin_unlock(tx_q_lock);
2571
2572                 found = 0;
2573                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */
2574                 if (peer != NULL) {
2575                         tx->mxc_peer = peer;
2576                         cfs_write_lock(g_lock);
2577                         if (peer->mxp_conn == NULL) {
2578                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2579                                 if (ret != 0) {
2580                                         /* out of memory, give up and fail tx */
2581                                         tx->mxc_errno = -ENOMEM;
2582                                         mxlnd_peer_decref(peer);
2583                                         cfs_write_unlock(g_lock);
2584                                         mxlnd_put_idle_tx(tx);
2585                                         continue;
2586                                 }
2587                         }
2588                         tx->mxc_conn = peer->mxp_conn;
2589                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2590                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2591                         cfs_write_unlock(g_lock);
2592                         mxlnd_queue_tx(tx);
2593                         found = 1;
2594                 }
2595                 if (found == 0) {
2596                         int             hash    = 0;
2597                         kmx_peer_t     *peer    = NULL;
2598                         kmx_peer_t     *old     = NULL;
2599
2600                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2601
2602                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2603                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2604                         /* create peer */
2605                         /* adds conn ref for this function */
2606                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2607                                         *kmxlnd_tunables.kmx_board,
2608                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2609                         if (ret != 0) {
2610                                 /* finalize message */
2611                                 tx->mxc_errno = ret;
2612                                 mxlnd_put_idle_tx(tx);
2613                                 continue;
2614                         }
2615                         tx->mxc_peer = peer;
2616                         tx->mxc_conn = peer->mxp_conn;
2617                         /* this tx will keep the conn ref taken in peer_alloc() */
2618
2619                         /* add peer to global peer list, but look to see
2620                          * if someone already created it after we released
2621                          * the read lock */
2622                         cfs_write_lock(g_lock);
2623                         old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2624                         if (old) {
2625                                 /* we have a peer ref on old */
2626                                 if (old->mxp_conn) {
2627                                         found = 1;
2628                                 } else {
2629                                         /* no conn */
2630                                         /* drop our ref taken above... */
2631                                         mxlnd_peer_decref(old);
2632                                         /* and delete it */
2633                                         mxlnd_del_peer_locked(old);
2634                                 }
2635                         }
2636
2637                         if (found == 0) {
2638                                 cfs_list_add_tail(&peer->mxp_list,
2639                                                   &kmxlnd_data.kmx_peers[hash]);
2640                                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
2641                         } else {
2642                                 tx->mxc_peer = old;
2643                                 tx->mxc_conn = old->mxp_conn;
2644                                 LASSERT(old->mxp_conn != NULL);
2645                                 mxlnd_conn_addref(old->mxp_conn);
2646                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2647                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2648                                 mxlnd_peer_decref(peer);
2649                         }
2650                         cfs_write_unlock(g_lock);
2651
2652                         mxlnd_queue_tx(tx);
2653                 }
2654         }
2655         mxlnd_thread_stop(id);
2656         return 0;
2657 }
2658
2659 /* When calling this, we must not have the peer lock. */
2660 void
2661 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2662 {
2663         mx_return_t     mxret           = MX_SUCCESS;
2664         mx_request_t    request;
2665         kmx_conn_t      *conn           = peer->mxp_conn;
2666         u64             match           = ((u64) msg_type) << MXLND_MSG_OFFSET;
2667
2668         /* NOTE we are holding a conn ref every time we call this function,
2669          * we do not need to lock the peer before taking another ref */
2670         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2671
2672         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2673
2674         if (peer->mxp_reconnect_time == 0) {
2675                 peer->mxp_reconnect_time = jiffies;
2676         }
2677
2678         if (peer->mxp_nic_id == 0ULL) {
2679                 int     ret     = 0;
2680
2681                 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2682                                       &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2683                 if (ret == 0) {
2684                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2685                 }
2686                 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2687                         /* not mapped yet, return */
2688                         cfs_spin_lock(&conn->mxk_lock);
2689                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2690                         cfs_spin_unlock(&conn->mxk_lock);
2691                 }
2692         }
2693
2694         if (cfs_time_after(jiffies,
2695                            peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2696             conn->mxk_status != MXLND_CONN_DISCONNECT) {
2697                 /* give up and notify LNET */
2698                 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2699                        libcfs_nid2str(peer->mxp_nid));
2700                 mxlnd_conn_disconnect(conn, 0, 0);
2701                 mxlnd_conn_decref(conn);
2702                 return;
2703         }
2704
2705         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2706                             peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2707                             (void *) peer, &request);
2708         if (unlikely(mxret != MX_SUCCESS)) {
2709                 cfs_spin_lock(&conn->mxk_lock);
2710                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2711                 cfs_spin_unlock(&conn->mxk_lock);
2712                 CNETERR("mx_iconnect() failed with %s (%d) to %s\n",
2713                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2714                 mxlnd_conn_decref(conn);
2715         }
2716         mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
2717                                MXLND_CONNECT_TIMEOUT/CFS_HZ*1000);
2718         return;
2719 }
2720
2721 #define MXLND_STATS 0
2722
2723 int
2724 mxlnd_check_sends(kmx_peer_t *peer)
2725 {
2726         int             ret             = 0;
2727         int             found           = 0;
2728         mx_return_t     mxret           = MX_SUCCESS;
2729         kmx_ctx_t       *tx             = NULL;
2730         kmx_conn_t      *conn           = NULL;
2731         u8              msg_type        = 0;
2732         int             credit          = 0;
2733         int             status          = 0;
2734         int             ntx_posted      = 0;
2735         int             credits         = 0;
2736 #if MXLND_STATS
2737         static unsigned long last       = 0;
2738 #endif
2739
2740         if (unlikely(peer == NULL)) {
2741                 LASSERT(peer != NULL);
2742                 return -1;
2743         }
2744         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
2745         conn = peer->mxp_conn;
2746         /* NOTE take a ref for the duration of this function since it is called
2747          * when there might not be any queued txs for this peer */
2748         if (conn) {
2749                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2750                         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2751                         return -1;
2752                 }
2753                 mxlnd_conn_addref(conn); /* for duration of this function */
2754         }
2755         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2756
2757         /* do not add another ref for this tx */
2758
2759         if (conn == NULL) {
2760                 /* we do not have any conns */
2761                 CNETERR("peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2762                 return -1;
2763         }
2764
2765 #if MXLND_STATS
2766         if (cfs_time_after(jiffies, last)) {
2767                 last = jiffies + CFS_HZ;
2768                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2769                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2770                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2771                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2772                               conn->mxk_ntx_data, conn->mxk_data_posted);
2773         }
2774 #endif
2775
2776         cfs_spin_lock(&conn->mxk_lock);
2777         ntx_posted = conn->mxk_ntx_posted;
2778         credits = conn->mxk_credits;
2779
2780         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2781         LASSERT(ntx_posted >= 0);
2782
2783         LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2784         LASSERT(credits >= 0);
2785
2786         /* check number of queued msgs, ignore data */
2787         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2788                 /* check if any txs queued that could return credits... */
2789                 if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
2790                     conn->mxk_ntx_msgs == 0) {
2791                         /* if not, send a NOOP */
2792                         tx = mxlnd_get_idle_tx();
2793                         if (likely(tx != NULL)) {
2794                                 tx->mxc_peer = peer;
2795                                 tx->mxc_conn = peer->mxp_conn;
2796                                 mxlnd_conn_addref(conn); /* for this tx */
2797                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2798                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2799                                 mxlnd_peer_queue_tx_locked(tx);
2800                                 found = 1;
2801                                 goto done_locked;
2802                         }
2803                 }
2804         }
2805
2806         /* if the peer is not ready, try to connect */
2807         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2808             conn->mxk_status == MXLND_CONN_FAIL)) {
2809                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2810                 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2811                 cfs_spin_unlock(&conn->mxk_lock);
2812                 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2813                 goto done;
2814         }
2815
2816         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
2817                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
2818                 /* We have something to send. If we have a queued tx that does not
2819                  * require a credit (free), choose it since its completion will
2820                  * return a credit (here or at the peer), complete a DATA or
2821                  * CONN_REQ or CONN_ACK. */
2822                 cfs_list_t *tmp_tx = NULL;
2823                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
2824                         tmp_tx = &conn->mxk_tx_free_queue;
2825                 } else {
2826                         tmp_tx = &conn->mxk_tx_credit_queue;
2827                 }
2828                 tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2829
2830                 msg_type = tx->mxc_msg_type;
2831
2832                 /* don't try to send a rx */
2833                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2834
2835                 /* ensure that it is a valid msg type */
2836                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2837                         msg_type == MXLND_MSG_CONN_ACK ||
2838                         msg_type == MXLND_MSG_NOOP     ||
2839                         msg_type == MXLND_MSG_EAGER    ||
2840                         msg_type == MXLND_MSG_PUT_REQ  ||
2841                         msg_type == MXLND_MSG_PUT_ACK  ||
2842                         msg_type == MXLND_MSG_PUT_DATA ||
2843                         msg_type == MXLND_MSG_GET_REQ  ||
2844                         msg_type == MXLND_MSG_GET_DATA);
2845                 LASSERT(tx->mxc_peer == peer);
2846                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2847
2848                 credit = mxlnd_tx_requires_credit(tx);
2849                 if (credit) {
2850
2851                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2852                                 CDEBUG(D_NET, "%s: posted enough\n",
2853                                               libcfs_nid2str(peer->mxp_nid));
2854                                 goto done_locked;
2855                         }
2856
2857                         if (conn->mxk_credits == 0) {
2858                                 CDEBUG(D_NET, "%s: no credits\n",
2859                                               libcfs_nid2str(peer->mxp_nid));
2860                                 goto done_locked;
2861                         }
2862
2863                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2864                             conn->mxk_outstanding == 0) {  /* giving back credits */
2865                                 CDEBUG(D_NET, "%s: not using last credit\n",
2866                                               libcfs_nid2str(peer->mxp_nid));
2867                                 goto done_locked;
2868                         }
2869                 }
2870
2871                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2872                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2873                                 msg_type == MXLND_MSG_CONN_ACK)) {
2874                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2875                                              mxlnd_connstatus_to_str(conn->mxk_status),
2876                                              tx->mxc_cookie,
2877                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2878                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2879                                     cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
2880                                         cfs_list_del_init(&tx->mxc_list);
2881                                         tx->mxc_errno = -ECONNABORTED;
2882                                         cfs_spin_unlock(&conn->mxk_lock);
2883                                         mxlnd_put_idle_tx(tx);
2884                                         mxlnd_conn_decref(conn);
2885                                         goto done;
2886                                 }
2887                                 goto done_locked;
2888                         }
2889                 }
2890
2891                 cfs_list_del_init(&tx->mxc_list);
2892
2893                 /* handle credits, etc now while we have the lock to avoid races */
2894                 if (credit) {
2895                         conn->mxk_credits--;
2896                         conn->mxk_ntx_posted++;
2897                 }
2898                 if (msg_type != MXLND_MSG_PUT_DATA &&
2899                     msg_type != MXLND_MSG_GET_DATA) {
2900                         if (msg_type != MXLND_MSG_CONN_REQ &&
2901                             msg_type != MXLND_MSG_CONN_ACK) {
2902                                 conn->mxk_ntx_msgs--;
2903                         }
2904                 }
2905                 if (tx->mxc_incarnation == 0 &&
2906                     conn->mxk_incarnation != 0) {
2907                         tx->mxc_incarnation = conn->mxk_incarnation;
2908                 }
2909
2910                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2911                  * or (2) there is a non-DATA msg that can return credits in the
2912                  * queue, then drop this duplicate NOOP */
2913                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2914                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2915                             (conn->mxk_ntx_msgs >= 1)) {
2916                                 conn->mxk_credits++;
2917                                 conn->mxk_ntx_posted--;
2918                                 cfs_spin_unlock(&conn->mxk_lock);
2919                                 /* redundant NOOP */
2920                                 mxlnd_put_idle_tx(tx);
2921                                 mxlnd_conn_decref(conn);
2922                                 CDEBUG(D_NET, "%s: redundant noop\n",
2923                                               libcfs_nid2str(peer->mxp_nid));
2924                                 found = 1;
2925                                 goto done;
2926                         }
2927                 }
2928
2929                 found = 1;
2930                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2931                     (msg_type != MXLND_MSG_GET_DATA))) {
2932                         mxlnd_pack_msg_locked(tx);
2933                 }
2934
2935                 mxret = MX_SUCCESS;
2936
2937                 status = conn->mxk_status;
2938                 cfs_spin_unlock(&conn->mxk_lock);
2939
2940                 if (likely((status == MXLND_CONN_READY) ||
2941                     (msg_type == MXLND_MSG_CONN_REQ) ||
2942                     (msg_type == MXLND_MSG_CONN_ACK))) {
2943                         ret = 0;
2944                         if (msg_type != MXLND_MSG_CONN_REQ &&
2945                             msg_type != MXLND_MSG_CONN_ACK) {
2946                                 /* add to the pending list */
2947                                 ret = mxlnd_q_pending_ctx(tx);
2948                         } else {
2949                                 /* CONN_REQ/ACK */
2950                                 tx->mxc_state = MXLND_CTX_PENDING;
2951                         }
2952
2953                         if (ret == 0) {
2954                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2955                                     msg_type != MXLND_MSG_GET_DATA)) {
2956                                         /* send a msg style tx */
2957                                         LASSERT(tx->mxc_nseg == 1);
2958                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2959                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2960                                                mxlnd_msgtype_to_str(msg_type),
2961                                                tx->mxc_cookie);
2962                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2963                                                           &tx->mxc_seg,
2964                                                           tx->mxc_nseg,
2965                                                           tx->mxc_pin_type,
2966                                                           conn->mxk_epa,
2967                                                           tx->mxc_match,
2968                                                           (void *) tx,
2969                                                           &tx->mxc_mxreq);
2970                                 } else {
2971                                         /* send a DATA tx */
2972                                         cfs_spin_lock(&conn->mxk_lock);
2973                                         conn->mxk_ntx_data--;
2974                                         conn->mxk_data_posted++;
2975                                         cfs_spin_unlock(&conn->mxk_lock);
2976                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2977                                                mxlnd_msgtype_to_str(msg_type),
2978                                                tx->mxc_cookie);
2979                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2980                                                           tx->mxc_seg_list,
2981                                                           tx->mxc_nseg,
2982                                                           tx->mxc_pin_type,
2983                                                           conn->mxk_epa,
2984                                                           tx->mxc_match,
2985                                                           (void *) tx,
2986                                                           &tx->mxc_mxreq);
2987                                 }
2988                         } else {
2989                                 /* ret != 0 */
2990                                 mxret = MX_CONNECTION_FAILED;
2991                         }
2992                         if (likely(mxret == MX_SUCCESS)) {
2993                                 ret = 0;
2994                         } else {
2995                                 CNETERR("mx_kisend() failed with %s (%d) "
2996                                         "sending to %s\n", mx_strerror(mxret), (int) mxret,
2997                                        libcfs_nid2str(peer->mxp_nid));
2998                                 /* NOTE mx_kisend() only fails if there are not enough
2999                                 * resources. Do not change the connection status. */
3000                                 if (mxret == MX_NO_RESOURCES) {
3001                                         tx->mxc_errno = -ENOMEM;
3002                                 } else {
3003                                         tx->mxc_errno = -ECONNABORTED;
3004                                 }
3005                                 if (credit) {
3006                                         cfs_spin_lock(&conn->mxk_lock);
3007                                         conn->mxk_ntx_posted--;
3008                                         conn->mxk_credits++;
3009                                         cfs_spin_unlock(&conn->mxk_lock);
3010                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
3011                                         msg_type == MXLND_MSG_GET_DATA) {
3012                                         cfs_spin_lock(&conn->mxk_lock);
3013                                         conn->mxk_data_posted--;
3014                                         cfs_spin_unlock(&conn->mxk_lock);
3015                                 }
3016                                 if (msg_type != MXLND_MSG_PUT_DATA &&
3017                                     msg_type != MXLND_MSG_GET_DATA &&
3018                                     msg_type != MXLND_MSG_CONN_REQ &&
3019                                     msg_type != MXLND_MSG_CONN_ACK) {
3020                                         cfs_spin_lock(&conn->mxk_lock);
3021                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
3022                                         cfs_spin_unlock(&conn->mxk_lock);
3023                                 }
3024                                 if (msg_type != MXLND_MSG_CONN_REQ &&
3025                                     msg_type != MXLND_MSG_CONN_ACK) {
3026                                         /* remove from the pending list */
3027                                         mxlnd_deq_pending_ctx(tx);
3028                                 }
3029                                 mxlnd_put_idle_tx(tx);
3030                                 mxlnd_conn_decref(conn);
3031                         }
3032                 }
3033                 cfs_spin_lock(&conn->mxk_lock);
3034         }
3035 done_locked:
3036         cfs_spin_unlock(&conn->mxk_lock);
3037 done:
3038         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3039         return found;
3040 }
3041
3042
3043 /**
3044  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3045  * @ctx - the tx descriptor
3046  *
3047  * Determine which type of send request it was and start the next step, if needed,
3048  * or, if done, signal completion to LNET. After we are done, put back on the
3049  * idle tx list.
3050  */
3051 void
3052 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3053 {
3054         int             code    = tx->mxc_status.code;
3055         int             failed  = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3056         kmx_msg_t       *msg    = tx->mxc_msg;
3057         kmx_peer_t      *peer   = tx->mxc_peer;
3058         kmx_conn_t      *conn   = tx->mxc_conn;
3059         u8              type    = tx->mxc_msg_type;
3060         int             credit  = mxlnd_tx_requires_credit(tx);
3061         u64             cookie  = tx->mxc_cookie;
3062
3063         CDEBUG(D_NET, "entering %s (0x%llx):\n",
3064                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3065
3066         LASSERT (peer != NULL);
3067         LASSERT (conn != NULL);
3068
3069         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3070                 LASSERT (type == msg->mxm_type);
3071         }
3072
3073         if (failed) {
3074                 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3075         } else {
3076                 cfs_spin_lock(&conn->mxk_lock);
3077                 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3078                 cfs_spin_unlock(&conn->mxk_lock);
3079         }
3080
3081         switch (type) {
3082
3083         case MXLND_MSG_GET_DATA:
3084                 cfs_spin_lock(&conn->mxk_lock);
3085                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3086                         conn->mxk_outstanding++;
3087                         conn->mxk_data_posted--;
3088                 }
3089                 cfs_spin_unlock(&conn->mxk_lock);
3090                 break;
3091
3092         case MXLND_MSG_PUT_DATA:
3093                 cfs_spin_lock(&conn->mxk_lock);
3094                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3095                         conn->mxk_data_posted--;
3096                 }
3097                 cfs_spin_unlock(&conn->mxk_lock);
3098                 break;
3099
3100         case MXLND_MSG_NOOP:
3101         case MXLND_MSG_PUT_REQ:
3102         case MXLND_MSG_PUT_ACK:
3103         case MXLND_MSG_GET_REQ:
3104         case MXLND_MSG_EAGER:
3105                 break;
3106
3107         case MXLND_MSG_CONN_ACK:
3108                 if (peer->mxp_incompatible) {
3109                         /* we sent our params, now close this conn */
3110                         mxlnd_conn_disconnect(conn, 0, 1);
3111                 }
3112         case MXLND_MSG_CONN_REQ:
3113                 if (failed) {
3114                         CNETERR("%s failed with %s (%d) (errno = %d) to %s\n",
3115                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3116                                mx_strstatus(code), code, tx->mxc_errno,
3117                                libcfs_nid2str(tx->mxc_nid));
3118                         if (!peer->mxp_incompatible) {
3119                                 cfs_spin_lock(&conn->mxk_lock);
3120                                 if (code == MX_STATUS_BAD_SESSION)
3121                                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
3122                                 else
3123                                         mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3124                                 cfs_spin_unlock(&conn->mxk_lock);
3125                         }
3126                 }
3127                 break;
3128
3129         default:
3130                 CNETERR("Unknown msg type of %d\n", type);
3131                 LBUG();
3132         }
3133
3134         if (credit) {
3135                 cfs_spin_lock(&conn->mxk_lock);
3136                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3137                         conn->mxk_ntx_posted--;
3138                 }
3139                 cfs_spin_unlock(&conn->mxk_lock);
3140         }
3141
3142         mxlnd_put_idle_tx(tx);
3143         mxlnd_conn_decref(conn);
3144
3145         mxlnd_check_sends(peer);
3146
3147         CDEBUG(D_NET, "leaving\n");
3148         return;
3149 }
3150
3151 /* Handle completion of MSG or DATA rx.
3152  * CONN_REQ and CONN_ACK are handled elsewhere. */
3153 void
3154 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3155 {
3156         int             ret             = 0;
3157         int             repost          = 1;
3158         int             credit          = 1;
3159         u32             nob             = rx->mxc_status.xfer_length;
3160         u64             bits            = rx->mxc_status.match_info;
3161         kmx_msg_t      *msg             = rx->mxc_msg;
3162         kmx_peer_t     *peer            = rx->mxc_peer;
3163         kmx_conn_t     *conn            = rx->mxc_conn;
3164         u8              type            = rx->mxc_msg_type;
3165         u64             seq             = bits;
3166         lnet_msg_t     *lntmsg[2];
3167         int             result          = 0;
3168         int             peer_ref        = 0;
3169         int             conn_ref        = 0;
3170
3171         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3172          * failed GET reply */
3173
3174         /* NOTE peer may still be NULL if it is a new peer and
3175          *      conn may be NULL if this is a re-connect */
3176         if (likely(peer != NULL && conn != NULL)) {
3177                 /* we have a reference on the conn */
3178                 conn_ref = 1;
3179         } else if (peer != NULL && conn == NULL) {
3180                 /* we have a reference on the peer */
3181                 peer_ref = 1;
3182         } else if (peer == NULL && conn != NULL) {
3183                 /* fatal error */
3184                 CERROR("rx 0x%llx from %s has conn but no peer\n",
3185                        bits, libcfs_nid2str(rx->mxc_nid));
3186                 LBUG();
3187         } /* else peer and conn == NULL */
3188
3189         if (conn == NULL && peer != NULL) {
3190                 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3191                 conn = peer->mxp_conn;
3192                 if (conn) {
3193                         mxlnd_conn_addref(conn); /* conn takes ref... */
3194                         mxlnd_peer_decref(peer); /* from peer */
3195                         conn_ref = 1;
3196                         peer_ref = 0;
3197                 }
3198                 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3199                 rx->mxc_conn = conn;
3200         }
3201
3202 #if MXLND_DEBUG
3203         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3204 #endif
3205
3206         lntmsg[0] = NULL;
3207         lntmsg[1] = NULL;
3208
3209         if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3210             rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3211                 CNETERR("rx from %s failed with %s (%d)\n",
3212                         libcfs_nid2str(rx->mxc_nid),
3213                         mx_strstatus(rx->mxc_status.code),
3214                         rx->mxc_status.code);
3215                 credit = 0;
3216                 goto cleanup;
3217         }
3218
3219         if (nob == 0) {
3220                 /* this may be a failed GET reply */
3221                 if (type == MXLND_MSG_GET_DATA) {
3222                         /* get the error (52-59) bits from the match bits */
3223                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3224                         lntmsg[0] = rx->mxc_lntmsg[0];
3225                         result = -ret;
3226                         goto cleanup;
3227                 } else {
3228                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
3229                         CNETERR("rx from %s returned with 0 bytes\n",
3230                                 libcfs_nid2str(rx->mxc_nid));
3231                         goto cleanup;
3232                 }
3233         }
3234
3235         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3236         if (type == MXLND_MSG_PUT_DATA) {
3237                 /* result = 0; */
3238                 lntmsg[0] = rx->mxc_lntmsg[0];
3239                 goto cleanup;
3240         } else if (type == MXLND_MSG_GET_DATA) {
3241                 /* result = 0; */
3242                 lntmsg[0] = rx->mxc_lntmsg[0];
3243                 lntmsg[1] = rx->mxc_lntmsg[1];
3244                 goto cleanup;
3245         }
3246
3247         ret = mxlnd_unpack_msg(msg, nob);
3248         if (ret != 0) {
3249                 CNETERR("Error %d unpacking rx from %s\n",
3250                         ret, libcfs_nid2str(rx->mxc_nid));
3251                 goto cleanup;
3252         }
3253         rx->mxc_nob = nob;
3254         type = msg->mxm_type;
3255
3256         if (rx->mxc_nid != msg->mxm_srcnid ||
3257             kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3258                 CNETERR("rx with mismatched NID (type %s) (my nid is "
3259                        "0x%llx and rx msg dst is 0x%llx)\n",
3260                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3261                        msg->mxm_dstnid);
3262                 goto cleanup;
3263         }
3264
3265         if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3266             msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3267                 CNETERR("Stale rx from %s with type %s "
3268                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3269                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3270                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3271                        msg->mxm_srcstamp, conn->mxk_incarnation,
3272                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3273                 credit = 0;
3274                 goto cleanup;
3275         }
3276
3277         CDEBUG(D_NET, "Received %s with %d credits\n",
3278                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
3279
3280         LASSERT(peer != NULL && conn != NULL);
3281         if (msg->mxm_credits != 0) {
3282                 cfs_spin_lock(&conn->mxk_lock);
3283                 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3284                         if ((conn->mxk_credits + msg->mxm_credits) >
3285                              *kmxlnd_tunables.kmx_peercredits) {
3286                                 CNETERR("mxk_credits %d  mxm_credits %d\n",
3287                                         conn->mxk_credits, msg->mxm_credits);
3288                         }
3289                         conn->mxk_credits += msg->mxm_credits;
3290                         LASSERT(conn->mxk_credits >= 0);
3291                         LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3292                 }
3293                 cfs_spin_unlock(&conn->mxk_lock);
3294         }
3295
3296         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3297         switch (type) {
3298         case MXLND_MSG_NOOP:
3299                 break;
3300
3301         case MXLND_MSG_EAGER:
3302                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3303                                         msg->mxm_srcnid, rx, 0);
3304                 repost = ret < 0;
3305                 break;
3306
3307         case MXLND_MSG_PUT_REQ:
3308                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3309                                         msg->mxm_srcnid, rx, 1);
3310                 repost = ret < 0;
3311                 break;
3312
3313         case MXLND_MSG_PUT_ACK: {
3314                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3315                 if (cookie > MXLND_MAX_COOKIE) {
3316                         CNETERR("NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3317                                 libcfs_nid2str(rx->mxc_nid));
3318                         result = -((u32) MXLND_ERROR_VAL(cookie));
3319                         lntmsg[0] = rx->mxc_lntmsg[0];
3320                 } else {
3321                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3322                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3323                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3324                 }
3325                 /* repost == 1 */
3326                 break;
3327         }
3328         case MXLND_MSG_GET_REQ:
3329                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3330                                         msg->mxm_srcnid, rx, 1);
3331                 repost = ret < 0;
3332                 break;
3333
3334         default:
3335                 CNETERR("Bad MXLND message type %x from %s\n", msg->mxm_type,
3336                         libcfs_nid2str(rx->mxc_nid));
3337                 ret = -EPROTO;
3338                 break;
3339         }
3340
3341         if (ret < 0) {
3342                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3343                 cfs_spin_lock(&conn->mxk_lock);
3344                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3345                 cfs_spin_unlock(&conn->mxk_lock);
3346         }
3347
3348 cleanup:
3349         if (conn != NULL) {
3350                 cfs_spin_lock(&conn->mxk_lock);
3351                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3352                 cfs_spin_unlock(&conn->mxk_lock);
3353         }
3354
3355         if (repost) {
3356                 /* lnet_parse() failed, etc., repost now */
3357                 mxlnd_put_idle_rx(rx);
3358                 if (conn != NULL && credit == 1) {
3359                         if (type == MXLND_MSG_PUT_DATA ||
3360                             type == MXLND_MSG_EAGER ||
3361                             type == MXLND_MSG_PUT_REQ ||
3362                             type == MXLND_MSG_NOOP) {
3363                                 cfs_spin_lock(&conn->mxk_lock);
3364                                 conn->mxk_outstanding++;
3365                                 cfs_spin_unlock(&conn->mxk_lock);
3366                         }
3367                 }
3368                 if (conn_ref) mxlnd_conn_decref(conn);
3369                 LASSERT(peer_ref == 0);
3370         }
3371
3372         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3373                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3374         } else {
3375                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3376         }
3377
3378         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3379         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3380
3381         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3382
3383         return;
3384 }
3385
3386 void
3387 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3388 {
3389         kmx_ctx_t       *tx     = NULL;
3390         kmx_msg_t       *txmsg  = NULL;
3391         kmx_conn_t      *conn   = peer->mxp_conn;
3392         u64             nic_id  = 0ULL;
3393         u32             ep_id   = 0;
3394         u32             sid     = 0;
3395         u8              type    = (msg_type == MXLND_MSG_ICON_REQ ?
3396                                    MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3397
3398         /* a conn ref was taken when calling mx_iconnect(),
3399          * hold it until CONN_REQ or CONN_ACK completes */
3400
3401         CDEBUG(D_NET, "entering\n");
3402         if (status.code != MX_STATUS_SUCCESS) {
3403                 int send_bye    = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3404
3405                 CNETERR("mx_iconnect() failed for %s with %s (%d) "
3406                         "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3407                         mxlnd_msgtype_to_str(msg_type),
3408                         mx_strstatus(status.code), status.code,
3409                         libcfs_nid2str(peer->mxp_nid),
3410                         peer->mxp_nid,
3411                         peer->mxp_nic_id,
3412                         peer->mxp_ep_id);
3413                 cfs_spin_lock(&conn->mxk_lock);
3414                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3415                 cfs_spin_unlock(&conn->mxk_lock);
3416
3417                 if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
3418                                    MXLND_CONNECT_TIMEOUT)) {
3419                         CNETERR("timeout, calling conn_disconnect()\n");
3420                         mxlnd_conn_disconnect(conn, 0, send_bye);
3421                 }
3422
3423                 mxlnd_conn_decref(conn);
3424                 return;
3425         }
3426         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3427         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3428         cfs_spin_lock(&conn->mxk_lock);
3429         conn->mxk_epa = status.source;
3430         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3431         if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3432                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3433         }
3434         cfs_spin_unlock(&conn->mxk_lock);
3435         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3436
3437         /* mx_iconnect() succeeded, reset delay to 0 */
3438         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3439         peer->mxp_reconnect_time = 0;
3440         peer->mxp_conn->mxk_sid = sid;
3441         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3442
3443         /* marshal CONN_REQ or CONN_ACK msg */
3444         /* we are still using the conn ref from iconnect() - do not take another */
3445         tx = mxlnd_get_idle_tx();
3446         if (tx == NULL) {
3447                 CNETERR("Can't obtain %s tx for %s\n",
3448                        mxlnd_msgtype_to_str(type),
3449                        libcfs_nid2str(peer->mxp_nid));
3450                 cfs_spin_lock(&conn->mxk_lock);
3451                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3452                 cfs_spin_unlock(&conn->mxk_lock);
3453                 mxlnd_conn_decref(conn);
3454                 return;
3455         }
3456
3457         tx->mxc_peer = peer;
3458         tx->mxc_conn = conn;
3459         tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3460         CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3461         mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3462         txmsg = tx->mxc_msg;
3463         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3464         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3465         tx->mxc_match = mxlnd_create_match(tx, 0);
3466
3467         mxlnd_queue_tx(tx);
3468         return;
3469 }
3470
3471 /**
3472  * mxlnd_request_waitd - the MX request completion thread(s)
3473  * @arg - thread id (as a void *)
3474  *
3475  * This thread waits for a MX completion and then completes the request.
3476  * We will create one thread per CPU.
3477  */
3478 int
3479 mxlnd_request_waitd(void *arg)
3480 {
3481         long                    id              = (long) arg;
3482         char                    name[24];
3483         __u32                   result          = 0;
3484         mx_return_t             mxret           = MX_SUCCESS;
3485         mx_status_t             status;
3486         kmx_ctx_t              *ctx             = NULL;
3487         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3488         kmx_peer_t             *peer            = NULL;
3489         kmx_conn_t             *conn            = NULL;
3490 #if MXLND_POLLING
3491         int                     count           = 0;
3492 #endif
3493
3494         memset(name, 0, sizeof(name));
3495         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3496         cfs_daemonize(name);
3497
3498         memset(&status, 0, sizeof(status));
3499
3500         CDEBUG(D_NET, "%s starting\n", name);
3501
3502         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3503                 u8      msg_type        = 0;
3504
3505                 mxret = MX_SUCCESS;
3506                 result = 0;
3507 #if MXLND_POLLING
3508                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3509                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3510                                             &status, &result);
3511                 } else {
3512                         count = 0;
3513                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3514                                             0ULL, 0ULL, &status, &result);
3515                 }
3516 #else
3517                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3518                                     0ULL, 0ULL, &status, &result);
3519 #endif
3520                 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown)))
3521                         break;
3522
3523                 if (result != 1) {
3524                         /* nothing completed... */
3525                         continue;
3526                 }
3527
3528                 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3529                        "match_info 0x%llx and length %d\n",
3530                        mx_strstatus(status.code), status.code,
3531                        (u64) status.match_info, status.msg_length);
3532
3533                 if (status.code != MX_STATUS_SUCCESS) {
3534                         CNETERR("wait_any() failed with %s (%d) with "
3535                                 "match_info 0x%llx and length %d\n",
3536                                 mx_strstatus(status.code), status.code,
3537                                 (u64) status.match_info, status.msg_length);
3538                 }
3539
3540                 msg_type = MXLND_MSG_TYPE(status.match_info);
3541
3542                 /* This may be a mx_iconnect() request completing,
3543                  * check the bit mask for CONN_REQ and CONN_ACK */
3544                 if (msg_type == MXLND_MSG_ICON_REQ ||
3545                     msg_type == MXLND_MSG_ICON_ACK) {
3546                         peer = (kmx_peer_t*) status.context;
3547                         mxlnd_handle_connect_msg(peer, msg_type, status);
3548                         continue;
3549                 }
3550
3551                 /* This must be a tx or rx */
3552
3553                 /* NOTE: if this is a RX from the unexpected callback, it may
3554                  * have very little info. If we dropped it in unexpected_recv(),
3555                  * it will not have a context. If so, ignore it. */
3556                 ctx = (kmx_ctx_t *) status.context;
3557                 if (ctx != NULL) {
3558
3559                         req_type = ctx->mxc_type;
3560                         conn = ctx->mxc_conn; /* this may be NULL */
3561                         mxlnd_deq_pending_ctx(ctx);
3562
3563                         /* copy status to ctx->mxc_status */
3564                         ctx->mxc_status = status;
3565
3566                         switch (req_type) {
3567                         case MXLND_REQ_TX:
3568                                 mxlnd_handle_tx_completion(ctx);
3569                                 break;
3570                         case MXLND_REQ_RX:
3571                                 mxlnd_handle_rx_completion(ctx);
3572                                 break;
3573                         default:
3574                                 CNETERR("Unknown ctx type %d\n", req_type);
3575                                 LBUG();
3576                                 break;
3577                         }
3578
3579                         /* conn is always set except for the first CONN_REQ rx
3580                          * from a new peer */
3581                         if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3582                                 mxlnd_conn_disconnect(conn, 1, 1);
3583                         }
3584                 }
3585                 CDEBUG(D_NET, "waitd() completed task\n");
3586         }
3587         CDEBUG(D_NET, "%s stopping\n", name);
3588         mxlnd_thread_stop(id);
3589         return 0;
3590 }
3591
3592
3593 unsigned long
3594 mxlnd_check_timeouts(unsigned long now)
3595 {
3596         int             i               = 0;
3597         int             disconnect      = 0;
3598         unsigned long   next            = 0; /* jiffies */
3599         kmx_peer_t      *peer           = NULL;
3600         kmx_conn_t      *conn           = NULL;
3601         cfs_rwlock_t    *g_lock         = &kmxlnd_data.kmx_global_lock;
3602
3603         cfs_read_lock(g_lock);
3604         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3605                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
3606                                         mxp_list) {
3607
3608                         if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3609                                 cfs_read_unlock(g_lock);
3610                                 return next;
3611                         }
3612
3613                         conn = peer->mxp_conn;
3614                         if (conn) {
3615                                 mxlnd_conn_addref(conn);
3616                         } else {
3617                                 continue;
3618                         }
3619
3620                         cfs_spin_lock(&conn->mxk_lock);
3621
3622                         /* if nothing pending (timeout == 0) or
3623                          * if conn is already disconnected,
3624                          * skip this conn */
3625                         if (conn->mxk_timeout == 0 ||
3626                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3627                                 cfs_spin_unlock(&conn->mxk_lock);
3628                                 mxlnd_conn_decref(conn);
3629                                 continue;
3630                         }
3631
3632                         /* we want to find the timeout that will occur first.
3633                          * if it is in the future, we will sleep until then.
3634                          * if it is in the past, then we will sleep one
3635                          * second and repeat the process. */
3636                         if ((next == 0) ||
3637                             (cfs_time_before(conn->mxk_timeout, next))) {
3638                                 next = conn->mxk_timeout;
3639                         }
3640
3641                         disconnect = 0;
3642
3643                         if (cfs_time_aftereq(now, conn->mxk_timeout))  {
3644                                 disconnect = 1;
3645                         }
3646                         cfs_spin_unlock(&conn->mxk_lock);
3647
3648                         if (disconnect) {
3649                                 mxlnd_conn_disconnect(conn, 1, 1);
3650                         }
3651                         mxlnd_conn_decref(conn);
3652                 }
3653         }
3654         cfs_read_unlock(g_lock);
3655         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3656
3657         return next;
3658 }
3659
3660 void
3661 mxlnd_passive_connect(kmx_connparams_t *cp)
3662 {
3663         int             ret             = 0;
3664         int             incompatible    = 0;
3665         u64             nic_id          = 0ULL;
3666         u32             ep_id           = 0;
3667         u32             sid             = 0;
3668         int             conn_ref        = 0;
3669         kmx_msg_t       *msg            = &cp->mxr_msg;
3670         kmx_peer_t      *peer           = cp->mxr_peer;
3671         kmx_conn_t      *conn           = NULL;
3672         cfs_rwlock_t    *g_lock         = &kmxlnd_data.kmx_global_lock;
3673
3674         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3675
3676         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3677         if (ret != 0) {
3678                 if (peer) {
3679                         CNETERR("Error %d unpacking CONN_REQ from %s\n",
3680                                ret, libcfs_nid2str(peer->mxp_nid));
3681                 } else {
3682                         CNETERR("Error %d unpacking CONN_REQ from "
3683                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3684                 }
3685                 goto cleanup;
3686         }
3687         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3688                 CNETERR("Can't accept %s: bad dst nid %s\n",
3689                                 libcfs_nid2str(msg->mxm_srcnid),
3690                                 libcfs_nid2str(msg->mxm_dstnid));
3691                 goto cleanup;
3692         }
3693         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3694                 CNETERR("Can't accept %s: incompatible queue depth "
3695                             "%d (%d wanted)\n",
3696                                 libcfs_nid2str(msg->mxm_srcnid),
3697                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3698                                 *kmxlnd_tunables.kmx_peercredits);
3699                 incompatible = 1;
3700         }
3701         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3702                 CNETERR("Can't accept %s: incompatible EAGER size "
3703                             "%d (%d wanted)\n",
3704                                 libcfs_nid2str(msg->mxm_srcnid),
3705                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3706                                 (int) MXLND_MSG_SIZE);
3707                 incompatible = 1;
3708         }
3709
3710         if (peer == NULL) {
3711                 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3712                 if (peer == NULL) {
3713                         int             hash    = 0;
3714                         u32             board   = 0;
3715                         kmx_peer_t      *existing_peer    = NULL;
3716
3717                         hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3718
3719                         mx_nic_id_to_board_number(nic_id, &board);
3720
3721                         /* adds conn ref for peer and one for this function */
3722                         ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3723                                                board, ep_id, 0ULL);
3724                         if (ret != 0) {
3725                                 goto cleanup;
3726                         }
3727                         peer->mxp_conn->mxk_sid = sid;
3728                         LASSERT(peer->mxp_ep_id == ep_id);
3729                         cfs_write_lock(g_lock);
3730                         existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3731                         if (existing_peer) {
3732                                 mxlnd_conn_decref(peer->mxp_conn);
3733                                 mxlnd_peer_decref(peer);
3734                                 peer = existing_peer;
3735                                 mxlnd_conn_addref(peer->mxp_conn);
3736                                 conn = peer->mxp_conn;
3737                         } else {
3738                                 cfs_list_add_tail(&peer->mxp_list,
3739                                                   &kmxlnd_data.kmx_peers[hash]);
3740                                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
3741                         }
3742                         cfs_write_unlock(g_lock);
3743                 } else {
3744                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3745                         cfs_write_lock(g_lock);
3746                         mxlnd_peer_decref(peer); /* drop ref taken above */
3747                         cfs_write_unlock(g_lock);
3748                         if (ret != 0) {
3749                                 CNETERR("Cannot allocate mxp_conn\n");
3750                                 goto cleanup;
3751                         }
3752                 }
3753                 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3754                 conn = peer->mxp_conn;
3755         } else { /* unexpected handler found peer */
3756                 kmx_conn_t      *old_conn       = peer->mxp_conn;
3757
3758                 if (sid != peer->mxp_conn->mxk_sid) {
3759                         /* do not call mx_disconnect() or send a BYE */
3760                         mxlnd_conn_disconnect(old_conn, 0, 0);
3761
3762                         /* This allocs a conn, points peer->mxp_conn to this one.
3763                         * The old conn is still on the peer->mxp_conns list.
3764                         * As the pending requests complete, they will call
3765                         * conn_decref() which will eventually free it. */
3766                         ret = mxlnd_conn_alloc(&conn, peer);
3767                         if (ret != 0) {
3768                                 CNETERR("Cannot allocate peer->mxp_conn\n");
3769                                 goto cleanup;
3770                         }
3771                         /* conn_alloc() adds one ref for the peer and one
3772                          * for this function */
3773                         conn_ref = 1;
3774
3775                         peer->mxp_conn->mxk_sid = sid;
3776                 } else {
3777                         /* same sid */
3778                         conn = peer->mxp_conn;
3779                 }
3780         }
3781         cfs_write_lock(g_lock);
3782         peer->mxp_incompatible = incompatible;
3783         cfs_write_unlock(g_lock);
3784         cfs_spin_lock(&conn->mxk_lock);
3785         conn->mxk_incarnation = msg->mxm_srcstamp;
3786         mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3787         cfs_spin_unlock(&conn->mxk_lock);
3788
3789         /* handle_conn_ack() will create the CONN_ACK msg */
3790         mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3791
3792 cleanup:
3793         if (conn_ref) mxlnd_conn_decref(conn);
3794
3795         mxlnd_connparams_free(cp);
3796         return;
3797 }
3798
3799 void
3800 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3801 {
3802         int             ret             = 0;
3803         int             incompatible    = 0;
3804         u64             nic_id          = 0ULL;
3805         u32             ep_id           = 0;
3806         u32             sid             = 0;
3807         kmx_msg_t       *msg            = &cp->mxr_msg;
3808         kmx_peer_t      *peer           = cp->mxr_peer;
3809         kmx_conn_t      *conn           = cp->mxr_conn;
3810
3811         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3812
3813         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3814         if (ret != 0) {
3815                 if (peer) {
3816                         CNETERR("Error %d unpacking CONN_ACK from %s\n",
3817                                ret, libcfs_nid2str(peer->mxp_nid));
3818                 } else {
3819                         CNETERR("Error %d unpacking CONN_ACK from "
3820                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3821                 }
3822                 ret = -1;
3823                 incompatible = 1;
3824                 goto failed;
3825         }
3826         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3827                 CNETERR("Can't accept CONN_ACK from %s: "
3828                        "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3829                         libcfs_nid2str(msg->mxm_dstnid));
3830                 ret = -1;
3831                 goto failed;
3832         }
3833         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3834                 CNETERR("Can't accept CONN_ACK from %s: "
3835                        "incompatible queue depth %d (%d wanted)\n",
3836                         libcfs_nid2str(msg->mxm_srcnid),
3837                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3838                         *kmxlnd_tunables.kmx_peercredits);
3839                 incompatible = 1;
3840                 ret = -1;
3841                 goto failed;
3842         }
3843         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3844                 CNETERR("Can't accept CONN_ACK from %s: "
3845                         "incompatible EAGER size %d (%d wanted)\n",
3846                         libcfs_nid2str(msg->mxm_srcnid),
3847                         msg->mxm_u.conn_req.mxcrm_eager_size,
3848                         (int) MXLND_MSG_SIZE);
3849                 incompatible = 1;
3850                 ret = -1;
3851                 goto failed;
3852         }
3853         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3854         peer->mxp_incompatible = incompatible;
3855         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3856         cfs_spin_lock(&conn->mxk_lock);
3857         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3858         conn->mxk_outstanding = 0;
3859         conn->mxk_incarnation = msg->mxm_srcstamp;
3860         conn->mxk_timeout = 0;
3861         if (!incompatible) {
3862                 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3863                        libcfs_nid2str(msg->mxm_srcnid));
3864                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3865         }
3866         cfs_spin_unlock(&conn->mxk_lock);
3867
3868         if (!incompatible)
3869                 mxlnd_check_sends(peer);
3870
3871 failed:
3872         if (ret < 0) {
3873                 cfs_spin_lock(&conn->mxk_lock);
3874                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3875                 cfs_spin_unlock(&conn->mxk_lock);
3876         }
3877
3878         if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3879
3880         mxlnd_connparams_free(cp);
3881         return;
3882 }
3883
3884 int
3885 mxlnd_abort_msgs(void)
3886 {
3887         int                     count           = 0;
3888         cfs_list_t              *orphans        = &kmxlnd_data.kmx_orphan_msgs;
3889         cfs_spinlock_t          *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3890
3891         /* abort orphans */
3892         cfs_spin_lock(g_conn_lock);
3893         while (!cfs_list_empty(orphans)) {
3894                 kmx_ctx_t       *ctx     = NULL;
3895                 kmx_conn_t      *conn   = NULL;
3896
3897                 ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
3898                 cfs_list_del_init(&ctx->mxc_list);
3899                 cfs_spin_unlock(g_conn_lock);
3900
3901                 ctx->mxc_errno = -ECONNABORTED;
3902                 conn = ctx->mxc_conn;
3903                 CDEBUG(D_NET, "aborting %s %s %s\n",
3904                        mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3905                        ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3906                        libcfs_nid2str(ctx->mxc_nid));
3907                 if (ctx->mxc_type == MXLND_REQ_TX) {
3908                         mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3909                         if (conn) mxlnd_conn_decref(conn); /* for this tx */
3910                 } else {
3911                         ctx->mxc_state = MXLND_CTX_CANCELED;
3912                         mxlnd_handle_rx_completion(ctx);
3913                 }
3914
3915                 count++;
3916                 cfs_spin_lock(g_conn_lock);
3917         }
3918         cfs_spin_unlock(g_conn_lock);
3919
3920         return count;
3921 }
3922
3923 int
3924 mxlnd_free_conn_zombies(void)
3925 {
3926         int                     count           = 0;
3927         cfs_list_t             *zombies        = &kmxlnd_data.kmx_conn_zombies;
3928         cfs_spinlock_t         *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3929         cfs_rwlock_t           *g_lock         = &kmxlnd_data.kmx_global_lock;
3930
3931         /* cleanup any zombies */
3932         cfs_spin_lock(g_conn_lock);
3933         while (!cfs_list_empty(zombies)) {
3934                 kmx_conn_t      *conn   = NULL;
3935
3936                 conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3937                 cfs_list_del_init(&conn->mxk_zombie);
3938                 cfs_spin_unlock(g_conn_lock);
3939
3940                 cfs_write_lock(g_lock);
3941                 mxlnd_conn_free_locked(conn);
3942                 cfs_write_unlock(g_lock);
3943
3944                 count++;
3945                 cfs_spin_lock(g_conn_lock);
3946         }
3947         cfs_spin_unlock(g_conn_lock);
3948         CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3949         return count;
3950 }
3951
3952 /**
3953  * mxlnd_connd - handles incoming connection requests
3954  * @arg - thread id (as a void *)
3955  *
3956  * This thread handles incoming connection requests
3957  */
3958 int
3959 mxlnd_connd(void *arg)
3960 {
3961         long                    id              = (long) arg;
3962
3963         cfs_daemonize("mxlnd_connd");
3964
3965         CDEBUG(D_NET, "connd starting\n");
3966
3967         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3968                 int                ret             = 0;
3969                 kmx_connparams_t  *cp              = NULL;
3970                 cfs_spinlock_t    *g_conn_lock     = &kmxlnd_data.kmx_conn_lock;
3971                 cfs_list_t        *conn_reqs       = &kmxlnd_data.kmx_conn_reqs;
3972
3973                 ret = cfs_down_interruptible(&kmxlnd_data.kmx_conn_sem);
3974
3975                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
3976                         break;
3977
3978                 if (ret != 0)
3979                         continue;
3980
3981                 ret = mxlnd_abort_msgs();
3982                 ret += mxlnd_free_conn_zombies();
3983
3984                 cfs_spin_lock(g_conn_lock);
3985                 if (cfs_list_empty(conn_reqs)) {
3986                         if (ret == 0)
3987                                 CNETERR("connd woke up but did not "
3988                                        "find a kmx_connparams_t or zombie conn\n");
3989                         cfs_spin_unlock(g_conn_lock);
3990                         continue;
3991                 }
3992                 cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
3993                                     mxr_list);
3994                 cfs_list_del_init(&cp->mxr_list);
3995                 cfs_spin_unlock(g_conn_lock);
3996
3997                 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
3998                 case MXLND_MSG_CONN_REQ:
3999                         /* We have a connection request. Handle it. */
4000                         mxlnd_passive_connect(cp);
4001                         break;
4002                 case MXLND_MSG_CONN_ACK:
4003                         /* The peer is ready for messages */
4004                         mxlnd_check_conn_ack(cp);
4005                         break;
4006                 }
4007         }
4008
4009         mxlnd_free_conn_zombies();
4010
4011         CDEBUG(D_NET, "connd stopping\n");
4012         mxlnd_thread_stop(id);
4013         return 0;
4014 }
4015
4016 /**
4017  * mxlnd_timeoutd - enforces timeouts on messages
4018  * @arg - thread id (as a void *)
4019  *
4020  * This thread queries each peer for its earliest timeout. If a peer has timed out,
4021  * it calls mxlnd_conn_disconnect().
4022  *
4023  * After checking for timeouts, try progressing sends (call check_sends()).
4024  */
4025 int
4026 mxlnd_timeoutd(void *arg)
4027 {
4028         int             i       = 0;
4029         long            id      = (long) arg;
4030         unsigned long   now     = 0;
4031         unsigned long   next    = 0;
4032         unsigned long   delay   = CFS_HZ;
4033         kmx_peer_t     *peer    = NULL;
4034         kmx_peer_t     *temp    = NULL;
4035         kmx_conn_t     *conn    = NULL;
4036         cfs_rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
4037
4038         cfs_daemonize("mxlnd_timeoutd");
4039
4040         CDEBUG(D_NET, "timeoutd starting\n");
4041
4042         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
4043
4044                 now = jiffies;
4045                 /* if the next timeout has not arrived, go back to sleep */
4046                 if (cfs_time_after(now, next)) {
4047                         next = mxlnd_check_timeouts(now);
4048                 }
4049
4050                 /* try to progress peers' txs */
4051                cfs_write_lock(g_lock);
4052                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4053                         cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
4054
4055                         /* NOTE we are safe against the removal of peer, but
4056                          * not against the removal of temp */
4057                         cfs_list_for_each_entry_safe(peer, temp, peers,
4058                                                      mxp_list) {
4059                                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
4060                                         break;
4061                                 mxlnd_peer_addref(peer); /* add ref... */
4062                                 conn = peer->mxp_conn;
4063                                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4064                                         mxlnd_conn_addref(conn); /* take ref... */
4065                                 } else {
4066                                         CDEBUG(D_NET, "ignoring %s\n",
4067                                                libcfs_nid2str(peer->mxp_nid));
4068                                         mxlnd_peer_decref(peer); /* ...to here */
4069                                         continue;
4070                                 }
4071
4072                                 if ((conn->mxk_status == MXLND_CONN_READY ||
4073                                     conn->mxk_status == MXLND_CONN_FAIL) &&
4074                                     cfs_time_after(now,
4075                                                    conn->mxk_last_tx +
4076                                                    CFS_HZ)) {
4077                                         cfs_write_unlock(g_lock);
4078                                         mxlnd_check_sends(peer);
4079                                         cfs_write_lock(g_lock);
4080                                 }
4081                                 mxlnd_conn_decref(conn); /* until here */
4082                                 mxlnd_peer_decref(peer); /* ...to here */
4083                         }
4084                 }
4085                 cfs_write_unlock(g_lock);
4086
4087                 mxlnd_sleep(delay);
4088         }
4089         CDEBUG(D_NET, "timeoutd stopping\n");
4090         mxlnd_thread_stop(id);
4091         return 0;
4092 }