Whamcloud - gitweb
ddc3bed4b30568e7eec0ff74c32e69ae556c01f3
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
47
48 inline int
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
50 {
51         /* if memcmp() == 0, it is NULL */
52         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
53 }
54
55 char *
56 mxlnd_ctxstate_to_str(int mxc_state)
57 {
58         switch (mxc_state) {
59         case MXLND_CTX_INIT:
60                 return "MXLND_CTX_INIT";
61         case MXLND_CTX_IDLE:
62                 return "MXLND_CTX_IDLE";
63         case MXLND_CTX_PREP:
64                 return "MXLND_CTX_PREP";
65         case MXLND_CTX_PENDING:
66                 return "MXLND_CTX_PENDING";
67         case MXLND_CTX_COMPLETED:
68                 return "MXLND_CTX_COMPLETED";
69         case MXLND_CTX_CANCELED:
70                 return "MXLND_CTX_CANCELED";
71         default:
72                 return "*unknown*";
73         }
74 }
75
76 char *
77 mxlnd_connstatus_to_str(int mxk_status)
78 {
79         switch (mxk_status) {
80         case MXLND_CONN_READY:
81                 return "MXLND_CONN_READY";
82         case MXLND_CONN_INIT:
83                 return "MXLND_CONN_INIT";
84         case MXLND_CONN_WAIT:
85                 return "MXLND_CONN_WAIT";
86         case MXLND_CONN_DISCONNECT:
87                 return "MXLND_CONN_DISCONNECT";
88         case MXLND_CONN_FAIL:
89                 return "MXLND_CONN_FAIL";
90         default:
91                 return "unknown";
92         }
93 }
94
95 char *
96 mxlnd_msgtype_to_str(int type) {
97         switch (type) {
98         case MXLND_MSG_EAGER:
99                 return "MXLND_MSG_EAGER";
100         case MXLND_MSG_CONN_REQ:
101                 return "MXLND_MSG_CONN_REQ";
102         case MXLND_MSG_CONN_ACK:
103                 return "MXLND_MSG_CONN_ACK";
104         case MXLND_MSG_BYE:
105                 return "MXLND_MSG_BYE";
106         case MXLND_MSG_NOOP:
107                 return "MXLND_MSG_NOOP";
108         case MXLND_MSG_PUT_REQ:
109                 return "MXLND_MSG_PUT_REQ";
110         case MXLND_MSG_PUT_ACK:
111                 return "MXLND_MSG_PUT_ACK";
112         case MXLND_MSG_PUT_DATA:
113                 return "MXLND_MSG_PUT_DATA";
114         case MXLND_MSG_GET_REQ:
115                 return "MXLND_MSG_GET_REQ";
116         case MXLND_MSG_GET_DATA:
117                 return "MXLND_MSG_GET_DATA";
118         default:
119                 return "unknown";
120         }
121 }
122
123 char *
124 mxlnd_lnetmsg_to_str(int type)
125 {
126         switch (type) {
127         case LNET_MSG_ACK:
128                 return "LNET_MSG_ACK";
129         case LNET_MSG_PUT:
130                 return "LNET_MSG_PUT";
131         case LNET_MSG_GET:
132                 return "LNET_MSG_GET";
133         case LNET_MSG_REPLY:
134                 return "LNET_MSG_REPLY";
135         case LNET_MSG_HELLO:
136                 return "LNET_MSG_HELLO";
137         default:
138                 LBUG();
139                 return "*unknown*";
140         }
141 }
142
143 static inline u64
144 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
145 {
146         u64 type        = (u64) ctx->mxc_msg_type;
147         u64 err         = (u64) error;
148         u64 match       = 0ULL;
149
150         mxlnd_valid_msg_type(ctx->mxc_msg_type);
151         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
152         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
153         return match;
154 }
155
156 static inline void
157 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
158 {
159         *msg_type = (u8) MXLND_MSG_TYPE(match);
160         *error    = (u8) MXLND_ERROR_VAL(match);
161         *cookie   = match & MXLND_MAX_COOKIE;
162         mxlnd_valid_msg_type(*msg_type);
163         return;
164 }
165
166 kmx_ctx_t *
167 mxlnd_get_idle_rx(kmx_conn_t *conn)
168 {
169         struct list_head        *rxs    = NULL;
170         kmx_ctx_t               *rx     = NULL;
171
172         LASSERT(conn != NULL);
173
174         rxs = &conn->mxk_rx_idle;
175
176         spin_lock(&conn->mxk_lock);
177
178         if (list_empty (rxs)) {
179                 spin_unlock(&conn->mxk_lock);
180                 return NULL;
181         }
182
183         rx = list_entry (rxs->next, kmx_ctx_t, mxc_list);
184         list_del_init(&rx->mxc_list);
185         spin_unlock(&conn->mxk_lock);
186
187 #if MXLND_DEBUG
188         if (rx->mxc_get != rx->mxc_put) {
189                 CDEBUG(D_NETERROR, "*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
190                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
191                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
192                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
193                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
194                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
195                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
196                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
197                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
198                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
199         }
200 #endif
201         LASSERT (rx->mxc_get == rx->mxc_put);
202
203         rx->mxc_get++;
204
205         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
206         rx->mxc_state = MXLND_CTX_PREP;
207         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
208
209         return rx;
210 }
211
212 int
213 mxlnd_put_idle_rx(kmx_ctx_t *rx)
214 {
215         kmx_conn_t              *conn   = rx->mxc_conn;
216         struct list_head        *rxs    = &conn->mxk_rx_idle;
217
218         LASSERT(rx->mxc_type == MXLND_REQ_RX);
219
220         mxlnd_ctx_init(rx);
221
222         rx->mxc_put++;
223         LASSERT(rx->mxc_get == rx->mxc_put);
224
225         spin_lock(&conn->mxk_lock);
226         list_add(&rx->mxc_list, rxs);
227         spin_unlock(&conn->mxk_lock);
228         return 0;
229 }
230
231 kmx_ctx_t *
232 mxlnd_get_idle_tx(void)
233 {
234         struct list_head        *tmp    = &kmxlnd_data.kmx_tx_idle;
235         kmx_ctx_t               *tx     = NULL;
236
237         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
238
239         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
240                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
241                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
242                 return NULL;
243         }
244
245         tmp = &kmxlnd_data.kmx_tx_idle;
246         tx = list_entry (tmp->next, kmx_ctx_t, mxc_list);
247         list_del_init(&tx->mxc_list);
248
249         /* Allocate a new completion cookie.  It might not be needed,
250          * but we've got a lock right now and we're unlikely to
251          * wrap... */
252         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
253         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
254                 kmxlnd_data.kmx_tx_next_cookie = 1;
255         }
256         kmxlnd_data.kmx_tx_used++;
257         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
258
259         LASSERT (tx->mxc_get == tx->mxc_put);
260
261         tx->mxc_get++;
262
263         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
264         LASSERT (tx->mxc_lntmsg[0] == NULL);
265         LASSERT (tx->mxc_lntmsg[1] == NULL);
266
267         tx->mxc_state = MXLND_CTX_PREP;
268         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
269
270         return tx;
271 }
272
273 void
274 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
275
276 int
277 mxlnd_put_idle_tx(kmx_ctx_t *tx)
278 {
279         int             result  = 0;
280         lnet_msg_t      *lntmsg[2];
281
282         LASSERT(tx->mxc_type == MXLND_REQ_TX);
283
284         if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
285                 kmx_conn_t      *conn   = tx->mxc_conn;
286
287                 result = -EIO;
288                 if (tx->mxc_errno != 0) result = tx->mxc_errno;
289                 /* FIXME should we set mx_dis? */
290                 mxlnd_conn_disconnect(conn, 0, 1);
291         }
292
293         lntmsg[0] = tx->mxc_lntmsg[0];
294         lntmsg[1] = tx->mxc_lntmsg[1];
295
296         mxlnd_ctx_init(tx);
297
298         tx->mxc_put++;
299         LASSERT(tx->mxc_get == tx->mxc_put);
300
301         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
302         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
303         kmxlnd_data.kmx_tx_used--;
304         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
305
306         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
307         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
308         return 0;
309 }
310
311
312 void
313 mxlnd_connparams_free(kmx_connparams_t *cp)
314 {
315         LASSERT(list_empty(&cp->mxr_list));
316         MXLND_FREE(cp, sizeof(*cp));
317         return;
318 }
319
320 int
321 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
322                             mx_endpoint_addr_t epa, u64 match, u32 length,
323                             kmx_conn_t *conn, kmx_peer_t *peer, void *data)
324 {
325         kmx_connparams_t *c = NULL;
326
327         MXLND_ALLOC(c, sizeof(*c));
328         if (!c) return -ENOMEM;
329
330         INIT_LIST_HEAD(&c->mxr_list);
331         c->mxr_context = context;
332         c->mxr_epa = epa;
333         c->mxr_match = match;
334         c->mxr_nob = length;
335         c->mxr_conn = conn;
336         c->mxr_peer = peer;
337         c->mxr_msg = *((kmx_msg_t *) data);
338
339         *cp = c;
340         return 0;
341 }
342
343 static inline void
344 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
345 {
346         conn->mxk_status = status;
347         mb();
348 }
349
350 /**
351  * mxlnd_conn_free_locked - free the conn
352  * @conn - a kmx_conn pointer
353  *
354  * The calling function should remove the conn from the conns list first
355  * then destroy it. Caller should have write-locked kmx_global_lock.
356  */
357 void
358 mxlnd_conn_free_locked(kmx_conn_t *conn)
359 {
360         int             valid   = !mxlnd_endpoint_addr_null(conn->mxk_epa);
361         kmx_peer_t      *peer   = conn->mxk_peer;
362
363         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
365                  list_empty (&conn->mxk_tx_free_queue) &&
366                  list_empty (&conn->mxk_pending));
367         if (!list_empty(&conn->mxk_list)) {
368                 list_del_init(&conn->mxk_list);
369                 if (peer->mxp_conn == conn) {
370                         peer->mxp_conn = NULL;
371                         if (valid) {
372                                 kmx_conn_t      *temp   = NULL;
373
374                                 mx_get_endpoint_addr_context(conn->mxk_epa,
375                                                              (void **) &temp);
376                                 if (conn == temp) {
377                                         mx_set_endpoint_addr_context(conn->mxk_epa,
378                                                                      (void *) NULL);
379                                 }
380                         }
381                         /* unlink from global list and drop its ref */
382                         list_del_init(&peer->mxp_list);
383                         mxlnd_peer_decref(peer);
384                 }
385         }
386         mxlnd_peer_decref(peer); /* drop conn's ref to peer */
387         if (conn->mxk_rx_pages) {
388                 LASSERT (conn->mxk_rxs != NULL);
389                 mxlnd_free_pages(conn->mxk_rx_pages);
390         }
391         if (conn->mxk_rxs) {
392                 int             i       = 0;
393                 kmx_ctx_t       *rx     = NULL;
394
395                 for (i = 0; i < MXLND_RX_MSGS(); i++) {
396                         rx = &conn->mxk_rxs[i];
397                         if (rx->mxc_seg_list != NULL) {
398                                 LASSERT(rx->mxc_nseg > 0);
399                                 MXLND_FREE(rx->mxc_seg_list,
400                                            rx->mxc_nseg *
401                                            sizeof(*rx->mxc_seg_list));
402                         }
403                 }
404                 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
405         }
406
407         MXLND_FREE(conn, sizeof (*conn));
408         return;
409 }
410
411
412 int
413 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
414 {
415         int             found   = 0;
416         int             count   = 0;
417         kmx_ctx_t       *ctx    = NULL;
418         kmx_ctx_t       *next   = NULL;
419         mx_return_t     mxret   = MX_SUCCESS;
420         u32             result  = 0;
421
422         do {
423                 found = 0;
424                 spin_lock(&conn->mxk_lock);
425                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
426                         list_del_init(&ctx->mxc_list);
427                         if (ctx->mxc_type == MXLND_REQ_RX) {
428                                 found = 1;
429                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
430                                                   &ctx->mxc_mxreq,
431                                                   &result);
432                                 if (mxret != MX_SUCCESS) {
433                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
434                                 }
435                                 if (result == 1) {
436                                         ctx->mxc_errno = -ECONNABORTED;
437                                         ctx->mxc_state = MXLND_CTX_CANCELED;
438                                         spin_unlock(&conn->mxk_lock);
439                                         spin_lock(&kmxlnd_data.kmx_conn_lock);
440                                         /* we may be holding the global lock,
441                                          * move to orphan list so that it can free it */
442                                         list_add_tail(&ctx->mxc_list,
443                                                       &kmxlnd_data.kmx_orphan_msgs);
444                                         count++;
445                                         spin_unlock(&kmxlnd_data.kmx_conn_lock);
446                                         spin_lock(&conn->mxk_lock);
447                                 }
448                                 break;
449                         }
450                 }
451                 spin_unlock(&conn->mxk_lock);
452         }
453         while (found);
454
455         return count;
456 }
457
458 int
459 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
460 {
461         int                     count   = 0;
462         struct list_head        *tmp    = NULL;
463
464         spin_lock(&conn->mxk_lock);
465         while (!list_empty(&conn->mxk_tx_free_queue) ||
466                !list_empty(&conn->mxk_tx_credit_queue)) {
467
468                 kmx_ctx_t       *tx     = NULL;
469
470                 if (!list_empty(&conn->mxk_tx_free_queue)) {
471                         tmp = &conn->mxk_tx_free_queue;
472                 } else {
473                         tmp = &conn->mxk_tx_credit_queue;
474                 }
475
476                 tx = list_entry(tmp->next, kmx_ctx_t, mxc_list);
477                 list_del_init(&tx->mxc_list);
478                 spin_unlock(&conn->mxk_lock);
479                 tx->mxc_errno = -ECONNABORTED;
480                 tx->mxc_state = MXLND_CTX_CANCELED;
481                 /* move to orphan list and then abort */
482                 spin_lock(&kmxlnd_data.kmx_conn_lock);
483                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
484                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
485                 count++;
486                 spin_lock(&conn->mxk_lock);
487         }
488         spin_unlock(&conn->mxk_lock);
489
490         return count;
491 }
492
493 void
494 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
495 {
496         u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
497                     (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
498
499         mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
500                   epa, match, NULL, NULL);
501         return;
502 }
503
504 /**
505  * mxlnd_conn_disconnect - shutdown a connection
506  * @conn - a kmx_conn pointer
507  * @mx_dis - call mx_disconnect()
508  * @send_bye - send peer a BYE msg
509  *
510  * This function sets the status to DISCONNECT, completes queued
511  * txs with failure, calls mx_disconnect, which will complete
512  * pending txs and matched rxs with failure.
513  */
514 void
515 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
516 {
517         mx_endpoint_addr_t      epa     = conn->mxk_epa;
518         int                     valid   = !mxlnd_endpoint_addr_null(epa);
519         int                     count   = 0;
520
521         spin_lock(&conn->mxk_lock);
522         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
523                 spin_unlock(&conn->mxk_lock);
524                 return;
525         }
526         mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
527         conn->mxk_timeout = 0;
528         spin_unlock(&conn->mxk_lock);
529
530         count = mxlnd_cancel_queued_txs(conn);
531         count += mxlnd_conn_cancel_pending_rxs(conn);
532
533         if (count)
534                 up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */
535
536         if (send_bye && valid &&
537             conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
538                 /* send a BYE to the peer */
539                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
540                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
541                 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
542                 /* wait to allow the peer to ack our message */
543                 mxlnd_sleep(msecs_to_jiffies(20));
544         }
545
546         if (atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
547                 unsigned long   last_msg        = 0;
548
549                 /* notify LNET that we are giving up on this peer */
550                 if (time_after(conn->mxk_last_rx, conn->mxk_last_tx))
551                         last_msg = conn->mxk_last_rx;
552                 else
553                         last_msg = conn->mxk_last_tx;
554
555                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
556
557                 if (mx_dis && valid &&
558                     (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
559                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
560         }
561         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
562
563         return;
564 }
565
566 /**
567  * mxlnd_conn_alloc - allocate and initialize a new conn struct
568  * @connp - address of a kmx_conn pointer
569  * @peer - owning kmx_peer
570  *
571  * Returns 0 on success and -ENOMEM on failure
572  */
573 int
574 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
575 {
576         int             i       = 0;
577         int             ret     = 0;
578         int             ipage   = 0;
579         int             offset  = 0;
580         void           *addr    = NULL;
581         kmx_conn_t     *conn    = NULL;
582         kmx_pages_t    *pages   = NULL;
583         struct page    *page    = NULL;
584         kmx_ctx_t      *rx      = NULL;
585
586         LASSERT(peer != NULL);
587
588         MXLND_ALLOC(conn, sizeof (*conn));
589         if (conn == NULL) {
590                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
591                 return -ENOMEM;
592         }
593         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
594
595         memset(conn, 0, sizeof(*conn));
596
597         ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
598         if (ret != 0) {
599                 CERROR("Can't allocate rx pages\n");
600                 MXLND_FREE(conn, sizeof(*conn));
601                 return -ENOMEM;
602         }
603         conn->mxk_rx_pages = pages;
604
605         MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
606         if (conn->mxk_rxs == NULL) {
607                 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
608                 mxlnd_free_pages(pages);
609                 MXLND_FREE(conn, sizeof(*conn));
610                 return -ENOMEM;
611         }
612
613         memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
614
615         conn->mxk_peer = peer;
616         INIT_LIST_HEAD(&conn->mxk_list);
617         INIT_LIST_HEAD(&conn->mxk_zombie);
618         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer
619                                                    and one for the caller */
620         if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
621                 u64     nic_id  = 0ULL;
622                 u32     ep_id   = 0;
623
624                 /* this is localhost, set the epa and status as up */
625                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
626                 conn->mxk_epa = kmxlnd_data.kmx_epa;
627                 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
628                 peer->mxp_reconnect_time = 0;
629                 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
630                 peer->mxp_nic_id = nic_id;
631                 peer->mxp_ep_id = ep_id;
632                 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
633                 conn->mxk_timeout = 0;
634         } else {
635                 /* conn->mxk_incarnation = 0 - will be set by peer */
636                 /* conn->mxk_sid = 0 - will be set by peer */
637                 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
638                 /* mxk_epa - to be set after mx_iconnect() */
639         }
640         spin_lock_init(&conn->mxk_lock);
641         /* conn->mxk_timeout = 0 */
642         /* conn->mxk_last_tx = 0 */
643         /* conn->mxk_last_rx = 0 */
644         INIT_LIST_HEAD(&conn->mxk_rx_idle);
645
646         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
647         /* mxk_outstanding = 0 */
648
649         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
650         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
651         /* conn->mxk_ntx_msgs = 0 */
652         /* conn->mxk_ntx_data = 0 */
653         /* conn->mxk_ntx_posted = 0 */
654         /* conn->mxk_data_posted = 0 */
655         INIT_LIST_HEAD(&conn->mxk_pending);
656
657         for (i = 0; i < MXLND_RX_MSGS(); i++) {
658
659                 rx = &conn->mxk_rxs[i];
660                 rx->mxc_type = MXLND_REQ_RX;
661                 INIT_LIST_HEAD(&rx->mxc_list);
662
663                 /* map mxc_msg to page */
664                 page = pages->mxg_pages[ipage];
665                 addr = page_address(page);
666                 LASSERT(addr != NULL);
667                 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
668                 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
669
670                 rx->mxc_conn = conn;
671                 rx->mxc_peer = peer;
672                 rx->mxc_nid = peer->mxp_nid;
673
674                 mxlnd_ctx_init(rx);
675
676                 offset += MXLND_MSG_SIZE;
677                 LASSERT (offset <= PAGE_SIZE);
678
679                 if (offset == PAGE_SIZE) {
680                         offset = 0;
681                         ipage++;
682                         LASSERT (ipage <= MXLND_TX_MSG_PAGES());
683                 }
684
685                 list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
686         }
687
688         *connp = conn;
689
690         mxlnd_peer_addref(peer);        /* add a ref for this conn */
691
692         /* add to front of peer's conns list */
693         list_add(&conn->mxk_list, &peer->mxp_conns);
694         peer->mxp_conn = conn;
695         return 0;
696 }
697
698 int
699 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
700 {
701         int             ret     = 0;
702         rwlock_t       *g_lock  = &kmxlnd_data.kmx_global_lock;
703
704         write_lock(g_lock);
705         ret = mxlnd_conn_alloc_locked(connp, peer);
706         write_unlock(g_lock);
707         return ret;
708 }
709
710 int
711 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
712 {
713         int             ret     = 0;
714         kmx_conn_t      *conn   = ctx->mxc_conn;
715
716         ctx->mxc_state = MXLND_CTX_PENDING;
717         if (conn != NULL) {
718                 spin_lock(&conn->mxk_lock);
719                 if (conn->mxk_status >= MXLND_CONN_INIT) {
720                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
721                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
722                                 conn->mxk_timeout = ctx->mxc_deadline;
723                         }
724                 } else {
725                         ctx->mxc_state = MXLND_CTX_COMPLETED;
726                         ret = -1;
727                 }
728                 spin_unlock(&conn->mxk_lock);
729         }
730         return ret;
731 }
732
733 int
734 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
735 {
736         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
737                 ctx->mxc_state == MXLND_CTX_COMPLETED);
738         if (ctx->mxc_state != MXLND_CTX_PENDING &&
739             ctx->mxc_state != MXLND_CTX_COMPLETED) {
740                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
741                        mxlnd_ctxstate_to_str(ctx->mxc_state));
742         }
743         ctx->mxc_state = MXLND_CTX_COMPLETED;
744         if (!list_empty(&ctx->mxc_list)) {
745                 kmx_conn_t      *conn = ctx->mxc_conn;
746                 kmx_ctx_t       *next = NULL;
747
748                 LASSERT(conn != NULL);
749                 spin_lock(&conn->mxk_lock);
750                 list_del_init(&ctx->mxc_list);
751                 conn->mxk_timeout = 0;
752                 if (!list_empty(&conn->mxk_pending)) {
753                         next = list_entry(conn->mxk_pending.next, kmx_ctx_t, mxc_list);
754                         conn->mxk_timeout = next->mxc_deadline;
755                 }
756                 spin_unlock(&conn->mxk_lock);
757         }
758         return 0;
759 }
760
761 /**
762  * mxlnd_peer_free - free the peer
763  * @peer - a kmx_peer pointer
764  *
765  * The calling function should decrement the rxs, drain the tx queues and
766  * remove the peer from the peers list first then destroy it.
767  */
768 void
769 mxlnd_peer_free(kmx_peer_t *peer)
770 {
771         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
772
773         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
774
775         if (!list_empty(&peer->mxp_list)) {
776                 /* assume we are locked */
777                 list_del_init(&peer->mxp_list);
778         }
779
780         MXLND_FREE(peer, sizeof (*peer));
781         atomic_dec(&kmxlnd_data.kmx_npeers);
782         return;
783 }
784
785 static int
786 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
787 {
788         int                     ret     = -EHOSTUNREACH;
789         unsigned char           *haddr  = NULL;
790         struct net_device       *dev    = NULL;
791         struct neighbour        *n      = NULL;
792         __be32                  dst_ip  = htonl(ip);
793
794         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
795         if (dev == NULL)
796                 return -ENODEV;
797
798         haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
799
800         n = neigh_lookup(&arp_tbl, &dst_ip, dev);
801         if (n) {
802                 n->used = jiffies;
803                 if (n->nud_state & NUD_VALID) {
804                         memcpy(haddr, n->ha, dev->addr_len);
805                         neigh_release(n);
806                         ret = 0;
807                 }
808         }
809
810         dev_put(dev);
811
812         return ret;
813 }
814
815
816 /* We only want the MAC address of the peer's Myricom NIC. We
817  * require that each node has the IPoMX interface (myriN) up.
818  * We will not pass any traffic over IPoMX, but it allows us
819  * to get the MAC address. */
820 static int
821 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
822 {
823         int                     ret     = 0;
824         int                     try     = 1;
825         int                     fatal   = 0;
826         u64                     tmp_id  = 0ULL;
827         cfs_socket_t            *sock   = NULL;
828
829         do {
830                 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
831                 ret = mxlnd_lookup_mac(ip, &tmp_id);
832                 if (ret == 0) {
833                         break;
834                 } else {
835                         /* not found, try to connect (force an arp) */
836                         ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
837                         if (ret == -ECONNREFUSED) {
838                                 /* peer is there, get the MAC address */
839                                 mxlnd_lookup_mac(ip, &tmp_id);
840                                 if (tmp_id != 0ULL)
841                                         ret = 0;
842                                 break;
843                         } else if (ret == -EHOSTUNREACH && try < tries) {
844                                 /* add a little backoff */
845                                 CDEBUG(D_NET, "sleeping for %d jiffies\n", HZ/4);
846                                 mxlnd_sleep(HZ/4);
847                         }
848                 }
849         } while (try++ < tries);
850         CDEBUG(D_NET, "done trying. ret = %d\n", ret);
851
852         if (tmp_id == 0ULL)
853                 ret = -EHOSTUNREACH;
854 #ifdef __LITTLE_ENDIAN
855         *nic_id = ___arch__swab64(tmp_id);
856 #else
857         *nic_id = tmp_id;
858 #endif
859         return ret;
860 }
861
862 /**
863  * mxlnd_peer_alloc - allocate and initialize a new peer struct
864  * @peerp - address of a kmx_peer pointer
865  * @nid - LNET node id
866  *
867  * Returns 0 on success and -ENOMEM on failure
868  */
869 int
870 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
871 {
872         int             ret     = 0;
873         u32             ip      = LNET_NIDADDR(nid);
874         kmx_peer_t     *peer    = NULL;
875
876         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
877
878         MXLND_ALLOC(peer, sizeof (*peer));
879         if (peer == NULL) {
880                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
881                 return -ENOMEM;
882         }
883         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
884
885         memset(peer, 0, sizeof(*peer));
886
887         INIT_LIST_HEAD(&peer->mxp_list);
888         peer->mxp_nid = nid;
889         /* peer->mxp_ni unused - may be used for multi-rail */
890         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
891
892         peer->mxp_board = board;
893         peer->mxp_ep_id = ep_id;
894         peer->mxp_nic_id = nic_id;
895
896         INIT_LIST_HEAD(&peer->mxp_conns);
897         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
898         if (ret != 0) {
899                 mxlnd_peer_decref(peer);
900                 return ret;
901         }
902         INIT_LIST_HEAD(&peer->mxp_tx_queue);
903
904         if (peer->mxp_nic_id != 0ULL)
905                 nic_id = peer->mxp_nic_id;
906
907         if (nic_id == 0ULL) {
908                 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
909                 if (ret == 0) {
910                         peer->mxp_nic_id = nic_id;
911                         mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
912                 }
913         }
914
915         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
916
917         /* peer->mxp_reconnect_time = 0 */
918         /* peer->mxp_incompatible = 0 */
919
920         *peerp = peer;
921         return 0;
922 }
923
924 static inline kmx_peer_t *
925 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
926 {
927         int             found   = 0;
928         int             hash    = 0;
929         kmx_peer_t      *peer   = NULL;
930
931         hash = mxlnd_nid_to_hash(nid);
932
933         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
934                 if (peer->mxp_nid == nid) {
935                         found = 1;
936                         mxlnd_peer_addref(peer);
937                         break;
938                 }
939         }
940         return (found ? peer : NULL);
941 }
942
943 static kmx_peer_t *
944 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
945 {
946         int             ret     = 0;
947         int             hash    = 0;
948         kmx_peer_t      *peer   = NULL;
949         kmx_peer_t      *old    = NULL;
950         rwlock_t        *g_lock = &kmxlnd_data.kmx_global_lock;
951
952         read_lock(g_lock);
953         peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
954
955         if ((peer && peer->mxp_conn) || /* found peer with conn or */
956             (!peer && !create)) {       /* did not find peer and do not create one */
957                 read_unlock(g_lock);
958                 return peer;
959         }
960
961         read_unlock(g_lock);
962
963         /* if peer but _not_ conn */
964         if (peer && !peer->mxp_conn) {
965                 if (create) {
966                         write_lock(g_lock);
967                         if (!peer->mxp_conn) { /* check again */
968                                 /* create the conn */
969                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
970                                 if (ret != 0) {
971                                         /* we tried, return the peer only.
972                                          * the caller needs to see if the conn exists */
973                                         CDEBUG(D_NETERROR, "%s: %s could not alloc conn\n",
974                                         __func__, libcfs_nid2str(peer->mxp_nid));
975                                 } else {
976                                         /* drop extra conn ref */
977                                         mxlnd_conn_decref(peer->mxp_conn);
978                                 }
979                         }
980                         write_unlock(g_lock);
981                 }
982                 return peer;
983         }
984
985         /* peer not found and we need to create one */
986         hash = mxlnd_nid_to_hash(nid);
987
988         /* create peer (and conn) */
989         /* adds conn ref for peer and one for this function */
990         ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
991                                *kmxlnd_tunables.kmx_ep_id, 0ULL);
992         if (ret != 0) /* no memory, peer is NULL */
993                 return NULL;
994
995         write_lock(g_lock);
996
997         /* look again */
998         old = mxlnd_find_peer_by_nid_locked(nid);
999         if (old) {
1000                 /* someone already created one */
1001                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1002                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1003                 mxlnd_peer_decref(peer);
1004                 peer = old;
1005         } else {
1006                 /* no other peer, use this one */
1007                 list_add_tail(&peer->mxp_list, &kmxlnd_data.kmx_peers[hash]);
1008                 atomic_inc(&kmxlnd_data.kmx_npeers);
1009                 mxlnd_peer_addref(peer);
1010                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1011         }
1012
1013         write_unlock(g_lock);
1014
1015         return peer;
1016 }
1017
1018 static inline int
1019 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1020 {
1021         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1022                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1023                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1024                 tx->mxc_msg_type == MXLND_MSG_NOOP);
1025 }
1026
1027 /**
1028  * mxlnd_init_msg - set type and number of bytes
1029  * @msg - msg pointer
1030  * @type - of message
1031  * @body_nob - bytes in msg body
1032  */
1033 static inline void
1034 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1035 {
1036         msg->mxm_type = type;
1037         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
1038 }
1039
1040 static inline void
1041 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1042 {
1043         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
1044         kmx_msg_t       *msg    = NULL;
1045
1046         LASSERT (tx != NULL);
1047         LASSERT (nob <= MXLND_MSG_SIZE);
1048
1049         tx->mxc_nid = nid;
1050         /* tx->mxc_peer should have already been set if we know it */
1051         tx->mxc_msg_type = type;
1052         tx->mxc_nseg = 1;
1053         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1054         tx->mxc_seg.segment_length = nob;
1055         tx->mxc_pin_type = MX_PIN_PHYSICAL;
1056
1057         msg = tx->mxc_msg;
1058         msg->mxm_type = type;
1059         msg->mxm_nob  = nob;
1060
1061         return;
1062 }
1063
1064 static inline __u32
1065 mxlnd_cksum (void *ptr, int nob)
1066 {
1067         char  *c  = ptr;
1068         __u32  sum = 0;
1069
1070         while (nob-- > 0)
1071                 sum = ((sum << 1) | (sum >> 31)) + *c++;
1072
1073         /* ensure I don't return 0 (== no checksum) */
1074         return (sum == 0) ? 1 : sum;
1075 }
1076
1077 /**
1078  * mxlnd_pack_msg_locked - complete msg info
1079  * @tx - msg to send
1080  */
1081 static inline void
1082 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1083 {
1084         kmx_msg_t       *msg    = tx->mxc_msg;
1085
1086         /* type and nob should already be set in init_msg() */
1087         msg->mxm_magic    = MXLND_MSG_MAGIC;
1088         msg->mxm_version  = MXLND_MSG_VERSION;
1089         /*   mxm_type */
1090         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1091          * return credits as well */
1092         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1093             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1094                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
1095                 tx->mxc_conn->mxk_outstanding = 0;
1096         } else {
1097                 msg->mxm_credits  = 0;
1098         }
1099         /*   mxm_nob */
1100         msg->mxm_cksum    = 0;
1101         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
1102         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1103         msg->mxm_dstnid   = tx->mxc_nid;
1104         /* if it is a new peer, the dststamp will be 0 */
1105         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1106
1107         if (*kmxlnd_tunables.kmx_cksum) {
1108                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1109         }
1110 }
1111
1112 int
1113 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1114 {
1115         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
1116         __u32     msg_cksum     = 0;
1117         int       flip          = 0;
1118         int       msg_nob       = 0;
1119
1120         /* 6 bytes are enough to have received magic + version */
1121         if (nob < 6) {
1122                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
1123                 return -EPROTO;
1124         }
1125
1126         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1127                 flip = 0;
1128         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1129                 flip = 1;
1130         } else {
1131                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
1132                 return -EPROTO;
1133         }
1134
1135         if (msg->mxm_version !=
1136             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1137                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
1138                 return -EPROTO;
1139         }
1140
1141         if (nob < hdr_size) {
1142                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
1143                 return -EPROTO;
1144         }
1145
1146         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1147         if (msg_nob > nob) {
1148                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
1149                 return -EPROTO;
1150         }
1151
1152         /* checksum must be computed with mxm_cksum zero and BEFORE anything
1153          * gets flipped */
1154         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1155         msg->mxm_cksum = 0;
1156         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1157                 CDEBUG(D_NETERROR, "Bad checksum\n");
1158                 return -EPROTO;
1159         }
1160         msg->mxm_cksum = msg_cksum;
1161
1162         if (flip) {
1163                 /* leave magic unflipped as a clue to peer endianness */
1164                 __swab16s(&msg->mxm_version);
1165                 CLASSERT (sizeof(msg->mxm_type) == 1);
1166                 CLASSERT (sizeof(msg->mxm_credits) == 1);
1167                 msg->mxm_nob = msg_nob;
1168                 __swab64s(&msg->mxm_srcnid);
1169                 __swab64s(&msg->mxm_srcstamp);
1170                 __swab64s(&msg->mxm_dstnid);
1171                 __swab64s(&msg->mxm_dststamp);
1172         }
1173
1174         if (msg->mxm_srcnid == LNET_NID_ANY) {
1175                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1176                 return -EPROTO;
1177         }
1178
1179         switch (msg->mxm_type) {
1180         default:
1181                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
1182                 return -EPROTO;
1183
1184         case MXLND_MSG_NOOP:
1185                 break;
1186
1187         case MXLND_MSG_EAGER:
1188                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1189                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
1190                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1191                         return -EPROTO;
1192                 }
1193                 break;
1194
1195         case MXLND_MSG_PUT_REQ:
1196                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1197                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
1198                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1199                         return -EPROTO;
1200                 }
1201                 if (flip)
1202                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1203                 break;
1204
1205         case MXLND_MSG_PUT_ACK:
1206                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1207                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1208                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1209                         return -EPROTO;
1210                 }
1211                 if (flip) {
1212                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1213                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1214                 }
1215                 break;
1216
1217         case MXLND_MSG_GET_REQ:
1218                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1219                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1220                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1221                         return -EPROTO;
1222                 }
1223                 if (flip) {
1224                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1225                 }
1226                 break;
1227
1228         case MXLND_MSG_CONN_REQ:
1229         case MXLND_MSG_CONN_ACK:
1230                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1231                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1232                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1233                         return -EPROTO;
1234                 }
1235                 if (flip) {
1236                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1237                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1238                 }
1239                 break;
1240         }
1241         return 0;
1242 }
1243
1244
1245 /**
1246  * mxlnd_recv_msg
1247  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1248  * @rx
1249  * @msg_type
1250  * @cookie
1251  * @length - length of incoming message
1252  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1253  *
1254  * The caller gets the rx and sets nid, peer and conn if known.
1255  *
1256  * Returns 0 on success and -1 on failure
1257  */
1258 int
1259 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1260 {
1261         int             ret     = 0;
1262         mx_return_t     mxret   = MX_SUCCESS;
1263         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1264
1265         rx->mxc_msg_type = msg_type;
1266         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1267         rx->mxc_cookie = cookie;
1268         /* rx->mxc_match may already be set */
1269         /* rx->mxc_seg.segment_ptr is already set */
1270         rx->mxc_seg.segment_length = length;
1271         ret = mxlnd_q_pending_ctx(rx);
1272         if (ret == -1) {
1273                 /* the caller is responsible for calling conn_decref() if needed */
1274                 return -1;
1275         }
1276         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1277                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1278         if (mxret != MX_SUCCESS) {
1279                 mxlnd_deq_pending_ctx(rx);
1280                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1281                                    mx_strerror(mxret), (int) mxret);
1282                 return -1;
1283         }
1284         return 0;
1285 }
1286
1287
1288 /**
1289  * mxlnd_unexpected_recv - this is the callback function that will handle
1290  *                         unexpected receives
1291  * @context - NULL, ignore
1292  * @source - the peer's mx_endpoint_addr_t
1293  * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1294  * @length - length of incoming message
1295  * @data_if_available - used for CONN_[REQ|ACK]
1296  *
1297  * If it is an eager-sized msg, we will call recv_msg() with the actual
1298  * length. If it is a large message, we will call recv_msg() with a
1299  * length of 0 bytes to drop it because we should never have a large,
1300  * unexpected message.
1301  *
1302  * NOTE - The MX library blocks until this function completes. Make it as fast as
1303  * possible. DO NOT allocate memory which can block!
1304  *
1305  * If we cannot get a rx or the conn is closed, drop the message on the floor
1306  * (i.e. recv 0 bytes and ignore).
1307  */
1308 mx_unexp_handler_action_t
1309 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1310                  uint64_t match_value, uint32_t length, void *data_if_available)
1311 {
1312         int             ret             = 0;
1313         kmx_ctx_t       *rx             = NULL;
1314         mx_ksegment_t   seg;
1315         u8              msg_type        = 0;
1316         u8              error           = 0;
1317         u64             cookie          = 0ULL;
1318         kmx_conn_t      *conn           = NULL;
1319         kmx_peer_t      *peer           = NULL;
1320         u64             nic_id          = 0ULL;
1321         u32             ep_id           = 0;
1322         u32             sid             = 0;
1323
1324         /* TODO this will change to the net struct */
1325         if (context != NULL) {
1326                 CDEBUG(D_NETERROR, "non-NULL context\n");
1327         }
1328
1329 #if MXLND_DEBUG
1330         CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1331 #endif
1332
1333         mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1334         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1335         read_lock(&kmxlnd_data.kmx_global_lock);
1336         mx_get_endpoint_addr_context(source, (void **) &conn);
1337         if (conn) {
1338                 mxlnd_conn_addref(conn); /* add ref for this function */
1339                 peer = conn->mxk_peer;
1340         }
1341         read_unlock(&kmxlnd_data.kmx_global_lock);
1342
1343         if (msg_type == MXLND_MSG_BYE) {
1344                 if (conn) {
1345                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1346                                         libcfs_nid2str(peer->mxp_nid));
1347                         mxlnd_conn_disconnect(conn, 1, 0);
1348                         mxlnd_conn_decref(conn); /* drop ref taken above */
1349                 }
1350                 return MX_RECV_FINISHED;
1351         }
1352
1353         if (msg_type == MXLND_MSG_CONN_REQ) {
1354                 kmx_connparams_t       *cp      = NULL;
1355                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1356                                                   sizeof(kmx_connreq_msg_t);
1357
1358                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1359                 if (unlikely(length != expected || !data_if_available)) {
1360                         CDEBUG(D_NETERROR, "received invalid CONN_REQ from %llx "
1361                                "length=%d (expected %d)\n", nic_id, length, expected);
1362                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1363                         return MX_RECV_FINISHED;
1364                 }
1365
1366                 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1367                                          conn, peer, data_if_available);
1368                 if (unlikely(ret != 0)) {
1369                         CDEBUG(D_NETERROR, "unable to alloc CONN_REQ from %llx:%d\n",
1370                                         nic_id, ep_id);
1371                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1372                         return MX_RECV_FINISHED;
1373                 }
1374                 spin_lock(&kmxlnd_data.kmx_conn_lock);
1375                 list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1376                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1377                 up(&kmxlnd_data.kmx_conn_sem);
1378                 return MX_RECV_FINISHED;
1379         }
1380         if (msg_type == MXLND_MSG_CONN_ACK) {
1381                 kmx_connparams_t  *cp           = NULL;
1382                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1383                                                   sizeof(kmx_connreq_msg_t);
1384
1385                 LASSERT(conn);
1386                 if (unlikely(error != 0)) {
1387                         CDEBUG(D_NETERROR, "received CONN_ACK from %s "
1388                                "with error -%d\n",
1389                                libcfs_nid2str(peer->mxp_nid), (int) error);
1390                         mxlnd_conn_disconnect(conn, 1, 0);
1391                 } else if (unlikely(length != expected || !data_if_available)) {
1392                         CDEBUG(D_NETERROR, "received %s CONN_ACK from %s "
1393                                "length=%d (expected %d)\n",
1394                                data_if_available ? "short" : "missing",
1395                                libcfs_nid2str(peer->mxp_nid), length, expected);
1396                         mxlnd_conn_disconnect(conn, 1, 1);
1397                 } else {
1398                         /* peer is ready for messages */
1399                         ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1400                                          conn, peer, data_if_available);
1401                         if (unlikely(ret != 0)) {
1402                                 CDEBUG(D_NETERROR, "unable to alloc kmx_connparams_t"
1403                                                " from %llx:%d\n", nic_id, ep_id);
1404                                 mxlnd_conn_disconnect(conn, 1, 1);
1405                         } else {
1406                                 spin_lock(&kmxlnd_data.kmx_conn_lock);
1407                                 list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1408                                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1409                                 up(&kmxlnd_data.kmx_conn_sem);
1410                         }
1411                 }
1412                 mxlnd_conn_decref(conn); /* drop ref taken above */
1413
1414                 return MX_RECV_FINISHED;
1415         }
1416
1417         /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1418
1419         LASSERT(peer != NULL && conn != NULL);
1420
1421         rx = mxlnd_get_idle_rx(conn);
1422         if (rx != NULL) {
1423                 if (length <= MXLND_MSG_SIZE) {
1424                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1425                 } else {
1426                         CDEBUG(D_NETERROR, "unexpected large receive with "
1427                                            "match_value=0x%llx length=%d\n",
1428                                            match_value, length);
1429                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1430                 }
1431
1432                 if (ret == 0) {
1433                         /* hold conn ref until rx completes */
1434                         rx->mxc_conn = conn;
1435                         rx->mxc_peer = peer;
1436                         rx->mxc_nid = peer->mxp_nid;
1437                 } else {
1438                         CDEBUG(D_NETERROR, "could not post receive\n");
1439                         mxlnd_put_idle_rx(rx);
1440                 }
1441         }
1442
1443         /* Encountered error, drop incoming message on the floor */
1444         /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1445          * uses the standard code path and acks the sender normally */
1446
1447         if (rx == NULL || ret != 0) {
1448                 mxlnd_conn_decref(conn); /* drop ref taken above */
1449                 if (rx == NULL) {
1450                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx"
1451                                " 0x%llx from %s\n", match_value,
1452                                libcfs_nid2str(peer->mxp_nid));
1453                 } else {
1454                         /* ret != 0 */
1455                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1456                 }
1457                 seg.segment_ptr = 0ULL;
1458                 seg.segment_length = 0;
1459                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1460                           match_value, ~0ULL, NULL, NULL);
1461         }
1462
1463         return MX_RECV_CONTINUE;
1464 }
1465
1466
1467 int
1468 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1469 {
1470         int              i      = 0;
1471         int              ret    = -ENOENT;
1472         kmx_peer_t      *peer   = NULL;
1473
1474         read_lock(&kmxlnd_data.kmx_global_lock);
1475         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1476                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) {
1477                         if (index-- == 0) {
1478                                 *nidp = peer->mxp_nid;
1479                                 *count = atomic_read(&peer->mxp_refcount);
1480                                 ret = 0;
1481                                 break;
1482                         }
1483                 }
1484         }
1485         read_unlock(&kmxlnd_data.kmx_global_lock);
1486
1487         return ret;
1488 }
1489
1490 void
1491 mxlnd_del_peer_locked(kmx_peer_t *peer)
1492 {
1493         if (peer->mxp_conn) {
1494                 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1495         } else {
1496                 list_del_init(&peer->mxp_list); /* remove from the global list */
1497                 mxlnd_peer_decref(peer); /* drop global list ref */
1498         }
1499         return;
1500 }
1501
1502 int
1503 mxlnd_del_peer(lnet_nid_t nid)
1504 {
1505         int             i       = 0;
1506         int             ret     = 0;
1507         kmx_peer_t      *peer   = NULL;
1508         kmx_peer_t      *next   = NULL;
1509
1510         if (nid != LNET_NID_ANY) {
1511                 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1512         }
1513         write_lock(&kmxlnd_data.kmx_global_lock);
1514         if (nid != LNET_NID_ANY) {
1515                 if (peer == NULL) {
1516                         ret = -ENOENT;
1517                 } else {
1518                         mxlnd_peer_decref(peer); /* and drops it */
1519                         mxlnd_del_peer_locked(peer);
1520                 }
1521         } else { /* LNET_NID_ANY */
1522                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1523                         list_for_each_entry_safe(peer, next,
1524                                                  &kmxlnd_data.kmx_peers[i], mxp_list) {
1525                                 mxlnd_del_peer_locked(peer);
1526                         }
1527                 }
1528         }
1529         write_unlock(&kmxlnd_data.kmx_global_lock);
1530
1531         return ret;
1532 }
1533
1534 kmx_conn_t *
1535 mxlnd_get_conn_by_idx(int index)
1536 {
1537         int              i      = 0;
1538         kmx_peer_t      *peer   = NULL;
1539         kmx_conn_t      *conn   = NULL;
1540
1541         read_lock(&kmxlnd_data.kmx_global_lock);
1542         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1543                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) {
1544                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1545                                 if (index-- > 0) {
1546                                         continue;
1547                                 }
1548
1549                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1550                                 read_unlock(&kmxlnd_data.kmx_global_lock);
1551                                 return conn;
1552                         }
1553                 }
1554         }
1555         read_unlock(&kmxlnd_data.kmx_global_lock);
1556
1557         return NULL;
1558 }
1559
1560 void
1561 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1562 {
1563         kmx_conn_t      *conn   = NULL;
1564         kmx_conn_t      *next   = NULL;
1565
1566         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1567                 mxlnd_conn_disconnect(conn, 0, 1);
1568
1569         return;
1570 }
1571
1572 int
1573 mxlnd_close_matching_conns(lnet_nid_t nid)
1574 {
1575         int             i       = 0;
1576         int             ret     = 0;
1577         kmx_peer_t      *peer   = NULL;
1578
1579         write_lock(&kmxlnd_data.kmx_global_lock);
1580         if (nid != LNET_NID_ANY) {
1581                 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1582                 if (peer == NULL) {
1583                         ret = -ENOENT;
1584                 } else {
1585                         mxlnd_close_matching_conns_locked(peer);
1586                         mxlnd_peer_decref(peer); /* and drops it here */
1587                 }
1588         } else { /* LNET_NID_ANY */
1589                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1590                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list)
1591                                 mxlnd_close_matching_conns_locked(peer);
1592                 }
1593         }
1594         write_unlock(&kmxlnd_data.kmx_global_lock);
1595
1596         return ret;
1597 }
1598
1599 /**
1600  * mxlnd_ctl - modify MXLND parameters
1601  * @ni - LNET interface handle
1602  * @cmd - command to change
1603  * @arg - the ioctl data
1604  */
1605 int
1606 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1607 {
1608         struct libcfs_ioctl_data *data  = arg;
1609         int                       ret   = -EINVAL;
1610
1611         LASSERT (ni == kmxlnd_data.kmx_ni);
1612
1613         switch (cmd) {
1614         case IOC_LIBCFS_GET_PEER: {
1615                 lnet_nid_t      nid     = 0;
1616                 int             count   = 0;
1617
1618                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1619                 data->ioc_nid    = nid;
1620                 data->ioc_count  = count;
1621                 break;
1622         }
1623         case IOC_LIBCFS_DEL_PEER: {
1624                 ret = mxlnd_del_peer(data->ioc_nid);
1625                 break;
1626         }
1627         case IOC_LIBCFS_GET_CONN: {
1628                 kmx_conn_t      *conn = NULL;
1629
1630                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1631                 if (conn == NULL) {
1632                         ret = -ENOENT;
1633                 } else {
1634                         ret = 0;
1635                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1636                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1637                 }
1638                 break;
1639         }
1640         case IOC_LIBCFS_CLOSE_CONNECTION: {
1641                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1642                 break;
1643         }
1644         default:
1645                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1646                 break;
1647         }
1648
1649         return ret;
1650 }
1651
1652 /**
1653  * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1654  * @tx
1655  *
1656  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1657  */
1658 void
1659 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1660 {
1661         u8              msg_type        = tx->mxc_msg_type;
1662         kmx_conn_t      *conn           = tx->mxc_conn;
1663
1664         LASSERT (msg_type != 0);
1665         LASSERT (tx->mxc_nid != 0);
1666         LASSERT (tx->mxc_peer != NULL);
1667         LASSERT (tx->mxc_conn != NULL);
1668
1669         tx->mxc_incarnation = conn->mxk_incarnation;
1670
1671         if (msg_type != MXLND_MSG_PUT_DATA &&
1672             msg_type != MXLND_MSG_GET_DATA) {
1673                 /* msg style tx */
1674                 if (mxlnd_tx_requires_credit(tx)) {
1675                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1676                         conn->mxk_ntx_msgs++;
1677                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1678                            msg_type == MXLND_MSG_CONN_ACK) {
1679                         /* put conn msgs at the front of the queue */
1680                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1681                 } else {
1682                         /* PUT_ACK, PUT_NAK */
1683                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1684                         conn->mxk_ntx_msgs++;
1685                 }
1686         } else {
1687                 /* data style tx */
1688                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1689                 conn->mxk_ntx_data++;
1690         }
1691
1692         return;
1693 }
1694
1695 /**
1696  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1697  * @tx
1698  *
1699  * Add the tx to the peer's msg or data queue
1700  */
1701 static inline void
1702 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1703 {
1704         LASSERT(tx->mxc_peer != NULL);
1705         LASSERT(tx->mxc_conn != NULL);
1706         spin_lock(&tx->mxc_conn->mxk_lock);
1707         mxlnd_peer_queue_tx_locked(tx);
1708         spin_unlock(&tx->mxc_conn->mxk_lock);
1709
1710         return;
1711 }
1712
1713 /**
1714  * mxlnd_queue_tx - add the tx to the global tx queue
1715  * @tx
1716  *
1717  * Add the tx to the global queue and up the tx_queue_sem
1718  */
1719 void
1720 mxlnd_queue_tx(kmx_ctx_t *tx)
1721 {
1722         kmx_peer_t *peer   = tx->mxc_peer;
1723         LASSERT (tx->mxc_nid != 0);
1724
1725         if (peer != NULL) {
1726                 if (peer->mxp_incompatible &&
1727                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1728                         /* let this fail now */
1729                         tx->mxc_errno = -ECONNABORTED;
1730                         mxlnd_conn_decref(peer->mxp_conn);
1731                         mxlnd_put_idle_tx(tx);
1732                         return;
1733                 }
1734                 if (tx->mxc_conn == NULL) {
1735                         int             ret     = 0;
1736                         kmx_conn_t      *conn   = NULL;
1737
1738                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1739                         if (ret != 0) {
1740                                 tx->mxc_errno = ret;
1741                                 mxlnd_put_idle_tx(tx);
1742                                 goto done;
1743                         }
1744                         tx->mxc_conn = conn;
1745                         mxlnd_peer_decref(peer); /* and takes it from peer */
1746                 }
1747                 LASSERT(tx->mxc_conn != NULL);
1748                 mxlnd_peer_queue_tx(tx);
1749                 mxlnd_check_sends(peer);
1750         } else {
1751                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1752                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1753                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1754                 up(&kmxlnd_data.kmx_tx_queue_sem);
1755         }
1756 done:
1757         return;
1758 }
1759
1760 int
1761 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1762 {
1763         int             i                       = 0;
1764         int             sum                     = 0;
1765         int             old_sum                 = 0;
1766         int             nseg                    = 0;
1767         int             first_iov               = -1;
1768         int             first_iov_offset        = 0;
1769         int             first_found             = 0;
1770         int             last_iov                = -1;
1771         int             last_iov_length         = 0;
1772         mx_ksegment_t  *seg                     = NULL;
1773
1774         if (niov == 0) return 0;
1775         LASSERT(iov != NULL);
1776
1777         for (i = 0; i < niov; i++) {
1778                 sum = old_sum + (u32) iov[i].iov_len;
1779                 if (!first_found && (sum > offset)) {
1780                         first_iov = i;
1781                         first_iov_offset = offset - old_sum;
1782                         first_found = 1;
1783                         sum = (u32) iov[i].iov_len - first_iov_offset;
1784                         old_sum = 0;
1785                 }
1786                 if (sum >= nob) {
1787                         last_iov = i;
1788                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1789                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1790                         break;
1791                 }
1792                 old_sum = sum;
1793         }
1794         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1795         nseg = last_iov - first_iov + 1;
1796         LASSERT(nseg > 0);
1797
1798         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1799         if (seg == NULL) {
1800                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1801                 return -1;
1802         }
1803         memset(seg, 0, nseg * sizeof(*seg));
1804         ctx->mxc_nseg = nseg;
1805         sum = 0;
1806         for (i = 0; i < nseg; i++) {
1807                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1808                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1809                 if (i == 0) {
1810                         seg[i].segment_ptr += (u64) first_iov_offset;
1811                         seg[i].segment_length -= (u32) first_iov_offset;
1812                 }
1813                 if (i == (nseg - 1)) {
1814                         seg[i].segment_length = (u32) last_iov_length;
1815                 }
1816                 sum += seg[i].segment_length;
1817         }
1818         ctx->mxc_seg_list = seg;
1819         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1820 #ifdef MX_PIN_FULLPAGES
1821         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1822 #endif
1823         LASSERT(nob == sum);
1824         return 0;
1825 }
1826
1827 int
1828 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1829 {
1830         int             i                       = 0;
1831         int             sum                     = 0;
1832         int             old_sum                 = 0;
1833         int             nseg                    = 0;
1834         int             first_kiov              = -1;
1835         int             first_kiov_offset       = 0;
1836         int             first_found             = 0;
1837         int             last_kiov               = -1;
1838         int             last_kiov_length        = 0;
1839         mx_ksegment_t  *seg                     = NULL;
1840
1841         if (niov == 0) return 0;
1842         LASSERT(kiov != NULL);
1843
1844         for (i = 0; i < niov; i++) {
1845                 sum = old_sum + kiov[i].kiov_len;
1846                 if (i == 0) sum -= kiov[i].kiov_offset;
1847                 if (!first_found && (sum > offset)) {
1848                         first_kiov = i;
1849                         first_kiov_offset = offset - old_sum;
1850                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1851                         first_found = 1;
1852                         sum = kiov[i].kiov_len - first_kiov_offset;
1853                         old_sum = 0;
1854                 }
1855                 if (sum >= nob) {
1856                         last_kiov = i;
1857                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1858                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1859                         break;
1860                 }
1861                 old_sum = sum;
1862         }
1863         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1864         nseg = last_kiov - first_kiov + 1;
1865         LASSERT(nseg > 0);
1866
1867         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1868         if (seg == NULL) {
1869                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1870                 return -1;
1871         }
1872         memset(seg, 0, niov * sizeof(*seg));
1873         ctx->mxc_nseg = niov;
1874         sum = 0;
1875         for (i = 0; i < niov; i++) {
1876                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1877                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1878                 if (i == 0) {
1879                         seg[i].segment_ptr += (u64) first_kiov_offset;
1880                         /* we have to add back the original kiov_offset */
1881                         seg[i].segment_length -= first_kiov_offset +
1882                                                  kiov[first_kiov].kiov_offset;
1883                 }
1884                 if (i == (nseg - 1)) {
1885                         seg[i].segment_length = last_kiov_length;
1886                 }
1887                 sum += seg[i].segment_length;
1888         }
1889         ctx->mxc_seg_list = seg;
1890         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1891 #ifdef MX_PIN_FULLPAGES
1892         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1893 #endif
1894         LASSERT(nob == sum);
1895         return 0;
1896 }
1897
1898 void
1899 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1900 {
1901         LASSERT(type == MXLND_MSG_PUT_ACK);
1902         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1903         tx->mxc_cookie = cookie;
1904         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1905         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1906         tx->mxc_match = mxlnd_create_match(tx, status);
1907
1908         mxlnd_queue_tx(tx);
1909 }
1910
1911
1912 /**
1913  * mxlnd_send_data - get tx, map [k]iov, queue tx
1914  * @ni
1915  * @lntmsg
1916  * @peer
1917  * @msg_type
1918  * @cookie
1919  *
1920  * This setups the DATA send for PUT or GET.
1921  *
1922  * On success, it queues the tx, on failure it calls lnet_finalize()
1923  */
1924 void
1925 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1926 {
1927         int                     ret     = 0;
1928         lnet_process_id_t       target  = lntmsg->msg_target;
1929         unsigned int            niov    = lntmsg->msg_niov;
1930         struct iovec           *iov     = lntmsg->msg_iov;
1931         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1932         unsigned int            offset  = lntmsg->msg_offset;
1933         unsigned int            nob     = lntmsg->msg_len;
1934         kmx_ctx_t              *tx      = NULL;
1935
1936         LASSERT(lntmsg != NULL);
1937         LASSERT(peer != NULL);
1938         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1939         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1940
1941         tx = mxlnd_get_idle_tx();
1942         if (tx == NULL) {
1943                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1944                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1945                         libcfs_nid2str(target.nid));
1946                 goto failed_0;
1947         }
1948         tx->mxc_nid = target.nid;
1949         /* NOTE called when we have a ref on the conn, get one for this tx */
1950         mxlnd_conn_addref(peer->mxp_conn);
1951         tx->mxc_peer = peer;
1952         tx->mxc_conn = peer->mxp_conn;
1953         tx->mxc_msg_type = msg_type;
1954         tx->mxc_lntmsg[0] = lntmsg;
1955         tx->mxc_cookie = cookie;
1956         tx->mxc_match = mxlnd_create_match(tx, 0);
1957
1958         /* This setups up the mx_ksegment_t to send the DATA payload  */
1959         if (nob == 0) {
1960                 /* do not setup the segments */
1961                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1962                                    "to %s?\n", libcfs_nid2str(target.nid));
1963                 ret = 0;
1964         } else if (kiov == NULL) {
1965                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1966         } else {
1967                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1968         }
1969         if (ret != 0) {
1970                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1971                                    libcfs_nid2str(target.nid));
1972                 tx->mxc_errno = -EIO;
1973                 goto failed_1;
1974         }
1975         mxlnd_queue_tx(tx);
1976         return;
1977
1978 failed_1:
1979         mxlnd_conn_decref(peer->mxp_conn);
1980         mxlnd_put_idle_tx(tx);
1981         return;
1982
1983 failed_0:
1984         CDEBUG(D_NETERROR, "no tx avail\n");
1985         lnet_finalize(ni, lntmsg, -EIO);
1986         return;
1987 }
1988
1989 /**
1990  * mxlnd_recv_data - map [k]iov, post rx
1991  * @ni
1992  * @lntmsg
1993  * @rx
1994  * @msg_type
1995  * @cookie
1996  *
1997  * This setups the DATA receive for PUT or GET.
1998  *
1999  * On success, it returns 0, on failure it returns -1
2000  */
2001 int
2002 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2003 {
2004         int                     ret     = 0;
2005         lnet_process_id_t       target  = lntmsg->msg_target;
2006         unsigned int            niov    = lntmsg->msg_niov;
2007         struct iovec           *iov     = lntmsg->msg_iov;
2008         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
2009         unsigned int            offset  = lntmsg->msg_offset;
2010         unsigned int            nob     = lntmsg->msg_len;
2011         mx_return_t             mxret   = MX_SUCCESS;
2012         u64                     mask    = ~(MXLND_ERROR_MASK);
2013
2014         /* above assumes MXLND_MSG_PUT_DATA */
2015         if (msg_type == MXLND_MSG_GET_DATA) {
2016                 niov = lntmsg->msg_md->md_niov;
2017                 iov = lntmsg->msg_md->md_iov.iov;
2018                 kiov = lntmsg->msg_md->md_iov.kiov;
2019                 offset = 0;
2020                 nob = lntmsg->msg_md->md_length;
2021         }
2022
2023         LASSERT(lntmsg != NULL);
2024         LASSERT(rx != NULL);
2025         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2026         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2027
2028         rx->mxc_msg_type = msg_type;
2029         rx->mxc_state = MXLND_CTX_PENDING;
2030         rx->mxc_nid = target.nid;
2031         /* if posting a GET_DATA, we may not yet know the peer */
2032         if (rx->mxc_peer != NULL) {
2033                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2034         }
2035         rx->mxc_lntmsg[0] = lntmsg;
2036         rx->mxc_cookie = cookie;
2037         rx->mxc_match = mxlnd_create_match(rx, 0);
2038         /* This setups up the mx_ksegment_t to receive the DATA payload  */
2039         if (kiov == NULL) {
2040                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2041         } else {
2042                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2043         }
2044         if (msg_type == MXLND_MSG_GET_DATA) {
2045                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2046                 if (rx->mxc_lntmsg[1] == NULL) {
2047                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
2048                                            libcfs_nid2str(target.nid));
2049                         ret = -1;
2050                 }
2051         }
2052         if (ret != 0) {
2053                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
2054                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2055                        libcfs_nid2str(target.nid));
2056                 return -1;
2057         }
2058         ret = mxlnd_q_pending_ctx(rx);
2059         if (ret == -1) {
2060                 return -1;
2061         }
2062         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2063         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2064                           rx->mxc_seg_list, rx->mxc_nseg,
2065                           rx->mxc_pin_type, rx->mxc_match,
2066                           mask, (void *) rx,
2067                           &rx->mxc_mxreq);
2068         if (mxret != MX_SUCCESS) {
2069                 if (rx->mxc_conn != NULL) {
2070                         mxlnd_deq_pending_ctx(rx);
2071                 }
2072                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
2073                                    (int) mxret, libcfs_nid2str(target.nid));
2074                 return -1;
2075         }
2076
2077         return 0;
2078 }
2079
2080 /**
2081  * mxlnd_send - the LND required send function
2082  * @ni
2083  * @private
2084  * @lntmsg
2085  *
2086  * This must not block. Since we may not have a peer struct for the receiver,
2087  * it will append send messages on a global tx list. We will then up the
2088  * tx_queued's semaphore to notify it of the new send.
2089  */
2090 int
2091 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2092 {
2093         int                     ret             = 0;
2094         int                     type            = lntmsg->msg_type;
2095         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
2096         lnet_process_id_t       target          = lntmsg->msg_target;
2097         lnet_nid_t              nid             = target.nid;
2098         int                     target_is_router = lntmsg->msg_target_is_router;
2099         int                     routing         = lntmsg->msg_routing;
2100         unsigned int            payload_niov    = lntmsg->msg_niov;
2101         struct iovec           *payload_iov     = lntmsg->msg_iov;
2102         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
2103         unsigned int            payload_offset  = lntmsg->msg_offset;
2104         unsigned int            payload_nob     = lntmsg->msg_len;
2105         kmx_ctx_t              *tx              = NULL;
2106         kmx_msg_t              *txmsg           = NULL;
2107         kmx_ctx_t              *rx              = (kmx_ctx_t *) private; /* for REPLY */
2108         kmx_ctx_t              *rx_data         = NULL;
2109         kmx_conn_t             *conn            = NULL;
2110         int                     nob             = 0;
2111         uint32_t                length          = 0;
2112         kmx_peer_t             *peer            = NULL;
2113         rwlock_t               *g_lock          = &kmxlnd_data.kmx_global_lock;
2114
2115         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2116                        payload_nob, payload_niov, libcfs_id2str(target));
2117
2118         LASSERT (payload_nob == 0 || payload_niov > 0);
2119         LASSERT (payload_niov <= LNET_MAX_IOV);
2120         /* payload is either all vaddrs or all pages */
2121         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2122
2123         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2124
2125         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2126          * to a new peer, so create one if not found */
2127         peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2128         if (peer == NULL || peer->mxp_conn == NULL) {
2129                 /* we could not find it nor could we create one or
2130                  * one exists but we cannot create a conn,
2131                  * fail this message */
2132                 if (peer) {
2133                         /* found peer without conn, drop ref taken above */
2134                         LASSERT(peer->mxp_conn == NULL);
2135                         mxlnd_peer_decref(peer);
2136                 }
2137                 return -ENOMEM;
2138         }
2139
2140         /* we have a peer with a conn */
2141
2142         if (unlikely(peer->mxp_incompatible)) {
2143                 mxlnd_peer_decref(peer); /* drop ref taken above */
2144         } else {
2145                 read_lock(g_lock);
2146                 conn = peer->mxp_conn;
2147                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
2148                         mxlnd_conn_addref(conn);
2149                 } else {
2150                         conn = NULL;
2151                 }
2152                 read_unlock(g_lock);
2153                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2154                 if (!conn)
2155                         return -ENOTCONN;
2156         }
2157
2158         LASSERT(peer && conn);
2159
2160         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2161
2162         switch (type) {
2163         case LNET_MSG_ACK:
2164                 LASSERT (payload_nob == 0);
2165                 break;
2166
2167         case LNET_MSG_REPLY:
2168         case LNET_MSG_PUT:
2169                 /* Is the payload small enough not to need DATA? */
2170                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2171                 if (nob <= MXLND_MSG_SIZE)
2172                         break;                  /* send EAGER */
2173
2174                 tx = mxlnd_get_idle_tx();
2175                 if (unlikely(tx == NULL)) {
2176                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
2177                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
2178                                libcfs_nid2str(nid));
2179                         if (conn) mxlnd_conn_decref(conn);
2180                         return -ENOMEM;
2181                 }
2182
2183                 tx->mxc_peer = peer;
2184                 tx->mxc_conn = conn;
2185                 /* we added a conn ref above */
2186                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2187                 txmsg = tx->mxc_msg;
2188                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2189                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2190                 tx->mxc_match = mxlnd_create_match(tx, 0);
2191
2192                 /* we must post a receive _before_ sending the request.
2193                  * we need to determine how much to receive, it will be either
2194                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
2195
2196                 rx = mxlnd_get_idle_rx(conn);
2197                 if (unlikely(rx == NULL)) {
2198                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
2199                                            libcfs_nid2str(nid));
2200                         mxlnd_put_idle_tx(tx);
2201                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2202                         return -ENOMEM;
2203                 }
2204                 rx->mxc_nid = nid;
2205                 rx->mxc_peer = peer;
2206                 mxlnd_conn_addref(conn); /* for this rx */
2207                 rx->mxc_conn = conn;
2208                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2209                 rx->mxc_cookie = tx->mxc_cookie;
2210                 rx->mxc_match = mxlnd_create_match(rx, 0);
2211
2212                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2213                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2214                 if (unlikely(ret != 0)) {
2215                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
2216                                            libcfs_nid2str(nid));
2217                         rx->mxc_lntmsg[0] = NULL;
2218                         mxlnd_put_idle_rx(rx);
2219                         mxlnd_put_idle_tx(tx);
2220                         mxlnd_conn_decref(conn); /* for the rx... */
2221                         mxlnd_conn_decref(conn); /* and for the tx */
2222                         return -EHOSTUNREACH;
2223                 }
2224
2225                 mxlnd_queue_tx(tx);
2226                 return 0;
2227
2228         case LNET_MSG_GET:
2229                 if (routing || target_is_router)
2230                         break;                  /* send EAGER */
2231
2232                 /* is the REPLY message too small for DATA? */
2233                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2234                 if (nob <= MXLND_MSG_SIZE)
2235                         break;                  /* send EAGER */
2236
2237                 /* get tx (we need the cookie) , post rx for incoming DATA,
2238                  * then post GET_REQ tx */
2239                 tx = mxlnd_get_idle_tx();
2240                 if (unlikely(tx == NULL)) {
2241                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
2242                                            libcfs_nid2str(nid));
2243                         mxlnd_conn_decref(conn); /* for the ref taken above */
2244                         return -ENOMEM;
2245                 }
2246                 rx_data = mxlnd_get_idle_rx(conn);
2247                 if (unlikely(rx_data == NULL)) {
2248                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
2249                                            libcfs_nid2str(nid));
2250                         mxlnd_put_idle_tx(tx);
2251                         mxlnd_conn_decref(conn); /* for the ref taken above */
2252                         return -ENOMEM;
2253                 }
2254                 rx_data->mxc_peer = peer;
2255                 /* NOTE no need to lock peer before adding conn ref since we took
2256                  * a conn ref for the tx (it cannot be freed between there and here ) */
2257                 mxlnd_conn_addref(conn); /* for the rx_data */
2258                 rx_data->mxc_conn = conn;
2259
2260                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2261                 if (unlikely(ret != 0)) {
2262                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
2263                                            libcfs_nid2str(nid));
2264                         mxlnd_put_idle_rx(rx_data);
2265                         mxlnd_put_idle_tx(tx);
2266                         mxlnd_conn_decref(conn); /* for the rx_data... */
2267                         mxlnd_conn_decref(conn); /* and for the tx */
2268                         return -EIO;
2269                 }
2270
2271                 tx->mxc_peer = peer;
2272                 tx->mxc_conn = conn;
2273                 /* conn ref taken above */
2274                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2275                 txmsg = tx->mxc_msg;
2276                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2277                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2278                 tx->mxc_match = mxlnd_create_match(tx, 0);
2279
2280                 mxlnd_queue_tx(tx);
2281                 return 0;
2282
2283         default:
2284                 LBUG();
2285                 mxlnd_conn_decref(conn); /* drop ref taken above */
2286                 return -EIO;
2287         }
2288
2289         /* send EAGER */
2290
2291         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2292                 <= MXLND_MSG_SIZE);
2293
2294         tx = mxlnd_get_idle_tx();
2295         if (unlikely(tx == NULL)) {
2296                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2297                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2298                 mxlnd_conn_decref(conn); /* drop ref taken above */
2299                 return -ENOMEM;
2300         }
2301
2302         tx->mxc_peer = peer;
2303         tx->mxc_conn = conn;
2304         /* conn ref taken above */
2305         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2306         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2307         tx->mxc_match = mxlnd_create_match(tx, 0);
2308
2309         txmsg = tx->mxc_msg;
2310         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2311
2312         if (payload_kiov != NULL)
2313                 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2314                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2315                             payload_niov, payload_kiov, payload_offset, payload_nob);
2316         else
2317                 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2318                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2319                             payload_niov, payload_iov, payload_offset, payload_nob);
2320
2321         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2322         mxlnd_queue_tx(tx);
2323         return 0;
2324 }
2325
2326 /**
2327  * mxlnd_recv - the LND required recv function
2328  * @ni
2329  * @private
2330  * @lntmsg
2331  * @delayed
2332  * @niov
2333  * @kiov
2334  * @offset
2335  * @mlen
2336  * @rlen
2337  *
2338  * This must not block.
2339  */
2340 int
2341 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2342              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2343              unsigned int offset, unsigned int mlen, unsigned int rlen)
2344 {
2345         int             ret             = 0;
2346         int             nob             = 0;
2347         int             len             = 0;
2348         kmx_ctx_t       *rx             = private;
2349         kmx_msg_t       *rxmsg          = rx->mxc_msg;
2350         lnet_nid_t       nid            = rx->mxc_nid;
2351         kmx_ctx_t       *tx             = NULL;
2352         kmx_msg_t       *txmsg          = NULL;
2353         kmx_peer_t      *peer           = rx->mxc_peer;
2354         kmx_conn_t      *conn           = peer->mxp_conn;
2355         u64              cookie         = 0ULL;
2356         int              msg_type       = rxmsg->mxm_type;
2357         int              repost         = 1;
2358         int              credit         = 0;
2359         int              finalize       = 0;
2360
2361         LASSERT (mlen <= rlen);
2362         /* Either all pages or all vaddrs */
2363         LASSERT (!(kiov != NULL && iov != NULL));
2364         LASSERT (peer && conn);
2365
2366         /* conn_addref(conn) already taken for the primary rx */
2367
2368         switch (msg_type) {
2369         case MXLND_MSG_EAGER:
2370                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2371                 len = rx->mxc_status.xfer_length;
2372                 if (unlikely(nob > len)) {
2373                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2374                                            libcfs_nid2str(nid), nob, len);
2375                         ret = -EPROTO;
2376                         break;
2377                 }
2378
2379                 if (kiov != NULL)
2380                         lnet_copy_flat2kiov(niov, kiov, offset,
2381                                 MXLND_MSG_SIZE, rxmsg,
2382                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2383                                 mlen);
2384                 else
2385                         lnet_copy_flat2iov(niov, iov, offset,
2386                                 MXLND_MSG_SIZE, rxmsg,
2387                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2388                                 mlen);
2389                 finalize = 1;
2390                 credit = 1;
2391                 break;
2392
2393         case MXLND_MSG_PUT_REQ:
2394                 /* we are going to reuse the rx, store the needed info */
2395                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2396
2397                 /* get tx, post rx, send PUT_ACK */
2398
2399                 tx = mxlnd_get_idle_tx();
2400                 if (unlikely(tx == NULL)) {
2401                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2402                         /* Not replying will break the connection */
2403                         ret = -ENOMEM;
2404                         break;
2405                 }
2406                 if (unlikely(mlen == 0)) {
2407                         finalize = 1;
2408                         tx->mxc_peer = peer;
2409                         tx->mxc_conn = conn;
2410                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2411                         /* repost = 1 */
2412                         break;
2413                 }
2414
2415                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2416                 tx->mxc_peer = peer;
2417                 tx->mxc_conn = conn;
2418                 /* no need to lock peer first since we already have a ref */
2419                 mxlnd_conn_addref(conn); /* for the tx */
2420                 txmsg = tx->mxc_msg;
2421                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2422                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2423                 tx->mxc_cookie = cookie;
2424                 tx->mxc_match = mxlnd_create_match(tx, 0);
2425
2426                 /* we must post a receive _before_ sending the PUT_ACK */
2427                 mxlnd_ctx_init(rx);
2428                 rx->mxc_state = MXLND_CTX_PREP;
2429                 rx->mxc_peer = peer;
2430                 rx->mxc_conn = conn;
2431                 /* do not take another ref for this rx, it is already taken */
2432                 rx->mxc_nid = peer->mxp_nid;
2433                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2434                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2435
2436                 if (unlikely(ret != 0)) {
2437                         /* Notify peer that it's over */
2438                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2439                                            libcfs_nid2str(nid), ret);
2440                         mxlnd_ctx_init(tx);
2441                         tx->mxc_state = MXLND_CTX_PREP;
2442                         tx->mxc_peer = peer;
2443                         tx->mxc_conn = conn;
2444                         /* finalize = 0, let the PUT_ACK tx finalize this */
2445                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2446                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2447                         /* conn ref already taken above */
2448                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2449                         /* repost = 1 */
2450                         break;
2451                 }
2452
2453                 mxlnd_queue_tx(tx);
2454                 /* do not return a credit until after PUT_DATA returns */
2455                 repost = 0;
2456                 break;
2457
2458         case MXLND_MSG_GET_REQ:
2459                 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2460
2461                 if (likely(lntmsg != NULL)) {
2462                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2463                                         cookie);
2464                 } else {
2465                         /* GET didn't match anything */
2466                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2467                          * We have to embed the error code in the match bits.
2468                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2469                         tx = mxlnd_get_idle_tx();
2470                         if (unlikely(tx == NULL)) {
2471                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2472                                                    libcfs_nid2str(nid));
2473                                 /* we can't get a tx, notify the peer that the GET failed */
2474                                 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2475                                                    ENODATA, cookie);
2476                                 ret = -ENOMEM;
2477                                 break;
2478                         }
2479                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2480                         tx->mxc_state = MXLND_CTX_PENDING;
2481                         tx->mxc_nid = nid;
2482                         tx->mxc_peer = peer;
2483                         tx->mxc_conn = conn;
2484                         /* no need to lock peer first since we already have a ref */
2485                         mxlnd_conn_addref(conn); /* for this tx */
2486                         tx->mxc_cookie = cookie;
2487                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2488                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2489                         mxlnd_queue_tx(tx);
2490                 }
2491                 /* finalize lntmsg after tx completes */
2492                 break;
2493
2494         default:
2495                 LBUG();
2496         }
2497
2498         if (repost) {
2499                 /* we received a message, increment peer's outstanding credits */
2500                 if (credit == 1) {
2501                         spin_lock(&conn->mxk_lock);
2502                         conn->mxk_outstanding++;
2503                         spin_unlock(&conn->mxk_lock);
2504                 }
2505                 /* we are done with the rx */
2506                 mxlnd_put_idle_rx(rx);
2507                 mxlnd_conn_decref(conn);
2508         }
2509
2510         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2511
2512         /* we received a credit, see if we can use it to send a msg */
2513         if (credit) mxlnd_check_sends(peer);
2514
2515         return ret;
2516 }
2517
2518 void
2519 mxlnd_sleep(unsigned long timeout)
2520 {
2521         set_current_state(TASK_INTERRUPTIBLE);
2522         schedule_timeout(timeout);
2523         return;
2524 }
2525
2526 /**
2527  * mxlnd_tx_queued - the generic send queue thread
2528  * @arg - thread id (as a void *)
2529  *
2530  * This thread moves send messages from the global tx_queue to the owning
2531  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2532  * it to the global peer list.
2533  */
2534 int
2535 mxlnd_tx_queued(void *arg)
2536 {
2537         long                    id      = (long) arg;
2538         int                     ret     = 0;
2539         int                     found   = 0;
2540         kmx_ctx_t              *tx      = NULL;
2541         kmx_peer_t             *peer    = NULL;
2542         struct list_head       *queue   = &kmxlnd_data.kmx_tx_queue;
2543         spinlock_t             *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2544         rwlock_t               *g_lock  = &kmxlnd_data.kmx_global_lock;
2545
2546         cfs_daemonize("mxlnd_tx_queued");
2547
2548         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
2549                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2550                 if (atomic_read(&kmxlnd_data.kmx_shutdown))
2551                         break;
2552                 if (ret != 0) // Should we check for -EINTR?
2553                         continue;
2554                 spin_lock(tx_q_lock);
2555                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2556                         spin_unlock(tx_q_lock);
2557                         continue;
2558                 }
2559                 tx = list_entry (queue->next, kmx_ctx_t, mxc_list);
2560                 list_del_init(&tx->mxc_list);
2561                 spin_unlock(tx_q_lock);
2562
2563                 found = 0;
2564                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */
2565                 if (peer != NULL) {
2566                         tx->mxc_peer = peer;
2567                         write_lock(g_lock);
2568                         if (peer->mxp_conn == NULL) {
2569                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2570                                 if (ret != 0) {
2571                                         /* out of memory, give up and fail tx */
2572                                         tx->mxc_errno = -ENOMEM;
2573                                         mxlnd_peer_decref(peer);
2574                                         write_unlock(g_lock);
2575                                         mxlnd_put_idle_tx(tx);
2576                                         continue;
2577                                 }
2578                         }
2579                         tx->mxc_conn = peer->mxp_conn;
2580                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2581                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2582                         write_unlock(g_lock);
2583                         mxlnd_queue_tx(tx);
2584                         found = 1;
2585                 }
2586                 if (found == 0) {
2587                         int             hash    = 0;
2588                         kmx_peer_t     *peer    = NULL;
2589                         kmx_peer_t     *old     = NULL;
2590
2591                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2592
2593                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2594                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2595                         /* create peer */
2596                         /* adds conn ref for this function */
2597                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2598                                         *kmxlnd_tunables.kmx_board,
2599                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2600                         if (ret != 0) {
2601                                 /* finalize message */
2602                                 tx->mxc_errno = ret;
2603                                 mxlnd_put_idle_tx(tx);
2604                                 continue;
2605                         }
2606                         tx->mxc_peer = peer;
2607                         tx->mxc_conn = peer->mxp_conn;
2608                         /* this tx will keep the conn ref taken in peer_alloc() */
2609
2610                         /* add peer to global peer list, but look to see
2611                          * if someone already created it after we released
2612                          * the read lock */
2613                         write_lock(g_lock);
2614                         old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2615                         if (old) {
2616                                 /* we have a peer ref on old */
2617                                 if (old->mxp_conn) {
2618                                         found = 1;
2619                                 } else {
2620                                         /* no conn */
2621                                         /* drop our ref taken above... */
2622                                         mxlnd_peer_decref(old);
2623                                         /* and delete it */
2624                                         mxlnd_del_peer_locked(old);
2625                                 }
2626                         }
2627
2628                         if (found == 0) {
2629                                 list_add_tail(&peer->mxp_list, &kmxlnd_data.kmx_peers[hash]);
2630                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2631                         } else {
2632                                 tx->mxc_peer = old;
2633                                 tx->mxc_conn = old->mxp_conn;
2634                                 LASSERT(old->mxp_conn != NULL);
2635                                 mxlnd_conn_addref(old->mxp_conn);
2636                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2637                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2638                                 mxlnd_peer_decref(peer);
2639                         }
2640                         write_unlock(g_lock);
2641
2642                         mxlnd_queue_tx(tx);
2643                 }
2644         }
2645         mxlnd_thread_stop(id);
2646         return 0;
2647 }
2648
2649 /* When calling this, we must not have the peer lock. */
2650 void
2651 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2652 {
2653         mx_return_t     mxret           = MX_SUCCESS;
2654         mx_request_t    request;
2655         kmx_conn_t      *conn           = peer->mxp_conn;
2656         u64             match           = ((u64) msg_type) << MXLND_MSG_OFFSET;
2657
2658         /* NOTE we are holding a conn ref every time we call this function,
2659          * we do not need to lock the peer before taking another ref */
2660         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2661
2662         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2663
2664         if (peer->mxp_reconnect_time == 0) {
2665                 peer->mxp_reconnect_time = jiffies;
2666         }
2667
2668         if (peer->mxp_nic_id == 0ULL) {
2669                 int     ret     = 0;
2670
2671                 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2672                                       &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2673                 if (ret == 0) {
2674                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2675                 }
2676                 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2677                         /* not mapped yet, return */
2678                         spin_lock(&conn->mxk_lock);
2679                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2680                         spin_unlock(&conn->mxk_lock);
2681                 }
2682         }
2683
2684         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2685             conn->mxk_status != MXLND_CONN_DISCONNECT) {
2686                 /* give up and notify LNET */
2687                 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2688                        libcfs_nid2str(peer->mxp_nid));
2689                 mxlnd_conn_disconnect(conn, 0, 0);
2690                 mxlnd_conn_decref(conn);
2691                 return;
2692         }
2693
2694         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2695                             peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2696                             (void *) peer, &request);
2697         if (unlikely(mxret != MX_SUCCESS)) {
2698                 spin_lock(&conn->mxk_lock);
2699                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2700                 spin_unlock(&conn->mxk_lock);
2701                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2702                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2703                 mxlnd_conn_decref(conn);
2704         }
2705         mx_set_request_timeout(kmxlnd_data.kmx_endpt, request, MXLND_CONNECT_TIMEOUT/HZ*1000);
2706         return;
2707 }
2708
2709 #define MXLND_STATS 0
2710
2711 int
2712 mxlnd_check_sends(kmx_peer_t *peer)
2713 {
2714         int             ret             = 0;
2715         int             found           = 0;
2716         mx_return_t     mxret           = MX_SUCCESS;
2717         kmx_ctx_t       *tx             = NULL;
2718         kmx_conn_t      *conn           = NULL;
2719         u8              msg_type        = 0;
2720         int             credit          = 0;
2721         int             status          = 0;
2722         int             ntx_posted      = 0;
2723         int             credits         = 0;
2724 #if MXLND_STATS
2725         static unsigned long last       = 0;
2726 #endif
2727
2728         if (unlikely(peer == NULL)) {
2729                 LASSERT(peer != NULL);
2730                 return -1;
2731         }
2732         write_lock(&kmxlnd_data.kmx_global_lock);
2733         conn = peer->mxp_conn;
2734         /* NOTE take a ref for the duration of this function since it is called
2735          * when there might not be any queued txs for this peer */
2736         if (conn) {
2737                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2738                         write_unlock(&kmxlnd_data.kmx_global_lock);
2739                         return -1;
2740                 }
2741                 mxlnd_conn_addref(conn); /* for duration of this function */
2742         }
2743         write_unlock(&kmxlnd_data.kmx_global_lock);
2744
2745         /* do not add another ref for this tx */
2746
2747         if (conn == NULL) {
2748                 /* we do not have any conns */
2749                 CDEBUG(D_NETERROR, "peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2750                 return -1;
2751         }
2752
2753 #if MXLND_STATS
2754         if (time_after(jiffies, last)) {
2755                 last = jiffies + HZ;
2756                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2757                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2758                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2759                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2760                               conn->mxk_ntx_data, conn->mxk_data_posted);
2761         }
2762 #endif
2763
2764         spin_lock(&conn->mxk_lock);
2765         ntx_posted = conn->mxk_ntx_posted;
2766         credits = conn->mxk_credits;
2767
2768         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2769         LASSERT(ntx_posted >= 0);
2770
2771         LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2772         LASSERT(credits >= 0);
2773
2774         /* check number of queued msgs, ignore data */
2775         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2776                 /* check if any txs queued that could return credits... */
2777                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2778                         /* if not, send a NOOP */
2779                         tx = mxlnd_get_idle_tx();
2780                         if (likely(tx != NULL)) {
2781                                 tx->mxc_peer = peer;
2782                                 tx->mxc_conn = peer->mxp_conn;
2783                                 mxlnd_conn_addref(conn); /* for this tx */
2784                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2785                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2786                                 mxlnd_peer_queue_tx_locked(tx);
2787                                 found = 1;
2788                                 goto done_locked;
2789                         }
2790                 }
2791         }
2792
2793         /* if the peer is not ready, try to connect */
2794         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2795             conn->mxk_status == MXLND_CONN_FAIL)) {
2796                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2797                 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2798                 spin_unlock(&conn->mxk_lock);
2799                 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2800                 goto done;
2801         }
2802
2803         while (!list_empty(&conn->mxk_tx_free_queue) ||
2804                !list_empty(&conn->mxk_tx_credit_queue)) {
2805                 /* We have something to send. If we have a queued tx that does not
2806                  * require a credit (free), choose it since its completion will
2807                  * return a credit (here or at the peer), complete a DATA or
2808                  * CONN_REQ or CONN_ACK. */
2809                 struct list_head *tmp_tx = NULL;
2810                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2811                         tmp_tx = &conn->mxk_tx_free_queue;
2812                 } else {
2813                         tmp_tx = &conn->mxk_tx_credit_queue;
2814                 }
2815                 tx = list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2816
2817                 msg_type = tx->mxc_msg_type;
2818
2819                 /* don't try to send a rx */
2820                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2821
2822                 /* ensure that it is a valid msg type */
2823                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2824                         msg_type == MXLND_MSG_CONN_ACK ||
2825                         msg_type == MXLND_MSG_NOOP     ||
2826                         msg_type == MXLND_MSG_EAGER    ||
2827                         msg_type == MXLND_MSG_PUT_REQ  ||
2828                         msg_type == MXLND_MSG_PUT_ACK  ||
2829                         msg_type == MXLND_MSG_PUT_DATA ||
2830                         msg_type == MXLND_MSG_GET_REQ  ||
2831                         msg_type == MXLND_MSG_GET_DATA);
2832                 LASSERT(tx->mxc_peer == peer);
2833                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2834
2835                 credit = mxlnd_tx_requires_credit(tx);
2836                 if (credit) {
2837
2838                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2839                                 CDEBUG(D_NET, "%s: posted enough\n",
2840                                               libcfs_nid2str(peer->mxp_nid));
2841                                 goto done_locked;
2842                         }
2843
2844                         if (conn->mxk_credits == 0) {
2845                                 CDEBUG(D_NET, "%s: no credits\n",
2846                                               libcfs_nid2str(peer->mxp_nid));
2847                                 goto done_locked;
2848                         }
2849
2850                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2851                             conn->mxk_outstanding == 0) {  /* giving back credits */
2852                                 CDEBUG(D_NET, "%s: not using last credit\n",
2853                                               libcfs_nid2str(peer->mxp_nid));
2854                                 goto done_locked;
2855                         }
2856                 }
2857
2858                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2859                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2860                                 msg_type == MXLND_MSG_CONN_ACK)) {
2861                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2862                                              mxlnd_connstatus_to_str(conn->mxk_status),
2863                                              tx->mxc_cookie,
2864                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2865                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2866                                     time_after_eq(jiffies, tx->mxc_deadline)) {
2867                                         list_del_init(&tx->mxc_list);
2868                                         tx->mxc_errno = -ECONNABORTED;
2869                                         spin_unlock(&conn->mxk_lock);
2870                                         mxlnd_put_idle_tx(tx);
2871                                         mxlnd_conn_decref(conn);
2872                                         goto done;
2873                                 }
2874                                 goto done_locked;
2875                         }
2876                 }
2877
2878                 list_del_init(&tx->mxc_list);
2879
2880                 /* handle credits, etc now while we have the lock to avoid races */
2881                 if (credit) {
2882                         conn->mxk_credits--;
2883                         conn->mxk_ntx_posted++;
2884                 }
2885                 if (msg_type != MXLND_MSG_PUT_DATA &&
2886                     msg_type != MXLND_MSG_GET_DATA) {
2887                         if (msg_type != MXLND_MSG_CONN_REQ &&
2888                             msg_type != MXLND_MSG_CONN_ACK) {
2889                                 conn->mxk_ntx_msgs--;
2890                         }
2891                 }
2892                 if (tx->mxc_incarnation == 0 &&
2893                     conn->mxk_incarnation != 0) {
2894                         tx->mxc_incarnation = conn->mxk_incarnation;
2895                 }
2896
2897                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2898                  * or (2) there is a non-DATA msg that can return credits in the
2899                  * queue, then drop this duplicate NOOP */
2900                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2901                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2902                             (conn->mxk_ntx_msgs >= 1)) {
2903                                 conn->mxk_credits++;
2904                                 conn->mxk_ntx_posted--;
2905                                 spin_unlock(&conn->mxk_lock);
2906                                 /* redundant NOOP */
2907                                 mxlnd_put_idle_tx(tx);
2908                                 mxlnd_conn_decref(conn);
2909                                 CDEBUG(D_NET, "%s: redundant noop\n",
2910                                               libcfs_nid2str(peer->mxp_nid));
2911                                 found = 1;
2912                                 goto done;
2913                         }
2914                 }
2915
2916                 found = 1;
2917                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2918                     (msg_type != MXLND_MSG_GET_DATA))) {
2919                         mxlnd_pack_msg_locked(tx);
2920                 }
2921
2922                 mxret = MX_SUCCESS;
2923
2924                 status = conn->mxk_status;
2925                 spin_unlock(&conn->mxk_lock);
2926
2927                 if (likely((status == MXLND_CONN_READY) ||
2928                     (msg_type == MXLND_MSG_CONN_REQ) ||
2929                     (msg_type == MXLND_MSG_CONN_ACK))) {
2930                         ret = 0;
2931                         if (msg_type != MXLND_MSG_CONN_REQ &&
2932                             msg_type != MXLND_MSG_CONN_ACK) {
2933                                 /* add to the pending list */
2934                                 ret = mxlnd_q_pending_ctx(tx);
2935                         } else {
2936                                 /* CONN_REQ/ACK */
2937                                 tx->mxc_state = MXLND_CTX_PENDING;
2938                         }
2939
2940                         if (ret == 0) {
2941                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2942                                     msg_type != MXLND_MSG_GET_DATA)) {
2943                                         /* send a msg style tx */
2944                                         LASSERT(tx->mxc_nseg == 1);
2945                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2946                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2947                                                mxlnd_msgtype_to_str(msg_type),
2948                                                tx->mxc_cookie);
2949                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2950                                                           &tx->mxc_seg,
2951                                                           tx->mxc_nseg,
2952                                                           tx->mxc_pin_type,
2953                                                           conn->mxk_epa,
2954                                                           tx->mxc_match,
2955                                                           (void *) tx,
2956                                                           &tx->mxc_mxreq);
2957                                 } else {
2958                                         /* send a DATA tx */
2959                                         spin_lock(&conn->mxk_lock);
2960                                         conn->mxk_ntx_data--;
2961                                         conn->mxk_data_posted++;
2962                                         spin_unlock(&conn->mxk_lock);
2963                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2964                                                mxlnd_msgtype_to_str(msg_type),
2965                                                tx->mxc_cookie);
2966                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2967                                                           tx->mxc_seg_list,
2968                                                           tx->mxc_nseg,
2969                                                           tx->mxc_pin_type,
2970                                                           conn->mxk_epa,
2971                                                           tx->mxc_match,
2972                                                           (void *) tx,
2973                                                           &tx->mxc_mxreq);
2974                                 }
2975                         } else {
2976                                 /* ret != 0 */
2977                                 mxret = MX_CONNECTION_FAILED;
2978                         }
2979                         if (likely(mxret == MX_SUCCESS)) {
2980                                 ret = 0;
2981                         } else {
2982                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2983                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2984                                        libcfs_nid2str(peer->mxp_nid));
2985                                 /* NOTE mx_kisend() only fails if there are not enough
2986                                 * resources. Do not change the connection status. */
2987                                 if (mxret == MX_NO_RESOURCES) {
2988                                         tx->mxc_errno = -ENOMEM;
2989                                 } else {
2990                                         tx->mxc_errno = -ECONNABORTED;
2991                                 }
2992                                 if (credit) {
2993                                         spin_lock(&conn->mxk_lock);
2994                                         conn->mxk_ntx_posted--;
2995                                         conn->mxk_credits++;
2996                                         spin_unlock(&conn->mxk_lock);
2997                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2998                                         msg_type == MXLND_MSG_GET_DATA) {
2999                                         spin_lock(&conn->mxk_lock);
3000                                         conn->mxk_data_posted--;
3001                                         spin_unlock(&conn->mxk_lock);
3002                                 }
3003                                 if (msg_type != MXLND_MSG_PUT_DATA &&
3004                                     msg_type != MXLND_MSG_GET_DATA &&
3005                                     msg_type != MXLND_MSG_CONN_REQ &&
3006                                     msg_type != MXLND_MSG_CONN_ACK) {
3007                                         spin_lock(&conn->mxk_lock);
3008                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
3009                                         spin_unlock(&conn->mxk_lock);
3010                                 }
3011                                 if (msg_type != MXLND_MSG_CONN_REQ &&
3012                                     msg_type != MXLND_MSG_CONN_ACK) {
3013                                         /* remove from the pending list */
3014                                         mxlnd_deq_pending_ctx(tx);
3015                                 }
3016                                 mxlnd_put_idle_tx(tx);
3017                                 mxlnd_conn_decref(conn);
3018                         }
3019                 }
3020                 spin_lock(&conn->mxk_lock);
3021         }
3022 done_locked:
3023         spin_unlock(&conn->mxk_lock);
3024 done:
3025         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3026         return found;
3027 }
3028
3029
3030 /**
3031  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3032  * @ctx - the tx descriptor
3033  *
3034  * Determine which type of send request it was and start the next step, if needed,
3035  * or, if done, signal completion to LNET. After we are done, put back on the
3036  * idle tx list.
3037  */
3038 void
3039 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3040 {
3041         int             code    = tx->mxc_status.code;
3042         int             failed  = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3043         kmx_msg_t       *msg    = tx->mxc_msg;
3044         kmx_peer_t      *peer   = tx->mxc_peer;
3045         kmx_conn_t      *conn   = tx->mxc_conn;
3046         u8              type    = tx->mxc_msg_type;
3047         int             credit  = mxlnd_tx_requires_credit(tx);
3048         u64             cookie  = tx->mxc_cookie;
3049
3050         CDEBUG(D_NET, "entering %s (0x%llx):\n",
3051                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3052
3053         LASSERT (peer != NULL);
3054         LASSERT (conn != NULL);
3055
3056         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3057                 LASSERT (type == msg->mxm_type);
3058         }
3059
3060         if (failed) {
3061                 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3062         } else {
3063                 spin_lock(&conn->mxk_lock);
3064                 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3065                 spin_unlock(&conn->mxk_lock);
3066         }
3067
3068         switch (type) {
3069
3070         case MXLND_MSG_GET_DATA:
3071                 spin_lock(&conn->mxk_lock);
3072                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3073                         conn->mxk_outstanding++;
3074                         conn->mxk_data_posted--;
3075                 }
3076                 spin_unlock(&conn->mxk_lock);
3077                 break;
3078
3079         case MXLND_MSG_PUT_DATA:
3080                 spin_lock(&conn->mxk_lock);
3081                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3082                         conn->mxk_data_posted--;
3083                 }
3084                 spin_unlock(&conn->mxk_lock);
3085                 break;
3086
3087         case MXLND_MSG_NOOP:
3088         case MXLND_MSG_PUT_REQ:
3089         case MXLND_MSG_PUT_ACK:
3090         case MXLND_MSG_GET_REQ:
3091         case MXLND_MSG_EAGER:
3092                 break;
3093
3094         case MXLND_MSG_CONN_ACK:
3095                 if (peer->mxp_incompatible) {
3096                         /* we sent our params, now close this conn */
3097                         mxlnd_conn_disconnect(conn, 0, 1);
3098                 }
3099         case MXLND_MSG_CONN_REQ:
3100                 if (failed) {
3101                         CDEBUG(D_NETERROR, "%s failed with %s (%d) (errno = %d)"
3102                                " to %s\n",
3103                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3104                                mx_strstatus(code), code, tx->mxc_errno,
3105                                libcfs_nid2str(tx->mxc_nid));
3106                         if (!peer->mxp_incompatible) {
3107                                 spin_lock(&conn->mxk_lock);
3108                                 if (code == MX_STATUS_BAD_SESSION)
3109                                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
3110                                 else
3111                                         mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3112                                 spin_unlock(&conn->mxk_lock);
3113                         }
3114                 }
3115                 break;
3116
3117         default:
3118                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
3119                 LBUG();
3120         }
3121
3122         if (credit) {
3123                 spin_lock(&conn->mxk_lock);
3124                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3125                         conn->mxk_ntx_posted--;
3126                 }
3127                 spin_unlock(&conn->mxk_lock);
3128         }
3129
3130         mxlnd_put_idle_tx(tx);
3131         mxlnd_conn_decref(conn);
3132
3133         mxlnd_check_sends(peer);
3134
3135         CDEBUG(D_NET, "leaving\n");
3136         return;
3137 }
3138
3139 /* Handle completion of MSG or DATA rx.
3140  * CONN_REQ and CONN_ACK are handled elsewhere. */
3141 void
3142 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3143 {
3144         int             ret             = 0;
3145         int             repost          = 1;
3146         int             credit          = 1;
3147         u32             nob             = rx->mxc_status.xfer_length;
3148         u64             bits            = rx->mxc_status.match_info;
3149         kmx_msg_t      *msg             = rx->mxc_msg;
3150         kmx_peer_t     *peer            = rx->mxc_peer;
3151         kmx_conn_t     *conn            = rx->mxc_conn;
3152         u8              type            = rx->mxc_msg_type;
3153         u64             seq             = bits;
3154         lnet_msg_t     *lntmsg[2];
3155         int             result          = 0;
3156         int             peer_ref        = 0;
3157         int             conn_ref        = 0;
3158
3159         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3160          * failed GET reply */
3161
3162         /* NOTE peer may still be NULL if it is a new peer and
3163          *      conn may be NULL if this is a re-connect */
3164         if (likely(peer != NULL && conn != NULL)) {
3165                 /* we have a reference on the conn */
3166                 conn_ref = 1;
3167         } else if (peer != NULL && conn == NULL) {
3168                 /* we have a reference on the peer */
3169                 peer_ref = 1;
3170         } else if (peer == NULL && conn != NULL) {
3171                 /* fatal error */
3172                 CERROR("rx 0x%llx from %s has conn but no peer\n",
3173                        bits, libcfs_nid2str(rx->mxc_nid));
3174                 LBUG();
3175         } /* else peer and conn == NULL */
3176
3177         if (conn == NULL && peer != NULL) {
3178                 write_lock(&kmxlnd_data.kmx_global_lock);
3179                 conn = peer->mxp_conn;
3180                 if (conn) {
3181                         mxlnd_conn_addref(conn); /* conn takes ref... */
3182                         mxlnd_peer_decref(peer); /* from peer */
3183                         conn_ref = 1;
3184                         peer_ref = 0;
3185                 }
3186                 write_unlock(&kmxlnd_data.kmx_global_lock);
3187                 rx->mxc_conn = conn;
3188         }
3189
3190 #if MXLND_DEBUG
3191         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3192 #endif
3193
3194         lntmsg[0] = NULL;
3195         lntmsg[1] = NULL;
3196
3197         if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3198             rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3199                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
3200                                    libcfs_nid2str(rx->mxc_nid),
3201                                    mx_strstatus(rx->mxc_status.code),
3202                                    rx->mxc_status.code);
3203                 credit = 0;
3204                 goto cleanup;
3205         }
3206
3207         if (nob == 0) {
3208                 /* this may be a failed GET reply */
3209                 if (type == MXLND_MSG_GET_DATA) {
3210                         /* get the error (52-59) bits from the match bits */
3211                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3212                         lntmsg[0] = rx->mxc_lntmsg[0];
3213                         result = -ret;
3214                         goto cleanup;
3215                 } else {
3216                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
3217                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
3218                                            libcfs_nid2str(rx->mxc_nid));
3219                         goto cleanup;
3220                 }
3221         }
3222
3223         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3224         if (type == MXLND_MSG_PUT_DATA) {
3225                 /* result = 0; */
3226                 lntmsg[0] = rx->mxc_lntmsg[0];
3227                 goto cleanup;
3228         } else if (type == MXLND_MSG_GET_DATA) {
3229                 /* result = 0; */
3230                 lntmsg[0] = rx->mxc_lntmsg[0];
3231                 lntmsg[1] = rx->mxc_lntmsg[1];
3232                 goto cleanup;
3233         }
3234
3235         ret = mxlnd_unpack_msg(msg, nob);
3236         if (ret != 0) {
3237                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
3238                                    ret, libcfs_nid2str(rx->mxc_nid));
3239                 goto cleanup;
3240         }
3241         rx->mxc_nob = nob;
3242         type = msg->mxm_type;
3243
3244         if (rx->mxc_nid != msg->mxm_srcnid ||
3245             kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3246                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
3247                        "0x%llx and rx msg dst is 0x%llx)\n",
3248                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3249                        msg->mxm_dstnid);
3250                 goto cleanup;
3251         }
3252
3253         if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3254             msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3255                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3256                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3257                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3258                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3259                        msg->mxm_srcstamp, conn->mxk_incarnation,
3260                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3261                 credit = 0;
3262                 goto cleanup;
3263         }
3264
3265         CDEBUG(D_NET, "Received %s with %d credits\n",
3266                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
3267
3268         LASSERT(peer != NULL && conn != NULL);
3269         if (msg->mxm_credits != 0) {
3270                 spin_lock(&conn->mxk_lock);
3271                 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3272                         if ((conn->mxk_credits + msg->mxm_credits) >
3273                              *kmxlnd_tunables.kmx_peercredits) {
3274                                 CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
3275                                        conn->mxk_credits, msg->mxm_credits);
3276                         }
3277                         conn->mxk_credits += msg->mxm_credits;
3278                         LASSERT(conn->mxk_credits >= 0);
3279                         LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3280                 }
3281                 spin_unlock(&conn->mxk_lock);
3282         }
3283
3284         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3285         switch (type) {
3286         case MXLND_MSG_NOOP:
3287                 break;
3288
3289         case MXLND_MSG_EAGER:
3290                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3291                                         msg->mxm_srcnid, rx, 0);
3292                 repost = ret < 0;
3293                 break;
3294
3295         case MXLND_MSG_PUT_REQ:
3296                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3297                                         msg->mxm_srcnid, rx, 1);
3298                 repost = ret < 0;
3299                 break;
3300
3301         case MXLND_MSG_PUT_ACK: {
3302                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3303                 if (cookie > MXLND_MAX_COOKIE) {
3304                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3305                                            libcfs_nid2str(rx->mxc_nid));
3306                         result = -((u32) MXLND_ERROR_VAL(cookie));
3307                         lntmsg[0] = rx->mxc_lntmsg[0];
3308                 } else {
3309                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3310                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3311                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3312                 }
3313                 /* repost == 1 */
3314                 break;
3315         }
3316         case MXLND_MSG_GET_REQ:
3317                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3318                                         msg->mxm_srcnid, rx, 1);
3319                 repost = ret < 0;
3320                 break;
3321
3322         default:
3323                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3324                                 libcfs_nid2str(rx->mxc_nid));
3325                 ret = -EPROTO;
3326                 break;
3327         }
3328
3329         if (ret < 0) {
3330                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3331                 spin_lock(&conn->mxk_lock);
3332                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3333                 spin_unlock(&conn->mxk_lock);
3334         }
3335
3336 cleanup:
3337         if (conn != NULL) {
3338                 spin_lock(&conn->mxk_lock);
3339                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3340                 spin_unlock(&conn->mxk_lock);
3341         }
3342
3343         if (repost) {
3344                 /* lnet_parse() failed, etc., repost now */
3345                 mxlnd_put_idle_rx(rx);
3346                 if (conn != NULL && credit == 1) {
3347                         if (type == MXLND_MSG_PUT_DATA ||
3348                             type == MXLND_MSG_EAGER ||
3349                             type == MXLND_MSG_PUT_REQ ||
3350                             type == MXLND_MSG_NOOP) {
3351                                 spin_lock(&conn->mxk_lock);
3352                                 conn->mxk_outstanding++;
3353                                 spin_unlock(&conn->mxk_lock);
3354                         }
3355                 }
3356                 if (conn_ref) mxlnd_conn_decref(conn);
3357                 LASSERT(peer_ref == 0);
3358         }
3359
3360         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3361                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3362         } else {
3363                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3364         }
3365
3366         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3367         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3368
3369         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3370
3371         return;
3372 }
3373
3374 void
3375 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3376 {
3377         kmx_ctx_t       *tx     = NULL;
3378         kmx_msg_t       *txmsg  = NULL;
3379         kmx_conn_t      *conn   = peer->mxp_conn;
3380         u64             nic_id  = 0ULL;
3381         u32             ep_id   = 0;
3382         u32             sid     = 0;
3383         u8              type    = (msg_type == MXLND_MSG_ICON_REQ ?
3384                                    MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3385
3386         /* a conn ref was taken when calling mx_iconnect(),
3387          * hold it until CONN_REQ or CONN_ACK completes */
3388
3389         CDEBUG(D_NET, "entering\n");
3390         if (status.code != MX_STATUS_SUCCESS) {
3391                 int send_bye    = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3392
3393                 CDEBUG(D_NETERROR, "mx_iconnect() failed for %s with %s (%d) "
3394                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3395                         mxlnd_msgtype_to_str(msg_type),
3396                         mx_strstatus(status.code), status.code,
3397                         libcfs_nid2str(peer->mxp_nid),
3398                         peer->mxp_nid,
3399                         peer->mxp_nic_id,
3400                         peer->mxp_ep_id);
3401                 spin_lock(&conn->mxk_lock);
3402                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3403                 spin_unlock(&conn->mxk_lock);
3404
3405                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT)) {
3406                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3407                         mxlnd_conn_disconnect(conn, 0, send_bye);
3408                 }
3409
3410                 mxlnd_conn_decref(conn);
3411                 return;
3412         }
3413         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3414         write_lock(&kmxlnd_data.kmx_global_lock);
3415         spin_lock(&conn->mxk_lock);
3416         conn->mxk_epa = status.source;
3417         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3418         if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3419                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3420         }
3421         spin_unlock(&conn->mxk_lock);
3422         write_unlock(&kmxlnd_data.kmx_global_lock);
3423
3424         /* mx_iconnect() succeeded, reset delay to 0 */
3425         write_lock(&kmxlnd_data.kmx_global_lock);
3426         peer->mxp_reconnect_time = 0;
3427         peer->mxp_conn->mxk_sid = sid;
3428         write_unlock(&kmxlnd_data.kmx_global_lock);
3429
3430         /* marshal CONN_REQ or CONN_ACK msg */
3431         /* we are still using the conn ref from iconnect() - do not take another */
3432         tx = mxlnd_get_idle_tx();
3433         if (tx == NULL) {
3434                 CDEBUG(D_NETERROR, "Can't obtain %s tx for %s\n",
3435                        mxlnd_msgtype_to_str(type),
3436                        libcfs_nid2str(peer->mxp_nid));
3437                 spin_lock(&conn->mxk_lock);
3438                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3439                 spin_unlock(&conn->mxk_lock);
3440                 mxlnd_conn_decref(conn);
3441                 return;
3442         }
3443
3444         tx->mxc_peer = peer;
3445         tx->mxc_conn = conn;
3446         tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3447         CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3448         mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3449         txmsg = tx->mxc_msg;
3450         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3451         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3452         tx->mxc_match = mxlnd_create_match(tx, 0);
3453
3454         mxlnd_queue_tx(tx);
3455         return;
3456 }
3457
3458 /**
3459  * mxlnd_request_waitd - the MX request completion thread(s)
3460  * @arg - thread id (as a void *)
3461  *
3462  * This thread waits for a MX completion and then completes the request.
3463  * We will create one thread per CPU.
3464  */
3465 int
3466 mxlnd_request_waitd(void *arg)
3467 {
3468         long                    id              = (long) arg;
3469         char                    name[24];
3470         __u32                   result          = 0;
3471         mx_return_t             mxret           = MX_SUCCESS;
3472         mx_status_t             status;
3473         kmx_ctx_t              *ctx             = NULL;
3474         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3475         kmx_peer_t             *peer            = NULL;
3476         kmx_conn_t             *conn            = NULL;
3477 #if MXLND_POLLING
3478         int                     count           = 0;
3479 #endif
3480
3481         memset(name, 0, sizeof(name));
3482         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3483         cfs_daemonize(name);
3484
3485         memset(&status, 0, sizeof(status));
3486
3487         CDEBUG(D_NET, "%s starting\n", name);
3488
3489         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3490                 u8      msg_type        = 0;
3491
3492                 mxret = MX_SUCCESS;
3493                 result = 0;
3494 #if MXLND_POLLING
3495                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3496                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3497                                             &status, &result);
3498                 } else {
3499                         count = 0;
3500                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3501                                             0ULL, 0ULL, &status, &result);
3502                 }
3503 #else
3504                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3505                                     0ULL, 0ULL, &status, &result);
3506 #endif
3507                 if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown)))
3508                         break;
3509
3510                 if (result != 1) {
3511                         /* nothing completed... */
3512                         continue;
3513                 }
3514
3515                 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3516                        "match_info 0x%llx and length %d\n",
3517                        mx_strstatus(status.code), status.code,
3518                        (u64) status.match_info, status.msg_length);
3519
3520                 if (status.code != MX_STATUS_SUCCESS) {
3521                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3522                                "match_info 0x%llx and length %d\n",
3523                                mx_strstatus(status.code), status.code,
3524                                (u64) status.match_info, status.msg_length);
3525                 }
3526
3527                 msg_type = MXLND_MSG_TYPE(status.match_info);
3528
3529                 /* This may be a mx_iconnect() request completing,
3530                  * check the bit mask for CONN_REQ and CONN_ACK */
3531                 if (msg_type == MXLND_MSG_ICON_REQ ||
3532                     msg_type == MXLND_MSG_ICON_ACK) {
3533                         peer = (kmx_peer_t*) status.context;
3534                         mxlnd_handle_connect_msg(peer, msg_type, status);
3535                         continue;
3536                 }
3537
3538                 /* This must be a tx or rx */
3539
3540                 /* NOTE: if this is a RX from the unexpected callback, it may
3541                  * have very little info. If we dropped it in unexpected_recv(),
3542                  * it will not have a context. If so, ignore it. */
3543                 ctx = (kmx_ctx_t *) status.context;
3544                 if (ctx != NULL) {
3545
3546                         req_type = ctx->mxc_type;
3547                         conn = ctx->mxc_conn; /* this may be NULL */
3548                         mxlnd_deq_pending_ctx(ctx);
3549
3550                         /* copy status to ctx->mxc_status */
3551                         ctx->mxc_status = status;
3552
3553                         switch (req_type) {
3554                         case MXLND_REQ_TX:
3555                                 mxlnd_handle_tx_completion(ctx);
3556                                 break;
3557                         case MXLND_REQ_RX:
3558                                 mxlnd_handle_rx_completion(ctx);
3559                                 break;
3560                         default:
3561                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3562                                 LBUG();
3563                                 break;
3564                         }
3565
3566                         /* conn is always set except for the first CONN_REQ rx
3567                          * from a new peer */
3568                         if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3569                                 mxlnd_conn_disconnect(conn, 1, 1);
3570                         }
3571                 }
3572                 CDEBUG(D_NET, "waitd() completed task\n");
3573         }
3574         CDEBUG(D_NET, "%s stopping\n", name);
3575         mxlnd_thread_stop(id);
3576         return 0;
3577 }
3578
3579
3580 unsigned long
3581 mxlnd_check_timeouts(unsigned long now)
3582 {
3583         int             i               = 0;
3584         int             disconnect      = 0;
3585         unsigned long   next            = 0; /* jiffies */
3586         kmx_peer_t      *peer           = NULL;
3587         kmx_conn_t      *conn           = NULL;
3588         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3589
3590         read_lock(g_lock);
3591         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3592                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_list) {
3593
3594                         if (unlikely(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3595                                 read_unlock(g_lock);
3596                                 return next;
3597                         }
3598
3599                         conn = peer->mxp_conn;
3600                         if (conn) {
3601                                 mxlnd_conn_addref(conn);
3602                         } else {
3603                                 continue;
3604                         }
3605
3606                         spin_lock(&conn->mxk_lock);
3607
3608                         /* if nothing pending (timeout == 0) or
3609                          * if conn is already disconnected,
3610                          * skip this conn */
3611                         if (conn->mxk_timeout == 0 ||
3612                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3613                                 spin_unlock(&conn->mxk_lock);
3614                                 mxlnd_conn_decref(conn);
3615                                 continue;
3616                         }
3617
3618                         /* we want to find the timeout that will occur first.
3619                          * if it is in the future, we will sleep until then.
3620                          * if it is in the past, then we will sleep one
3621                          * second and repeat the process. */
3622                         if ((next == 0) || (time_before(conn->mxk_timeout, next))) {
3623                                 next = conn->mxk_timeout;
3624                         }
3625
3626                         disconnect = 0;
3627
3628                         if (time_after_eq(now, conn->mxk_timeout))  {
3629                                 disconnect = 1;
3630                         }
3631                         spin_unlock(&conn->mxk_lock);
3632
3633                         if (disconnect) {
3634                                 mxlnd_conn_disconnect(conn, 1, 1);
3635                         }
3636                         mxlnd_conn_decref(conn);
3637                 }
3638         }
3639         read_unlock(g_lock);
3640         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3641
3642         return next;
3643 }
3644
3645 void
3646 mxlnd_passive_connect(kmx_connparams_t *cp)
3647 {
3648         int             ret             = 0;
3649         int             incompatible    = 0;
3650         u64             nic_id          = 0ULL;
3651         u32             ep_id           = 0;
3652         u32             sid             = 0;
3653         int             conn_ref        = 0;
3654         kmx_msg_t       *msg            = &cp->mxr_msg;
3655         kmx_peer_t      *peer           = cp->mxr_peer;
3656         kmx_conn_t      *conn           = NULL;
3657         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3658
3659         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3660
3661         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3662         if (ret != 0) {
3663                 if (peer) {
3664                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from %s\n",
3665                                ret, libcfs_nid2str(peer->mxp_nid));
3666                 } else {
3667                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from "
3668                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3669                 }
3670                 goto cleanup;
3671         }
3672         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3673                 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3674                                 libcfs_nid2str(msg->mxm_srcnid),
3675                                 libcfs_nid2str(msg->mxm_dstnid));
3676                 goto cleanup;
3677         }
3678         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3679                 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3680                             "%d (%d wanted)\n",
3681                                 libcfs_nid2str(msg->mxm_srcnid),
3682                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3683                                 *kmxlnd_tunables.kmx_peercredits);
3684                 incompatible = 1;
3685         }
3686         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3687                 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3688                             "%d (%d wanted)\n",
3689                                 libcfs_nid2str(msg->mxm_srcnid),
3690                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3691                                 (int) MXLND_MSG_SIZE);
3692                 incompatible = 1;
3693         }
3694
3695         if (peer == NULL) {
3696                 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3697                 if (peer == NULL) {
3698                         int             hash    = 0;
3699                         u32             board   = 0;
3700                         kmx_peer_t      *existing_peer    = NULL;
3701
3702                         hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3703
3704                         mx_nic_id_to_board_number(nic_id, &board);
3705
3706                         /* adds conn ref for peer and one for this function */
3707                         ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3708                                                board, ep_id, 0ULL);
3709                         if (ret != 0) {
3710                                 goto cleanup;
3711                         }
3712                         peer->mxp_conn->mxk_sid = sid;
3713                         LASSERT(peer->mxp_ep_id == ep_id);
3714                         write_lock(g_lock);
3715                         existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3716                         if (existing_peer) {
3717                                 mxlnd_conn_decref(peer->mxp_conn);
3718                                 mxlnd_peer_decref(peer);
3719                                 peer = existing_peer;
3720                                 mxlnd_conn_addref(peer->mxp_conn);
3721                                 conn = peer->mxp_conn;
3722                         } else {
3723                                 list_add_tail(&peer->mxp_list,
3724                                               &kmxlnd_data.kmx_peers[hash]);
3725                                 atomic_inc(&kmxlnd_data.kmx_npeers);
3726                         }
3727                         write_unlock(g_lock);
3728                 } else {
3729                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3730                         write_lock(g_lock);
3731                         mxlnd_peer_decref(peer); /* drop ref taken above */
3732                         write_unlock(g_lock);
3733                         if (ret != 0) {
3734                                 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3735                                 goto cleanup;
3736                         }
3737                 }
3738                 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3739                 conn = peer->mxp_conn;
3740         } else { /* unexpected handler found peer */
3741                 kmx_conn_t      *old_conn       = peer->mxp_conn;
3742
3743                 if (sid != peer->mxp_conn->mxk_sid) {
3744                         /* do not call mx_disconnect() or send a BYE */
3745                         mxlnd_conn_disconnect(old_conn, 0, 0);
3746
3747                         /* This allocs a conn, points peer->mxp_conn to this one.
3748                         * The old conn is still on the peer->mxp_conns list.
3749                         * As the pending requests complete, they will call
3750                         * conn_decref() which will eventually free it. */
3751                         ret = mxlnd_conn_alloc(&conn, peer);
3752                         if (ret != 0) {
3753                                 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3754                                 goto cleanup;
3755                         }
3756                         /* conn_alloc() adds one ref for the peer and one
3757                          * for this function */
3758                         conn_ref = 1;
3759
3760                         peer->mxp_conn->mxk_sid = sid;
3761                 } else {
3762                         /* same sid */
3763                         conn = peer->mxp_conn;
3764                 }
3765         }
3766         write_lock(g_lock);
3767         peer->mxp_incompatible = incompatible;
3768         write_unlock(g_lock);
3769         spin_lock(&conn->mxk_lock);
3770         conn->mxk_incarnation = msg->mxm_srcstamp;
3771         mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3772         spin_unlock(&conn->mxk_lock);
3773
3774         /* handle_conn_ack() will create the CONN_ACK msg */
3775         mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3776
3777 cleanup:
3778         if (conn_ref) mxlnd_conn_decref(conn);
3779
3780         mxlnd_connparams_free(cp);
3781         return;
3782 }
3783
3784 void
3785 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3786 {
3787         int             ret             = 0;
3788         int             incompatible    = 0;
3789         u64             nic_id          = 0ULL;
3790         u32             ep_id           = 0;
3791         u32             sid             = 0;
3792         kmx_msg_t       *msg            = &cp->mxr_msg;
3793         kmx_peer_t      *peer           = cp->mxr_peer;
3794         kmx_conn_t      *conn           = cp->mxr_conn;
3795
3796         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3797
3798         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3799         if (ret != 0) {
3800                 if (peer) {
3801                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from %s\n",
3802                                ret, libcfs_nid2str(peer->mxp_nid));
3803                 } else {
3804                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from "
3805                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3806                 }
3807                 ret = -1;
3808                 incompatible = 1;
3809                 goto failed;
3810         }
3811         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3812                 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3813                        "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3814                         libcfs_nid2str(msg->mxm_dstnid));
3815                 ret = -1;
3816                 goto failed;
3817         }
3818         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3819                 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3820                        "incompatible queue depth %d (%d wanted)\n",
3821                         libcfs_nid2str(msg->mxm_srcnid),
3822                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3823                         *kmxlnd_tunables.kmx_peercredits);
3824                 incompatible = 1;
3825                 ret = -1;
3826                 goto failed;
3827         }
3828         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3829                 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3830                        "incompatible EAGER size %d (%d wanted)\n",
3831                         libcfs_nid2str(msg->mxm_srcnid),
3832                         msg->mxm_u.conn_req.mxcrm_eager_size,
3833                         (int) MXLND_MSG_SIZE);
3834                 incompatible = 1;
3835                 ret = -1;
3836                 goto failed;
3837         }
3838         write_lock(&kmxlnd_data.kmx_global_lock);
3839         peer->mxp_incompatible = incompatible;
3840         write_unlock(&kmxlnd_data.kmx_global_lock);
3841         spin_lock(&conn->mxk_lock);
3842         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3843         conn->mxk_outstanding = 0;
3844         conn->mxk_incarnation = msg->mxm_srcstamp;
3845         conn->mxk_timeout = 0;
3846         if (!incompatible) {
3847                 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3848                        libcfs_nid2str(msg->mxm_srcnid));
3849                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3850         }
3851         spin_unlock(&conn->mxk_lock);
3852
3853         if (!incompatible)
3854                 mxlnd_check_sends(peer);
3855
3856 failed:
3857         if (ret < 0) {
3858                 spin_lock(&conn->mxk_lock);
3859                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3860                 spin_unlock(&conn->mxk_lock);
3861         }
3862
3863         if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3864
3865         mxlnd_connparams_free(cp);
3866         return;
3867 }
3868
3869 int
3870 mxlnd_abort_msgs(void)
3871 {
3872         int                     count           = 0;
3873         struct list_head        *orphans        = &kmxlnd_data.kmx_orphan_msgs;
3874         spinlock_t              *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3875
3876         /* abort orphans */
3877         spin_lock(g_conn_lock);
3878         while (!list_empty(orphans)) {
3879                 kmx_ctx_t       *ctx     = NULL;
3880                 kmx_conn_t      *conn   = NULL;
3881
3882                 ctx = list_entry(orphans->next, kmx_ctx_t, mxc_list);
3883                 list_del_init(&ctx->mxc_list);
3884                 spin_unlock(g_conn_lock);
3885
3886                 ctx->mxc_errno = -ECONNABORTED;
3887                 conn = ctx->mxc_conn;
3888                 CDEBUG(D_NET, "aborting %s %s %s\n",
3889                        mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3890                        ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3891                        libcfs_nid2str(ctx->mxc_nid));
3892                 if (ctx->mxc_type == MXLND_REQ_TX) {
3893                         mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3894                         if (conn) mxlnd_conn_decref(conn); /* for this tx */
3895                 } else {
3896                         ctx->mxc_state = MXLND_CTX_CANCELED;
3897                         mxlnd_handle_rx_completion(ctx);
3898                 }
3899
3900                 count++;
3901                 spin_lock(g_conn_lock);
3902         }
3903         spin_unlock(g_conn_lock);
3904
3905         return count;
3906 }
3907
3908 int
3909 mxlnd_free_conn_zombies(void)
3910 {
3911         int                     count           = 0;
3912         struct list_head        *zombies        = &kmxlnd_data.kmx_conn_zombies;
3913         spinlock_t              *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3914         rwlock_t                *g_lock         = &kmxlnd_data.kmx_global_lock;
3915
3916         /* cleanup any zombies */
3917         spin_lock(g_conn_lock);
3918         while (!list_empty(zombies)) {
3919                 kmx_conn_t      *conn   = NULL;
3920
3921                 conn = list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3922                 list_del_init(&conn->mxk_zombie);
3923                 spin_unlock(g_conn_lock);
3924
3925                 write_lock(g_lock);
3926                 mxlnd_conn_free_locked(conn);
3927                 write_unlock(g_lock);
3928
3929                 count++;
3930                 spin_lock(g_conn_lock);
3931         }
3932         spin_unlock(g_conn_lock);
3933         CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3934         return count;
3935 }
3936
3937 /**
3938  * mxlnd_connd - handles incoming connection requests
3939  * @arg - thread id (as a void *)
3940  *
3941  * This thread handles incoming connection requests
3942  */
3943 int
3944 mxlnd_connd(void *arg)
3945 {
3946         long                    id              = (long) arg;
3947
3948         cfs_daemonize("mxlnd_connd");
3949
3950         CDEBUG(D_NET, "connd starting\n");
3951
3952         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
3953                 int                     ret             = 0;
3954                 kmx_connparams_t       *cp              = NULL;
3955                 spinlock_t             *g_conn_lock     = &kmxlnd_data.kmx_conn_lock;
3956                 struct list_head       *conn_reqs       = &kmxlnd_data.kmx_conn_reqs;
3957
3958                 ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
3959
3960                 if (atomic_read(&kmxlnd_data.kmx_shutdown))
3961                         break;
3962
3963                 if (ret != 0)
3964                         continue;
3965
3966                 ret = mxlnd_abort_msgs();
3967                 ret += mxlnd_free_conn_zombies();
3968
3969                 spin_lock(g_conn_lock);
3970                 if (list_empty(conn_reqs)) {
3971                         if (ret == 0)
3972                                 CDEBUG(D_NETERROR, "connd woke up but did not "
3973                                        "find a kmx_connparams_t or zombie conn\n");
3974                         spin_unlock(g_conn_lock);
3975                         continue;
3976                 }
3977                 cp = list_entry(conn_reqs->next, kmx_connparams_t, mxr_list);
3978                 list_del_init(&cp->mxr_list);
3979                 spin_unlock(g_conn_lock);
3980
3981                 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
3982                 case MXLND_MSG_CONN_REQ:
3983                         /* We have a connection request. Handle it. */
3984                         mxlnd_passive_connect(cp);
3985                         break;
3986                 case MXLND_MSG_CONN_ACK:
3987                         /* The peer is ready for messages */
3988                         mxlnd_check_conn_ack(cp);
3989                         break;
3990                 }
3991         }
3992
3993         mxlnd_free_conn_zombies();
3994
3995         CDEBUG(D_NET, "connd stopping\n");
3996         mxlnd_thread_stop(id);
3997         return 0;
3998 }
3999
4000 /**
4001  * mxlnd_timeoutd - enforces timeouts on messages
4002  * @arg - thread id (as a void *)
4003  *
4004  * This thread queries each peer for its earliest timeout. If a peer has timed out,
4005  * it calls mxlnd_conn_disconnect().
4006  *
4007  * After checking for timeouts, try progressing sends (call check_sends()).
4008  */
4009 int
4010 mxlnd_timeoutd(void *arg)
4011 {
4012         int             i       = 0;
4013         long            id      = (long) arg;
4014         unsigned long   now     = 0;
4015         unsigned long   next    = 0;
4016         unsigned long   delay   = HZ;
4017         kmx_peer_t     *peer    = NULL;
4018         kmx_peer_t     *temp    = NULL;
4019         kmx_conn_t     *conn    = NULL;
4020         rwlock_t       *g_lock  = &kmxlnd_data.kmx_global_lock;
4021
4022         cfs_daemonize("mxlnd_timeoutd");
4023
4024         CDEBUG(D_NET, "timeoutd starting\n");
4025
4026         while (!(atomic_read(&kmxlnd_data.kmx_shutdown))) {
4027
4028                 now = jiffies;
4029                 /* if the next timeout has not arrived, go back to sleep */
4030                 if (time_after(now, next)) {
4031                         next = mxlnd_check_timeouts(now);
4032                 }
4033
4034                 /* try to progress peers' txs */
4035                write_lock(g_lock);
4036                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4037                         struct list_head *peers = &kmxlnd_data.kmx_peers[i];
4038
4039                         /* NOTE we are safe against the removal of peer, but
4040                          * not against the removal of temp */
4041                         list_for_each_entry_safe(peer, temp, peers, mxp_list) {
4042                                 if (atomic_read(&kmxlnd_data.kmx_shutdown))
4043                                         break;
4044                                 mxlnd_peer_addref(peer); /* add ref... */
4045                                 conn = peer->mxp_conn;
4046                                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4047                                         mxlnd_conn_addref(conn); /* take ref... */
4048                                 } else {
4049                                         CDEBUG(D_NET, "ignoring %s\n",
4050                                                libcfs_nid2str(peer->mxp_nid));
4051                                         mxlnd_peer_decref(peer); /* ...to here */
4052                                         continue;
4053                                 }
4054
4055                                 if ((conn->mxk_status == MXLND_CONN_READY ||
4056                                     conn->mxk_status == MXLND_CONN_FAIL) &&
4057                                     time_after(now, conn->mxk_last_tx + HZ)) {
4058                                         write_unlock(g_lock);
4059                                         mxlnd_check_sends(peer);
4060                                         write_lock(g_lock);
4061                                 }
4062                                 mxlnd_conn_decref(conn); /* until here */
4063                                 mxlnd_peer_decref(peer); /* ...to here */
4064                         }
4065                 }
4066                 write_unlock(g_lock);
4067
4068                 mxlnd_sleep(delay);
4069         }
4070         CDEBUG(D_NET, "timeoutd stopping\n");
4071         mxlnd_thread_stop(id);
4072         return 0;
4073 }