Whamcloud - gitweb
LU-2675 lnet: remove lnet/include/lnet/linux/
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include <asm/page.h>
45 #include "mxlnd.h"
46
47 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
48
49 inline int
50 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
51 {
52         /* if memcmp() == 0, it is NULL */
53         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
54 }
55
56 char *
57 mxlnd_ctxstate_to_str(int mxc_state)
58 {
59         switch (mxc_state) {
60         case MXLND_CTX_INIT:
61                 return "MXLND_CTX_INIT";
62         case MXLND_CTX_IDLE:
63                 return "MXLND_CTX_IDLE";
64         case MXLND_CTX_PREP:
65                 return "MXLND_CTX_PREP";
66         case MXLND_CTX_PENDING:
67                 return "MXLND_CTX_PENDING";
68         case MXLND_CTX_COMPLETED:
69                 return "MXLND_CTX_COMPLETED";
70         case MXLND_CTX_CANCELED:
71                 return "MXLND_CTX_CANCELED";
72         default:
73                 return "*unknown*";
74         }
75 }
76
77 char *
78 mxlnd_connstatus_to_str(int mxk_status)
79 {
80         switch (mxk_status) {
81         case MXLND_CONN_READY:
82                 return "MXLND_CONN_READY";
83         case MXLND_CONN_INIT:
84                 return "MXLND_CONN_INIT";
85         case MXLND_CONN_WAIT:
86                 return "MXLND_CONN_WAIT";
87         case MXLND_CONN_DISCONNECT:
88                 return "MXLND_CONN_DISCONNECT";
89         case MXLND_CONN_FAIL:
90                 return "MXLND_CONN_FAIL";
91         default:
92                 return "unknown";
93         }
94 }
95
96 char *
97 mxlnd_msgtype_to_str(int type) {
98         switch (type) {
99         case MXLND_MSG_EAGER:
100                 return "MXLND_MSG_EAGER";
101         case MXLND_MSG_CONN_REQ:
102                 return "MXLND_MSG_CONN_REQ";
103         case MXLND_MSG_CONN_ACK:
104                 return "MXLND_MSG_CONN_ACK";
105         case MXLND_MSG_BYE:
106                 return "MXLND_MSG_BYE";
107         case MXLND_MSG_NOOP:
108                 return "MXLND_MSG_NOOP";
109         case MXLND_MSG_PUT_REQ:
110                 return "MXLND_MSG_PUT_REQ";
111         case MXLND_MSG_PUT_ACK:
112                 return "MXLND_MSG_PUT_ACK";
113         case MXLND_MSG_PUT_DATA:
114                 return "MXLND_MSG_PUT_DATA";
115         case MXLND_MSG_GET_REQ:
116                 return "MXLND_MSG_GET_REQ";
117         case MXLND_MSG_GET_DATA:
118                 return "MXLND_MSG_GET_DATA";
119         default:
120                 return "unknown";
121         }
122 }
123
124 char *
125 mxlnd_lnetmsg_to_str(int type)
126 {
127         switch (type) {
128         case LNET_MSG_ACK:
129                 return "LNET_MSG_ACK";
130         case LNET_MSG_PUT:
131                 return "LNET_MSG_PUT";
132         case LNET_MSG_GET:
133                 return "LNET_MSG_GET";
134         case LNET_MSG_REPLY:
135                 return "LNET_MSG_REPLY";
136         case LNET_MSG_HELLO:
137                 return "LNET_MSG_HELLO";
138         default:
139                 LBUG();
140                 return "*unknown*";
141         }
142 }
143
144 static inline u64
145 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
146 {
147         u64 type        = (u64) ctx->mxc_msg_type;
148         u64 err         = (u64) error;
149         u64 match       = 0ULL;
150
151         mxlnd_valid_msg_type(ctx->mxc_msg_type);
152         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
153         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
154         return match;
155 }
156
157 static inline void
158 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
159 {
160         *msg_type = (u8) MXLND_MSG_TYPE(match);
161         *error    = (u8) MXLND_ERROR_VAL(match);
162         *cookie   = match & MXLND_MAX_COOKIE;
163         mxlnd_valid_msg_type(*msg_type);
164         return;
165 }
166
167 kmx_ctx_t *
168 mxlnd_get_idle_rx(kmx_conn_t *conn)
169 {
170         cfs_list_t              *rxs    = NULL;
171         kmx_ctx_t               *rx     = NULL;
172
173         LASSERT(conn != NULL);
174
175         rxs = &conn->mxk_rx_idle;
176
177         spin_lock(&conn->mxk_lock);
178
179         if (cfs_list_empty (rxs)) {
180                 spin_unlock(&conn->mxk_lock);
181                 return NULL;
182         }
183
184         rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
185         cfs_list_del_init(&rx->mxc_list);
186         spin_unlock(&conn->mxk_lock);
187
188 #if MXLND_DEBUG
189         if (rx->mxc_get != rx->mxc_put) {
190                 CNETERR("*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
191                 CNETERR("*** incarnation= %lld ***\n", rx->mxc_incarnation);
192                 CNETERR("*** deadline= %ld ***\n", rx->mxc_deadline);
193                 CNETERR("*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
194                 CNETERR("*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
195                 CNETERR("*** nid= 0x%llx ***\n", rx->mxc_nid);
196                 CNETERR("*** peer= 0x%p ***\n", rx->mxc_peer);
197                 CNETERR("*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
198                 CNETERR("*** cookie= 0x%llx ***\n", rx->mxc_cookie);
199                 CNETERR("*** nob= %d ***\n", rx->mxc_nob);
200         }
201 #endif
202         LASSERT (rx->mxc_get == rx->mxc_put);
203
204         rx->mxc_get++;
205
206         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
207         rx->mxc_state = MXLND_CTX_PREP;
208         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
209
210         return rx;
211 }
212
213 int
214 mxlnd_put_idle_rx(kmx_ctx_t *rx)
215 {
216         kmx_conn_t              *conn   = rx->mxc_conn;
217         cfs_list_t              *rxs    = &conn->mxk_rx_idle;
218
219         LASSERT(rx->mxc_type == MXLND_REQ_RX);
220
221         mxlnd_ctx_init(rx);
222
223         rx->mxc_put++;
224         LASSERT(rx->mxc_get == rx->mxc_put);
225
226         spin_lock(&conn->mxk_lock);
227         cfs_list_add(&rx->mxc_list, rxs);
228         spin_unlock(&conn->mxk_lock);
229         return 0;
230 }
231
232 kmx_ctx_t *
233 mxlnd_get_idle_tx(void)
234 {
235         cfs_list_t              *tmp    = &kmxlnd_data.kmx_tx_idle;
236         kmx_ctx_t               *tx     = NULL;
237
238         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
239
240         if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
241                 CNETERR("%d txs in use\n", kmxlnd_data.kmx_tx_used);
242                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
243                 return NULL;
244         }
245
246         tmp = &kmxlnd_data.kmx_tx_idle;
247         tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
248         cfs_list_del_init(&tx->mxc_list);
249
250         /* Allocate a new completion cookie.  It might not be needed,
251          * but we've got a lock right now and we're unlikely to
252          * wrap... */
253         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
254         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
255                 kmxlnd_data.kmx_tx_next_cookie = 1;
256         }
257         kmxlnd_data.kmx_tx_used++;
258         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
259
260         LASSERT (tx->mxc_get == tx->mxc_put);
261
262         tx->mxc_get++;
263
264         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
265         LASSERT (tx->mxc_lntmsg[0] == NULL);
266         LASSERT (tx->mxc_lntmsg[1] == NULL);
267
268         tx->mxc_state = MXLND_CTX_PREP;
269         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
270
271         return tx;
272 }
273
274 void
275 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
276
277 int
278 mxlnd_put_idle_tx(kmx_ctx_t *tx)
279 {
280         int             result  = 0;
281         lnet_msg_t      *lntmsg[2];
282
283         LASSERT(tx->mxc_type == MXLND_REQ_TX);
284
285         if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
286                 kmx_conn_t      *conn   = tx->mxc_conn;
287
288                 result = -EIO;
289                 if (tx->mxc_errno != 0) result = tx->mxc_errno;
290                 /* FIXME should we set mx_dis? */
291                 mxlnd_conn_disconnect(conn, 0, 1);
292         }
293
294         lntmsg[0] = tx->mxc_lntmsg[0];
295         lntmsg[1] = tx->mxc_lntmsg[1];
296
297         mxlnd_ctx_init(tx);
298
299         tx->mxc_put++;
300         LASSERT(tx->mxc_get == tx->mxc_put);
301
302         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
303         cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
304         kmxlnd_data.kmx_tx_used--;
305         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
306
307         if (lntmsg[0] != NULL)
308                 lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
309         if (lntmsg[1] != NULL)
310                 lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
311         return 0;
312 }
313
314
315 void
316 mxlnd_connparams_free(kmx_connparams_t *cp)
317 {
318         LASSERT(cfs_list_empty(&cp->mxr_list));
319         MXLND_FREE(cp, sizeof(*cp));
320         return;
321 }
322
323 int
324 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
325                             mx_endpoint_addr_t epa, u64 match, u32 length,
326                             kmx_conn_t *conn, kmx_peer_t *peer, void *data)
327 {
328         kmx_connparams_t *c = NULL;
329
330         MXLND_ALLOC(c, sizeof(*c));
331         if (!c) return -ENOMEM;
332
333         CFS_INIT_LIST_HEAD(&c->mxr_list);
334         c->mxr_context = context;
335         c->mxr_epa = epa;
336         c->mxr_match = match;
337         c->mxr_nob = length;
338         c->mxr_conn = conn;
339         c->mxr_peer = peer;
340         c->mxr_msg = *((kmx_msg_t *) data);
341
342         *cp = c;
343         return 0;
344 }
345
346 static inline void
347 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
348 {
349         conn->mxk_status = status;
350         smp_mb();
351 }
352
353 /**
354  * mxlnd_conn_free_locked - free the conn
355  * @conn - a kmx_conn pointer
356  *
357  * The calling function should remove the conn from the conns list first
358  * then destroy it. Caller should have write-locked kmx_global_lock.
359  */
360 void
361 mxlnd_conn_free_locked(kmx_conn_t *conn)
362 {
363         int             valid   = !mxlnd_endpoint_addr_null(conn->mxk_epa);
364         kmx_peer_t      *peer   = conn->mxk_peer;
365
366         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
367         LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
368                  cfs_list_empty (&conn->mxk_tx_free_queue) &&
369                  cfs_list_empty (&conn->mxk_pending));
370         if (!cfs_list_empty(&conn->mxk_list)) {
371                 cfs_list_del_init(&conn->mxk_list);
372                 if (peer->mxp_conn == conn) {
373                         peer->mxp_conn = NULL;
374                         if (valid) {
375                                 kmx_conn_t      *temp   = NULL;
376
377                                 mx_get_endpoint_addr_context(conn->mxk_epa,
378                                                              (void **) &temp);
379                                 if (conn == temp) {
380                                         mx_set_endpoint_addr_context(conn->mxk_epa,
381                                                                      (void *) NULL);
382                                 }
383                         }
384                         /* unlink from global list and drop its ref */
385                         cfs_list_del_init(&peer->mxp_list);
386                         mxlnd_peer_decref(peer);
387                 }
388         }
389         mxlnd_peer_decref(peer); /* drop conn's ref to peer */
390         if (conn->mxk_rx_pages) {
391                 LASSERT (conn->mxk_rxs != NULL);
392                 mxlnd_free_pages(conn->mxk_rx_pages);
393         }
394         if (conn->mxk_rxs) {
395                 int             i       = 0;
396                 kmx_ctx_t       *rx     = NULL;
397
398                 for (i = 0; i < MXLND_RX_MSGS(); i++) {
399                         rx = &conn->mxk_rxs[i];
400                         if (rx->mxc_seg_list != NULL) {
401                                 LASSERT(rx->mxc_nseg > 0);
402                                 MXLND_FREE(rx->mxc_seg_list,
403                                            rx->mxc_nseg *
404                                            sizeof(*rx->mxc_seg_list));
405                         }
406                 }
407                 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
408         }
409
410         MXLND_FREE(conn, sizeof (*conn));
411         return;
412 }
413
414
415 int
416 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
417 {
418         int             found   = 0;
419         int             count   = 0;
420         kmx_ctx_t       *ctx    = NULL;
421         kmx_ctx_t       *next   = NULL;
422         mx_return_t     mxret   = MX_SUCCESS;
423         u32             result  = 0;
424
425         do {
426                 found = 0;
427                 spin_lock(&conn->mxk_lock);
428                 cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
429                                              mxc_list) {
430                         cfs_list_del_init(&ctx->mxc_list);
431                         if (ctx->mxc_type == MXLND_REQ_RX) {
432                                 found = 1;
433                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
434                                                   &ctx->mxc_mxreq,
435                                                   &result);
436                                 if (mxret != MX_SUCCESS) {
437                                         CNETERR("mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
438                                 }
439                                 if (result == 1) {
440                                         ctx->mxc_errno = -ECONNABORTED;
441                                         ctx->mxc_state = MXLND_CTX_CANCELED;
442                                         spin_unlock(&conn->mxk_lock);
443                                         spin_lock(&kmxlnd_data.kmx_conn_lock);
444                                         /* we may be holding the global lock,
445                                          * move to orphan list so that it can free it */
446                                         cfs_list_add_tail(&ctx->mxc_list,
447                                                           &kmxlnd_data.kmx_orphan_msgs);
448                                         count++;
449                                         spin_unlock(&kmxlnd_data.kmx_conn_lock);
450                                         spin_lock(&conn->mxk_lock);
451                                 }
452                                 break;
453                         }
454                 }
455                 spin_unlock(&conn->mxk_lock);
456         } while (found);
457
458         return count;
459 }
460
461 int
462 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
463 {
464         int             count   = 0;
465         cfs_list_t      *tmp    = NULL;
466
467         spin_lock(&conn->mxk_lock);
468         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
469                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
470
471                 kmx_ctx_t       *tx     = NULL;
472
473                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
474                         tmp = &conn->mxk_tx_free_queue;
475                 } else {
476                         tmp = &conn->mxk_tx_credit_queue;
477                 }
478
479                 tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
480                 cfs_list_del_init(&tx->mxc_list);
481                 spin_unlock(&conn->mxk_lock);
482                 tx->mxc_errno = -ECONNABORTED;
483                 tx->mxc_state = MXLND_CTX_CANCELED;
484                 /* move to orphan list and then abort */
485                 spin_lock(&kmxlnd_data.kmx_conn_lock);
486                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
487                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
488                 count++;
489                 spin_lock(&conn->mxk_lock);
490         }
491         spin_unlock(&conn->mxk_lock);
492
493         return count;
494 }
495
496 void
497 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
498 {
499         u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
500                     (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
501
502         mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
503                   epa, match, NULL, NULL);
504         return;
505 }
506
507 /**
508  * mxlnd_conn_disconnect - shutdown a connection
509  * @conn - a kmx_conn pointer
510  * @mx_dis - call mx_disconnect()
511  * @send_bye - send peer a BYE msg
512  *
513  * This function sets the status to DISCONNECT, completes queued
514  * txs with failure, calls mx_disconnect, which will complete
515  * pending txs and matched rxs with failure.
516  */
517 void
518 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
519 {
520         mx_endpoint_addr_t      epa     = conn->mxk_epa;
521         int                     valid   = !mxlnd_endpoint_addr_null(epa);
522         int                     count   = 0;
523
524         spin_lock(&conn->mxk_lock);
525         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
526                 spin_unlock(&conn->mxk_lock);
527                 return;
528         }
529         mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
530         conn->mxk_timeout = 0;
531         spin_unlock(&conn->mxk_lock);
532
533         count = mxlnd_cancel_queued_txs(conn);
534         count += mxlnd_conn_cancel_pending_rxs(conn);
535
536         if (count) /* let connd call kmxlnd_abort_msgs() */
537                 up(&kmxlnd_data.kmx_conn_sem);
538
539         if (send_bye && valid &&
540             conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
541                 /* send a BYE to the peer */
542                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
543                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
544                 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
545                 /* wait to allow the peer to ack our message */
546                 mxlnd_sleep(msecs_to_jiffies(20));
547         }
548
549         if (atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
550                 unsigned long   last_msg        = 0;
551
552                 /* notify LNET that we are giving up on this peer */
553                 if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
554                         last_msg = conn->mxk_last_rx;
555                 else
556                         last_msg = conn->mxk_last_tx;
557
558                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
559
560                 if (mx_dis && valid &&
561                     (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
562                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
563         }
564         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
565
566         return;
567 }
568
569 /**
570  * mxlnd_conn_alloc - allocate and initialize a new conn struct
571  * @connp - address of a kmx_conn pointer
572  * @peer - owning kmx_peer
573  *
574  * Returns 0 on success and -ENOMEM on failure
575  */
576 int
577 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
578 {
579         int             i       = 0;
580         int             ret     = 0;
581         int             ipage   = 0;
582         int             offset  = 0;
583         void           *addr    = NULL;
584         kmx_conn_t     *conn    = NULL;
585         kmx_pages_t    *pages   = NULL;
586         struct page    *page    = NULL;
587         kmx_ctx_t      *rx      = NULL;
588
589         LASSERT(peer != NULL);
590
591         MXLND_ALLOC(conn, sizeof (*conn));
592         if (conn == NULL) {
593                 CNETERR("Cannot allocate conn\n");
594                 return -ENOMEM;
595         }
596         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
597
598         memset(conn, 0, sizeof(*conn));
599
600         ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
601         if (ret != 0) {
602                 CERROR("Can't allocate rx pages\n");
603                 MXLND_FREE(conn, sizeof(*conn));
604                 return -ENOMEM;
605         }
606         conn->mxk_rx_pages = pages;
607
608         MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
609         if (conn->mxk_rxs == NULL) {
610                 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
611                 mxlnd_free_pages(pages);
612                 MXLND_FREE(conn, sizeof(*conn));
613                 return -ENOMEM;
614         }
615
616         memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
617
618         conn->mxk_peer = peer;
619         CFS_INIT_LIST_HEAD(&conn->mxk_list);
620         CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
621         atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
622                                                    and one for the caller */
623         if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
624                 u64     nic_id  = 0ULL;
625                 u32     ep_id   = 0;
626
627                 /* this is localhost, set the epa and status as up */
628                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
629                 conn->mxk_epa = kmxlnd_data.kmx_epa;
630                 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
631                 peer->mxp_reconnect_time = 0;
632                 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
633                 peer->mxp_nic_id = nic_id;
634                 peer->mxp_ep_id = ep_id;
635                 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
636                 conn->mxk_timeout = 0;
637         } else {
638                 /* conn->mxk_incarnation = 0 - will be set by peer */
639                 /* conn->mxk_sid = 0 - will be set by peer */
640                 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
641                 /* mxk_epa - to be set after mx_iconnect() */
642         }
643         spin_lock_init(&conn->mxk_lock);
644         /* conn->mxk_timeout = 0 */
645         /* conn->mxk_last_tx = 0 */
646         /* conn->mxk_last_rx = 0 */
647         CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
648
649         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
650         /* mxk_outstanding = 0 */
651
652         CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
653         CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
654         /* conn->mxk_ntx_msgs = 0 */
655         /* conn->mxk_ntx_data = 0 */
656         /* conn->mxk_ntx_posted = 0 */
657         /* conn->mxk_data_posted = 0 */
658         CFS_INIT_LIST_HEAD(&conn->mxk_pending);
659
660         for (i = 0; i < MXLND_RX_MSGS(); i++) {
661
662                 rx = &conn->mxk_rxs[i];
663                 rx->mxc_type = MXLND_REQ_RX;
664                 CFS_INIT_LIST_HEAD(&rx->mxc_list);
665
666                 /* map mxc_msg to page */
667                 page = pages->mxg_pages[ipage];
668                 addr = page_address(page);
669                 LASSERT(addr != NULL);
670                 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
671                 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
672
673                 rx->mxc_conn = conn;
674                 rx->mxc_peer = peer;
675                 rx->mxc_nid = peer->mxp_nid;
676
677                 mxlnd_ctx_init(rx);
678
679                 offset += MXLND_MSG_SIZE;
680                 LASSERT (offset <= PAGE_SIZE);
681
682                 if (offset == PAGE_SIZE) {
683                         offset = 0;
684                         ipage++;
685                         LASSERT (ipage <= MXLND_TX_MSG_PAGES());
686                 }
687
688                 cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
689         }
690
691         *connp = conn;
692
693         mxlnd_peer_addref(peer);        /* add a ref for this conn */
694
695         /* add to front of peer's conns list */
696         cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
697         peer->mxp_conn = conn;
698         return 0;
699 }
700
701 int
702 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
703 {
704         int             ret     = 0;
705         rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
706
707         write_lock(g_lock);
708         ret = mxlnd_conn_alloc_locked(connp, peer);
709         write_unlock(g_lock);
710         return ret;
711 }
712
713 int
714 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
715 {
716         int             ret     = 0;
717         kmx_conn_t      *conn   = ctx->mxc_conn;
718
719         ctx->mxc_state = MXLND_CTX_PENDING;
720         if (conn != NULL) {
721                 spin_lock(&conn->mxk_lock);
722                 if (conn->mxk_status >= MXLND_CONN_INIT) {
723                         cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
724                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
725                                 conn->mxk_timeout = ctx->mxc_deadline;
726                         }
727                 } else {
728                         ctx->mxc_state = MXLND_CTX_COMPLETED;
729                         ret = -1;
730                 }
731                 spin_unlock(&conn->mxk_lock);
732         }
733         return ret;
734 }
735
736 int
737 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
738 {
739         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
740                 ctx->mxc_state == MXLND_CTX_COMPLETED);
741         if (ctx->mxc_state != MXLND_CTX_PENDING &&
742             ctx->mxc_state != MXLND_CTX_COMPLETED) {
743                 CNETERR("deq ctx->mxc_state = %s\n",
744                         mxlnd_ctxstate_to_str(ctx->mxc_state));
745         }
746         ctx->mxc_state = MXLND_CTX_COMPLETED;
747         if (!cfs_list_empty(&ctx->mxc_list)) {
748                 kmx_conn_t      *conn = ctx->mxc_conn;
749                 kmx_ctx_t       *next = NULL;
750
751                 LASSERT(conn != NULL);
752                 spin_lock(&conn->mxk_lock);
753                 cfs_list_del_init(&ctx->mxc_list);
754                 conn->mxk_timeout = 0;
755                 if (!cfs_list_empty(&conn->mxk_pending)) {
756                         next = cfs_list_entry(conn->mxk_pending.next,
757                                               kmx_ctx_t, mxc_list);
758                         conn->mxk_timeout = next->mxc_deadline;
759                 }
760                 spin_unlock(&conn->mxk_lock);
761         }
762         return 0;
763 }
764
765 /**
766  * mxlnd_peer_free - free the peer
767  * @peer - a kmx_peer pointer
768  *
769  * The calling function should decrement the rxs, drain the tx queues and
770  * remove the peer from the peers list first then destroy it.
771  */
772 void
773 mxlnd_peer_free(kmx_peer_t *peer)
774 {
775         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
776
777         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
778
779         if (!cfs_list_empty(&peer->mxp_list)) {
780                 /* assume we are locked */
781                 cfs_list_del_init(&peer->mxp_list);
782         }
783
784         MXLND_FREE(peer, sizeof (*peer));
785         atomic_dec(&kmxlnd_data.kmx_npeers);
786         return;
787 }
788
789 static int
790 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
791 {
792         int                     ret     = -EHOSTUNREACH;
793         unsigned char           *haddr  = NULL;
794         struct net_device       *dev    = NULL;
795         struct neighbour        *n      = NULL;
796         __be32                  dst_ip  = htonl(ip);
797
798         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
799         if (dev == NULL)
800                 return -ENODEV;
801
802         haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
803
804         n = neigh_lookup(&arp_tbl, &dst_ip, dev);
805         if (n) {
806                 n->used = jiffies;
807                 if (n->nud_state & NUD_VALID) {
808                         memcpy(haddr, n->ha, dev->addr_len);
809                         neigh_release(n);
810                         ret = 0;
811                 }
812         }
813
814         dev_put(dev);
815
816         return ret;
817 }
818
819
820 /* We only want the MAC address of the peer's Myricom NIC. We
821  * require that each node has the IPoMX interface (myriN) up.
822  * We will not pass any traffic over IPoMX, but it allows us
823  * to get the MAC address. */
824 static int
825 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
826 {
827         int                     ret     = 0;
828         int                     try     = 1;
829         int                     fatal   = 0;
830         u64                     tmp_id  = 0ULL;
831         cfs_socket_t            *sock   = NULL;
832
833         do {
834                 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
835                 ret = mxlnd_lookup_mac(ip, &tmp_id);
836                 if (ret == 0) {
837                         break;
838                 } else {
839                         /* not found, try to connect (force an arp) */
840                         ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
841                         if (ret == -ECONNREFUSED) {
842                                 /* peer is there, get the MAC address */
843                                 mxlnd_lookup_mac(ip, &tmp_id);
844                                 if (tmp_id != 0ULL)
845                                         ret = 0;
846                                 break;
847                         } else if (ret == -EHOSTUNREACH && try < tries) {
848                                 /* add a little backoff */
849                                 CDEBUG(D_NET, "sleeping for %lu jiffies\n",
850                                        msecs_to_jiffies(MSEC_PER_SEC / 4));
851                                 mxlnd_sleep(msecs_to_jiffies(MSEC_PER_SEC / 4));
852                         }
853                 }
854         } while (try++ < tries);
855         CDEBUG(D_NET, "done trying. ret = %d\n", ret);
856
857         if (tmp_id == 0ULL)
858                 ret = -EHOSTUNREACH;
859 #if __BYTE_ORDER == __LITTLE_ENDIAN
860         *nic_id = ___arch__swab64(tmp_id);
861 #else
862         *nic_id = tmp_id;
863 #endif
864         return ret;
865 }
866
867 /**
868  * mxlnd_peer_alloc - allocate and initialize a new peer struct
869  * @peerp - address of a kmx_peer pointer
870  * @nid - LNET node id
871  *
872  * Returns 0 on success and -ENOMEM on failure
873  */
874 int
875 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
876 {
877         int             ret     = 0;
878         u32             ip      = LNET_NIDADDR(nid);
879         kmx_peer_t     *peer    = NULL;
880
881         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
882
883         MXLND_ALLOC(peer, sizeof (*peer));
884         if (peer == NULL) {
885                 CNETERR("Cannot allocate peer for NID 0x%llx\n",
886                        nid);
887                 return -ENOMEM;
888         }
889         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
890
891         memset(peer, 0, sizeof(*peer));
892
893         CFS_INIT_LIST_HEAD(&peer->mxp_list);
894         peer->mxp_nid = nid;
895         /* peer->mxp_ni unused - may be used for multi-rail */
896         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
897
898         peer->mxp_board = board;
899         peer->mxp_ep_id = ep_id;
900         peer->mxp_nic_id = nic_id;
901
902         CFS_INIT_LIST_HEAD(&peer->mxp_conns);
903         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
904         if (ret != 0) {
905                 mxlnd_peer_decref(peer);
906                 return ret;
907         }
908         CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
909
910         if (peer->mxp_nic_id != 0ULL)
911                 nic_id = peer->mxp_nic_id;
912
913         if (nic_id == 0ULL) {
914                 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
915                 if (ret == 0) {
916                         peer->mxp_nic_id = nic_id;
917                         mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
918                 }
919         }
920
921         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
922
923         /* peer->mxp_reconnect_time = 0 */
924         /* peer->mxp_incompatible = 0 */
925
926         *peerp = peer;
927         return 0;
928 }
929
930 static inline kmx_peer_t *
931 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
932 {
933         int             found   = 0;
934         int             hash    = 0;
935         kmx_peer_t      *peer   = NULL;
936
937         hash = mxlnd_nid_to_hash(nid);
938
939         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
940                 if (peer->mxp_nid == nid) {
941                         found = 1;
942                         mxlnd_peer_addref(peer);
943                         break;
944                 }
945         }
946         return (found ? peer : NULL);
947 }
948
949 static kmx_peer_t *
950 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
951 {
952         int             ret     = 0;
953         int             hash    = 0;
954         kmx_peer_t      *peer   = NULL;
955         kmx_peer_t      *old    = NULL;
956         rwlock_t    *g_lock = &kmxlnd_data.kmx_global_lock;
957
958         read_lock(g_lock);
959         peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
960
961         if ((peer && peer->mxp_conn) || /* found peer with conn or */
962             (!peer && !create)) {       /* did not find peer and do not create one */
963                 read_unlock(g_lock);
964                 return peer;
965         }
966
967         read_unlock(g_lock);
968
969         /* if peer but _not_ conn */
970         if (peer && !peer->mxp_conn) {
971                 if (create) {
972                         write_lock(g_lock);
973                         if (!peer->mxp_conn) { /* check again */
974                                 /* create the conn */
975                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
976                                 if (ret != 0) {
977                                         /* we tried, return the peer only.
978                                          * the caller needs to see if the conn exists */
979                                         CNETERR("%s: %s could not alloc conn\n",
980                                         __func__, libcfs_nid2str(peer->mxp_nid));
981                                 } else {
982                                         /* drop extra conn ref */
983                                         mxlnd_conn_decref(peer->mxp_conn);
984                                 }
985                         }
986                         write_unlock(g_lock);
987                 }
988                 return peer;
989         }
990
991         /* peer not found and we need to create one */
992         hash = mxlnd_nid_to_hash(nid);
993
994         /* create peer (and conn) */
995         /* adds conn ref for peer and one for this function */
996         ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
997                                *kmxlnd_tunables.kmx_ep_id, 0ULL);
998         if (ret != 0) /* no memory, peer is NULL */
999                 return NULL;
1000
1001         write_lock(g_lock);
1002
1003         /* look again */
1004         old = mxlnd_find_peer_by_nid_locked(nid);
1005         if (old) {
1006                 /* someone already created one */
1007                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1008                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1009                 mxlnd_peer_decref(peer);
1010                 peer = old;
1011         } else {
1012                 /* no other peer, use this one */
1013                 cfs_list_add_tail(&peer->mxp_list,
1014                                   &kmxlnd_data.kmx_peers[hash]);
1015                 atomic_inc(&kmxlnd_data.kmx_npeers);
1016                 mxlnd_peer_addref(peer);
1017                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1018         }
1019
1020         write_unlock(g_lock);
1021
1022         return peer;
1023 }
1024
1025 static inline int
1026 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1027 {
1028         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1029                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1030                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1031                 tx->mxc_msg_type == MXLND_MSG_NOOP);
1032 }
1033
1034 /**
1035  * mxlnd_init_msg - set type and number of bytes
1036  * @msg - msg pointer
1037  * @type - of message
1038  * @body_nob - bytes in msg body
1039  */
1040 static inline void
1041 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1042 {
1043         msg->mxm_type = type;
1044         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
1045 }
1046
1047 static inline void
1048 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1049 {
1050         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
1051         kmx_msg_t       *msg    = NULL;
1052
1053         LASSERT (tx != NULL);
1054         LASSERT (nob <= MXLND_MSG_SIZE);
1055
1056         tx->mxc_nid = nid;
1057         /* tx->mxc_peer should have already been set if we know it */
1058         tx->mxc_msg_type = type;
1059         tx->mxc_nseg = 1;
1060         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1061         tx->mxc_seg.segment_length = nob;
1062         tx->mxc_pin_type = MX_PIN_PHYSICAL;
1063
1064         msg = tx->mxc_msg;
1065         msg->mxm_type = type;
1066         msg->mxm_nob  = nob;
1067
1068         return;
1069 }
1070
1071 static inline __u32
1072 mxlnd_cksum (void *ptr, int nob)
1073 {
1074         char  *c  = ptr;
1075         __u32  sum = 0;
1076
1077         while (nob-- > 0)
1078                 sum = ((sum << 1) | (sum >> 31)) + *c++;
1079
1080         /* ensure I don't return 0 (== no checksum) */
1081         return (sum == 0) ? 1 : sum;
1082 }
1083
1084 /**
1085  * mxlnd_pack_msg_locked - complete msg info
1086  * @tx - msg to send
1087  */
1088 static inline void
1089 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1090 {
1091         kmx_msg_t       *msg    = tx->mxc_msg;
1092
1093         /* type and nob should already be set in init_msg() */
1094         msg->mxm_magic    = MXLND_MSG_MAGIC;
1095         msg->mxm_version  = MXLND_MSG_VERSION;
1096         /*   mxm_type */
1097         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1098          * return credits as well */
1099         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1100             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1101                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
1102                 tx->mxc_conn->mxk_outstanding = 0;
1103         } else {
1104                 msg->mxm_credits  = 0;
1105         }
1106         /*   mxm_nob */
1107         msg->mxm_cksum    = 0;
1108         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
1109         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1110         msg->mxm_dstnid   = tx->mxc_nid;
1111         /* if it is a new peer, the dststamp will be 0 */
1112         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1113
1114         if (*kmxlnd_tunables.kmx_cksum) {
1115                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1116         }
1117 }
1118
1119 int
1120 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1121 {
1122         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
1123         __u32     msg_cksum     = 0;
1124         int       flip          = 0;
1125         int       msg_nob       = 0;
1126
1127         /* 6 bytes are enough to have received magic + version */
1128         if (nob < 6) {
1129                 CNETERR("not enough bytes for magic + hdr: %d\n", nob);
1130                 return -EPROTO;
1131         }
1132
1133         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1134                 flip = 0;
1135         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1136                 flip = 1;
1137         } else {
1138                 CNETERR("Bad magic: %08x\n", msg->mxm_magic);
1139                 return -EPROTO;
1140         }
1141
1142         if (msg->mxm_version !=
1143             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1144                 CNETERR("Bad version: %d\n", msg->mxm_version);
1145                 return -EPROTO;
1146         }
1147
1148         if (nob < hdr_size) {
1149                 CNETERR("not enough for a header: %d\n", nob);
1150                 return -EPROTO;
1151         }
1152
1153         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1154         if (msg_nob > nob) {
1155                 CNETERR("Short message: got %d, wanted %d\n", nob, msg_nob);
1156                 return -EPROTO;
1157         }
1158
1159         /* checksum must be computed with mxm_cksum zero and BEFORE anything
1160          * gets flipped */
1161         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1162         msg->mxm_cksum = 0;
1163         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1164                 CNETERR("Bad checksum\n");
1165                 return -EPROTO;
1166         }
1167         msg->mxm_cksum = msg_cksum;
1168
1169         if (flip) {
1170                 /* leave magic unflipped as a clue to peer endianness */
1171                 __swab16s(&msg->mxm_version);
1172                 CLASSERT (sizeof(msg->mxm_type) == 1);
1173                 CLASSERT (sizeof(msg->mxm_credits) == 1);
1174                 msg->mxm_nob = msg_nob;
1175                 __swab64s(&msg->mxm_srcnid);
1176                 __swab64s(&msg->mxm_srcstamp);
1177                 __swab64s(&msg->mxm_dstnid);
1178                 __swab64s(&msg->mxm_dststamp);
1179         }
1180
1181         if (msg->mxm_srcnid == LNET_NID_ANY) {
1182                 CNETERR("Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1183                 return -EPROTO;
1184         }
1185
1186         switch (msg->mxm_type) {
1187         default:
1188                 CNETERR("Unknown message type %x\n", msg->mxm_type);
1189                 return -EPROTO;
1190
1191         case MXLND_MSG_NOOP:
1192                 break;
1193
1194         case MXLND_MSG_EAGER:
1195                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1196                         CNETERR("Short EAGER: %d(%d)\n", msg_nob,
1197                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1198                         return -EPROTO;
1199                 }
1200                 break;
1201
1202         case MXLND_MSG_PUT_REQ:
1203                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1204                         CNETERR("Short PUT_REQ: %d(%d)\n", msg_nob,
1205                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1206                         return -EPROTO;
1207                 }
1208                 if (flip)
1209                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1210                 break;
1211
1212         case MXLND_MSG_PUT_ACK:
1213                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1214                         CNETERR("Short PUT_ACK: %d(%d)\n", msg_nob,
1215                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1216                         return -EPROTO;
1217                 }
1218                 if (flip) {
1219                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1220                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1221                 }
1222                 break;
1223
1224         case MXLND_MSG_GET_REQ:
1225                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1226                         CNETERR("Short GET_REQ: %d(%d)\n", msg_nob,
1227                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1228                         return -EPROTO;
1229                 }
1230                 if (flip) {
1231                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1232                 }
1233                 break;
1234
1235         case MXLND_MSG_CONN_REQ:
1236         case MXLND_MSG_CONN_ACK:
1237                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1238                         CNETERR("Short connreq/ack: %d(%d)\n", msg_nob,
1239                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1240                         return -EPROTO;
1241                 }
1242                 if (flip) {
1243                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1244                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1245                 }
1246                 break;
1247         }
1248         return 0;
1249 }
1250
1251
1252 /**
1253  * mxlnd_recv_msg
1254  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1255  * @rx
1256  * @msg_type
1257  * @cookie
1258  * @length - length of incoming message
1259  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1260  *
1261  * The caller gets the rx and sets nid, peer and conn if known.
1262  *
1263  * Returns 0 on success and -1 on failure
1264  */
1265 int
1266 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1267 {
1268         int             ret     = 0;
1269         mx_return_t     mxret   = MX_SUCCESS;
1270         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1271
1272         rx->mxc_msg_type = msg_type;
1273         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1274         rx->mxc_cookie = cookie;
1275         /* rx->mxc_match may already be set */
1276         /* rx->mxc_seg.segment_ptr is already set */
1277         rx->mxc_seg.segment_length = length;
1278         ret = mxlnd_q_pending_ctx(rx);
1279         if (ret == -1) {
1280                 /* the caller is responsible for calling conn_decref() if needed */
1281                 return -1;
1282         }
1283         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1284                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1285         if (mxret != MX_SUCCESS) {
1286                 mxlnd_deq_pending_ctx(rx);
1287                 CNETERR("mx_kirecv() failed with %s (%d)\n",
1288                         mx_strerror(mxret), (int) mxret);
1289                 return -1;
1290         }
1291         return 0;
1292 }
1293
1294
1295 /**
1296  * mxlnd_unexpected_recv - this is the callback function that will handle
1297  *                         unexpected receives
1298  * @context - NULL, ignore
1299  * @source - the peer's mx_endpoint_addr_t
1300  * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1301  * @length - length of incoming message
1302  * @data_if_available - used for CONN_[REQ|ACK]
1303  *
1304  * If it is an eager-sized msg, we will call recv_msg() with the actual
1305  * length. If it is a large message, we will call recv_msg() with a
1306  * length of 0 bytes to drop it because we should never have a large,
1307  * unexpected message.
1308  *
1309  * NOTE - The MX library blocks until this function completes. Make it as fast as
1310  * possible. DO NOT allocate memory which can block!
1311  *
1312  * If we cannot get a rx or the conn is closed, drop the message on the floor
1313  * (i.e. recv 0 bytes and ignore).
1314  */
1315 mx_unexp_handler_action_t
1316 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1317                  uint64_t match_value, uint32_t length, void *data_if_available)
1318 {
1319         int             ret             = 0;
1320         kmx_ctx_t       *rx             = NULL;
1321         mx_ksegment_t   seg;
1322         u8              msg_type        = 0;
1323         u8              error           = 0;
1324         u64             cookie          = 0ULL;
1325         kmx_conn_t      *conn           = NULL;
1326         kmx_peer_t      *peer           = NULL;
1327         u64             nic_id          = 0ULL;
1328         u32             ep_id           = 0;
1329         u32             sid             = 0;
1330
1331         /* TODO this will change to the net struct */
1332         if (context != NULL) {
1333                 CNETERR("non-NULL context\n");
1334         }
1335
1336 #if MXLND_DEBUG
1337         CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1338 #endif
1339
1340         mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1341         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1342         read_lock(&kmxlnd_data.kmx_global_lock);
1343         mx_get_endpoint_addr_context(source, (void **) &conn);
1344         if (conn) {
1345                 mxlnd_conn_addref(conn); /* add ref for this function */
1346                 peer = conn->mxk_peer;
1347         }
1348         read_unlock(&kmxlnd_data.kmx_global_lock);
1349
1350         if (msg_type == MXLND_MSG_BYE) {
1351                 if (conn) {
1352                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1353                                         libcfs_nid2str(peer->mxp_nid));
1354                         mxlnd_conn_disconnect(conn, 1, 0);
1355                         mxlnd_conn_decref(conn); /* drop ref taken above */
1356                 }
1357                 return MX_RECV_FINISHED;
1358         }
1359
1360         if (msg_type == MXLND_MSG_CONN_REQ) {
1361                 kmx_connparams_t       *cp      = NULL;
1362                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1363                                                   sizeof(kmx_connreq_msg_t);
1364
1365                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1366                 if (unlikely(length != expected || !data_if_available)) {
1367                         CNETERR("received invalid CONN_REQ from %llx "
1368                                 "length=%d (expected %d)\n", nic_id, length, expected);
1369                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1370                         return MX_RECV_FINISHED;
1371                 }
1372
1373                 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1374                                          conn, peer, data_if_available);
1375                 if (unlikely(ret != 0)) {
1376                         CNETERR("unable to alloc CONN_REQ from %llx:%d\n",
1377                                 nic_id, ep_id);
1378                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1379                         return MX_RECV_FINISHED;
1380                 }
1381                 spin_lock(&kmxlnd_data.kmx_conn_lock);
1382                 cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1383                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1384                 up(&kmxlnd_data.kmx_conn_sem);
1385                 return MX_RECV_FINISHED;
1386         }
1387         if (msg_type == MXLND_MSG_CONN_ACK) {
1388                 kmx_connparams_t  *cp           = NULL;
1389                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1390                                                   sizeof(kmx_connreq_msg_t);
1391
1392                 LASSERT(conn);
1393                 if (unlikely(error != 0)) {
1394                         CNETERR("received CONN_ACK from %s with error -%d\n",
1395                                libcfs_nid2str(peer->mxp_nid), (int) error);
1396                         mxlnd_conn_disconnect(conn, 1, 0);
1397                 } else if (unlikely(length != expected || !data_if_available)) {
1398                         CNETERR("received %s CONN_ACK from %s "
1399                                "length=%d (expected %d)\n",
1400                                data_if_available ? "short" : "missing",
1401                                libcfs_nid2str(peer->mxp_nid), length, expected);
1402                         mxlnd_conn_disconnect(conn, 1, 1);
1403                 } else {
1404                         /* peer is ready for messages */
1405                         ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1406                                          conn, peer, data_if_available);
1407                         if (unlikely(ret != 0)) {
1408                                 CNETERR("unable to alloc kmx_connparams_t"
1409                                                " from %llx:%d\n", nic_id, ep_id);
1410                                 mxlnd_conn_disconnect(conn, 1, 1);
1411                         } else {
1412                                 spin_lock(&kmxlnd_data.kmx_conn_lock);
1413                                 cfs_list_add_tail(&cp->mxr_list,
1414                                                   &kmxlnd_data.kmx_conn_reqs);
1415                                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1416                                 up(&kmxlnd_data.kmx_conn_sem);
1417                         }
1418                 }
1419                 mxlnd_conn_decref(conn); /* drop ref taken above */
1420
1421                 return MX_RECV_FINISHED;
1422         }
1423
1424         /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1425
1426         LASSERT(peer != NULL && conn != NULL);
1427
1428         rx = mxlnd_get_idle_rx(conn);
1429         if (rx != NULL) {
1430                 if (length <= MXLND_MSG_SIZE) {
1431                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1432                 } else {
1433                         CNETERR("unexpected large receive with "
1434                                 "match_value=0x%llx length=%d\n",
1435                                 match_value, length);
1436                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1437                 }
1438
1439                 if (ret == 0) {
1440                         /* hold conn ref until rx completes */
1441                         rx->mxc_conn = conn;
1442                         rx->mxc_peer = peer;
1443                         rx->mxc_nid = peer->mxp_nid;
1444                 } else {
1445                         CNETERR("could not post receive\n");
1446                         mxlnd_put_idle_rx(rx);
1447                 }
1448         }
1449
1450         /* Encountered error, drop incoming message on the floor */
1451         /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1452          * uses the standard code path and acks the sender normally */
1453
1454         if (rx == NULL || ret != 0) {
1455                 mxlnd_conn_decref(conn); /* drop ref taken above */
1456                 if (rx == NULL) {
1457                         CNETERR("no idle rxs available - dropping rx"
1458                                 " 0x%llx from %s\n", match_value,
1459                                 libcfs_nid2str(peer->mxp_nid));
1460                 } else {
1461                         /* ret != 0 */
1462                         CNETERR("disconnected peer - dropping rx\n");
1463                 }
1464                 seg.segment_ptr = 0ULL;
1465                 seg.segment_length = 0;
1466                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1467                           match_value, ~0ULL, NULL, NULL);
1468         }
1469
1470         return MX_RECV_CONTINUE;
1471 }
1472
1473
1474 int
1475 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1476 {
1477         int              i      = 0;
1478         int              ret    = -ENOENT;
1479         kmx_peer_t      *peer   = NULL;
1480
1481         read_lock(&kmxlnd_data.kmx_global_lock);
1482         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1483                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1484                                         mxp_list) {
1485                         if (index-- == 0) {
1486                                 *nidp = peer->mxp_nid;
1487                                 *count = atomic_read(&peer->mxp_refcount);
1488                                 ret = 0;
1489                                 break;
1490                         }
1491                 }
1492         }
1493         read_unlock(&kmxlnd_data.kmx_global_lock);
1494
1495         return ret;
1496 }
1497
1498 void
1499 mxlnd_del_peer_locked(kmx_peer_t *peer)
1500 {
1501         if (peer->mxp_conn) {
1502                 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1503         } else {
1504                 cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
1505                 mxlnd_peer_decref(peer); /* drop global list ref */
1506         }
1507         return;
1508 }
1509
1510 int
1511 mxlnd_del_peer(lnet_nid_t nid)
1512 {
1513         int             i       = 0;
1514         int             ret     = 0;
1515         kmx_peer_t      *peer   = NULL;
1516         kmx_peer_t      *next   = NULL;
1517
1518         if (nid != LNET_NID_ANY) {
1519                 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1520         }
1521         write_lock(&kmxlnd_data.kmx_global_lock);
1522         if (nid != LNET_NID_ANY) {
1523                 if (peer == NULL) {
1524                         ret = -ENOENT;
1525                 } else {
1526                         mxlnd_peer_decref(peer); /* and drops it */
1527                         mxlnd_del_peer_locked(peer);
1528                 }
1529         } else { /* LNET_NID_ANY */
1530                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1531                         cfs_list_for_each_entry_safe(peer, next,
1532                                                      &kmxlnd_data.kmx_peers[i],
1533                                                      mxp_list) {
1534                                 mxlnd_del_peer_locked(peer);
1535                         }
1536                 }
1537         }
1538         write_unlock(&kmxlnd_data.kmx_global_lock);
1539
1540         return ret;
1541 }
1542
1543 kmx_conn_t *
1544 mxlnd_get_conn_by_idx(int index)
1545 {
1546         int              i      = 0;
1547         kmx_peer_t      *peer   = NULL;
1548         kmx_conn_t      *conn   = NULL;
1549
1550         read_lock(&kmxlnd_data.kmx_global_lock);
1551         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1552                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1553                                         mxp_list) {
1554                         cfs_list_for_each_entry(conn, &peer->mxp_conns,
1555                                                 mxk_list) {
1556                                 if (index-- > 0) {
1557                                         continue;
1558                                 }
1559
1560                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1561                                 read_unlock(&kmxlnd_data.kmx_global_lock);
1562                                 return conn;
1563                         }
1564                 }
1565         }
1566         read_unlock(&kmxlnd_data.kmx_global_lock);
1567
1568         return NULL;
1569 }
1570
1571 void
1572 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1573 {
1574         kmx_conn_t      *conn   = NULL;
1575         kmx_conn_t      *next   = NULL;
1576
1577         cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1578                 mxlnd_conn_disconnect(conn, 0, 1);
1579
1580         return;
1581 }
1582
1583 int
1584 mxlnd_close_matching_conns(lnet_nid_t nid)
1585 {
1586         int             i       = 0;
1587         int             ret     = 0;
1588         kmx_peer_t      *peer   = NULL;
1589
1590         write_lock(&kmxlnd_data.kmx_global_lock);
1591         if (nid != LNET_NID_ANY) {
1592                 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1593                 if (peer == NULL) {
1594                         ret = -ENOENT;
1595                 } else {
1596                         mxlnd_close_matching_conns_locked(peer);
1597                         mxlnd_peer_decref(peer); /* and drops it here */
1598                 }
1599         } else { /* LNET_NID_ANY */
1600                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1601                         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],                                                mxp_list)
1602                                 mxlnd_close_matching_conns_locked(peer);
1603                 }
1604         }
1605         write_unlock(&kmxlnd_data.kmx_global_lock);
1606
1607         return ret;
1608 }
1609
1610 /**
1611  * mxlnd_ctl - modify MXLND parameters
1612  * @ni - LNET interface handle
1613  * @cmd - command to change
1614  * @arg - the ioctl data
1615  */
1616 int
1617 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1618 {
1619         struct libcfs_ioctl_data *data  = arg;
1620         int                       ret   = -EINVAL;
1621
1622         LASSERT (ni == kmxlnd_data.kmx_ni);
1623
1624         switch (cmd) {
1625         case IOC_LIBCFS_GET_PEER: {
1626                 lnet_nid_t      nid     = 0;
1627                 int             count   = 0;
1628
1629                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1630                 data->ioc_nid    = nid;
1631                 data->ioc_count  = count;
1632                 break;
1633         }
1634         case IOC_LIBCFS_DEL_PEER: {
1635                 ret = mxlnd_del_peer(data->ioc_nid);
1636                 break;
1637         }
1638         case IOC_LIBCFS_GET_CONN: {
1639                 kmx_conn_t      *conn = NULL;
1640
1641                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1642                 if (conn == NULL) {
1643                         ret = -ENOENT;
1644                 } else {
1645                         ret = 0;
1646                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1647                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1648                 }
1649                 break;
1650         }
1651         case IOC_LIBCFS_CLOSE_CONNECTION: {
1652                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1653                 break;
1654         }
1655         default:
1656                 CNETERR("unknown ctl(%d)\n", cmd);
1657                 break;
1658         }
1659
1660         return ret;
1661 }
1662
1663 /**
1664  * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1665  * @tx
1666  *
1667  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1668  */
1669 void
1670 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1671 {
1672         u8              msg_type        = tx->mxc_msg_type;
1673         kmx_conn_t      *conn           = tx->mxc_conn;
1674
1675         LASSERT (msg_type != 0);
1676         LASSERT (tx->mxc_nid != 0);
1677         LASSERT (tx->mxc_peer != NULL);
1678         LASSERT (tx->mxc_conn != NULL);
1679
1680         tx->mxc_incarnation = conn->mxk_incarnation;
1681
1682         if (msg_type != MXLND_MSG_PUT_DATA &&
1683             msg_type != MXLND_MSG_GET_DATA) {
1684                 /* msg style tx */
1685                 if (mxlnd_tx_requires_credit(tx)) {
1686                         cfs_list_add_tail(&tx->mxc_list,
1687                                           &conn->mxk_tx_credit_queue);
1688                         conn->mxk_ntx_msgs++;
1689                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1690                            msg_type == MXLND_MSG_CONN_ACK) {
1691                         /* put conn msgs at the front of the queue */
1692                         cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1693                 } else {
1694                         /* PUT_ACK, PUT_NAK */
1695                         cfs_list_add_tail(&tx->mxc_list,
1696                                           &conn->mxk_tx_free_queue);
1697                         conn->mxk_ntx_msgs++;
1698                 }
1699         } else {
1700                 /* data style tx */
1701                 cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1702                 conn->mxk_ntx_data++;
1703         }
1704
1705         return;
1706 }
1707
1708 /**
1709  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1710  * @tx
1711  *
1712  * Add the tx to the peer's msg or data queue
1713  */
1714 static inline void
1715 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1716 {
1717         LASSERT(tx->mxc_peer != NULL);
1718         LASSERT(tx->mxc_conn != NULL);
1719         spin_lock(&tx->mxc_conn->mxk_lock);
1720         mxlnd_peer_queue_tx_locked(tx);
1721         spin_unlock(&tx->mxc_conn->mxk_lock);
1722
1723         return;
1724 }
1725
1726 /**
1727  * mxlnd_queue_tx - add the tx to the global tx queue
1728  * @tx
1729  *
1730  * Add the tx to the global queue and up the tx_queue_sem
1731  */
1732 void
1733 mxlnd_queue_tx(kmx_ctx_t *tx)
1734 {
1735         kmx_peer_t *peer   = tx->mxc_peer;
1736         LASSERT (tx->mxc_nid != 0);
1737
1738         if (peer != NULL) {
1739                 if (peer->mxp_incompatible &&
1740                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1741                         /* let this fail now */
1742                         tx->mxc_errno = -ECONNABORTED;
1743                         mxlnd_conn_decref(peer->mxp_conn);
1744                         mxlnd_put_idle_tx(tx);
1745                         return;
1746                 }
1747                 if (tx->mxc_conn == NULL) {
1748                         int             ret     = 0;
1749                         kmx_conn_t      *conn   = NULL;
1750
1751                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1752                         if (ret != 0) {
1753                                 tx->mxc_errno = ret;
1754                                 mxlnd_put_idle_tx(tx);
1755                                 goto done;
1756                         }
1757                         tx->mxc_conn = conn;
1758                         mxlnd_peer_decref(peer); /* and takes it from peer */
1759                 }
1760                 LASSERT(tx->mxc_conn != NULL);
1761                 mxlnd_peer_queue_tx(tx);
1762                 mxlnd_check_sends(peer);
1763         } else {
1764                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1765                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1766                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1767                 up(&kmxlnd_data.kmx_tx_queue_sem);
1768         }
1769 done:
1770         return;
1771 }
1772
1773 int
1774 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1775 {
1776         int             i                       = 0;
1777         int             sum                     = 0;
1778         int             old_sum                 = 0;
1779         int             nseg                    = 0;
1780         int             first_iov               = -1;
1781         int             first_iov_offset        = 0;
1782         int             first_found             = 0;
1783         int             last_iov                = -1;
1784         int             last_iov_length         = 0;
1785         mx_ksegment_t  *seg                     = NULL;
1786
1787         if (niov == 0) return 0;
1788         LASSERT(iov != NULL);
1789
1790         for (i = 0; i < niov; i++) {
1791                 sum = old_sum + (u32) iov[i].iov_len;
1792                 if (!first_found && (sum > offset)) {
1793                         first_iov = i;
1794                         first_iov_offset = offset - old_sum;
1795                         first_found = 1;
1796                         sum = (u32) iov[i].iov_len - first_iov_offset;
1797                         old_sum = 0;
1798                 }
1799                 if (sum >= nob) {
1800                         last_iov = i;
1801                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1802                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1803                         break;
1804                 }
1805                 old_sum = sum;
1806         }
1807         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1808         nseg = last_iov - first_iov + 1;
1809         LASSERT(nseg > 0);
1810
1811         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1812         if (seg == NULL) {
1813                 CNETERR("MXLND_ALLOC() failed\n");
1814                 return -1;
1815         }
1816         memset(seg, 0, nseg * sizeof(*seg));
1817         ctx->mxc_nseg = nseg;
1818         sum = 0;
1819         for (i = 0; i < nseg; i++) {
1820                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1821                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1822                 if (i == 0) {
1823                         seg[i].segment_ptr += (u64) first_iov_offset;
1824                         seg[i].segment_length -= (u32) first_iov_offset;
1825                 }
1826                 if (i == (nseg - 1)) {
1827                         seg[i].segment_length = (u32) last_iov_length;
1828                 }
1829                 sum += seg[i].segment_length;
1830         }
1831         ctx->mxc_seg_list = seg;
1832         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1833 #ifdef MX_PIN_FULLPAGES
1834         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1835 #endif
1836         LASSERT(nob == sum);
1837         return 0;
1838 }
1839
1840 int
1841 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1842 {
1843         int             i                       = 0;
1844         int             sum                     = 0;
1845         int             old_sum                 = 0;
1846         int             nseg                    = 0;
1847         int             first_kiov              = -1;
1848         int             first_kiov_offset       = 0;
1849         int             first_found             = 0;
1850         int             last_kiov               = -1;
1851         int             last_kiov_length        = 0;
1852         mx_ksegment_t  *seg                     = NULL;
1853
1854         if (niov == 0) return 0;
1855         LASSERT(kiov != NULL);
1856
1857         for (i = 0; i < niov; i++) {
1858                 sum = old_sum + kiov[i].kiov_len;
1859                 if (i == 0) sum -= kiov[i].kiov_offset;
1860                 if (!first_found && (sum > offset)) {
1861                         first_kiov = i;
1862                         first_kiov_offset = offset - old_sum;
1863                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1864                         first_found = 1;
1865                         sum = kiov[i].kiov_len - first_kiov_offset;
1866                         old_sum = 0;
1867                 }
1868                 if (sum >= nob) {
1869                         last_kiov = i;
1870                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1871                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1872                         break;
1873                 }
1874                 old_sum = sum;
1875         }
1876         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1877         nseg = last_kiov - first_kiov + 1;
1878         LASSERT(nseg > 0);
1879
1880         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1881         if (seg == NULL) {
1882                 CNETERR("MXLND_ALLOC() failed\n");
1883                 return -1;
1884         }
1885         memset(seg, 0, niov * sizeof(*seg));
1886         ctx->mxc_nseg = niov;
1887         sum = 0;
1888         for (i = 0; i < niov; i++) {
1889                 seg[i].segment_ptr =
1890                         page_to_phys(kiov[first_kiov + i].kiov_page);
1891                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1892                 if (i == 0) {
1893                         seg[i].segment_ptr += (u64) first_kiov_offset;
1894                         /* we have to add back the original kiov_offset */
1895                         seg[i].segment_length -= first_kiov_offset +
1896                                                  kiov[first_kiov].kiov_offset;
1897                 }
1898                 if (i == (nseg - 1)) {
1899                         seg[i].segment_length = last_kiov_length;
1900                 }
1901                 sum += seg[i].segment_length;
1902         }
1903         ctx->mxc_seg_list = seg;
1904         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1905 #ifdef MX_PIN_FULLPAGES
1906         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1907 #endif
1908         LASSERT(nob == sum);
1909         return 0;
1910 }
1911
1912 void
1913 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1914 {
1915         LASSERT(type == MXLND_MSG_PUT_ACK);
1916         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1917         tx->mxc_cookie = cookie;
1918         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1919         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1920         tx->mxc_match = mxlnd_create_match(tx, status);
1921
1922         mxlnd_queue_tx(tx);
1923 }
1924
1925
1926 /**
1927  * mxlnd_send_data - get tx, map [k]iov, queue tx
1928  * @ni
1929  * @lntmsg
1930  * @peer
1931  * @msg_type
1932  * @cookie
1933  *
1934  * This setups the DATA send for PUT or GET.
1935  *
1936  * On success, it queues the tx, on failure it calls lnet_finalize()
1937  */
1938 void
1939 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1940 {
1941         int                     ret     = 0;
1942         lnet_process_id_t       target  = lntmsg->msg_target;
1943         unsigned int            niov    = lntmsg->msg_niov;
1944         struct iovec           *iov     = lntmsg->msg_iov;
1945         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1946         unsigned int            offset  = lntmsg->msg_offset;
1947         unsigned int            nob     = lntmsg->msg_len;
1948         kmx_ctx_t              *tx      = NULL;
1949
1950         LASSERT(lntmsg != NULL);
1951         LASSERT(peer != NULL);
1952         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1953         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1954
1955         tx = mxlnd_get_idle_tx();
1956         if (tx == NULL) {
1957                 CNETERR("Can't allocate %s tx for %s\n",
1958                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1959                         libcfs_nid2str(target.nid));
1960                 goto failed_0;
1961         }
1962         tx->mxc_nid = target.nid;
1963         /* NOTE called when we have a ref on the conn, get one for this tx */
1964         mxlnd_conn_addref(peer->mxp_conn);
1965         tx->mxc_peer = peer;
1966         tx->mxc_conn = peer->mxp_conn;
1967         tx->mxc_msg_type = msg_type;
1968         tx->mxc_lntmsg[0] = lntmsg;
1969         tx->mxc_cookie = cookie;
1970         tx->mxc_match = mxlnd_create_match(tx, 0);
1971
1972         /* This setups up the mx_ksegment_t to send the DATA payload  */
1973         if (nob == 0) {
1974                 /* do not setup the segments */
1975                 CNETERR("nob = 0; why didn't we use an EAGER reply "
1976                         "to %s?\n", libcfs_nid2str(target.nid));
1977                 ret = 0;
1978         } else if (kiov == NULL) {
1979                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1980         } else {
1981                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1982         }
1983         if (ret != 0) {
1984                 CNETERR("Can't setup send DATA for %s\n",
1985                         libcfs_nid2str(target.nid));
1986                 tx->mxc_errno = -EIO;
1987                 goto failed_1;
1988         }
1989         mxlnd_queue_tx(tx);
1990         return;
1991
1992 failed_1:
1993         mxlnd_conn_decref(peer->mxp_conn);
1994         mxlnd_put_idle_tx(tx);
1995         return;
1996
1997 failed_0:
1998         CNETERR("no tx avail\n");
1999         lnet_finalize(ni, lntmsg, -EIO);
2000         return;
2001 }
2002
2003 /**
2004  * mxlnd_recv_data - map [k]iov, post rx
2005  * @ni
2006  * @lntmsg
2007  * @rx
2008  * @msg_type
2009  * @cookie
2010  *
2011  * This setups the DATA receive for PUT or GET.
2012  *
2013  * On success, it returns 0, on failure it returns -1
2014  */
2015 int
2016 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2017 {
2018         int                     ret     = 0;
2019         lnet_process_id_t       target  = lntmsg->msg_target;
2020         unsigned int            niov    = lntmsg->msg_niov;
2021         struct iovec           *iov     = lntmsg->msg_iov;
2022         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
2023         unsigned int            offset  = lntmsg->msg_offset;
2024         unsigned int            nob     = lntmsg->msg_len;
2025         mx_return_t             mxret   = MX_SUCCESS;
2026         u64                     mask    = ~(MXLND_ERROR_MASK);
2027
2028         /* above assumes MXLND_MSG_PUT_DATA */
2029         if (msg_type == MXLND_MSG_GET_DATA) {
2030                 niov = lntmsg->msg_md->md_niov;
2031                 iov = lntmsg->msg_md->md_iov.iov;
2032                 kiov = lntmsg->msg_md->md_iov.kiov;
2033                 offset = 0;
2034                 nob = lntmsg->msg_md->md_length;
2035         }
2036
2037         LASSERT(lntmsg != NULL);
2038         LASSERT(rx != NULL);
2039         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2040         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2041
2042         rx->mxc_msg_type = msg_type;
2043         rx->mxc_state = MXLND_CTX_PENDING;
2044         rx->mxc_nid = target.nid;
2045         /* if posting a GET_DATA, we may not yet know the peer */
2046         if (rx->mxc_peer != NULL) {
2047                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2048         }
2049         rx->mxc_lntmsg[0] = lntmsg;
2050         rx->mxc_cookie = cookie;
2051         rx->mxc_match = mxlnd_create_match(rx, 0);
2052         /* This setups up the mx_ksegment_t to receive the DATA payload  */
2053         if (kiov == NULL) {
2054                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2055         } else {
2056                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2057         }
2058         if (msg_type == MXLND_MSG_GET_DATA) {
2059                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2060                 if (rx->mxc_lntmsg[1] == NULL) {
2061                         CNETERR("Can't create reply for GET -> %s\n",
2062                                 libcfs_nid2str(target.nid));
2063                         ret = -1;
2064                 }
2065         }
2066         if (ret != 0) {
2067                 CNETERR("Can't setup %s rx for %s\n",
2068                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2069                        libcfs_nid2str(target.nid));
2070                 return -1;
2071         }
2072         ret = mxlnd_q_pending_ctx(rx);
2073         if (ret == -1) {
2074                 return -1;
2075         }
2076         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2077         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2078                           rx->mxc_seg_list, rx->mxc_nseg,
2079                           rx->mxc_pin_type, rx->mxc_match,
2080                           mask, (void *) rx,
2081                           &rx->mxc_mxreq);
2082         if (mxret != MX_SUCCESS) {
2083                 if (rx->mxc_conn != NULL) {
2084                         mxlnd_deq_pending_ctx(rx);
2085                 }
2086                 CNETERR("mx_kirecv() failed with %d for %s\n",
2087                         (int) mxret, libcfs_nid2str(target.nid));
2088                 return -1;
2089         }
2090
2091         return 0;
2092 }
2093
2094 /**
2095  * mxlnd_send - the LND required send function
2096  * @ni
2097  * @private
2098  * @lntmsg
2099  *
2100  * This must not block. Since we may not have a peer struct for the receiver,
2101  * it will append send messages on a global tx list. We will then up the
2102  * tx_queued's semaphore to notify it of the new send.
2103  */
2104 int
2105 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2106 {
2107         int                     ret             = 0;
2108         int                     type            = lntmsg->msg_type;
2109         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
2110         lnet_process_id_t       target          = lntmsg->msg_target;
2111         lnet_nid_t              nid             = target.nid;
2112         int                     target_is_router = lntmsg->msg_target_is_router;
2113         int                     routing         = lntmsg->msg_routing;
2114         unsigned int            payload_niov    = lntmsg->msg_niov;
2115         struct iovec           *payload_iov     = lntmsg->msg_iov;
2116         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
2117         unsigned int            payload_offset  = lntmsg->msg_offset;
2118         unsigned int            payload_nob     = lntmsg->msg_len;
2119         kmx_ctx_t              *tx              = NULL;
2120         kmx_msg_t              *txmsg           = NULL;
2121         kmx_ctx_t              *rx              = (kmx_ctx_t *) private; /* for REPLY */
2122         kmx_ctx_t              *rx_data         = NULL;
2123         kmx_conn_t             *conn            = NULL;
2124         int                     nob             = 0;
2125         uint32_t                length          = 0;
2126         kmx_peer_t             *peer            = NULL;
2127         rwlock_t                *g_lock         =&kmxlnd_data.kmx_global_lock;
2128
2129         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2130                        payload_nob, payload_niov, libcfs_id2str(target));
2131
2132         LASSERT (payload_nob == 0 || payload_niov > 0);
2133         LASSERT (payload_niov <= LNET_MAX_IOV);
2134         /* payload is either all vaddrs or all pages */
2135         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2136
2137         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2138
2139         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2140          * to a new peer, so create one if not found */
2141         peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2142         if (peer == NULL || peer->mxp_conn == NULL) {
2143                 /* we could not find it nor could we create one or
2144                  * one exists but we cannot create a conn,
2145                  * fail this message */
2146                 if (peer) {
2147                         /* found peer without conn, drop ref taken above */
2148                         LASSERT(peer->mxp_conn == NULL);
2149                         mxlnd_peer_decref(peer);
2150                 }
2151                 return -ENOMEM;
2152         }
2153
2154         /* we have a peer with a conn */
2155
2156         if (unlikely(peer->mxp_incompatible)) {
2157                 mxlnd_peer_decref(peer); /* drop ref taken above */
2158         } else {
2159                 read_lock(g_lock);
2160                 conn = peer->mxp_conn;
2161                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT)
2162                         mxlnd_conn_addref(conn);
2163                 else
2164                         conn = NULL;
2165                 read_unlock(g_lock);
2166                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2167                 if (!conn)
2168                         return -ENOTCONN;
2169         }
2170
2171         LASSERT(peer && conn);
2172
2173         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2174
2175         switch (type) {
2176         case LNET_MSG_ACK:
2177                 LASSERT (payload_nob == 0);
2178                 break;
2179
2180         case LNET_MSG_REPLY:
2181         case LNET_MSG_PUT:
2182                 /* Is the payload small enough not to need DATA? */
2183                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2184                 if (nob <= MXLND_MSG_SIZE)
2185                         break;                  /* send EAGER */
2186
2187                 tx = mxlnd_get_idle_tx();
2188                 if (unlikely(tx == NULL)) {
2189                         CNETERR("Can't allocate %s tx for %s\n",
2190                                 type == LNET_MSG_PUT ? "PUT" : "REPLY",
2191                                 libcfs_nid2str(nid));
2192                         if (conn) mxlnd_conn_decref(conn);
2193                         return -ENOMEM;
2194                 }
2195
2196                 tx->mxc_peer = peer;
2197                 tx->mxc_conn = conn;
2198                 /* we added a conn ref above */
2199                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2200                 txmsg = tx->mxc_msg;
2201                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2202                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2203                 tx->mxc_match = mxlnd_create_match(tx, 0);
2204
2205                 /* we must post a receive _before_ sending the request.
2206                  * we need to determine how much to receive, it will be either
2207                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
2208
2209                 rx = mxlnd_get_idle_rx(conn);
2210                 if (unlikely(rx == NULL)) {
2211                         CNETERR("Can't allocate rx for PUT_ACK for %s\n",
2212                                 libcfs_nid2str(nid));
2213                         mxlnd_put_idle_tx(tx);
2214                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2215                         return -ENOMEM;
2216                 }
2217                 rx->mxc_nid = nid;
2218                 rx->mxc_peer = peer;
2219                 mxlnd_conn_addref(conn); /* for this rx */
2220                 rx->mxc_conn = conn;
2221                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2222                 rx->mxc_cookie = tx->mxc_cookie;
2223                 rx->mxc_match = mxlnd_create_match(rx, 0);
2224
2225                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2226                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2227                 if (unlikely(ret != 0)) {
2228                         CNETERR("recv_msg() failed for PUT_ACK for %s\n",
2229                                            libcfs_nid2str(nid));
2230                         rx->mxc_lntmsg[0] = NULL;
2231                         mxlnd_put_idle_rx(rx);
2232                         mxlnd_put_idle_tx(tx);
2233                         mxlnd_conn_decref(conn); /* for the rx... */
2234                         mxlnd_conn_decref(conn); /* and for the tx */
2235                         return -EHOSTUNREACH;
2236                 }
2237
2238                 mxlnd_queue_tx(tx);
2239                 return 0;
2240
2241         case LNET_MSG_GET:
2242                 if (routing || target_is_router)
2243                         break;                  /* send EAGER */
2244
2245                 /* is the REPLY message too small for DATA? */
2246                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2247                 if (nob <= MXLND_MSG_SIZE)
2248                         break;                  /* send EAGER */
2249
2250                 /* get tx (we need the cookie) , post rx for incoming DATA,
2251                  * then post GET_REQ tx */
2252                 tx = mxlnd_get_idle_tx();
2253                 if (unlikely(tx == NULL)) {
2254                         CNETERR("Can't allocate GET tx for %s\n",
2255                                 libcfs_nid2str(nid));
2256                         mxlnd_conn_decref(conn); /* for the ref taken above */
2257                         return -ENOMEM;
2258                 }
2259                 rx_data = mxlnd_get_idle_rx(conn);
2260                 if (unlikely(rx_data == NULL)) {
2261                         CNETERR("Can't allocate DATA rx for %s\n",
2262                                 libcfs_nid2str(nid));
2263                         mxlnd_put_idle_tx(tx);
2264                         mxlnd_conn_decref(conn); /* for the ref taken above */
2265                         return -ENOMEM;
2266                 }
2267                 rx_data->mxc_peer = peer;
2268                 /* NOTE no need to lock peer before adding conn ref since we took
2269                  * a conn ref for the tx (it cannot be freed between there and here ) */
2270                 mxlnd_conn_addref(conn); /* for the rx_data */
2271                 rx_data->mxc_conn = conn;
2272
2273                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2274                 if (unlikely(ret != 0)) {
2275                         CNETERR("Can't setup GET sink for %s\n",
2276                                 libcfs_nid2str(nid));
2277                         mxlnd_put_idle_rx(rx_data);
2278                         mxlnd_put_idle_tx(tx);
2279                         mxlnd_conn_decref(conn); /* for the rx_data... */
2280                         mxlnd_conn_decref(conn); /* and for the tx */
2281                         return -EIO;
2282                 }
2283
2284                 tx->mxc_peer = peer;
2285                 tx->mxc_conn = conn;
2286                 /* conn ref taken above */
2287                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2288                 txmsg = tx->mxc_msg;
2289                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2290                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2291                 tx->mxc_match = mxlnd_create_match(tx, 0);
2292
2293                 mxlnd_queue_tx(tx);
2294                 return 0;
2295
2296         default:
2297                 LBUG();
2298                 mxlnd_conn_decref(conn); /* drop ref taken above */
2299                 return -EIO;
2300         }
2301
2302         /* send EAGER */
2303
2304         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2305                 <= MXLND_MSG_SIZE);
2306
2307         tx = mxlnd_get_idle_tx();
2308         if (unlikely(tx == NULL)) {
2309                 CNETERR("Can't send %s to %s: tx descs exhausted\n",
2310                         mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2311                 mxlnd_conn_decref(conn); /* drop ref taken above */
2312                 return -ENOMEM;
2313         }
2314
2315         tx->mxc_peer = peer;
2316         tx->mxc_conn = conn;
2317         /* conn ref taken above */
2318         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2319         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2320         tx->mxc_match = mxlnd_create_match(tx, 0);
2321
2322         txmsg = tx->mxc_msg;
2323         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2324
2325         if (payload_kiov != NULL)
2326                 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2327                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2328                             payload_niov, payload_kiov, payload_offset, payload_nob);
2329         else
2330                 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2331                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2332                             payload_niov, payload_iov, payload_offset, payload_nob);
2333
2334         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2335         mxlnd_queue_tx(tx);
2336         return 0;
2337 }
2338
2339 /**
2340  * mxlnd_recv - the LND required recv function
2341  * @ni
2342  * @private
2343  * @lntmsg
2344  * @delayed
2345  * @niov
2346  * @kiov
2347  * @offset
2348  * @mlen
2349  * @rlen
2350  *
2351  * This must not block.
2352  */
2353 int
2354 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2355              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2356              unsigned int offset, unsigned int mlen, unsigned int rlen)
2357 {
2358         int             ret             = 0;
2359         int             nob             = 0;
2360         int             len             = 0;
2361         kmx_ctx_t       *rx             = private;
2362         kmx_msg_t       *rxmsg          = rx->mxc_msg;
2363         lnet_nid_t       nid            = rx->mxc_nid;
2364         kmx_ctx_t       *tx             = NULL;
2365         kmx_msg_t       *txmsg          = NULL;
2366         kmx_peer_t      *peer           = rx->mxc_peer;
2367         kmx_conn_t      *conn           = peer->mxp_conn;
2368         u64              cookie         = 0ULL;
2369         int              msg_type       = rxmsg->mxm_type;
2370         int              repost         = 1;
2371         int              credit         = 0;
2372         int              finalize       = 0;
2373
2374         LASSERT (mlen <= rlen);
2375         /* Either all pages or all vaddrs */
2376         LASSERT (!(kiov != NULL && iov != NULL));
2377         LASSERT (peer && conn);
2378
2379         /* conn_addref(conn) already taken for the primary rx */
2380
2381         switch (msg_type) {
2382         case MXLND_MSG_EAGER:
2383                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2384                 len = rx->mxc_status.xfer_length;
2385                 if (unlikely(nob > len)) {
2386                         CNETERR("Eager message from %s too big: %d(%d)\n",
2387                                 libcfs_nid2str(nid), nob, len);
2388                         ret = -EPROTO;
2389                         break;
2390                 }
2391
2392                 if (kiov != NULL)
2393                         lnet_copy_flat2kiov(niov, kiov, offset,
2394                                 MXLND_MSG_SIZE, rxmsg,
2395                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2396                                 mlen);
2397                 else
2398                         lnet_copy_flat2iov(niov, iov, offset,
2399                                 MXLND_MSG_SIZE, rxmsg,
2400                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2401                                 mlen);
2402                 finalize = 1;
2403                 credit = 1;
2404                 break;
2405
2406         case MXLND_MSG_PUT_REQ:
2407                 /* we are going to reuse the rx, store the needed info */
2408                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2409
2410                 /* get tx, post rx, send PUT_ACK */
2411
2412                 tx = mxlnd_get_idle_tx();
2413                 if (unlikely(tx == NULL)) {
2414                         CNETERR("Can't allocate tx for %s\n", libcfs_nid2str(nid));
2415                         /* Not replying will break the connection */
2416                         ret = -ENOMEM;
2417                         break;
2418                 }
2419                 if (unlikely(mlen == 0)) {
2420                         finalize = 1;
2421                         tx->mxc_peer = peer;
2422                         tx->mxc_conn = conn;
2423                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2424                         /* repost = 1 */
2425                         break;
2426                 }
2427
2428                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2429                 tx->mxc_peer = peer;
2430                 tx->mxc_conn = conn;
2431                 /* no need to lock peer first since we already have a ref */
2432                 mxlnd_conn_addref(conn); /* for the tx */
2433                 txmsg = tx->mxc_msg;
2434                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2435                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2436                 tx->mxc_cookie = cookie;
2437                 tx->mxc_match = mxlnd_create_match(tx, 0);
2438
2439                 /* we must post a receive _before_ sending the PUT_ACK */
2440                 mxlnd_ctx_init(rx);
2441                 rx->mxc_state = MXLND_CTX_PREP;
2442                 rx->mxc_peer = peer;
2443                 rx->mxc_conn = conn;
2444                 /* do not take another ref for this rx, it is already taken */
2445                 rx->mxc_nid = peer->mxp_nid;
2446                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2447                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2448
2449                 if (unlikely(ret != 0)) {
2450                         /* Notify peer that it's over */
2451                         CNETERR("Can't setup PUT_DATA rx for %s: %d\n",
2452                                 libcfs_nid2str(nid), ret);
2453                         mxlnd_ctx_init(tx);
2454                         tx->mxc_state = MXLND_CTX_PREP;
2455                         tx->mxc_peer = peer;
2456                         tx->mxc_conn = conn;
2457                         /* finalize = 0, let the PUT_ACK tx finalize this */
2458                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2459                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2460                         /* conn ref already taken above */
2461                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2462                         /* repost = 1 */
2463                         break;
2464                 }
2465
2466                 mxlnd_queue_tx(tx);
2467                 /* do not return a credit until after PUT_DATA returns */
2468                 repost = 0;
2469                 break;
2470
2471         case MXLND_MSG_GET_REQ:
2472                 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2473
2474                 if (likely(lntmsg != NULL)) {
2475                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2476                                         cookie);
2477                 } else {
2478                         /* GET didn't match anything */
2479                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2480                          * We have to embed the error code in the match bits.
2481                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2482                         tx = mxlnd_get_idle_tx();
2483                         if (unlikely(tx == NULL)) {
2484                                 CNETERR("Can't get tx for GET NAK for %s\n",
2485                                         libcfs_nid2str(nid));
2486                                 /* we can't get a tx, notify the peer that the GET failed */
2487                                 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2488                                                    ENODATA, cookie);
2489                                 ret = -ENOMEM;
2490                                 break;
2491                         }
2492                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2493                         tx->mxc_state = MXLND_CTX_PENDING;
2494                         tx->mxc_nid = nid;
2495                         tx->mxc_peer = peer;
2496                         tx->mxc_conn = conn;
2497                         /* no need to lock peer first since we already have a ref */
2498                         mxlnd_conn_addref(conn); /* for this tx */
2499                         tx->mxc_cookie = cookie;
2500                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2501                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2502                         mxlnd_queue_tx(tx);
2503                 }
2504                 /* finalize lntmsg after tx completes */
2505                 break;
2506
2507         default:
2508                 LBUG();
2509         }
2510
2511         if (repost) {
2512                 /* we received a message, increment peer's outstanding credits */
2513                 if (credit == 1) {
2514                         spin_lock(&conn->mxk_lock);
2515                         conn->mxk_outstanding++;
2516                         spin_unlock(&conn->mxk_lock);
2517                 }
2518                 /* we are done with the rx */
2519                 mxlnd_put_idle_rx(rx);
2520                 mxlnd_conn_decref(conn);
2521         }
2522
2523         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2524
2525         /* we received a credit, see if we can use it to send a msg */
2526         if (credit) mxlnd_check_sends(peer);
2527
2528         return ret;
2529 }
2530
2531 void
2532 mxlnd_sleep(unsigned long timeout)
2533 {
2534         set_current_state(TASK_INTERRUPTIBLE);
2535         schedule_timeout(timeout);
2536         return;
2537 }
2538
2539 /**
2540  * mxlnd_tx_queued - the generic send queue thread
2541  * @arg - thread id (as a void *)
2542  *
2543  * This thread moves send messages from the global tx_queue to the owning
2544  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2545  * it to the global peer list.
2546  */
2547 int
2548 mxlnd_tx_queued(void *arg)
2549 {
2550         long                    id      = (long) arg;
2551         int                     ret     = 0;
2552         int                     found   = 0;
2553         kmx_ctx_t              *tx      = NULL;
2554         kmx_peer_t             *peer    = NULL;
2555         cfs_list_t             *queue   = &kmxlnd_data.kmx_tx_queue;
2556         spinlock_t              *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2557         rwlock_t                *g_lock  = &kmxlnd_data.kmx_global_lock;
2558
2559         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
2560                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2561                 if (atomic_read(&kmxlnd_data.kmx_shutdown))
2562                         break;
2563                 if (ret != 0) /* Should we check for -EINTR? */
2564                         continue;
2565                 spin_lock(tx_q_lock);
2566                 if (cfs_list_empty(&kmxlnd_data.kmx_tx_queue)) {
2567                         spin_unlock(tx_q_lock);
2568                         continue;
2569                 }
2570                 tx = cfs_list_entry(queue->next, kmx_ctx_t, mxc_list);
2571                 cfs_list_del_init(&tx->mxc_list);
2572                 spin_unlock(tx_q_lock);
2573
2574                 found = 0;
2575                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds ref*/
2576                 if (peer != NULL) {
2577                         tx->mxc_peer = peer;
2578                         write_lock(g_lock);
2579                         if (peer->mxp_conn == NULL) {
2580                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn,
2581                                                               peer);
2582                                 if (ret != 0) {
2583                                         /* out of memory: give up, fail tx */
2584                                         tx->mxc_errno = -ENOMEM;
2585                                         mxlnd_peer_decref(peer);
2586                                         write_unlock(g_lock);
2587                                         mxlnd_put_idle_tx(tx);
2588                                         continue;
2589                                 }
2590                         }
2591                         tx->mxc_conn = peer->mxp_conn;
2592                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2593                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2594                         write_unlock(g_lock);
2595                         mxlnd_queue_tx(tx);
2596                         found = 1;
2597                 }
2598                 if (found == 0) {
2599                         int             hash    = 0;
2600                         kmx_peer_t     *peer    = NULL;
2601                         kmx_peer_t     *old     = NULL;
2602
2603                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2604
2605                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2606                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2607                         /* create peer */
2608                         /* adds conn ref for this function */
2609                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2610                                         *kmxlnd_tunables.kmx_board,
2611                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2612                         if (ret != 0) {
2613                                 /* finalize message */
2614                                 tx->mxc_errno = ret;
2615                                 mxlnd_put_idle_tx(tx);
2616                                 continue;
2617                         }
2618                         tx->mxc_peer = peer;
2619                         tx->mxc_conn = peer->mxp_conn;
2620                         /* this tx will keep the conn ref taken in peer_alloc() */
2621
2622                         /* add peer to global peer list, but look to see
2623                          * if someone already created it after we released
2624                          * the read lock */
2625                         write_lock(g_lock);
2626                         old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2627                         if (old) {
2628                                 /* we have a peer ref on old */
2629                                 if (old->mxp_conn) {
2630                                         found = 1;
2631                                 } else {
2632                                         /* no conn */
2633                                         /* drop our ref taken above... */
2634                                         mxlnd_peer_decref(old);
2635                                         /* and delete it */
2636                                         mxlnd_del_peer_locked(old);
2637                                 }
2638                         }
2639
2640                         if (found == 0) {
2641                                 cfs_list_add_tail(&peer->mxp_list,
2642                                                   &kmxlnd_data.kmx_peers[hash]);
2643                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2644                         } else {
2645                                 tx->mxc_peer = old;
2646                                 tx->mxc_conn = old->mxp_conn;
2647                                 LASSERT(old->mxp_conn != NULL);
2648                                 mxlnd_conn_addref(old->mxp_conn);
2649                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2650                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2651                                 mxlnd_peer_decref(peer);
2652                         }
2653                         write_unlock(g_lock);
2654
2655                         mxlnd_queue_tx(tx);
2656                 }
2657         }
2658         mxlnd_thread_stop(id);
2659         return 0;
2660 }
2661
2662 /* When calling this, we must not have the peer lock. */
2663 void
2664 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2665 {
2666         mx_return_t     mxret           = MX_SUCCESS;
2667         mx_request_t    request;
2668         kmx_conn_t      *conn           = peer->mxp_conn;
2669         u64             match           = ((u64) msg_type) << MXLND_MSG_OFFSET;
2670
2671         /* NOTE we are holding a conn ref every time we call this function,
2672          * we do not need to lock the peer before taking another ref */
2673         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2674
2675         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2676
2677         if (peer->mxp_reconnect_time == 0) {
2678                 peer->mxp_reconnect_time = jiffies;
2679         }
2680
2681         if (peer->mxp_nic_id == 0ULL) {
2682                 int     ret     = 0;
2683
2684                 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2685                                       &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2686                 if (ret == 0) {
2687                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2688                 }
2689                 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2690                         /* not mapped yet, return */
2691                         spin_lock(&conn->mxk_lock);
2692                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2693                         spin_unlock(&conn->mxk_lock);
2694                 }
2695         }
2696
2697         if (cfs_time_after(jiffies,
2698                            peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2699             conn->mxk_status != MXLND_CONN_DISCONNECT) {
2700                 /* give up and notify LNET */
2701                 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2702                        libcfs_nid2str(peer->mxp_nid));
2703                 mxlnd_conn_disconnect(conn, 0, 0);
2704                 mxlnd_conn_decref(conn);
2705                 return;
2706         }
2707
2708         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2709                             peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2710                             (void *) peer, &request);
2711         if (unlikely(mxret != MX_SUCCESS)) {
2712                 spin_lock(&conn->mxk_lock);
2713                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2714                 spin_unlock(&conn->mxk_lock);
2715                 CNETERR("mx_iconnect() failed with %s (%d) to %s\n",
2716                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2717                 mxlnd_conn_decref(conn);
2718         }
2719         mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
2720                                jiffies_to_msecs(MXLND_CONNECT_TIMEOUT));
2721         return;
2722 }
2723
2724 #define MXLND_STATS 0
2725
2726 int
2727 mxlnd_check_sends(kmx_peer_t *peer)
2728 {
2729         int             ret             = 0;
2730         int             found           = 0;
2731         mx_return_t     mxret           = MX_SUCCESS;
2732         kmx_ctx_t       *tx             = NULL;
2733         kmx_conn_t      *conn           = NULL;
2734         u8              msg_type        = 0;
2735         int             credit          = 0;
2736         int             status          = 0;
2737         int             ntx_posted      = 0;
2738         int             credits         = 0;
2739 #if MXLND_STATS
2740         static unsigned long last       = 0;
2741 #endif
2742
2743         if (unlikely(peer == NULL)) {
2744                 LASSERT(peer != NULL);
2745                 return -1;
2746         }
2747         write_lock(&kmxlnd_data.kmx_global_lock);
2748         conn = peer->mxp_conn;
2749         /* NOTE take a ref for the duration of this function since it is
2750          * called when there might not be any queued txs for this peer */
2751         if (conn) {
2752                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2753                         write_unlock(&kmxlnd_data.kmx_global_lock);
2754                         return -1;
2755                 }
2756                 mxlnd_conn_addref(conn); /* for duration of this function */
2757         }
2758         write_unlock(&kmxlnd_data.kmx_global_lock);
2759
2760         /* do not add another ref for this tx */
2761
2762         if (conn == NULL) {
2763                 /* we do not have any conns */
2764                 CNETERR("peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2765                 return -1;
2766         }
2767
2768 #if MXLND_STATS
2769         if (cfs_time_after(jiffies, last)) {
2770                 last = jiffies + msecs_to_jiffies(MSEC_PER_SEC);
2771                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2772                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2773                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2774                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2775                               conn->mxk_ntx_data, conn->mxk_data_posted);
2776         }
2777 #endif
2778
2779         spin_lock(&conn->mxk_lock);
2780         ntx_posted = conn->mxk_ntx_posted;
2781         credits = conn->mxk_credits;
2782
2783         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2784         LASSERT(ntx_posted >= 0);
2785
2786         LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2787         LASSERT(credits >= 0);
2788
2789         /* check number of queued msgs, ignore data */
2790         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2791                 /* check if any txs queued that could return credits... */
2792                 if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
2793                     conn->mxk_ntx_msgs == 0) {
2794                         /* if not, send a NOOP */
2795                         tx = mxlnd_get_idle_tx();
2796                         if (likely(tx != NULL)) {
2797                                 tx->mxc_peer = peer;
2798                                 tx->mxc_conn = peer->mxp_conn;
2799                                 mxlnd_conn_addref(conn); /* for this tx */
2800                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2801                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2802                                 mxlnd_peer_queue_tx_locked(tx);
2803                                 found = 1;
2804                                 goto done_locked;
2805                         }
2806                 }
2807         }
2808
2809         /* if the peer is not ready, try to connect */
2810         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2811             conn->mxk_status == MXLND_CONN_FAIL)) {
2812                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2813                 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2814                 spin_unlock(&conn->mxk_lock);
2815                 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2816                 goto done;
2817         }
2818
2819         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
2820                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
2821                 /* We have something to send. If we have a queued tx that does not
2822                  * require a credit (free), choose it since its completion will
2823                  * return a credit (here or at the peer), complete a DATA or
2824                  * CONN_REQ or CONN_ACK. */
2825                 cfs_list_t *tmp_tx = NULL;
2826                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
2827                         tmp_tx = &conn->mxk_tx_free_queue;
2828                 } else {
2829                         tmp_tx = &conn->mxk_tx_credit_queue;
2830                 }
2831                 tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2832
2833                 msg_type = tx->mxc_msg_type;
2834
2835                 /* don't try to send a rx */
2836                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2837
2838                 /* ensure that it is a valid msg type */
2839                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2840                         msg_type == MXLND_MSG_CONN_ACK ||
2841                         msg_type == MXLND_MSG_NOOP     ||
2842                         msg_type == MXLND_MSG_EAGER    ||
2843                         msg_type == MXLND_MSG_PUT_REQ  ||
2844                         msg_type == MXLND_MSG_PUT_ACK  ||
2845                         msg_type == MXLND_MSG_PUT_DATA ||
2846                         msg_type == MXLND_MSG_GET_REQ  ||
2847                         msg_type == MXLND_MSG_GET_DATA);
2848                 LASSERT(tx->mxc_peer == peer);
2849                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2850
2851                 credit = mxlnd_tx_requires_credit(tx);
2852                 if (credit) {
2853
2854                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2855                                 CDEBUG(D_NET, "%s: posted enough\n",
2856                                               libcfs_nid2str(peer->mxp_nid));
2857                                 goto done_locked;
2858                         }
2859
2860                         if (conn->mxk_credits == 0) {
2861                                 CDEBUG(D_NET, "%s: no credits\n",
2862                                               libcfs_nid2str(peer->mxp_nid));
2863                                 goto done_locked;
2864                         }
2865
2866                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2867                             conn->mxk_outstanding == 0) {  /* giving back credits */
2868                                 CDEBUG(D_NET, "%s: not using last credit\n",
2869                                               libcfs_nid2str(peer->mxp_nid));
2870                                 goto done_locked;
2871                         }
2872                 }
2873
2874                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2875                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2876                                 msg_type == MXLND_MSG_CONN_ACK)) {
2877                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2878                                              mxlnd_connstatus_to_str(conn->mxk_status),
2879                                              tx->mxc_cookie,
2880                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2881                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2882                                     cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
2883                                         cfs_list_del_init(&tx->mxc_list);
2884                                         tx->mxc_errno = -ECONNABORTED;
2885                                         spin_unlock(&conn->mxk_lock);
2886                                         mxlnd_put_idle_tx(tx);
2887                                         mxlnd_conn_decref(conn);
2888                                         goto done;
2889                                 }
2890                                 goto done_locked;
2891                         }
2892                 }
2893
2894                 cfs_list_del_init(&tx->mxc_list);
2895
2896                 /* handle credits, etc now while we have the lock to avoid races */
2897                 if (credit) {
2898                         conn->mxk_credits--;
2899                         conn->mxk_ntx_posted++;
2900                 }
2901                 if (msg_type != MXLND_MSG_PUT_DATA &&
2902                     msg_type != MXLND_MSG_GET_DATA) {
2903                         if (msg_type != MXLND_MSG_CONN_REQ &&
2904                             msg_type != MXLND_MSG_CONN_ACK) {
2905                                 conn->mxk_ntx_msgs--;
2906                         }
2907                 }
2908                 if (tx->mxc_incarnation == 0 &&
2909                     conn->mxk_incarnation != 0) {
2910                         tx->mxc_incarnation = conn->mxk_incarnation;
2911                 }
2912
2913                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2914                  * or (2) there is a non-DATA msg that can return credits in the
2915                  * queue, then drop this duplicate NOOP */
2916                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2917                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2918                             (conn->mxk_ntx_msgs >= 1)) {
2919                                 conn->mxk_credits++;
2920                                 conn->mxk_ntx_posted--;
2921                                 spin_unlock(&conn->mxk_lock);
2922                                 /* redundant NOOP */
2923                                 mxlnd_put_idle_tx(tx);
2924                                 mxlnd_conn_decref(conn);
2925                                 CDEBUG(D_NET, "%s: redundant noop\n",
2926                                               libcfs_nid2str(peer->mxp_nid));
2927                                 found = 1;
2928                                 goto done;
2929                         }
2930                 }
2931
2932                 found = 1;
2933                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2934                     (msg_type != MXLND_MSG_GET_DATA))) {
2935                         mxlnd_pack_msg_locked(tx);
2936                 }
2937
2938                 mxret = MX_SUCCESS;
2939
2940                 status = conn->mxk_status;
2941                 spin_unlock(&conn->mxk_lock);
2942
2943                 if (likely((status == MXLND_CONN_READY) ||
2944                     (msg_type == MXLND_MSG_CONN_REQ) ||
2945                     (msg_type == MXLND_MSG_CONN_ACK))) {
2946                         ret = 0;
2947                         if (msg_type != MXLND_MSG_CONN_REQ &&
2948                             msg_type != MXLND_MSG_CONN_ACK) {
2949                                 /* add to the pending list */
2950                                 ret = mxlnd_q_pending_ctx(tx);
2951                         } else {
2952                                 /* CONN_REQ/ACK */
2953                                 tx->mxc_state = MXLND_CTX_PENDING;
2954                         }
2955
2956                         if (ret == 0) {
2957                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2958                                     msg_type != MXLND_MSG_GET_DATA)) {
2959                                         /* send a msg style tx */
2960                                         LASSERT(tx->mxc_nseg == 1);
2961                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2962                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2963                                                mxlnd_msgtype_to_str(msg_type),
2964                                                tx->mxc_cookie);
2965                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2966                                                           &tx->mxc_seg,
2967                                                           tx->mxc_nseg,
2968                                                           tx->mxc_pin_type,
2969                                                           conn->mxk_epa,
2970                                                           tx->mxc_match,
2971                                                           (void *) tx,
2972                                                           &tx->mxc_mxreq);
2973                                 } else {
2974                                         /* send a DATA tx */
2975                                         spin_lock(&conn->mxk_lock);
2976                                         conn->mxk_ntx_data--;
2977                                         conn->mxk_data_posted++;
2978                                         spin_unlock(&conn->mxk_lock);
2979                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2980                                                mxlnd_msgtype_to_str(msg_type),
2981                                                tx->mxc_cookie);
2982                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2983                                                           tx->mxc_seg_list,
2984                                                           tx->mxc_nseg,
2985                                                           tx->mxc_pin_type,
2986                                                           conn->mxk_epa,
2987                                                           tx->mxc_match,
2988                                                           (void *) tx,
2989                                                           &tx->mxc_mxreq);
2990                                 }
2991                         } else {
2992                                 /* ret != 0 */
2993                                 mxret = MX_CONNECTION_FAILED;
2994                         }
2995                         if (likely(mxret == MX_SUCCESS)) {
2996                                 ret = 0;
2997                         } else {
2998                                 CNETERR("mx_kisend() failed with %s (%d) "
2999                                         "sending to %s\n", mx_strerror(mxret), (int) mxret,
3000                                        libcfs_nid2str(peer->mxp_nid));
3001                                 /* NOTE mx_kisend() only fails if there are not enough
3002                                 * resources. Do not change the connection status. */
3003                                 if (mxret == MX_NO_RESOURCES) {
3004                                         tx->mxc_errno = -ENOMEM;
3005                                 } else {
3006                                         tx->mxc_errno = -ECONNABORTED;
3007                                 }
3008                                 if (credit) {
3009                                         spin_lock(&conn->mxk_lock);
3010                                         conn->mxk_ntx_posted--;
3011                                         conn->mxk_credits++;
3012                                         spin_unlock(&conn->mxk_lock);
3013                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
3014                                            msg_type == MXLND_MSG_GET_DATA) {
3015                                         spin_lock(&conn->mxk_lock);
3016                                         conn->mxk_data_posted--;
3017                                         spin_unlock(&conn->mxk_lock);
3018                                 }
3019                                 if (msg_type != MXLND_MSG_PUT_DATA &&
3020                                     msg_type != MXLND_MSG_GET_DATA &&
3021                                     msg_type != MXLND_MSG_CONN_REQ &&
3022                                     msg_type != MXLND_MSG_CONN_ACK) {
3023                                         spin_lock(&conn->mxk_lock);
3024                                         conn->mxk_outstanding +=
3025                                                 tx->mxc_msg->mxm_credits;
3026                                         spin_unlock(&conn->mxk_lock);
3027                                 }
3028                                 if (msg_type != MXLND_MSG_CONN_REQ &&
3029                                     msg_type != MXLND_MSG_CONN_ACK) {
3030                                         /* remove from the pending list */
3031                                         mxlnd_deq_pending_ctx(tx);
3032                                 }
3033                                 mxlnd_put_idle_tx(tx);
3034                                 mxlnd_conn_decref(conn);
3035                         }
3036                 }
3037                 spin_lock(&conn->mxk_lock);
3038         }
3039 done_locked:
3040         spin_unlock(&conn->mxk_lock);
3041 done:
3042         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3043         return found;
3044 }
3045
3046
3047 /**
3048  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3049  * @ctx - the tx descriptor
3050  *
3051  * Determine which type of send request it was and start the next step, if needed,
3052  * or, if done, signal completion to LNET. After we are done, put back on the
3053  * idle tx list.
3054  */
3055 void
3056 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3057 {
3058         int             code    = tx->mxc_status.code;
3059         int             failed  = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3060         kmx_msg_t       *msg    = tx->mxc_msg;
3061         kmx_peer_t      *peer   = tx->mxc_peer;
3062         kmx_conn_t      *conn   = tx->mxc_conn;
3063         u8              type    = tx->mxc_msg_type;
3064         int             credit  = mxlnd_tx_requires_credit(tx);
3065         u64             cookie  = tx->mxc_cookie;
3066
3067         CDEBUG(D_NET, "entering %s (0x%llx):\n",
3068                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3069
3070         LASSERT (peer != NULL);
3071         LASSERT (conn != NULL);
3072
3073         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3074                 LASSERT (type == msg->mxm_type);
3075         }
3076
3077         if (failed) {
3078                 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3079         } else {
3080                 spin_lock(&conn->mxk_lock);
3081                 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3082                 spin_unlock(&conn->mxk_lock);
3083         }
3084
3085         switch (type) {
3086
3087         case MXLND_MSG_GET_DATA:
3088                 spin_lock(&conn->mxk_lock);
3089                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3090                         conn->mxk_outstanding++;
3091                         conn->mxk_data_posted--;
3092                 }
3093                 spin_unlock(&conn->mxk_lock);
3094                 break;
3095
3096         case MXLND_MSG_PUT_DATA:
3097                 spin_lock(&conn->mxk_lock);
3098                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3099                         conn->mxk_data_posted--;
3100                 }
3101                 spin_unlock(&conn->mxk_lock);
3102                 break;
3103
3104         case MXLND_MSG_NOOP:
3105         case MXLND_MSG_PUT_REQ:
3106         case MXLND_MSG_PUT_ACK:
3107         case MXLND_MSG_GET_REQ:
3108         case MXLND_MSG_EAGER:
3109                 break;
3110
3111         case MXLND_MSG_CONN_ACK:
3112                 if (peer->mxp_incompatible) {
3113                         /* we sent our params, now close this conn */
3114                         mxlnd_conn_disconnect(conn, 0, 1);
3115                 }
3116         case MXLND_MSG_CONN_REQ:
3117                 if (failed) {
3118                         CNETERR("%s failed with %s (%d) (errno = %d) to %s\n",
3119                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3120                                mx_strstatus(code), code, tx->mxc_errno,
3121                                libcfs_nid2str(tx->mxc_nid));
3122                         if (!peer->mxp_incompatible) {
3123                                 spin_lock(&conn->mxk_lock);
3124                                 if (code == MX_STATUS_BAD_SESSION)
3125                                         mxlnd_set_conn_status(conn,
3126                                                               MXLND_CONN_INIT);
3127                                 else
3128                                         mxlnd_set_conn_status(conn,
3129                                                               MXLND_CONN_FAIL);
3130                                 spin_unlock(&conn->mxk_lock);
3131                         }
3132                 }
3133                 break;
3134
3135         default:
3136                 CNETERR("Unknown msg type of %d\n", type);
3137                 LBUG();
3138         }
3139
3140         if (credit) {
3141                 spin_lock(&conn->mxk_lock);
3142                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3143                         conn->mxk_ntx_posted--;
3144                 }
3145                 spin_unlock(&conn->mxk_lock);
3146         }
3147
3148         mxlnd_put_idle_tx(tx);
3149         mxlnd_conn_decref(conn);
3150
3151         mxlnd_check_sends(peer);
3152
3153         CDEBUG(D_NET, "leaving\n");
3154         return;
3155 }
3156
3157 /* Handle completion of MSG or DATA rx.
3158  * CONN_REQ and CONN_ACK are handled elsewhere. */
3159 void
3160 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3161 {
3162         int             ret             = 0;
3163         int             repost          = 1;
3164         int             credit          = 1;
3165         u32             nob             = rx->mxc_status.xfer_length;
3166         u64             bits            = rx->mxc_status.match_info;
3167         kmx_msg_t      *msg             = rx->mxc_msg;
3168         kmx_peer_t     *peer            = rx->mxc_peer;
3169         kmx_conn_t     *conn            = rx->mxc_conn;
3170         u8              type            = rx->mxc_msg_type;
3171         u64             seq             = bits;
3172         lnet_msg_t     *lntmsg[2];
3173         int             result          = 0;
3174         int             peer_ref        = 0;
3175         int             conn_ref        = 0;
3176
3177         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3178          * failed GET reply */
3179
3180         /* NOTE peer may still be NULL if it is a new peer and
3181          *      conn may be NULL if this is a re-connect */
3182         if (likely(peer != NULL && conn != NULL)) {
3183                 /* we have a reference on the conn */
3184                 conn_ref = 1;
3185         } else if (peer != NULL && conn == NULL) {
3186                 /* we have a reference on the peer */
3187                 peer_ref = 1;
3188         } else if (peer == NULL && conn != NULL) {
3189                 /* fatal error */
3190                 CERROR("rx 0x%llx from %s has conn but no peer\n",
3191                        bits, libcfs_nid2str(rx->mxc_nid));
3192                 LBUG();
3193         } /* else peer and conn == NULL */
3194
3195         if (conn == NULL && peer != NULL) {
3196                 write_lock(&kmxlnd_data.kmx_global_lock);
3197                 conn = peer->mxp_conn;
3198                 if (conn) {
3199                         mxlnd_conn_addref(conn); /* conn takes ref... */
3200                         mxlnd_peer_decref(peer); /* from peer */
3201                         conn_ref = 1;
3202                         peer_ref = 0;
3203                 }
3204                 write_unlock(&kmxlnd_data.kmx_global_lock);
3205                 rx->mxc_conn = conn;
3206         }
3207
3208 #if MXLND_DEBUG
3209         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3210 #endif
3211
3212         lntmsg[0] = NULL;
3213         lntmsg[1] = NULL;
3214
3215         if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3216             rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3217                 CNETERR("rx from %s failed with %s (%d)\n",
3218                         libcfs_nid2str(rx->mxc_nid),
3219                         mx_strstatus(rx->mxc_status.code),
3220                         rx->mxc_status.code);
3221                 credit = 0;
3222                 goto cleanup;
3223         }
3224
3225         if (nob == 0) {
3226                 /* this may be a failed GET reply */
3227                 if (type == MXLND_MSG_GET_DATA) {
3228                         /* get the error (52-59) bits from the match bits */
3229                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3230                         lntmsg[0] = rx->mxc_lntmsg[0];
3231                         result = -ret;
3232                         goto cleanup;
3233                 } else {
3234                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
3235                         CNETERR("rx from %s returned with 0 bytes\n",
3236                                 libcfs_nid2str(rx->mxc_nid));
3237                         goto cleanup;
3238                 }
3239         }
3240
3241         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3242         if (type == MXLND_MSG_PUT_DATA) {
3243                 /* result = 0; */
3244                 lntmsg[0] = rx->mxc_lntmsg[0];
3245                 goto cleanup;
3246         } else if (type == MXLND_MSG_GET_DATA) {
3247                 /* result = 0; */
3248                 lntmsg[0] = rx->mxc_lntmsg[0];
3249                 lntmsg[1] = rx->mxc_lntmsg[1];
3250                 goto cleanup;
3251         }
3252
3253         ret = mxlnd_unpack_msg(msg, nob);
3254         if (ret != 0) {
3255                 CNETERR("Error %d unpacking rx from %s\n",
3256                         ret, libcfs_nid2str(rx->mxc_nid));
3257                 goto cleanup;
3258         }
3259         rx->mxc_nob = nob;
3260         type = msg->mxm_type;
3261
3262         if (rx->mxc_nid != msg->mxm_srcnid ||
3263             kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3264                 CNETERR("rx with mismatched NID (type %s) (my nid is "
3265                        "0x%llx and rx msg dst is 0x%llx)\n",
3266                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3267                        msg->mxm_dstnid);
3268                 goto cleanup;
3269         }
3270
3271         if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3272             msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3273                 CNETERR("Stale rx from %s with type %s "
3274                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3275                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3276                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3277                        msg->mxm_srcstamp, conn->mxk_incarnation,
3278                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3279                 credit = 0;
3280                 goto cleanup;
3281         }
3282
3283         CDEBUG(D_NET, "Received %s with %d credits\n",
3284                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
3285
3286         LASSERT(peer != NULL && conn != NULL);
3287         if (msg->mxm_credits != 0) {
3288                 spin_lock(&conn->mxk_lock);
3289                 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3290                         if ((conn->mxk_credits + msg->mxm_credits) >
3291                              *kmxlnd_tunables.kmx_peercredits) {
3292                                 CNETERR("mxk_credits %d  mxm_credits %d\n",
3293                                         conn->mxk_credits, msg->mxm_credits);
3294                         }
3295                         conn->mxk_credits += msg->mxm_credits;
3296                         LASSERT(conn->mxk_credits >= 0);
3297                         LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3298                 }
3299                 spin_unlock(&conn->mxk_lock);
3300         }
3301
3302         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3303         switch (type) {
3304         case MXLND_MSG_NOOP:
3305                 break;
3306
3307         case MXLND_MSG_EAGER:
3308                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3309                                         msg->mxm_srcnid, rx, 0);
3310                 repost = ret < 0;
3311                 break;
3312
3313         case MXLND_MSG_PUT_REQ:
3314                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3315                                         msg->mxm_srcnid, rx, 1);
3316                 repost = ret < 0;
3317                 break;
3318
3319         case MXLND_MSG_PUT_ACK: {
3320                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3321                 if (cookie > MXLND_MAX_COOKIE) {
3322                         CNETERR("NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3323                                 libcfs_nid2str(rx->mxc_nid));
3324                         result = -((u32) MXLND_ERROR_VAL(cookie));
3325                         lntmsg[0] = rx->mxc_lntmsg[0];
3326                 } else {
3327                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3328                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3329                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3330                 }
3331                 /* repost == 1 */
3332                 break;
3333         }
3334         case MXLND_MSG_GET_REQ:
3335                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3336                                         msg->mxm_srcnid, rx, 1);
3337                 repost = ret < 0;
3338                 break;
3339
3340         default:
3341                 CNETERR("Bad MXLND message type %x from %s\n", msg->mxm_type,
3342                         libcfs_nid2str(rx->mxc_nid));
3343                 ret = -EPROTO;
3344                 break;
3345         }
3346
3347         if (ret < 0) {
3348                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3349                 spin_lock(&conn->mxk_lock);
3350                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3351                 spin_unlock(&conn->mxk_lock);
3352         }
3353
3354 cleanup:
3355         if (conn != NULL) {
3356                 spin_lock(&conn->mxk_lock);
3357                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3358                 spin_unlock(&conn->mxk_lock);
3359         }
3360
3361         if (repost) {
3362                 /* lnet_parse() failed, etc., repost now */
3363                 mxlnd_put_idle_rx(rx);
3364                 if (conn != NULL && credit == 1) {
3365                         if (type == MXLND_MSG_PUT_DATA ||
3366                             type == MXLND_MSG_EAGER ||
3367                             type == MXLND_MSG_PUT_REQ ||
3368                             type == MXLND_MSG_NOOP) {
3369                                 spin_lock(&conn->mxk_lock);
3370                                 conn->mxk_outstanding++;
3371                                 spin_unlock(&conn->mxk_lock);
3372                         }
3373                 }
3374                 if (conn_ref) mxlnd_conn_decref(conn);
3375                 LASSERT(peer_ref == 0);
3376         }
3377
3378         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3379                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3380         } else {
3381                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3382         }
3383
3384         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3385         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3386
3387         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3388
3389         return;
3390 }
3391
3392 void
3393 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3394 {
3395         kmx_ctx_t       *tx     = NULL;
3396         kmx_msg_t       *txmsg  = NULL;
3397         kmx_conn_t      *conn   = peer->mxp_conn;
3398         u64             nic_id  = 0ULL;
3399         u32             ep_id   = 0;
3400         u32             sid     = 0;
3401         u8              type    = (msg_type == MXLND_MSG_ICON_REQ ?
3402                                    MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3403
3404         /* a conn ref was taken when calling mx_iconnect(),
3405          * hold it until CONN_REQ or CONN_ACK completes */
3406
3407         CDEBUG(D_NET, "entering\n");
3408         if (status.code != MX_STATUS_SUCCESS) {
3409                 int send_bye    = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3410
3411                 CNETERR("mx_iconnect() failed for %s with %s (%d) "
3412                         "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3413                         mxlnd_msgtype_to_str(msg_type),
3414                         mx_strstatus(status.code), status.code,
3415                         libcfs_nid2str(peer->mxp_nid),
3416                         peer->mxp_nid,
3417                         peer->mxp_nic_id,
3418                         peer->mxp_ep_id);
3419                 spin_lock(&conn->mxk_lock);
3420                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3421                 spin_unlock(&conn->mxk_lock);
3422
3423                 if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
3424                                    MXLND_CONNECT_TIMEOUT)) {
3425                         CNETERR("timeout, calling conn_disconnect()\n");
3426                         mxlnd_conn_disconnect(conn, 0, send_bye);
3427                 }
3428
3429                 mxlnd_conn_decref(conn);
3430                 return;
3431         }
3432         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3433         write_lock(&kmxlnd_data.kmx_global_lock);
3434         spin_lock(&conn->mxk_lock);
3435         conn->mxk_epa = status.source;
3436         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3437         if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3438                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3439         }
3440         spin_unlock(&conn->mxk_lock);
3441         write_unlock(&kmxlnd_data.kmx_global_lock);
3442
3443         /* mx_iconnect() succeeded, reset delay to 0 */
3444         write_lock(&kmxlnd_data.kmx_global_lock);
3445         peer->mxp_reconnect_time = 0;
3446         peer->mxp_conn->mxk_sid = sid;
3447         write_unlock(&kmxlnd_data.kmx_global_lock);
3448
3449         /* marshal CONN_REQ or CONN_ACK msg */
3450         /* we are still using the conn ref from iconnect() - do not take another */
3451         tx = mxlnd_get_idle_tx();
3452         if (tx == NULL) {
3453                 CNETERR("Can't obtain %s tx for %s\n",
3454                        mxlnd_msgtype_to_str(type),
3455                        libcfs_nid2str(peer->mxp_nid));
3456                 spin_lock(&conn->mxk_lock);
3457                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3458                 spin_unlock(&conn->mxk_lock);
3459                 mxlnd_conn_decref(conn);
3460                 return;
3461         }
3462
3463         tx->mxc_peer = peer;
3464         tx->mxc_conn = conn;
3465         tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3466         CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3467         mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3468         txmsg = tx->mxc_msg;
3469         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3470         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3471         tx->mxc_match = mxlnd_create_match(tx, 0);
3472
3473         mxlnd_queue_tx(tx);
3474         return;
3475 }
3476
3477 /**
3478  * mxlnd_request_waitd - the MX request completion thread(s)
3479  * @arg - thread id (as a void *)
3480  *
3481  * This thread waits for a MX completion and then completes the request.
3482  * We will create one thread per CPU.
3483  */
3484 int
3485 mxlnd_request_waitd(void *arg)
3486 {
3487         long                    id              = (long) arg;
3488         __u32                   result          = 0;
3489         mx_return_t             mxret           = MX_SUCCESS;
3490         mx_status_t             status;
3491         kmx_ctx_t              *ctx             = NULL;
3492         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3493         kmx_peer_t             *peer            = NULL;
3494         kmx_conn_t             *conn            = NULL;
3495 #if MXLND_POLLING
3496         int                     count           = 0;
3497 #endif
3498
3499         memset(&status, 0, sizeof(status));
3500
3501         CDEBUG(D_NET, "%s starting\n", name);
3502
3503         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3504                 u8      msg_type        = 0;
3505
3506                 mxret = MX_SUCCESS;
3507                 result = 0;
3508 #if MXLND_POLLING
3509                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3510                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3511                                             &status, &result);
3512                 } else {
3513                         count = 0;
3514                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3515                                             0ULL, 0ULL, &status, &result);
3516                 }
3517 #else
3518                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3519                                     0ULL, 0ULL, &status, &result);
3520 #endif
3521                 if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown)))
3522                         break;
3523
3524                 if (result != 1) {
3525                         /* nothing completed... */
3526                         continue;
3527                 }
3528
3529                 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3530                        "match_info 0x%llx and length %d\n",
3531                        mx_strstatus(status.code), status.code,
3532                        (u64) status.match_info, status.msg_length);
3533
3534                 if (status.code != MX_STATUS_SUCCESS) {
3535                         CNETERR("wait_any() failed with %s (%d) with "
3536                                 "match_info 0x%llx and length %d\n",
3537                                 mx_strstatus(status.code), status.code,
3538                                 (u64) status.match_info, status.msg_length);
3539                 }
3540
3541                 msg_type = MXLND_MSG_TYPE(status.match_info);
3542
3543                 /* This may be a mx_iconnect() request completing,
3544                  * check the bit mask for CONN_REQ and CONN_ACK */
3545                 if (msg_type == MXLND_MSG_ICON_REQ ||
3546                     msg_type == MXLND_MSG_ICON_ACK) {
3547                         peer = (kmx_peer_t*) status.context;
3548                         mxlnd_handle_connect_msg(peer, msg_type, status);
3549                         continue;
3550                 }
3551
3552                 /* This must be a tx or rx */
3553
3554                 /* NOTE: if this is a RX from the unexpected callback, it may
3555                  * have very little info. If we dropped it in unexpected_recv(),
3556                  * it will not have a context. If so, ignore it. */
3557                 ctx = (kmx_ctx_t *) status.context;
3558                 if (ctx != NULL) {
3559
3560                         req_type = ctx->mxc_type;
3561                         conn = ctx->mxc_conn; /* this may be NULL */
3562                         mxlnd_deq_pending_ctx(ctx);
3563
3564                         /* copy status to ctx->mxc_status */
3565                         ctx->mxc_status = status;
3566
3567                         switch (req_type) {
3568                         case MXLND_REQ_TX:
3569                                 mxlnd_handle_tx_completion(ctx);
3570                                 break;
3571                         case MXLND_REQ_RX:
3572                                 mxlnd_handle_rx_completion(ctx);
3573                                 break;
3574                         default:
3575                                 CNETERR("Unknown ctx type %d\n", req_type);
3576                                 LBUG();
3577                                 break;
3578                         }
3579
3580                         /* conn is always set except for the first CONN_REQ rx
3581                          * from a new peer */
3582                         if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3583                                 mxlnd_conn_disconnect(conn, 1, 1);
3584                         }
3585                 }
3586                 CDEBUG(D_NET, "waitd() completed task\n");
3587         }
3588         CDEBUG(D_NET, "%s stopping\n", name);
3589         mxlnd_thread_stop(id);
3590         return 0;
3591 }
3592
3593
3594 unsigned long
3595 mxlnd_check_timeouts(unsigned long now)
3596 {
3597         int             i               = 0;
3598         int             disconnect      = 0;
3599         unsigned long   next            = 0; /* jiffies */
3600         kmx_peer_t      *peer           = NULL;
3601         kmx_conn_t      *conn           = NULL;
3602         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3603
3604         read_lock(g_lock);
3605         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3606                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
3607                                         mxp_list) {
3608
3609                         if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3610                                 read_unlock(g_lock);
3611                                 return next;
3612                         }
3613
3614                         conn = peer->mxp_conn;
3615                         if (conn) {
3616                                 mxlnd_conn_addref(conn);
3617                         } else {
3618                                 continue;
3619                         }
3620
3621                         spin_lock(&conn->mxk_lock);
3622
3623                         /* if nothing pending (timeout == 0) or
3624                          * if conn is already disconnected,
3625                          * skip this conn */
3626                         if (conn->mxk_timeout == 0 ||
3627                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3628                                 spin_unlock(&conn->mxk_lock);
3629                                 mxlnd_conn_decref(conn);
3630                                 continue;
3631                         }
3632
3633                         /* we want to find the timeout that will occur first.
3634                          * if it is in the future, we will sleep until then.
3635                          * if it is in the past, then we will sleep one
3636                          * second and repeat the process. */
3637                         if ((next == 0) ||
3638                             (cfs_time_before(conn->mxk_timeout, next))) {
3639                                 next = conn->mxk_timeout;
3640                         }
3641
3642                         disconnect = 0;
3643
3644                         if (cfs_time_aftereq(now, conn->mxk_timeout))
3645                                 disconnect = 1;
3646                         spin_unlock(&conn->mxk_lock);
3647
3648                         if (disconnect)
3649                                 mxlnd_conn_disconnect(conn, 1, 1);
3650                         mxlnd_conn_decref(conn);
3651                 }
3652         }
3653         read_unlock(g_lock);
3654         if (next == 0)
3655                 next = now + MXLND_COMM_TIMEOUT;
3656
3657         return next;
3658 }
3659
3660 void
3661 mxlnd_passive_connect(kmx_connparams_t *cp)
3662 {
3663         int             ret             = 0;
3664         int             incompatible    = 0;
3665         u64             nic_id          = 0ULL;
3666         u32             ep_id           = 0;
3667         u32             sid             = 0;
3668         int             conn_ref        = 0;
3669         kmx_msg_t       *msg            = &cp->mxr_msg;
3670         kmx_peer_t      *peer           = cp->mxr_peer;
3671         kmx_conn_t      *conn           = NULL;
3672         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3673
3674         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3675
3676         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3677         if (ret != 0) {
3678                 if (peer) {
3679                         CNETERR("Error %d unpacking CONN_REQ from %s\n",
3680                                ret, libcfs_nid2str(peer->mxp_nid));
3681                 } else {
3682                         CNETERR("Error %d unpacking CONN_REQ from "
3683                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3684                 }
3685                 goto cleanup;
3686         }
3687         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3688                 CNETERR("Can't accept %s: bad dst nid %s\n",
3689                                 libcfs_nid2str(msg->mxm_srcnid),
3690                                 libcfs_nid2str(msg->mxm_dstnid));
3691                 goto cleanup;
3692         }
3693         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3694                 CNETERR("Can't accept %s: incompatible queue depth "
3695                             "%d (%d wanted)\n",
3696                                 libcfs_nid2str(msg->mxm_srcnid),
3697                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3698                                 *kmxlnd_tunables.kmx_peercredits);
3699                 incompatible = 1;
3700         }
3701         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3702                 CNETERR("Can't accept %s: incompatible EAGER size "
3703                             "%d (%d wanted)\n",
3704                                 libcfs_nid2str(msg->mxm_srcnid),
3705                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3706                                 (int) MXLND_MSG_SIZE);
3707                 incompatible = 1;
3708         }
3709
3710         if (peer == NULL) {
3711                 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3712                 if (peer == NULL) {
3713                         int             hash    = 0;
3714                         u32             board   = 0;
3715                         kmx_peer_t      *existing_peer    = NULL;
3716
3717                         hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3718
3719                         mx_nic_id_to_board_number(nic_id, &board);
3720
3721                         /* adds conn ref for peer and one for this function */
3722                         ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3723                                                board, ep_id, 0ULL);
3724                         if (ret != 0) {
3725                                 goto cleanup;
3726                         }
3727                         peer->mxp_conn->mxk_sid = sid;
3728                         LASSERT(peer->mxp_ep_id == ep_id);
3729                         write_lock(g_lock);
3730                         existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3731                         if (existing_peer) {
3732                                 mxlnd_conn_decref(peer->mxp_conn);
3733                                 mxlnd_peer_decref(peer);
3734                                 peer = existing_peer;
3735                                 mxlnd_conn_addref(peer->mxp_conn);
3736                                 conn = peer->mxp_conn;
3737                         } else {
3738                                 cfs_list_add_tail(&peer->mxp_list,
3739                                                   &kmxlnd_data.kmx_peers[hash]);
3740                                 atomic_inc(&kmxlnd_data.kmx_npeers);
3741                         }
3742                         write_unlock(g_lock);
3743                 } else {
3744                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3745                         write_lock(g_lock);
3746                         mxlnd_peer_decref(peer); /* drop ref taken above */
3747                         write_unlock(g_lock);
3748                         if (ret != 0) {
3749                                 CNETERR("Cannot allocate mxp_conn\n");
3750                                 goto cleanup;
3751                         }
3752                 }
3753                 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3754                 conn = peer->mxp_conn;
3755         } else { /* unexpected handler found peer */
3756                 kmx_conn_t      *old_conn       = peer->mxp_conn;
3757
3758                 if (sid != peer->mxp_conn->mxk_sid) {
3759                         /* do not call mx_disconnect() or send a BYE */
3760                         mxlnd_conn_disconnect(old_conn, 0, 0);
3761
3762                         /* This allocs a conn, points peer->mxp_conn to this one.
3763                         * The old conn is still on the peer->mxp_conns list.
3764                         * As the pending requests complete, they will call
3765                         * conn_decref() which will eventually free it. */
3766                         ret = mxlnd_conn_alloc(&conn, peer);
3767                         if (ret != 0) {
3768                                 CNETERR("Cannot allocate peer->mxp_conn\n");
3769                                 goto cleanup;
3770                         }
3771                         /* conn_alloc() adds one ref for the peer and one
3772                          * for this function */
3773                         conn_ref = 1;
3774
3775                         peer->mxp_conn->mxk_sid = sid;
3776                 } else {
3777                         /* same sid */
3778                         conn = peer->mxp_conn;
3779                 }
3780         }
3781         write_lock(g_lock);
3782         peer->mxp_incompatible = incompatible;
3783         write_unlock(g_lock);
3784         spin_lock(&conn->mxk_lock);
3785         conn->mxk_incarnation = msg->mxm_srcstamp;
3786         mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3787         spin_unlock(&conn->mxk_lock);
3788
3789         /* handle_conn_ack() will create the CONN_ACK msg */
3790         mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3791
3792 cleanup:
3793         if (conn_ref) mxlnd_conn_decref(conn);
3794
3795         mxlnd_connparams_free(cp);
3796         return;
3797 }
3798
3799 void
3800 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3801 {
3802         int             ret             = 0;
3803         int             incompatible    = 0;
3804         u64             nic_id          = 0ULL;
3805         u32             ep_id           = 0;
3806         u32             sid             = 0;
3807         kmx_msg_t       *msg            = &cp->mxr_msg;
3808         kmx_peer_t      *peer           = cp->mxr_peer;
3809         kmx_conn_t      *conn           = cp->mxr_conn;
3810
3811         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3812
3813         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3814         if (ret != 0) {
3815                 if (peer) {
3816                         CNETERR("Error %d unpacking CONN_ACK from %s\n",
3817                                ret, libcfs_nid2str(peer->mxp_nid));
3818                 } else {
3819                         CNETERR("Error %d unpacking CONN_ACK from "
3820                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3821                 }
3822                 ret = -1;
3823                 incompatible = 1;
3824                 goto failed;
3825         }
3826         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3827                 CNETERR("Can't accept CONN_ACK from %s: "
3828                        "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3829                         libcfs_nid2str(msg->mxm_dstnid));
3830                 ret = -1;
3831                 goto failed;
3832         }
3833         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3834                 CNETERR("Can't accept CONN_ACK from %s: "
3835                        "incompatible queue depth %d (%d wanted)\n",
3836                         libcfs_nid2str(msg->mxm_srcnid),
3837                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3838                         *kmxlnd_tunables.kmx_peercredits);
3839                 incompatible = 1;
3840                 ret = -1;
3841                 goto failed;
3842         }
3843         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3844                 CNETERR("Can't accept CONN_ACK from %s: "
3845                         "incompatible EAGER size %d (%d wanted)\n",
3846                         libcfs_nid2str(msg->mxm_srcnid),
3847                         msg->mxm_u.conn_req.mxcrm_eager_size,
3848                         (int) MXLND_MSG_SIZE);
3849                 incompatible = 1;
3850                 ret = -1;
3851                 goto failed;
3852         }
3853         write_lock(&kmxlnd_data.kmx_global_lock);
3854         peer->mxp_incompatible = incompatible;
3855         write_unlock(&kmxlnd_data.kmx_global_lock);
3856         spin_lock(&conn->mxk_lock);
3857         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3858         conn->mxk_outstanding = 0;
3859         conn->mxk_incarnation = msg->mxm_srcstamp;
3860         conn->mxk_timeout = 0;
3861         if (!incompatible) {
3862                 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3863                        libcfs_nid2str(msg->mxm_srcnid));
3864                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3865         }
3866         spin_unlock(&conn->mxk_lock);
3867
3868         if (!incompatible)
3869                 mxlnd_check_sends(peer);
3870
3871 failed:
3872         if (ret < 0) {
3873                 spin_lock(&conn->mxk_lock);
3874                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3875                 spin_unlock(&conn->mxk_lock);
3876         }
3877
3878         if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3879
3880         mxlnd_connparams_free(cp);
3881         return;
3882 }
3883
3884 int
3885 mxlnd_abort_msgs(void)
3886 {
3887         int                     count           = 0;
3888         cfs_list_t              *orphans        = &kmxlnd_data.kmx_orphan_msgs;
3889         spinlock_t              *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3890
3891         /* abort orphans */
3892         spin_lock(g_conn_lock);
3893         while (!cfs_list_empty(orphans)) {
3894                 kmx_ctx_t       *ctx     = NULL;
3895                 kmx_conn_t      *conn   = NULL;
3896
3897                 ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
3898                 cfs_list_del_init(&ctx->mxc_list);
3899                 spin_unlock(g_conn_lock);
3900
3901                 ctx->mxc_errno = -ECONNABORTED;
3902                 conn = ctx->mxc_conn;
3903                 CDEBUG(D_NET, "aborting %s %s %s\n",
3904                        mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3905                        ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3906                        libcfs_nid2str(ctx->mxc_nid));
3907                 if (ctx->mxc_type == MXLND_REQ_TX) {
3908                         mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3909                         if (conn) mxlnd_conn_decref(conn); /* for this tx */
3910                 } else {
3911                         ctx->mxc_state = MXLND_CTX_CANCELED;
3912                         mxlnd_handle_rx_completion(ctx);
3913                 }
3914
3915                 count++;
3916                 spin_lock(g_conn_lock);
3917         }
3918         spin_unlock(g_conn_lock);
3919
3920         return count;
3921 }
3922
3923 int
3924 mxlnd_free_conn_zombies(void)
3925 {
3926         int             count           = 0;
3927         cfs_list_t      *zombies        = &kmxlnd_data.kmx_conn_zombies;
3928         spinlock_t      *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3929         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3930
3931         /* cleanup any zombies */
3932         spin_lock(g_conn_lock);
3933         while (!cfs_list_empty(zombies)) {
3934                 kmx_conn_t      *conn   = NULL;
3935
3936                 conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3937                 cfs_list_del_init(&conn->mxk_zombie);
3938                 spin_unlock(g_conn_lock);
3939
3940                 write_lock(g_lock);
3941                 mxlnd_conn_free_locked(conn);
3942                 write_unlock(g_lock);
3943
3944                 count++;
3945                 spin_lock(g_conn_lock);
3946         }
3947         spin_unlock(g_conn_lock);
3948         CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3949         return count;
3950 }
3951
3952 /**
3953  * mxlnd_connd - handles incoming connection requests
3954  * @arg - thread id (as a void *)
3955  *
3956  * This thread handles incoming connection requests
3957  */
3958 int
3959 mxlnd_connd(void *arg)
3960 {
3961         long                    id              = (long) arg;
3962
3963         CDEBUG(D_NET, "connd starting\n");
3964
3965         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3966                 int                ret             = 0;
3967                 kmx_connparams_t  *cp              = NULL;
3968                 spinlock_t        *g_conn_lock  = &kmxlnd_data.kmx_conn_lock;
3969                 cfs_list_t        *conn_reqs    = &kmxlnd_data.kmx_conn_reqs;
3970
3971                 ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
3972
3973                 if (atomic_read(&kmxlnd_data.kmx_shutdown))
3974                         break;
3975
3976                 if (ret != 0)
3977                         continue;
3978
3979                 ret = mxlnd_abort_msgs();
3980                 ret += mxlnd_free_conn_zombies();
3981
3982                 spin_lock(g_conn_lock);
3983                 if (cfs_list_empty(conn_reqs)) {
3984                         if (ret == 0)
3985                                 CNETERR("connd woke up but did not find a "
3986                                         "kmx_connparams_t or zombie conn\n");
3987                         spin_unlock(g_conn_lock);
3988                         continue;
3989                 }
3990                 cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
3991                                     mxr_list);
3992                 cfs_list_del_init(&cp->mxr_list);
3993                 spin_unlock(g_conn_lock);
3994
3995                 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
3996                 case MXLND_MSG_CONN_REQ:
3997                         /* We have a connection request. Handle it. */
3998                         mxlnd_passive_connect(cp);
3999                         break;
4000                 case MXLND_MSG_CONN_ACK:
4001                         /* The peer is ready for messages */
4002                         mxlnd_check_conn_ack(cp);
4003                         break;
4004                 }
4005         }
4006
4007         mxlnd_free_conn_zombies();
4008
4009         CDEBUG(D_NET, "connd stopping\n");
4010         mxlnd_thread_stop(id);
4011         return 0;
4012 }
4013
4014 /**
4015  * mxlnd_timeoutd - enforces timeouts on messages
4016  * @arg - thread id (as a void *)
4017  *
4018  * This thread queries each peer for its earliest timeout. If a peer has timed out,
4019  * it calls mxlnd_conn_disconnect().
4020  *
4021  * After checking for timeouts, try progressing sends (call check_sends()).
4022  */
4023 int
4024 mxlnd_timeoutd(void *arg)
4025 {
4026         int             i       = 0;
4027         long            id      = (long) arg;
4028         unsigned long   now     = 0;
4029         unsigned long   next    = 0;
4030         unsigned long   delay   = msecs_to_jiffies(MSEC_PER_SEC);
4031         kmx_peer_t     *peer    = NULL;
4032         kmx_peer_t     *temp    = NULL;
4033         kmx_conn_t     *conn    = NULL;
4034         rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
4035
4036         CDEBUG(D_NET, "timeoutd starting\n");
4037
4038         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
4039
4040                 now = jiffies;
4041                 /* if the next timeout has not arrived, go back to sleep */
4042                 if (cfs_time_after(now, next)) {
4043                         next = mxlnd_check_timeouts(now);
4044                 }
4045
4046                 /* try to progress peers' txs */
4047                 write_lock(g_lock);
4048                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4049                         cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
4050
4051                         /* NOTE we are safe against the removal of peer, but
4052                          * not against the removal of temp */
4053                         cfs_list_for_each_entry_safe(peer, temp, peers,
4054                                                      mxp_list) {
4055                                 if (atomic_read(&kmxlnd_data.kmx_shutdown))
4056                                         break;
4057                                 mxlnd_peer_addref(peer); /* add ref... */
4058                                 conn = peer->mxp_conn;
4059                                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4060                                         mxlnd_conn_addref(conn); /* take ref... */
4061                                 } else {
4062                                         CDEBUG(D_NET, "ignoring %s\n",
4063                                                libcfs_nid2str(peer->mxp_nid));
4064                                         mxlnd_peer_decref(peer); /* ...to here */
4065                                         continue;
4066                                 }
4067
4068                                 if ((conn->mxk_status == MXLND_CONN_READY ||
4069                                     conn->mxk_status == MXLND_CONN_FAIL) &&
4070                                     cfs_time_after(now,
4071                                                    conn->mxk_last_tx +
4072                                                    msecs_to_jiffies(MSEC_PER_SEC))) {
4073                                         write_unlock(g_lock);
4074                                         mxlnd_check_sends(peer);
4075                                         write_lock(g_lock);
4076                                 }
4077                                 mxlnd_conn_decref(conn); /* until here */
4078                                 mxlnd_peer_decref(peer); /* ...to here */
4079                         }
4080                 }
4081                 write_unlock(g_lock);
4082
4083                 mxlnd_sleep(delay);
4084         }
4085         CDEBUG(D_NET, "timeoutd stopping\n");
4086         mxlnd_thread_stop(id);
4087         return 0;
4088 }