Whamcloud - gitweb
b=20918 t-f max recovery time estimation
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
47
48 inline int
49 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
50 {
51         /* if memcmp() == 0, it is NULL */
52         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
53 }
54
55 char *
56 mxlnd_ctxstate_to_str(int mxc_state)
57 {
58         switch (mxc_state) {
59         case MXLND_CTX_INIT:
60                 return "MXLND_CTX_INIT";
61         case MXLND_CTX_IDLE:
62                 return "MXLND_CTX_IDLE";
63         case MXLND_CTX_PREP:
64                 return "MXLND_CTX_PREP";
65         case MXLND_CTX_PENDING:
66                 return "MXLND_CTX_PENDING";
67         case MXLND_CTX_COMPLETED:
68                 return "MXLND_CTX_COMPLETED";
69         case MXLND_CTX_CANCELED:
70                 return "MXLND_CTX_CANCELED";
71         default:
72                 return "*unknown*";
73         }
74 }
75
76 char *
77 mxlnd_connstatus_to_str(int mxk_status)
78 {
79         switch (mxk_status) {
80         case MXLND_CONN_READY:
81                 return "MXLND_CONN_READY";
82         case MXLND_CONN_INIT:
83                 return "MXLND_CONN_INIT";
84         case MXLND_CONN_WAIT:
85                 return "MXLND_CONN_WAIT";
86         case MXLND_CONN_DISCONNECT:
87                 return "MXLND_CONN_DISCONNECT";
88         case MXLND_CONN_FAIL:
89                 return "MXLND_CONN_FAIL";
90         default:
91                 return "unknown";
92         }
93 }
94
95 char *
96 mxlnd_msgtype_to_str(int type) {
97         switch (type) {
98         case MXLND_MSG_EAGER:
99                 return "MXLND_MSG_EAGER";
100         case MXLND_MSG_CONN_REQ:
101                 return "MXLND_MSG_CONN_REQ";
102         case MXLND_MSG_CONN_ACK:
103                 return "MXLND_MSG_CONN_ACK";
104         case MXLND_MSG_BYE:
105                 return "MXLND_MSG_BYE";
106         case MXLND_MSG_NOOP:
107                 return "MXLND_MSG_NOOP";
108         case MXLND_MSG_PUT_REQ:
109                 return "MXLND_MSG_PUT_REQ";
110         case MXLND_MSG_PUT_ACK:
111                 return "MXLND_MSG_PUT_ACK";
112         case MXLND_MSG_PUT_DATA:
113                 return "MXLND_MSG_PUT_DATA";
114         case MXLND_MSG_GET_REQ:
115                 return "MXLND_MSG_GET_REQ";
116         case MXLND_MSG_GET_DATA:
117                 return "MXLND_MSG_GET_DATA";
118         default:
119                 return "unknown";
120         }
121 }
122
123 char *
124 mxlnd_lnetmsg_to_str(int type)
125 {
126         switch (type) {
127         case LNET_MSG_ACK:
128                 return "LNET_MSG_ACK";
129         case LNET_MSG_PUT:
130                 return "LNET_MSG_PUT";
131         case LNET_MSG_GET:
132                 return "LNET_MSG_GET";
133         case LNET_MSG_REPLY:
134                 return "LNET_MSG_REPLY";
135         case LNET_MSG_HELLO:
136                 return "LNET_MSG_HELLO";
137         default:
138                 LBUG();
139                 return "*unknown*";
140         }
141 }
142
143 static inline u64
144 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
145 {
146         u64 type        = (u64) ctx->mxc_msg_type;
147         u64 err         = (u64) error;
148         u64 match       = 0ULL;
149
150         mxlnd_valid_msg_type(ctx->mxc_msg_type);
151         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
152         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
153         return match;
154 }
155
156 static inline void
157 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
158 {
159         *msg_type = (u8) MXLND_MSG_TYPE(match);
160         *error    = (u8) MXLND_ERROR_VAL(match);
161         *cookie   = match & MXLND_MAX_COOKIE;
162         mxlnd_valid_msg_type(*msg_type);
163         return;
164 }
165
166 kmx_ctx_t *
167 mxlnd_get_idle_rx(kmx_conn_t *conn)
168 {
169         cfs_list_t              *rxs    = NULL;
170         kmx_ctx_t               *rx     = NULL;
171
172         LASSERT(conn != NULL);
173
174         rxs = &conn->mxk_rx_idle;
175
176         cfs_spin_lock(&conn->mxk_lock);
177
178         if (cfs_list_empty (rxs)) {
179                 cfs_spin_unlock(&conn->mxk_lock);
180                 return NULL;
181         }
182
183         rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
184         cfs_list_del_init(&rx->mxc_list);
185         cfs_spin_unlock(&conn->mxk_lock);
186
187 #if MXLND_DEBUG
188         if (rx->mxc_get != rx->mxc_put) {
189                 CDEBUG(D_NETERROR, "*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
190                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
191                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
192                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
193                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
194                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
195                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
196                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
197                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
198                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
199         }
200 #endif
201         LASSERT (rx->mxc_get == rx->mxc_put);
202
203         rx->mxc_get++;
204
205         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
206         rx->mxc_state = MXLND_CTX_PREP;
207         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
208
209         return rx;
210 }
211
212 int
213 mxlnd_put_idle_rx(kmx_ctx_t *rx)
214 {
215         kmx_conn_t              *conn   = rx->mxc_conn;
216         cfs_list_t              *rxs    = &conn->mxk_rx_idle;
217
218         LASSERT(rx->mxc_type == MXLND_REQ_RX);
219
220         mxlnd_ctx_init(rx);
221
222         rx->mxc_put++;
223         LASSERT(rx->mxc_get == rx->mxc_put);
224
225         cfs_spin_lock(&conn->mxk_lock);
226         cfs_list_add(&rx->mxc_list, rxs);
227         cfs_spin_unlock(&conn->mxk_lock);
228         return 0;
229 }
230
231 kmx_ctx_t *
232 mxlnd_get_idle_tx(void)
233 {
234         cfs_list_t              *tmp    = &kmxlnd_data.kmx_tx_idle;
235         kmx_ctx_t               *tx     = NULL;
236
237         cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
238
239         if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
240                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
241                 cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
242                 return NULL;
243         }
244
245         tmp = &kmxlnd_data.kmx_tx_idle;
246         tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
247         cfs_list_del_init(&tx->mxc_list);
248
249         /* Allocate a new completion cookie.  It might not be needed,
250          * but we've got a lock right now and we're unlikely to
251          * wrap... */
252         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
253         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
254                 kmxlnd_data.kmx_tx_next_cookie = 1;
255         }
256         kmxlnd_data.kmx_tx_used++;
257         cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
258
259         LASSERT (tx->mxc_get == tx->mxc_put);
260
261         tx->mxc_get++;
262
263         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
264         LASSERT (tx->mxc_lntmsg[0] == NULL);
265         LASSERT (tx->mxc_lntmsg[1] == NULL);
266
267         tx->mxc_state = MXLND_CTX_PREP;
268         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
269
270         return tx;
271 }
272
273 void
274 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
275
276 int
277 mxlnd_put_idle_tx(kmx_ctx_t *tx)
278 {
279         int             result  = 0;
280         lnet_msg_t      *lntmsg[2];
281
282         LASSERT(tx->mxc_type == MXLND_REQ_TX);
283
284         if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
285                 kmx_conn_t      *conn   = tx->mxc_conn;
286
287                 result = -EIO;
288                 if (tx->mxc_errno != 0) result = tx->mxc_errno;
289                 /* FIXME should we set mx_dis? */
290                 mxlnd_conn_disconnect(conn, 0, 1);
291         }
292
293         lntmsg[0] = tx->mxc_lntmsg[0];
294         lntmsg[1] = tx->mxc_lntmsg[1];
295
296         mxlnd_ctx_init(tx);
297
298         tx->mxc_put++;
299         LASSERT(tx->mxc_get == tx->mxc_put);
300
301         cfs_spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
302         cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
303         kmxlnd_data.kmx_tx_used--;
304         cfs_spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
305
306         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
307         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
308         return 0;
309 }
310
311
312 void
313 mxlnd_connparams_free(kmx_connparams_t *cp)
314 {
315         LASSERT(cfs_list_empty(&cp->mxr_list));
316         MXLND_FREE(cp, sizeof(*cp));
317         return;
318 }
319
320 int
321 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
322                             mx_endpoint_addr_t epa, u64 match, u32 length,
323                             kmx_conn_t *conn, kmx_peer_t *peer, void *data)
324 {
325         kmx_connparams_t *c = NULL;
326
327         MXLND_ALLOC(c, sizeof(*c));
328         if (!c) return -ENOMEM;
329
330         CFS_INIT_LIST_HEAD(&c->mxr_list);
331         c->mxr_context = context;
332         c->mxr_epa = epa;
333         c->mxr_match = match;
334         c->mxr_nob = length;
335         c->mxr_conn = conn;
336         c->mxr_peer = peer;
337         c->mxr_msg = *((kmx_msg_t *) data);
338
339         *cp = c;
340         return 0;
341 }
342
343 static inline void
344 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
345 {
346         conn->mxk_status = status;
347         cfs_mb();
348 }
349
350 /**
351  * mxlnd_conn_free_locked - free the conn
352  * @conn - a kmx_conn pointer
353  *
354  * The calling function should remove the conn from the conns list first
355  * then destroy it. Caller should have write-locked kmx_global_lock.
356  */
357 void
358 mxlnd_conn_free_locked(kmx_conn_t *conn)
359 {
360         int             valid   = !mxlnd_endpoint_addr_null(conn->mxk_epa);
361         kmx_peer_t      *peer   = conn->mxk_peer;
362
363         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364         LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
365                  cfs_list_empty (&conn->mxk_tx_free_queue) &&
366                  cfs_list_empty (&conn->mxk_pending));
367         if (!cfs_list_empty(&conn->mxk_list)) {
368                 cfs_list_del_init(&conn->mxk_list);
369                 if (peer->mxp_conn == conn) {
370                         peer->mxp_conn = NULL;
371                         if (valid) {
372                                 kmx_conn_t      *temp   = NULL;
373
374                                 mx_get_endpoint_addr_context(conn->mxk_epa,
375                                                              (void **) &temp);
376                                 if (conn == temp) {
377                                         mx_set_endpoint_addr_context(conn->mxk_epa,
378                                                                      (void *) NULL);
379                                 }
380                         }
381                         /* unlink from global list and drop its ref */
382                         cfs_list_del_init(&peer->mxp_list);
383                         mxlnd_peer_decref(peer);
384                 }
385         }
386         mxlnd_peer_decref(peer); /* drop conn's ref to peer */
387         if (conn->mxk_rx_pages) {
388                 LASSERT (conn->mxk_rxs != NULL);
389                 mxlnd_free_pages(conn->mxk_rx_pages);
390         }
391         if (conn->mxk_rxs) {
392                 int             i       = 0;
393                 kmx_ctx_t       *rx     = NULL;
394
395                 for (i = 0; i < MXLND_RX_MSGS(); i++) {
396                         rx = &conn->mxk_rxs[i];
397                         if (rx->mxc_seg_list != NULL) {
398                                 LASSERT(rx->mxc_nseg > 0);
399                                 MXLND_FREE(rx->mxc_seg_list,
400                                            rx->mxc_nseg *
401                                            sizeof(*rx->mxc_seg_list));
402                         }
403                 }
404                 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
405         }
406
407         MXLND_FREE(conn, sizeof (*conn));
408         return;
409 }
410
411
412 int
413 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
414 {
415         int             found   = 0;
416         int             count   = 0;
417         kmx_ctx_t       *ctx    = NULL;
418         kmx_ctx_t       *next   = NULL;
419         mx_return_t     mxret   = MX_SUCCESS;
420         u32             result  = 0;
421
422         do {
423                 found = 0;
424                 cfs_spin_lock(&conn->mxk_lock);
425                 cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
426                                              mxc_list) {
427                         cfs_list_del_init(&ctx->mxc_list);
428                         if (ctx->mxc_type == MXLND_REQ_RX) {
429                                 found = 1;
430                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
431                                                   &ctx->mxc_mxreq,
432                                                   &result);
433                                 if (mxret != MX_SUCCESS) {
434                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
435                                 }
436                                 if (result == 1) {
437                                         ctx->mxc_errno = -ECONNABORTED;
438                                         ctx->mxc_state = MXLND_CTX_CANCELED;
439                                         cfs_spin_unlock(&conn->mxk_lock);
440                                         cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
441                                         /* we may be holding the global lock,
442                                          * move to orphan list so that it can free it */
443                                         cfs_list_add_tail(&ctx->mxc_list,
444                                                           &kmxlnd_data.kmx_orphan_msgs);
445                                         count++;
446                                         cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
447                                         cfs_spin_lock(&conn->mxk_lock);
448                                 }
449                                 break;
450                         }
451                 }
452                 cfs_spin_unlock(&conn->mxk_lock);
453         }
454         while (found);
455
456         return count;
457 }
458
459 int
460 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
461 {
462         int                     count   = 0;
463         cfs_list_t             *tmp    = NULL;
464
465         cfs_spin_lock(&conn->mxk_lock);
466         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
467                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
468
469                 kmx_ctx_t       *tx     = NULL;
470
471                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
472                         tmp = &conn->mxk_tx_free_queue;
473                 } else {
474                         tmp = &conn->mxk_tx_credit_queue;
475                 }
476
477                 tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
478                 cfs_list_del_init(&tx->mxc_list);
479                 cfs_spin_unlock(&conn->mxk_lock);
480                 tx->mxc_errno = -ECONNABORTED;
481                 tx->mxc_state = MXLND_CTX_CANCELED;
482                 /* move to orphan list and then abort */
483                 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
484                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
485                 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
486                 count++;
487                 cfs_spin_lock(&conn->mxk_lock);
488         }
489         cfs_spin_unlock(&conn->mxk_lock);
490
491         return count;
492 }
493
494 void
495 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
496 {
497         u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
498                     (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
499
500         mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
501                   epa, match, NULL, NULL);
502         return;
503 }
504
505 /**
506  * mxlnd_conn_disconnect - shutdown a connection
507  * @conn - a kmx_conn pointer
508  * @mx_dis - call mx_disconnect()
509  * @send_bye - send peer a BYE msg
510  *
511  * This function sets the status to DISCONNECT, completes queued
512  * txs with failure, calls mx_disconnect, which will complete
513  * pending txs and matched rxs with failure.
514  */
515 void
516 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
517 {
518         mx_endpoint_addr_t      epa     = conn->mxk_epa;
519         int                     valid   = !mxlnd_endpoint_addr_null(epa);
520         int                     count   = 0;
521
522         cfs_spin_lock(&conn->mxk_lock);
523         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
524                 cfs_spin_unlock(&conn->mxk_lock);
525                 return;
526         }
527         mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
528         conn->mxk_timeout = 0;
529         cfs_spin_unlock(&conn->mxk_lock);
530
531         count = mxlnd_cancel_queued_txs(conn);
532         count += mxlnd_conn_cancel_pending_rxs(conn);
533
534         if (count)
535                 cfs_up(&kmxlnd_data.kmx_conn_sem); /* let connd call kmxlnd_abort_msgs() */
536
537         if (send_bye && valid &&
538             conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
539                 /* send a BYE to the peer */
540                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
541                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
542                 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
543                 /* wait to allow the peer to ack our message */
544                 mxlnd_sleep(msecs_to_jiffies(20));
545         }
546
547         if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
548                 unsigned long   last_msg        = 0;
549
550                 /* notify LNET that we are giving up on this peer */
551                 if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
552                         last_msg = conn->mxk_last_rx;
553                 else
554                         last_msg = conn->mxk_last_tx;
555
556                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
557
558                 if (mx_dis && valid &&
559                     (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
560                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
561         }
562         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
563
564         return;
565 }
566
567 /**
568  * mxlnd_conn_alloc - allocate and initialize a new conn struct
569  * @connp - address of a kmx_conn pointer
570  * @peer - owning kmx_peer
571  *
572  * Returns 0 on success and -ENOMEM on failure
573  */
574 int
575 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
576 {
577         int             i       = 0;
578         int             ret     = 0;
579         int             ipage   = 0;
580         int             offset  = 0;
581         void           *addr    = NULL;
582         kmx_conn_t     *conn    = NULL;
583         kmx_pages_t    *pages   = NULL;
584         struct page    *page    = NULL;
585         kmx_ctx_t      *rx      = NULL;
586
587         LASSERT(peer != NULL);
588
589         MXLND_ALLOC(conn, sizeof (*conn));
590         if (conn == NULL) {
591                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
592                 return -ENOMEM;
593         }
594         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
595
596         memset(conn, 0, sizeof(*conn));
597
598         ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
599         if (ret != 0) {
600                 CERROR("Can't allocate rx pages\n");
601                 MXLND_FREE(conn, sizeof(*conn));
602                 return -ENOMEM;
603         }
604         conn->mxk_rx_pages = pages;
605
606         MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
607         if (conn->mxk_rxs == NULL) {
608                 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
609                 mxlnd_free_pages(pages);
610                 MXLND_FREE(conn, sizeof(*conn));
611                 return -ENOMEM;
612         }
613
614         memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
615
616         conn->mxk_peer = peer;
617         CFS_INIT_LIST_HEAD(&conn->mxk_list);
618         CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
619         cfs_atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
620                                                    and one for the caller */
621         if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
622                 u64     nic_id  = 0ULL;
623                 u32     ep_id   = 0;
624
625                 /* this is localhost, set the epa and status as up */
626                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
627                 conn->mxk_epa = kmxlnd_data.kmx_epa;
628                 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
629                 peer->mxp_reconnect_time = 0;
630                 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
631                 peer->mxp_nic_id = nic_id;
632                 peer->mxp_ep_id = ep_id;
633                 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
634                 conn->mxk_timeout = 0;
635         } else {
636                 /* conn->mxk_incarnation = 0 - will be set by peer */
637                 /* conn->mxk_sid = 0 - will be set by peer */
638                 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
639                 /* mxk_epa - to be set after mx_iconnect() */
640         }
641         cfs_spin_lock_init(&conn->mxk_lock);
642         /* conn->mxk_timeout = 0 */
643         /* conn->mxk_last_tx = 0 */
644         /* conn->mxk_last_rx = 0 */
645         CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
646
647         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
648         /* mxk_outstanding = 0 */
649
650         CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
651         CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
652         /* conn->mxk_ntx_msgs = 0 */
653         /* conn->mxk_ntx_data = 0 */
654         /* conn->mxk_ntx_posted = 0 */
655         /* conn->mxk_data_posted = 0 */
656         CFS_INIT_LIST_HEAD(&conn->mxk_pending);
657
658         for (i = 0; i < MXLND_RX_MSGS(); i++) {
659
660                 rx = &conn->mxk_rxs[i];
661                 rx->mxc_type = MXLND_REQ_RX;
662                 CFS_INIT_LIST_HEAD(&rx->mxc_list);
663
664                 /* map mxc_msg to page */
665                 page = pages->mxg_pages[ipage];
666                 addr = page_address(page);
667                 LASSERT(addr != NULL);
668                 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
669                 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
670
671                 rx->mxc_conn = conn;
672                 rx->mxc_peer = peer;
673                 rx->mxc_nid = peer->mxp_nid;
674
675                 mxlnd_ctx_init(rx);
676
677                 offset += MXLND_MSG_SIZE;
678                 LASSERT (offset <= PAGE_SIZE);
679
680                 if (offset == PAGE_SIZE) {
681                         offset = 0;
682                         ipage++;
683                         LASSERT (ipage <= MXLND_TX_MSG_PAGES());
684                 }
685
686                 cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
687         }
688
689         *connp = conn;
690
691         mxlnd_peer_addref(peer);        /* add a ref for this conn */
692
693         /* add to front of peer's conns list */
694         cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
695         peer->mxp_conn = conn;
696         return 0;
697 }
698
699 int
700 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
701 {
702         int             ret     = 0;
703         cfs_rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
704
705         cfs_write_lock(g_lock);
706         ret = mxlnd_conn_alloc_locked(connp, peer);
707         cfs_write_unlock(g_lock);
708         return ret;
709 }
710
711 int
712 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
713 {
714         int             ret     = 0;
715         kmx_conn_t      *conn   = ctx->mxc_conn;
716
717         ctx->mxc_state = MXLND_CTX_PENDING;
718         if (conn != NULL) {
719                 cfs_spin_lock(&conn->mxk_lock);
720                 if (conn->mxk_status >= MXLND_CONN_INIT) {
721                         cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
722                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
723                                 conn->mxk_timeout = ctx->mxc_deadline;
724                         }
725                 } else {
726                         ctx->mxc_state = MXLND_CTX_COMPLETED;
727                         ret = -1;
728                 }
729                 cfs_spin_unlock(&conn->mxk_lock);
730         }
731         return ret;
732 }
733
734 int
735 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
736 {
737         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
738                 ctx->mxc_state == MXLND_CTX_COMPLETED);
739         if (ctx->mxc_state != MXLND_CTX_PENDING &&
740             ctx->mxc_state != MXLND_CTX_COMPLETED) {
741                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n",
742                        mxlnd_ctxstate_to_str(ctx->mxc_state));
743         }
744         ctx->mxc_state = MXLND_CTX_COMPLETED;
745         if (!cfs_list_empty(&ctx->mxc_list)) {
746                 kmx_conn_t      *conn = ctx->mxc_conn;
747                 kmx_ctx_t       *next = NULL;
748
749                 LASSERT(conn != NULL);
750                 cfs_spin_lock(&conn->mxk_lock);
751                 cfs_list_del_init(&ctx->mxc_list);
752                 conn->mxk_timeout = 0;
753                 if (!cfs_list_empty(&conn->mxk_pending)) {
754                         next = cfs_list_entry(conn->mxk_pending.next,
755                                               kmx_ctx_t, mxc_list);
756                         conn->mxk_timeout = next->mxc_deadline;
757                 }
758                 cfs_spin_unlock(&conn->mxk_lock);
759         }
760         return 0;
761 }
762
763 /**
764  * mxlnd_peer_free - free the peer
765  * @peer - a kmx_peer pointer
766  *
767  * The calling function should decrement the rxs, drain the tx queues and
768  * remove the peer from the peers list first then destroy it.
769  */
770 void
771 mxlnd_peer_free(kmx_peer_t *peer)
772 {
773         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
774
775         LASSERT (cfs_atomic_read(&peer->mxp_refcount) == 0);
776
777         if (!cfs_list_empty(&peer->mxp_list)) {
778                 /* assume we are locked */
779                 cfs_list_del_init(&peer->mxp_list);
780         }
781
782         MXLND_FREE(peer, sizeof (*peer));
783         cfs_atomic_dec(&kmxlnd_data.kmx_npeers);
784         return;
785 }
786
787 static int
788 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
789 {
790         int                     ret     = -EHOSTUNREACH;
791         unsigned char           *haddr  = NULL;
792         struct net_device       *dev    = NULL;
793         struct neighbour        *n      = NULL;
794         __be32                  dst_ip  = htonl(ip);
795
796         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
797         if (dev == NULL)
798                 return -ENODEV;
799
800         haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
801
802         n = neigh_lookup(&arp_tbl, &dst_ip, dev);
803         if (n) {
804                 n->used = jiffies;
805                 if (n->nud_state & NUD_VALID) {
806                         memcpy(haddr, n->ha, dev->addr_len);
807                         neigh_release(n);
808                         ret = 0;
809                 }
810         }
811
812         dev_put(dev);
813
814         return ret;
815 }
816
817
818 /* We only want the MAC address of the peer's Myricom NIC. We
819  * require that each node has the IPoMX interface (myriN) up.
820  * We will not pass any traffic over IPoMX, but it allows us
821  * to get the MAC address. */
822 static int
823 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
824 {
825         int                     ret     = 0;
826         int                     try     = 1;
827         int                     fatal   = 0;
828         u64                     tmp_id  = 0ULL;
829         cfs_socket_t            *sock   = NULL;
830
831         do {
832                 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
833                 ret = mxlnd_lookup_mac(ip, &tmp_id);
834                 if (ret == 0) {
835                         break;
836                 } else {
837                         /* not found, try to connect (force an arp) */
838                         ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
839                         if (ret == -ECONNREFUSED) {
840                                 /* peer is there, get the MAC address */
841                                 mxlnd_lookup_mac(ip, &tmp_id);
842                                 if (tmp_id != 0ULL)
843                                         ret = 0;
844                                 break;
845                         } else if (ret == -EHOSTUNREACH && try < tries) {
846                                 /* add a little backoff */
847                                 CDEBUG(D_NET, "sleeping for %d jiffies\n",
848                                        CFS_HZ/4);
849                                 mxlnd_sleep(CFS_HZ/4);
850                         }
851                 }
852         } while (try++ < tries);
853         CDEBUG(D_NET, "done trying. ret = %d\n", ret);
854
855         if (tmp_id == 0ULL)
856                 ret = -EHOSTUNREACH;
857 #ifdef __LITTLE_ENDIAN
858         *nic_id = ___arch__swab64(tmp_id);
859 #else
860         *nic_id = tmp_id;
861 #endif
862         return ret;
863 }
864
865 /**
866  * mxlnd_peer_alloc - allocate and initialize a new peer struct
867  * @peerp - address of a kmx_peer pointer
868  * @nid - LNET node id
869  *
870  * Returns 0 on success and -ENOMEM on failure
871  */
872 int
873 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
874 {
875         int             ret     = 0;
876         u32             ip      = LNET_NIDADDR(nid);
877         kmx_peer_t     *peer    = NULL;
878
879         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
880
881         MXLND_ALLOC(peer, sizeof (*peer));
882         if (peer == NULL) {
883                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n",
884                        nid);
885                 return -ENOMEM;
886         }
887         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
888
889         memset(peer, 0, sizeof(*peer));
890
891         CFS_INIT_LIST_HEAD(&peer->mxp_list);
892         peer->mxp_nid = nid;
893         /* peer->mxp_ni unused - may be used for multi-rail */
894         cfs_atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
895
896         peer->mxp_board = board;
897         peer->mxp_ep_id = ep_id;
898         peer->mxp_nic_id = nic_id;
899
900         CFS_INIT_LIST_HEAD(&peer->mxp_conns);
901         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
902         if (ret != 0) {
903                 mxlnd_peer_decref(peer);
904                 return ret;
905         }
906         CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
907
908         if (peer->mxp_nic_id != 0ULL)
909                 nic_id = peer->mxp_nic_id;
910
911         if (nic_id == 0ULL) {
912                 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
913                 if (ret == 0) {
914                         peer->mxp_nic_id = nic_id;
915                         mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
916                 }
917         }
918
919         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
920
921         /* peer->mxp_reconnect_time = 0 */
922         /* peer->mxp_incompatible = 0 */
923
924         *peerp = peer;
925         return 0;
926 }
927
928 static inline kmx_peer_t *
929 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
930 {
931         int             found   = 0;
932         int             hash    = 0;
933         kmx_peer_t      *peer   = NULL;
934
935         hash = mxlnd_nid_to_hash(nid);
936
937         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
938                 if (peer->mxp_nid == nid) {
939                         found = 1;
940                         mxlnd_peer_addref(peer);
941                         break;
942                 }
943         }
944         return (found ? peer : NULL);
945 }
946
947 static kmx_peer_t *
948 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
949 {
950         int             ret     = 0;
951         int             hash    = 0;
952         kmx_peer_t      *peer   = NULL;
953         kmx_peer_t      *old    = NULL;
954         cfs_rwlock_t    *g_lock = &kmxlnd_data.kmx_global_lock;
955
956         cfs_read_lock(g_lock);
957         peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
958
959         if ((peer && peer->mxp_conn) || /* found peer with conn or */
960             (!peer && !create)) {       /* did not find peer and do not create one */
961                 cfs_read_unlock(g_lock);
962                 return peer;
963         }
964
965         cfs_read_unlock(g_lock);
966
967         /* if peer but _not_ conn */
968         if (peer && !peer->mxp_conn) {
969                 if (create) {
970                         cfs_write_lock(g_lock);
971                         if (!peer->mxp_conn) { /* check again */
972                                 /* create the conn */
973                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
974                                 if (ret != 0) {
975                                         /* we tried, return the peer only.
976                                          * the caller needs to see if the conn exists */
977                                         CDEBUG(D_NETERROR, "%s: %s could not alloc conn\n",
978                                         __func__, libcfs_nid2str(peer->mxp_nid));
979                                 } else {
980                                         /* drop extra conn ref */
981                                         mxlnd_conn_decref(peer->mxp_conn);
982                                 }
983                         }
984                         cfs_write_unlock(g_lock);
985                 }
986                 return peer;
987         }
988
989         /* peer not found and we need to create one */
990         hash = mxlnd_nid_to_hash(nid);
991
992         /* create peer (and conn) */
993         /* adds conn ref for peer and one for this function */
994         ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
995                                *kmxlnd_tunables.kmx_ep_id, 0ULL);
996         if (ret != 0) /* no memory, peer is NULL */
997                 return NULL;
998
999         cfs_write_lock(g_lock);
1000
1001         /* look again */
1002         old = mxlnd_find_peer_by_nid_locked(nid);
1003         if (old) {
1004                 /* someone already created one */
1005                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1006                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1007                 mxlnd_peer_decref(peer);
1008                 peer = old;
1009         } else {
1010                 /* no other peer, use this one */
1011                 cfs_list_add_tail(&peer->mxp_list,
1012                                   &kmxlnd_data.kmx_peers[hash]);
1013                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
1014                 mxlnd_peer_addref(peer);
1015                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1016         }
1017
1018         cfs_write_unlock(g_lock);
1019
1020         return peer;
1021 }
1022
1023 static inline int
1024 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1025 {
1026         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1027                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1028                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1029                 tx->mxc_msg_type == MXLND_MSG_NOOP);
1030 }
1031
1032 /**
1033  * mxlnd_init_msg - set type and number of bytes
1034  * @msg - msg pointer
1035  * @type - of message
1036  * @body_nob - bytes in msg body
1037  */
1038 static inline void
1039 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1040 {
1041         msg->mxm_type = type;
1042         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
1043 }
1044
1045 static inline void
1046 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1047 {
1048         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
1049         kmx_msg_t       *msg    = NULL;
1050
1051         LASSERT (tx != NULL);
1052         LASSERT (nob <= MXLND_MSG_SIZE);
1053
1054         tx->mxc_nid = nid;
1055         /* tx->mxc_peer should have already been set if we know it */
1056         tx->mxc_msg_type = type;
1057         tx->mxc_nseg = 1;
1058         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1059         tx->mxc_seg.segment_length = nob;
1060         tx->mxc_pin_type = MX_PIN_PHYSICAL;
1061
1062         msg = tx->mxc_msg;
1063         msg->mxm_type = type;
1064         msg->mxm_nob  = nob;
1065
1066         return;
1067 }
1068
1069 static inline __u32
1070 mxlnd_cksum (void *ptr, int nob)
1071 {
1072         char  *c  = ptr;
1073         __u32  sum = 0;
1074
1075         while (nob-- > 0)
1076                 sum = ((sum << 1) | (sum >> 31)) + *c++;
1077
1078         /* ensure I don't return 0 (== no checksum) */
1079         return (sum == 0) ? 1 : sum;
1080 }
1081
1082 /**
1083  * mxlnd_pack_msg_locked - complete msg info
1084  * @tx - msg to send
1085  */
1086 static inline void
1087 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1088 {
1089         kmx_msg_t       *msg    = tx->mxc_msg;
1090
1091         /* type and nob should already be set in init_msg() */
1092         msg->mxm_magic    = MXLND_MSG_MAGIC;
1093         msg->mxm_version  = MXLND_MSG_VERSION;
1094         /*   mxm_type */
1095         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1096          * return credits as well */
1097         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1098             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1099                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
1100                 tx->mxc_conn->mxk_outstanding = 0;
1101         } else {
1102                 msg->mxm_credits  = 0;
1103         }
1104         /*   mxm_nob */
1105         msg->mxm_cksum    = 0;
1106         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
1107         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1108         msg->mxm_dstnid   = tx->mxc_nid;
1109         /* if it is a new peer, the dststamp will be 0 */
1110         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1111
1112         if (*kmxlnd_tunables.kmx_cksum) {
1113                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1114         }
1115 }
1116
1117 int
1118 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1119 {
1120         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
1121         __u32     msg_cksum     = 0;
1122         int       flip          = 0;
1123         int       msg_nob       = 0;
1124
1125         /* 6 bytes are enough to have received magic + version */
1126         if (nob < 6) {
1127                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
1128                 return -EPROTO;
1129         }
1130
1131         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1132                 flip = 0;
1133         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1134                 flip = 1;
1135         } else {
1136                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
1137                 return -EPROTO;
1138         }
1139
1140         if (msg->mxm_version !=
1141             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1142                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
1143                 return -EPROTO;
1144         }
1145
1146         if (nob < hdr_size) {
1147                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
1148                 return -EPROTO;
1149         }
1150
1151         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1152         if (msg_nob > nob) {
1153                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
1154                 return -EPROTO;
1155         }
1156
1157         /* checksum must be computed with mxm_cksum zero and BEFORE anything
1158          * gets flipped */
1159         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1160         msg->mxm_cksum = 0;
1161         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1162                 CDEBUG(D_NETERROR, "Bad checksum\n");
1163                 return -EPROTO;
1164         }
1165         msg->mxm_cksum = msg_cksum;
1166
1167         if (flip) {
1168                 /* leave magic unflipped as a clue to peer endianness */
1169                 __swab16s(&msg->mxm_version);
1170                 CLASSERT (sizeof(msg->mxm_type) == 1);
1171                 CLASSERT (sizeof(msg->mxm_credits) == 1);
1172                 msg->mxm_nob = msg_nob;
1173                 __swab64s(&msg->mxm_srcnid);
1174                 __swab64s(&msg->mxm_srcstamp);
1175                 __swab64s(&msg->mxm_dstnid);
1176                 __swab64s(&msg->mxm_dststamp);
1177         }
1178
1179         if (msg->mxm_srcnid == LNET_NID_ANY) {
1180                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1181                 return -EPROTO;
1182         }
1183
1184         switch (msg->mxm_type) {
1185         default:
1186                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
1187                 return -EPROTO;
1188
1189         case MXLND_MSG_NOOP:
1190                 break;
1191
1192         case MXLND_MSG_EAGER:
1193                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1194                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
1195                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1196                         return -EPROTO;
1197                 }
1198                 break;
1199
1200         case MXLND_MSG_PUT_REQ:
1201                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1202                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
1203                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1204                         return -EPROTO;
1205                 }
1206                 if (flip)
1207                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1208                 break;
1209
1210         case MXLND_MSG_PUT_ACK:
1211                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1212                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
1213                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1214                         return -EPROTO;
1215                 }
1216                 if (flip) {
1217                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1218                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1219                 }
1220                 break;
1221
1222         case MXLND_MSG_GET_REQ:
1223                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1224                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
1225                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1226                         return -EPROTO;
1227                 }
1228                 if (flip) {
1229                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1230                 }
1231                 break;
1232
1233         case MXLND_MSG_CONN_REQ:
1234         case MXLND_MSG_CONN_ACK:
1235                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1236                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
1237                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1238                         return -EPROTO;
1239                 }
1240                 if (flip) {
1241                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1242                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1243                 }
1244                 break;
1245         }
1246         return 0;
1247 }
1248
1249
1250 /**
1251  * mxlnd_recv_msg
1252  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1253  * @rx
1254  * @msg_type
1255  * @cookie
1256  * @length - length of incoming message
1257  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1258  *
1259  * The caller gets the rx and sets nid, peer and conn if known.
1260  *
1261  * Returns 0 on success and -1 on failure
1262  */
1263 int
1264 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1265 {
1266         int             ret     = 0;
1267         mx_return_t     mxret   = MX_SUCCESS;
1268         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1269
1270         rx->mxc_msg_type = msg_type;
1271         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1272         rx->mxc_cookie = cookie;
1273         /* rx->mxc_match may already be set */
1274         /* rx->mxc_seg.segment_ptr is already set */
1275         rx->mxc_seg.segment_length = length;
1276         ret = mxlnd_q_pending_ctx(rx);
1277         if (ret == -1) {
1278                 /* the caller is responsible for calling conn_decref() if needed */
1279                 return -1;
1280         }
1281         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1282                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1283         if (mxret != MX_SUCCESS) {
1284                 mxlnd_deq_pending_ctx(rx);
1285                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n",
1286                                    mx_strerror(mxret), (int) mxret);
1287                 return -1;
1288         }
1289         return 0;
1290 }
1291
1292
1293 /**
1294  * mxlnd_unexpected_recv - this is the callback function that will handle
1295  *                         unexpected receives
1296  * @context - NULL, ignore
1297  * @source - the peer's mx_endpoint_addr_t
1298  * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1299  * @length - length of incoming message
1300  * @data_if_available - used for CONN_[REQ|ACK]
1301  *
1302  * If it is an eager-sized msg, we will call recv_msg() with the actual
1303  * length. If it is a large message, we will call recv_msg() with a
1304  * length of 0 bytes to drop it because we should never have a large,
1305  * unexpected message.
1306  *
1307  * NOTE - The MX library blocks until this function completes. Make it as fast as
1308  * possible. DO NOT allocate memory which can block!
1309  *
1310  * If we cannot get a rx or the conn is closed, drop the message on the floor
1311  * (i.e. recv 0 bytes and ignore).
1312  */
1313 mx_unexp_handler_action_t
1314 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1315                  uint64_t match_value, uint32_t length, void *data_if_available)
1316 {
1317         int             ret             = 0;
1318         kmx_ctx_t       *rx             = NULL;
1319         mx_ksegment_t   seg;
1320         u8              msg_type        = 0;
1321         u8              error           = 0;
1322         u64             cookie          = 0ULL;
1323         kmx_conn_t      *conn           = NULL;
1324         kmx_peer_t      *peer           = NULL;
1325         u64             nic_id          = 0ULL;
1326         u32             ep_id           = 0;
1327         u32             sid             = 0;
1328
1329         /* TODO this will change to the net struct */
1330         if (context != NULL) {
1331                 CDEBUG(D_NETERROR, "non-NULL context\n");
1332         }
1333
1334 #if MXLND_DEBUG
1335         CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1336 #endif
1337
1338         mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1339         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1340         cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1341         mx_get_endpoint_addr_context(source, (void **) &conn);
1342         if (conn) {
1343                 mxlnd_conn_addref(conn); /* add ref for this function */
1344                 peer = conn->mxk_peer;
1345         }
1346         cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1347
1348         if (msg_type == MXLND_MSG_BYE) {
1349                 if (conn) {
1350                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1351                                         libcfs_nid2str(peer->mxp_nid));
1352                         mxlnd_conn_disconnect(conn, 1, 0);
1353                         mxlnd_conn_decref(conn); /* drop ref taken above */
1354                 }
1355                 return MX_RECV_FINISHED;
1356         }
1357
1358         if (msg_type == MXLND_MSG_CONN_REQ) {
1359                 kmx_connparams_t       *cp      = NULL;
1360                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1361                                                   sizeof(kmx_connreq_msg_t);
1362
1363                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1364                 if (unlikely(length != expected || !data_if_available)) {
1365                         CDEBUG(D_NETERROR, "received invalid CONN_REQ from %llx "
1366                                "length=%d (expected %d)\n", nic_id, length, expected);
1367                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1368                         return MX_RECV_FINISHED;
1369                 }
1370
1371                 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1372                                          conn, peer, data_if_available);
1373                 if (unlikely(ret != 0)) {
1374                         CDEBUG(D_NETERROR, "unable to alloc CONN_REQ from %llx:%d\n",
1375                                         nic_id, ep_id);
1376                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1377                         return MX_RECV_FINISHED;
1378                 }
1379                 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1380                 cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1381                 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1382                 cfs_up(&kmxlnd_data.kmx_conn_sem);
1383                 return MX_RECV_FINISHED;
1384         }
1385         if (msg_type == MXLND_MSG_CONN_ACK) {
1386                 kmx_connparams_t  *cp           = NULL;
1387                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1388                                                   sizeof(kmx_connreq_msg_t);
1389
1390                 LASSERT(conn);
1391                 if (unlikely(error != 0)) {
1392                         CDEBUG(D_NETERROR, "received CONN_ACK from %s "
1393                                "with error -%d\n",
1394                                libcfs_nid2str(peer->mxp_nid), (int) error);
1395                         mxlnd_conn_disconnect(conn, 1, 0);
1396                 } else if (unlikely(length != expected || !data_if_available)) {
1397                         CDEBUG(D_NETERROR, "received %s CONN_ACK from %s "
1398                                "length=%d (expected %d)\n",
1399                                data_if_available ? "short" : "missing",
1400                                libcfs_nid2str(peer->mxp_nid), length, expected);
1401                         mxlnd_conn_disconnect(conn, 1, 1);
1402                 } else {
1403                         /* peer is ready for messages */
1404                         ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1405                                          conn, peer, data_if_available);
1406                         if (unlikely(ret != 0)) {
1407                                 CDEBUG(D_NETERROR, "unable to alloc kmx_connparams_t"
1408                                                " from %llx:%d\n", nic_id, ep_id);
1409                                 mxlnd_conn_disconnect(conn, 1, 1);
1410                         } else {
1411                                 cfs_spin_lock(&kmxlnd_data.kmx_conn_lock);
1412                                 cfs_list_add_tail(&cp->mxr_list,
1413                                                   &kmxlnd_data.kmx_conn_reqs);
1414                                 cfs_spin_unlock(&kmxlnd_data.kmx_conn_lock);
1415                                 cfs_up(&kmxlnd_data.kmx_conn_sem);
1416                         }
1417                 }
1418                 mxlnd_conn_decref(conn); /* drop ref taken above */
1419
1420                 return MX_RECV_FINISHED;
1421         }
1422
1423         /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1424
1425         LASSERT(peer != NULL && conn != NULL);
1426
1427         rx = mxlnd_get_idle_rx(conn);
1428         if (rx != NULL) {
1429                 if (length <= MXLND_MSG_SIZE) {
1430                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1431                 } else {
1432                         CDEBUG(D_NETERROR, "unexpected large receive with "
1433                                            "match_value=0x%llx length=%d\n",
1434                                            match_value, length);
1435                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1436                 }
1437
1438                 if (ret == 0) {
1439                         /* hold conn ref until rx completes */
1440                         rx->mxc_conn = conn;
1441                         rx->mxc_peer = peer;
1442                         rx->mxc_nid = peer->mxp_nid;
1443                 } else {
1444                         CDEBUG(D_NETERROR, "could not post receive\n");
1445                         mxlnd_put_idle_rx(rx);
1446                 }
1447         }
1448
1449         /* Encountered error, drop incoming message on the floor */
1450         /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1451          * uses the standard code path and acks the sender normally */
1452
1453         if (rx == NULL || ret != 0) {
1454                 mxlnd_conn_decref(conn); /* drop ref taken above */
1455                 if (rx == NULL) {
1456                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx"
1457                                " 0x%llx from %s\n", match_value,
1458                                libcfs_nid2str(peer->mxp_nid));
1459                 } else {
1460                         /* ret != 0 */
1461                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1462                 }
1463                 seg.segment_ptr = 0ULL;
1464                 seg.segment_length = 0;
1465                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1466                           match_value, ~0ULL, NULL, NULL);
1467         }
1468
1469         return MX_RECV_CONTINUE;
1470 }
1471
1472
1473 int
1474 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1475 {
1476         int              i      = 0;
1477         int              ret    = -ENOENT;
1478         kmx_peer_t      *peer   = NULL;
1479
1480         cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1481         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1482                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1483                                         mxp_list) {
1484                         if (index-- == 0) {
1485                                 *nidp = peer->mxp_nid;
1486                                 *count = cfs_atomic_read(&peer->mxp_refcount);
1487                                 ret = 0;
1488                                 break;
1489                         }
1490                 }
1491         }
1492         cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1493
1494         return ret;
1495 }
1496
1497 void
1498 mxlnd_del_peer_locked(kmx_peer_t *peer)
1499 {
1500         if (peer->mxp_conn) {
1501                 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1502         } else {
1503                 cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
1504                 mxlnd_peer_decref(peer); /* drop global list ref */
1505         }
1506         return;
1507 }
1508
1509 int
1510 mxlnd_del_peer(lnet_nid_t nid)
1511 {
1512         int             i       = 0;
1513         int             ret     = 0;
1514         kmx_peer_t      *peer   = NULL;
1515         kmx_peer_t      *next   = NULL;
1516
1517         if (nid != LNET_NID_ANY) {
1518                 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1519         }
1520         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1521         if (nid != LNET_NID_ANY) {
1522                 if (peer == NULL) {
1523                         ret = -ENOENT;
1524                 } else {
1525                         mxlnd_peer_decref(peer); /* and drops it */
1526                         mxlnd_del_peer_locked(peer);
1527                 }
1528         } else { /* LNET_NID_ANY */
1529                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1530                         cfs_list_for_each_entry_safe(peer, next,
1531                                                      &kmxlnd_data.kmx_peers[i],
1532                                                      mxp_list) {
1533                                 mxlnd_del_peer_locked(peer);
1534                         }
1535                 }
1536         }
1537         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1538
1539         return ret;
1540 }
1541
1542 kmx_conn_t *
1543 mxlnd_get_conn_by_idx(int index)
1544 {
1545         int              i      = 0;
1546         kmx_peer_t      *peer   = NULL;
1547         kmx_conn_t      *conn   = NULL;
1548
1549         cfs_read_lock(&kmxlnd_data.kmx_global_lock);
1550         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1551                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1552                                         mxp_list) {
1553                         cfs_list_for_each_entry(conn, &peer->mxp_conns,
1554                                                 mxk_list) {
1555                                 if (index-- > 0) {
1556                                         continue;
1557                                 }
1558
1559                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1560                                 cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1561                                 return conn;
1562                         }
1563                 }
1564         }
1565         cfs_read_unlock(&kmxlnd_data.kmx_global_lock);
1566
1567         return NULL;
1568 }
1569
1570 void
1571 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1572 {
1573         kmx_conn_t      *conn   = NULL;
1574         kmx_conn_t      *next   = NULL;
1575
1576         cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1577                 mxlnd_conn_disconnect(conn, 0, 1);
1578
1579         return;
1580 }
1581
1582 int
1583 mxlnd_close_matching_conns(lnet_nid_t nid)
1584 {
1585         int             i       = 0;
1586         int             ret     = 0;
1587         kmx_peer_t      *peer   = NULL;
1588
1589         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
1590         if (nid != LNET_NID_ANY) {
1591                 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1592                 if (peer == NULL) {
1593                         ret = -ENOENT;
1594                 } else {
1595                         mxlnd_close_matching_conns_locked(peer);
1596                         mxlnd_peer_decref(peer); /* and drops it here */
1597                 }
1598         } else { /* LNET_NID_ANY */
1599                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1600                         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],                                                mxp_list)
1601                                 mxlnd_close_matching_conns_locked(peer);
1602                 }
1603         }
1604         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
1605
1606         return ret;
1607 }
1608
1609 /**
1610  * mxlnd_ctl - modify MXLND parameters
1611  * @ni - LNET interface handle
1612  * @cmd - command to change
1613  * @arg - the ioctl data
1614  */
1615 int
1616 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1617 {
1618         struct libcfs_ioctl_data *data  = arg;
1619         int                       ret   = -EINVAL;
1620
1621         LASSERT (ni == kmxlnd_data.kmx_ni);
1622
1623         switch (cmd) {
1624         case IOC_LIBCFS_GET_PEER: {
1625                 lnet_nid_t      nid     = 0;
1626                 int             count   = 0;
1627
1628                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1629                 data->ioc_nid    = nid;
1630                 data->ioc_count  = count;
1631                 break;
1632         }
1633         case IOC_LIBCFS_DEL_PEER: {
1634                 ret = mxlnd_del_peer(data->ioc_nid);
1635                 break;
1636         }
1637         case IOC_LIBCFS_GET_CONN: {
1638                 kmx_conn_t      *conn = NULL;
1639
1640                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1641                 if (conn == NULL) {
1642                         ret = -ENOENT;
1643                 } else {
1644                         ret = 0;
1645                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1646                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1647                 }
1648                 break;
1649         }
1650         case IOC_LIBCFS_CLOSE_CONNECTION: {
1651                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1652                 break;
1653         }
1654         default:
1655                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1656                 break;
1657         }
1658
1659         return ret;
1660 }
1661
1662 /**
1663  * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1664  * @tx
1665  *
1666  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1667  */
1668 void
1669 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1670 {
1671         u8              msg_type        = tx->mxc_msg_type;
1672         kmx_conn_t      *conn           = tx->mxc_conn;
1673
1674         LASSERT (msg_type != 0);
1675         LASSERT (tx->mxc_nid != 0);
1676         LASSERT (tx->mxc_peer != NULL);
1677         LASSERT (tx->mxc_conn != NULL);
1678
1679         tx->mxc_incarnation = conn->mxk_incarnation;
1680
1681         if (msg_type != MXLND_MSG_PUT_DATA &&
1682             msg_type != MXLND_MSG_GET_DATA) {
1683                 /* msg style tx */
1684                 if (mxlnd_tx_requires_credit(tx)) {
1685                         cfs_list_add_tail(&tx->mxc_list,
1686                                           &conn->mxk_tx_credit_queue);
1687                         conn->mxk_ntx_msgs++;
1688                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1689                            msg_type == MXLND_MSG_CONN_ACK) {
1690                         /* put conn msgs at the front of the queue */
1691                         cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1692                 } else {
1693                         /* PUT_ACK, PUT_NAK */
1694                         cfs_list_add_tail(&tx->mxc_list,
1695                                           &conn->mxk_tx_free_queue);
1696                         conn->mxk_ntx_msgs++;
1697                 }
1698         } else {
1699                 /* data style tx */
1700                 cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1701                 conn->mxk_ntx_data++;
1702         }
1703
1704         return;
1705 }
1706
1707 /**
1708  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1709  * @tx
1710  *
1711  * Add the tx to the peer's msg or data queue
1712  */
1713 static inline void
1714 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1715 {
1716         LASSERT(tx->mxc_peer != NULL);
1717         LASSERT(tx->mxc_conn != NULL);
1718         cfs_spin_lock(&tx->mxc_conn->mxk_lock);
1719         mxlnd_peer_queue_tx_locked(tx);
1720         cfs_spin_unlock(&tx->mxc_conn->mxk_lock);
1721
1722         return;
1723 }
1724
1725 /**
1726  * mxlnd_queue_tx - add the tx to the global tx queue
1727  * @tx
1728  *
1729  * Add the tx to the global queue and up the tx_queue_sem
1730  */
1731 void
1732 mxlnd_queue_tx(kmx_ctx_t *tx)
1733 {
1734         kmx_peer_t *peer   = tx->mxc_peer;
1735         LASSERT (tx->mxc_nid != 0);
1736
1737         if (peer != NULL) {
1738                 if (peer->mxp_incompatible &&
1739                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1740                         /* let this fail now */
1741                         tx->mxc_errno = -ECONNABORTED;
1742                         mxlnd_conn_decref(peer->mxp_conn);
1743                         mxlnd_put_idle_tx(tx);
1744                         return;
1745                 }
1746                 if (tx->mxc_conn == NULL) {
1747                         int             ret     = 0;
1748                         kmx_conn_t      *conn   = NULL;
1749
1750                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1751                         if (ret != 0) {
1752                                 tx->mxc_errno = ret;
1753                                 mxlnd_put_idle_tx(tx);
1754                                 goto done;
1755                         }
1756                         tx->mxc_conn = conn;
1757                         mxlnd_peer_decref(peer); /* and takes it from peer */
1758                 }
1759                 LASSERT(tx->mxc_conn != NULL);
1760                 mxlnd_peer_queue_tx(tx);
1761                 mxlnd_check_sends(peer);
1762         } else {
1763                 cfs_spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1764                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1765                 cfs_spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1766                 cfs_up(&kmxlnd_data.kmx_tx_queue_sem);
1767         }
1768 done:
1769         return;
1770 }
1771
1772 int
1773 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1774 {
1775         int             i                       = 0;
1776         int             sum                     = 0;
1777         int             old_sum                 = 0;
1778         int             nseg                    = 0;
1779         int             first_iov               = -1;
1780         int             first_iov_offset        = 0;
1781         int             first_found             = 0;
1782         int             last_iov                = -1;
1783         int             last_iov_length         = 0;
1784         mx_ksegment_t  *seg                     = NULL;
1785
1786         if (niov == 0) return 0;
1787         LASSERT(iov != NULL);
1788
1789         for (i = 0; i < niov; i++) {
1790                 sum = old_sum + (u32) iov[i].iov_len;
1791                 if (!first_found && (sum > offset)) {
1792                         first_iov = i;
1793                         first_iov_offset = offset - old_sum;
1794                         first_found = 1;
1795                         sum = (u32) iov[i].iov_len - first_iov_offset;
1796                         old_sum = 0;
1797                 }
1798                 if (sum >= nob) {
1799                         last_iov = i;
1800                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1801                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1802                         break;
1803                 }
1804                 old_sum = sum;
1805         }
1806         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1807         nseg = last_iov - first_iov + 1;
1808         LASSERT(nseg > 0);
1809
1810         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1811         if (seg == NULL) {
1812                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1813                 return -1;
1814         }
1815         memset(seg, 0, nseg * sizeof(*seg));
1816         ctx->mxc_nseg = nseg;
1817         sum = 0;
1818         for (i = 0; i < nseg; i++) {
1819                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1820                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1821                 if (i == 0) {
1822                         seg[i].segment_ptr += (u64) first_iov_offset;
1823                         seg[i].segment_length -= (u32) first_iov_offset;
1824                 }
1825                 if (i == (nseg - 1)) {
1826                         seg[i].segment_length = (u32) last_iov_length;
1827                 }
1828                 sum += seg[i].segment_length;
1829         }
1830         ctx->mxc_seg_list = seg;
1831         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1832 #ifdef MX_PIN_FULLPAGES
1833         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1834 #endif
1835         LASSERT(nob == sum);
1836         return 0;
1837 }
1838
1839 int
1840 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1841 {
1842         int             i                       = 0;
1843         int             sum                     = 0;
1844         int             old_sum                 = 0;
1845         int             nseg                    = 0;
1846         int             first_kiov              = -1;
1847         int             first_kiov_offset       = 0;
1848         int             first_found             = 0;
1849         int             last_kiov               = -1;
1850         int             last_kiov_length        = 0;
1851         mx_ksegment_t  *seg                     = NULL;
1852
1853         if (niov == 0) return 0;
1854         LASSERT(kiov != NULL);
1855
1856         for (i = 0; i < niov; i++) {
1857                 sum = old_sum + kiov[i].kiov_len;
1858                 if (i == 0) sum -= kiov[i].kiov_offset;
1859                 if (!first_found && (sum > offset)) {
1860                         first_kiov = i;
1861                         first_kiov_offset = offset - old_sum;
1862                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1863                         first_found = 1;
1864                         sum = kiov[i].kiov_len - first_kiov_offset;
1865                         old_sum = 0;
1866                 }
1867                 if (sum >= nob) {
1868                         last_kiov = i;
1869                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1870                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1871                         break;
1872                 }
1873                 old_sum = sum;
1874         }
1875         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1876         nseg = last_kiov - first_kiov + 1;
1877         LASSERT(nseg > 0);
1878
1879         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1880         if (seg == NULL) {
1881                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1882                 return -1;
1883         }
1884         memset(seg, 0, niov * sizeof(*seg));
1885         ctx->mxc_nseg = niov;
1886         sum = 0;
1887         for (i = 0; i < niov; i++) {
1888                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1889                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1890                 if (i == 0) {
1891                         seg[i].segment_ptr += (u64) first_kiov_offset;
1892                         /* we have to add back the original kiov_offset */
1893                         seg[i].segment_length -= first_kiov_offset +
1894                                                  kiov[first_kiov].kiov_offset;
1895                 }
1896                 if (i == (nseg - 1)) {
1897                         seg[i].segment_length = last_kiov_length;
1898                 }
1899                 sum += seg[i].segment_length;
1900         }
1901         ctx->mxc_seg_list = seg;
1902         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1903 #ifdef MX_PIN_FULLPAGES
1904         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1905 #endif
1906         LASSERT(nob == sum);
1907         return 0;
1908 }
1909
1910 void
1911 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1912 {
1913         LASSERT(type == MXLND_MSG_PUT_ACK);
1914         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1915         tx->mxc_cookie = cookie;
1916         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1917         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1918         tx->mxc_match = mxlnd_create_match(tx, status);
1919
1920         mxlnd_queue_tx(tx);
1921 }
1922
1923
1924 /**
1925  * mxlnd_send_data - get tx, map [k]iov, queue tx
1926  * @ni
1927  * @lntmsg
1928  * @peer
1929  * @msg_type
1930  * @cookie
1931  *
1932  * This setups the DATA send for PUT or GET.
1933  *
1934  * On success, it queues the tx, on failure it calls lnet_finalize()
1935  */
1936 void
1937 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1938 {
1939         int                     ret     = 0;
1940         lnet_process_id_t       target  = lntmsg->msg_target;
1941         unsigned int            niov    = lntmsg->msg_niov;
1942         struct iovec           *iov     = lntmsg->msg_iov;
1943         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1944         unsigned int            offset  = lntmsg->msg_offset;
1945         unsigned int            nob     = lntmsg->msg_len;
1946         kmx_ctx_t              *tx      = NULL;
1947
1948         LASSERT(lntmsg != NULL);
1949         LASSERT(peer != NULL);
1950         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1951         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1952
1953         tx = mxlnd_get_idle_tx();
1954         if (tx == NULL) {
1955                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1956                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1957                         libcfs_nid2str(target.nid));
1958                 goto failed_0;
1959         }
1960         tx->mxc_nid = target.nid;
1961         /* NOTE called when we have a ref on the conn, get one for this tx */
1962         mxlnd_conn_addref(peer->mxp_conn);
1963         tx->mxc_peer = peer;
1964         tx->mxc_conn = peer->mxp_conn;
1965         tx->mxc_msg_type = msg_type;
1966         tx->mxc_lntmsg[0] = lntmsg;
1967         tx->mxc_cookie = cookie;
1968         tx->mxc_match = mxlnd_create_match(tx, 0);
1969
1970         /* This setups up the mx_ksegment_t to send the DATA payload  */
1971         if (nob == 0) {
1972                 /* do not setup the segments */
1973                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1974                                    "to %s?\n", libcfs_nid2str(target.nid));
1975                 ret = 0;
1976         } else if (kiov == NULL) {
1977                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1978         } else {
1979                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1980         }
1981         if (ret != 0) {
1982                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n",
1983                                    libcfs_nid2str(target.nid));
1984                 tx->mxc_errno = -EIO;
1985                 goto failed_1;
1986         }
1987         mxlnd_queue_tx(tx);
1988         return;
1989
1990 failed_1:
1991         mxlnd_conn_decref(peer->mxp_conn);
1992         mxlnd_put_idle_tx(tx);
1993         return;
1994
1995 failed_0:
1996         CDEBUG(D_NETERROR, "no tx avail\n");
1997         lnet_finalize(ni, lntmsg, -EIO);
1998         return;
1999 }
2000
2001 /**
2002  * mxlnd_recv_data - map [k]iov, post rx
2003  * @ni
2004  * @lntmsg
2005  * @rx
2006  * @msg_type
2007  * @cookie
2008  *
2009  * This setups the DATA receive for PUT or GET.
2010  *
2011  * On success, it returns 0, on failure it returns -1
2012  */
2013 int
2014 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2015 {
2016         int                     ret     = 0;
2017         lnet_process_id_t       target  = lntmsg->msg_target;
2018         unsigned int            niov    = lntmsg->msg_niov;
2019         struct iovec           *iov     = lntmsg->msg_iov;
2020         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
2021         unsigned int            offset  = lntmsg->msg_offset;
2022         unsigned int            nob     = lntmsg->msg_len;
2023         mx_return_t             mxret   = MX_SUCCESS;
2024         u64                     mask    = ~(MXLND_ERROR_MASK);
2025
2026         /* above assumes MXLND_MSG_PUT_DATA */
2027         if (msg_type == MXLND_MSG_GET_DATA) {
2028                 niov = lntmsg->msg_md->md_niov;
2029                 iov = lntmsg->msg_md->md_iov.iov;
2030                 kiov = lntmsg->msg_md->md_iov.kiov;
2031                 offset = 0;
2032                 nob = lntmsg->msg_md->md_length;
2033         }
2034
2035         LASSERT(lntmsg != NULL);
2036         LASSERT(rx != NULL);
2037         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2038         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2039
2040         rx->mxc_msg_type = msg_type;
2041         rx->mxc_state = MXLND_CTX_PENDING;
2042         rx->mxc_nid = target.nid;
2043         /* if posting a GET_DATA, we may not yet know the peer */
2044         if (rx->mxc_peer != NULL) {
2045                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2046         }
2047         rx->mxc_lntmsg[0] = lntmsg;
2048         rx->mxc_cookie = cookie;
2049         rx->mxc_match = mxlnd_create_match(rx, 0);
2050         /* This setups up the mx_ksegment_t to receive the DATA payload  */
2051         if (kiov == NULL) {
2052                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2053         } else {
2054                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2055         }
2056         if (msg_type == MXLND_MSG_GET_DATA) {
2057                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2058                 if (rx->mxc_lntmsg[1] == NULL) {
2059                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
2060                                            libcfs_nid2str(target.nid));
2061                         ret = -1;
2062                 }
2063         }
2064         if (ret != 0) {
2065                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
2066                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2067                        libcfs_nid2str(target.nid));
2068                 return -1;
2069         }
2070         ret = mxlnd_q_pending_ctx(rx);
2071         if (ret == -1) {
2072                 return -1;
2073         }
2074         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2075         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2076                           rx->mxc_seg_list, rx->mxc_nseg,
2077                           rx->mxc_pin_type, rx->mxc_match,
2078                           mask, (void *) rx,
2079                           &rx->mxc_mxreq);
2080         if (mxret != MX_SUCCESS) {
2081                 if (rx->mxc_conn != NULL) {
2082                         mxlnd_deq_pending_ctx(rx);
2083                 }
2084                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
2085                                    (int) mxret, libcfs_nid2str(target.nid));
2086                 return -1;
2087         }
2088
2089         return 0;
2090 }
2091
2092 /**
2093  * mxlnd_send - the LND required send function
2094  * @ni
2095  * @private
2096  * @lntmsg
2097  *
2098  * This must not block. Since we may not have a peer struct for the receiver,
2099  * it will append send messages on a global tx list. We will then up the
2100  * tx_queued's semaphore to notify it of the new send.
2101  */
2102 int
2103 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2104 {
2105         int                     ret             = 0;
2106         int                     type            = lntmsg->msg_type;
2107         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
2108         lnet_process_id_t       target          = lntmsg->msg_target;
2109         lnet_nid_t              nid             = target.nid;
2110         int                     target_is_router = lntmsg->msg_target_is_router;
2111         int                     routing         = lntmsg->msg_routing;
2112         unsigned int            payload_niov    = lntmsg->msg_niov;
2113         struct iovec           *payload_iov     = lntmsg->msg_iov;
2114         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
2115         unsigned int            payload_offset  = lntmsg->msg_offset;
2116         unsigned int            payload_nob     = lntmsg->msg_len;
2117         kmx_ctx_t              *tx              = NULL;
2118         kmx_msg_t              *txmsg           = NULL;
2119         kmx_ctx_t              *rx              = (kmx_ctx_t *) private; /* for REPLY */
2120         kmx_ctx_t              *rx_data         = NULL;
2121         kmx_conn_t             *conn            = NULL;
2122         int                     nob             = 0;
2123         uint32_t                length          = 0;
2124         kmx_peer_t             *peer            = NULL;
2125         cfs_rwlock_t           *g_lock          = &kmxlnd_data.kmx_global_lock;
2126
2127         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2128                        payload_nob, payload_niov, libcfs_id2str(target));
2129
2130         LASSERT (payload_nob == 0 || payload_niov > 0);
2131         LASSERT (payload_niov <= LNET_MAX_IOV);
2132         /* payload is either all vaddrs or all pages */
2133         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2134
2135         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2136
2137         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2138          * to a new peer, so create one if not found */
2139         peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2140         if (peer == NULL || peer->mxp_conn == NULL) {
2141                 /* we could not find it nor could we create one or
2142                  * one exists but we cannot create a conn,
2143                  * fail this message */
2144                 if (peer) {
2145                         /* found peer without conn, drop ref taken above */
2146                         LASSERT(peer->mxp_conn == NULL);
2147                         mxlnd_peer_decref(peer);
2148                 }
2149                 return -ENOMEM;
2150         }
2151
2152         /* we have a peer with a conn */
2153
2154         if (unlikely(peer->mxp_incompatible)) {
2155                 mxlnd_peer_decref(peer); /* drop ref taken above */
2156         } else {
2157                 cfs_read_lock(g_lock);
2158                 conn = peer->mxp_conn;
2159                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
2160                         mxlnd_conn_addref(conn);
2161                 } else {
2162                         conn = NULL;
2163                 }
2164                 cfs_read_unlock(g_lock);
2165                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2166                 if (!conn)
2167                         return -ENOTCONN;
2168         }
2169
2170         LASSERT(peer && conn);
2171
2172         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2173
2174         switch (type) {
2175         case LNET_MSG_ACK:
2176                 LASSERT (payload_nob == 0);
2177                 break;
2178
2179         case LNET_MSG_REPLY:
2180         case LNET_MSG_PUT:
2181                 /* Is the payload small enough not to need DATA? */
2182                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2183                 if (nob <= MXLND_MSG_SIZE)
2184                         break;                  /* send EAGER */
2185
2186                 tx = mxlnd_get_idle_tx();
2187                 if (unlikely(tx == NULL)) {
2188                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
2189                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
2190                                libcfs_nid2str(nid));
2191                         if (conn) mxlnd_conn_decref(conn);
2192                         return -ENOMEM;
2193                 }
2194
2195                 tx->mxc_peer = peer;
2196                 tx->mxc_conn = conn;
2197                 /* we added a conn ref above */
2198                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2199                 txmsg = tx->mxc_msg;
2200                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2201                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2202                 tx->mxc_match = mxlnd_create_match(tx, 0);
2203
2204                 /* we must post a receive _before_ sending the request.
2205                  * we need to determine how much to receive, it will be either
2206                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
2207
2208                 rx = mxlnd_get_idle_rx(conn);
2209                 if (unlikely(rx == NULL)) {
2210                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
2211                                            libcfs_nid2str(nid));
2212                         mxlnd_put_idle_tx(tx);
2213                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2214                         return -ENOMEM;
2215                 }
2216                 rx->mxc_nid = nid;
2217                 rx->mxc_peer = peer;
2218                 mxlnd_conn_addref(conn); /* for this rx */
2219                 rx->mxc_conn = conn;
2220                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2221                 rx->mxc_cookie = tx->mxc_cookie;
2222                 rx->mxc_match = mxlnd_create_match(rx, 0);
2223
2224                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2225                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2226                 if (unlikely(ret != 0)) {
2227                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
2228                                            libcfs_nid2str(nid));
2229                         rx->mxc_lntmsg[0] = NULL;
2230                         mxlnd_put_idle_rx(rx);
2231                         mxlnd_put_idle_tx(tx);
2232                         mxlnd_conn_decref(conn); /* for the rx... */
2233                         mxlnd_conn_decref(conn); /* and for the tx */
2234                         return -EHOSTUNREACH;
2235                 }
2236
2237                 mxlnd_queue_tx(tx);
2238                 return 0;
2239
2240         case LNET_MSG_GET:
2241                 if (routing || target_is_router)
2242                         break;                  /* send EAGER */
2243
2244                 /* is the REPLY message too small for DATA? */
2245                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2246                 if (nob <= MXLND_MSG_SIZE)
2247                         break;                  /* send EAGER */
2248
2249                 /* get tx (we need the cookie) , post rx for incoming DATA,
2250                  * then post GET_REQ tx */
2251                 tx = mxlnd_get_idle_tx();
2252                 if (unlikely(tx == NULL)) {
2253                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
2254                                            libcfs_nid2str(nid));
2255                         mxlnd_conn_decref(conn); /* for the ref taken above */
2256                         return -ENOMEM;
2257                 }
2258                 rx_data = mxlnd_get_idle_rx(conn);
2259                 if (unlikely(rx_data == NULL)) {
2260                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
2261                                            libcfs_nid2str(nid));
2262                         mxlnd_put_idle_tx(tx);
2263                         mxlnd_conn_decref(conn); /* for the ref taken above */
2264                         return -ENOMEM;
2265                 }
2266                 rx_data->mxc_peer = peer;
2267                 /* NOTE no need to lock peer before adding conn ref since we took
2268                  * a conn ref for the tx (it cannot be freed between there and here ) */
2269                 mxlnd_conn_addref(conn); /* for the rx_data */
2270                 rx_data->mxc_conn = conn;
2271
2272                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2273                 if (unlikely(ret != 0)) {
2274                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
2275                                            libcfs_nid2str(nid));
2276                         mxlnd_put_idle_rx(rx_data);
2277                         mxlnd_put_idle_tx(tx);
2278                         mxlnd_conn_decref(conn); /* for the rx_data... */
2279                         mxlnd_conn_decref(conn); /* and for the tx */
2280                         return -EIO;
2281                 }
2282
2283                 tx->mxc_peer = peer;
2284                 tx->mxc_conn = conn;
2285                 /* conn ref taken above */
2286                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2287                 txmsg = tx->mxc_msg;
2288                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2289                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2290                 tx->mxc_match = mxlnd_create_match(tx, 0);
2291
2292                 mxlnd_queue_tx(tx);
2293                 return 0;
2294
2295         default:
2296                 LBUG();
2297                 mxlnd_conn_decref(conn); /* drop ref taken above */
2298                 return -EIO;
2299         }
2300
2301         /* send EAGER */
2302
2303         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2304                 <= MXLND_MSG_SIZE);
2305
2306         tx = mxlnd_get_idle_tx();
2307         if (unlikely(tx == NULL)) {
2308                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
2309                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2310                 mxlnd_conn_decref(conn); /* drop ref taken above */
2311                 return -ENOMEM;
2312         }
2313
2314         tx->mxc_peer = peer;
2315         tx->mxc_conn = conn;
2316         /* conn ref taken above */
2317         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2318         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2319         tx->mxc_match = mxlnd_create_match(tx, 0);
2320
2321         txmsg = tx->mxc_msg;
2322         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2323
2324         if (payload_kiov != NULL)
2325                 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2326                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2327                             payload_niov, payload_kiov, payload_offset, payload_nob);
2328         else
2329                 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2330                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2331                             payload_niov, payload_iov, payload_offset, payload_nob);
2332
2333         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2334         mxlnd_queue_tx(tx);
2335         return 0;
2336 }
2337
2338 /**
2339  * mxlnd_recv - the LND required recv function
2340  * @ni
2341  * @private
2342  * @lntmsg
2343  * @delayed
2344  * @niov
2345  * @kiov
2346  * @offset
2347  * @mlen
2348  * @rlen
2349  *
2350  * This must not block.
2351  */
2352 int
2353 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2354              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2355              unsigned int offset, unsigned int mlen, unsigned int rlen)
2356 {
2357         int             ret             = 0;
2358         int             nob             = 0;
2359         int             len             = 0;
2360         kmx_ctx_t       *rx             = private;
2361         kmx_msg_t       *rxmsg          = rx->mxc_msg;
2362         lnet_nid_t       nid            = rx->mxc_nid;
2363         kmx_ctx_t       *tx             = NULL;
2364         kmx_msg_t       *txmsg          = NULL;
2365         kmx_peer_t      *peer           = rx->mxc_peer;
2366         kmx_conn_t      *conn           = peer->mxp_conn;
2367         u64              cookie         = 0ULL;
2368         int              msg_type       = rxmsg->mxm_type;
2369         int              repost         = 1;
2370         int              credit         = 0;
2371         int              finalize       = 0;
2372
2373         LASSERT (mlen <= rlen);
2374         /* Either all pages or all vaddrs */
2375         LASSERT (!(kiov != NULL && iov != NULL));
2376         LASSERT (peer && conn);
2377
2378         /* conn_addref(conn) already taken for the primary rx */
2379
2380         switch (msg_type) {
2381         case MXLND_MSG_EAGER:
2382                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2383                 len = rx->mxc_status.xfer_length;
2384                 if (unlikely(nob > len)) {
2385                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2386                                            libcfs_nid2str(nid), nob, len);
2387                         ret = -EPROTO;
2388                         break;
2389                 }
2390
2391                 if (kiov != NULL)
2392                         lnet_copy_flat2kiov(niov, kiov, offset,
2393                                 MXLND_MSG_SIZE, rxmsg,
2394                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2395                                 mlen);
2396                 else
2397                         lnet_copy_flat2iov(niov, iov, offset,
2398                                 MXLND_MSG_SIZE, rxmsg,
2399                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2400                                 mlen);
2401                 finalize = 1;
2402                 credit = 1;
2403                 break;
2404
2405         case MXLND_MSG_PUT_REQ:
2406                 /* we are going to reuse the rx, store the needed info */
2407                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2408
2409                 /* get tx, post rx, send PUT_ACK */
2410
2411                 tx = mxlnd_get_idle_tx();
2412                 if (unlikely(tx == NULL)) {
2413                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2414                         /* Not replying will break the connection */
2415                         ret = -ENOMEM;
2416                         break;
2417                 }
2418                 if (unlikely(mlen == 0)) {
2419                         finalize = 1;
2420                         tx->mxc_peer = peer;
2421                         tx->mxc_conn = conn;
2422                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2423                         /* repost = 1 */
2424                         break;
2425                 }
2426
2427                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2428                 tx->mxc_peer = peer;
2429                 tx->mxc_conn = conn;
2430                 /* no need to lock peer first since we already have a ref */
2431                 mxlnd_conn_addref(conn); /* for the tx */
2432                 txmsg = tx->mxc_msg;
2433                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2434                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2435                 tx->mxc_cookie = cookie;
2436                 tx->mxc_match = mxlnd_create_match(tx, 0);
2437
2438                 /* we must post a receive _before_ sending the PUT_ACK */
2439                 mxlnd_ctx_init(rx);
2440                 rx->mxc_state = MXLND_CTX_PREP;
2441                 rx->mxc_peer = peer;
2442                 rx->mxc_conn = conn;
2443                 /* do not take another ref for this rx, it is already taken */
2444                 rx->mxc_nid = peer->mxp_nid;
2445                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2446                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2447
2448                 if (unlikely(ret != 0)) {
2449                         /* Notify peer that it's over */
2450                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n",
2451                                            libcfs_nid2str(nid), ret);
2452                         mxlnd_ctx_init(tx);
2453                         tx->mxc_state = MXLND_CTX_PREP;
2454                         tx->mxc_peer = peer;
2455                         tx->mxc_conn = conn;
2456                         /* finalize = 0, let the PUT_ACK tx finalize this */
2457                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2458                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2459                         /* conn ref already taken above */
2460                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2461                         /* repost = 1 */
2462                         break;
2463                 }
2464
2465                 mxlnd_queue_tx(tx);
2466                 /* do not return a credit until after PUT_DATA returns */
2467                 repost = 0;
2468                 break;
2469
2470         case MXLND_MSG_GET_REQ:
2471                 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2472
2473                 if (likely(lntmsg != NULL)) {
2474                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2475                                         cookie);
2476                 } else {
2477                         /* GET didn't match anything */
2478                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2479                          * We have to embed the error code in the match bits.
2480                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2481                         tx = mxlnd_get_idle_tx();
2482                         if (unlikely(tx == NULL)) {
2483                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2484                                                    libcfs_nid2str(nid));
2485                                 /* we can't get a tx, notify the peer that the GET failed */
2486                                 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2487                                                    ENODATA, cookie);
2488                                 ret = -ENOMEM;
2489                                 break;
2490                         }
2491                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2492                         tx->mxc_state = MXLND_CTX_PENDING;
2493                         tx->mxc_nid = nid;
2494                         tx->mxc_peer = peer;
2495                         tx->mxc_conn = conn;
2496                         /* no need to lock peer first since we already have a ref */
2497                         mxlnd_conn_addref(conn); /* for this tx */
2498                         tx->mxc_cookie = cookie;
2499                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2500                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2501                         mxlnd_queue_tx(tx);
2502                 }
2503                 /* finalize lntmsg after tx completes */
2504                 break;
2505
2506         default:
2507                 LBUG();
2508         }
2509
2510         if (repost) {
2511                 /* we received a message, increment peer's outstanding credits */
2512                 if (credit == 1) {
2513                         cfs_spin_lock(&conn->mxk_lock);
2514                         conn->mxk_outstanding++;
2515                         cfs_spin_unlock(&conn->mxk_lock);
2516                 }
2517                 /* we are done with the rx */
2518                 mxlnd_put_idle_rx(rx);
2519                 mxlnd_conn_decref(conn);
2520         }
2521
2522         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2523
2524         /* we received a credit, see if we can use it to send a msg */
2525         if (credit) mxlnd_check_sends(peer);
2526
2527         return ret;
2528 }
2529
2530 void
2531 mxlnd_sleep(unsigned long timeout)
2532 {
2533         cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2534         cfs_schedule_timeout(timeout);
2535         return;
2536 }
2537
2538 /**
2539  * mxlnd_tx_queued - the generic send queue thread
2540  * @arg - thread id (as a void *)
2541  *
2542  * This thread moves send messages from the global tx_queue to the owning
2543  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2544  * it to the global peer list.
2545  */
2546 int
2547 mxlnd_tx_queued(void *arg)
2548 {
2549         long                    id      = (long) arg;
2550         int                     ret     = 0;
2551         int                     found   = 0;
2552         kmx_ctx_t              *tx      = NULL;
2553         kmx_peer_t             *peer    = NULL;
2554         cfs_list_t             *queue   = &kmxlnd_data.kmx_tx_queue;
2555         cfs_spinlock_t         *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2556         cfs_rwlock_t           *g_lock  = &kmxlnd_data.kmx_global_lock;
2557
2558         cfs_daemonize("mxlnd_tx_queued");
2559
2560         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
2561                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2562                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
2563                         break;
2564                 if (ret != 0) // Should we check for -EINTR?
2565                         continue;
2566                 cfs_spin_lock(tx_q_lock);
2567                 if (cfs_list_empty (&kmxlnd_data.kmx_tx_queue)) {
2568                         cfs_spin_unlock(tx_q_lock);
2569                         continue;
2570                 }
2571                 tx = cfs_list_entry (queue->next, kmx_ctx_t, mxc_list);
2572                 cfs_list_del_init(&tx->mxc_list);
2573                 cfs_spin_unlock(tx_q_lock);
2574
2575                 found = 0;
2576                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds peer ref */
2577                 if (peer != NULL) {
2578                         tx->mxc_peer = peer;
2579                         cfs_write_lock(g_lock);
2580                         if (peer->mxp_conn == NULL) {
2581                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2582                                 if (ret != 0) {
2583                                         /* out of memory, give up and fail tx */
2584                                         tx->mxc_errno = -ENOMEM;
2585                                         mxlnd_peer_decref(peer);
2586                                         cfs_write_unlock(g_lock);
2587                                         mxlnd_put_idle_tx(tx);
2588                                         continue;
2589                                 }
2590                         }
2591                         tx->mxc_conn = peer->mxp_conn;
2592                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2593                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2594                         cfs_write_unlock(g_lock);
2595                         mxlnd_queue_tx(tx);
2596                         found = 1;
2597                 }
2598                 if (found == 0) {
2599                         int             hash    = 0;
2600                         kmx_peer_t     *peer    = NULL;
2601                         kmx_peer_t     *old     = NULL;
2602
2603                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2604
2605                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2606                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2607                         /* create peer */
2608                         /* adds conn ref for this function */
2609                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2610                                         *kmxlnd_tunables.kmx_board,
2611                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2612                         if (ret != 0) {
2613                                 /* finalize message */
2614                                 tx->mxc_errno = ret;
2615                                 mxlnd_put_idle_tx(tx);
2616                                 continue;
2617                         }
2618                         tx->mxc_peer = peer;
2619                         tx->mxc_conn = peer->mxp_conn;
2620                         /* this tx will keep the conn ref taken in peer_alloc() */
2621
2622                         /* add peer to global peer list, but look to see
2623                          * if someone already created it after we released
2624                          * the read lock */
2625                         cfs_write_lock(g_lock);
2626                         old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2627                         if (old) {
2628                                 /* we have a peer ref on old */
2629                                 if (old->mxp_conn) {
2630                                         found = 1;
2631                                 } else {
2632                                         /* no conn */
2633                                         /* drop our ref taken above... */
2634                                         mxlnd_peer_decref(old);
2635                                         /* and delete it */
2636                                         mxlnd_del_peer_locked(old);
2637                                 }
2638                         }
2639
2640                         if (found == 0) {
2641                                 cfs_list_add_tail(&peer->mxp_list,
2642                                                   &kmxlnd_data.kmx_peers[hash]);
2643                                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
2644                         } else {
2645                                 tx->mxc_peer = old;
2646                                 tx->mxc_conn = old->mxp_conn;
2647                                 LASSERT(old->mxp_conn != NULL);
2648                                 mxlnd_conn_addref(old->mxp_conn);
2649                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2650                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2651                                 mxlnd_peer_decref(peer);
2652                         }
2653                         cfs_write_unlock(g_lock);
2654
2655                         mxlnd_queue_tx(tx);
2656                 }
2657         }
2658         mxlnd_thread_stop(id);
2659         return 0;
2660 }
2661
2662 /* When calling this, we must not have the peer lock. */
2663 void
2664 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2665 {
2666         mx_return_t     mxret           = MX_SUCCESS;
2667         mx_request_t    request;
2668         kmx_conn_t      *conn           = peer->mxp_conn;
2669         u64             match           = ((u64) msg_type) << MXLND_MSG_OFFSET;
2670
2671         /* NOTE we are holding a conn ref every time we call this function,
2672          * we do not need to lock the peer before taking another ref */
2673         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2674
2675         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2676
2677         if (peer->mxp_reconnect_time == 0) {
2678                 peer->mxp_reconnect_time = jiffies;
2679         }
2680
2681         if (peer->mxp_nic_id == 0ULL) {
2682                 int     ret     = 0;
2683
2684                 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2685                                       &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2686                 if (ret == 0) {
2687                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2688                 }
2689                 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2690                         /* not mapped yet, return */
2691                         cfs_spin_lock(&conn->mxk_lock);
2692                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2693                         cfs_spin_unlock(&conn->mxk_lock);
2694                 }
2695         }
2696
2697         if (cfs_time_after(jiffies,
2698                            peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2699             conn->mxk_status != MXLND_CONN_DISCONNECT) {
2700                 /* give up and notify LNET */
2701                 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2702                        libcfs_nid2str(peer->mxp_nid));
2703                 mxlnd_conn_disconnect(conn, 0, 0);
2704                 mxlnd_conn_decref(conn);
2705                 return;
2706         }
2707
2708         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2709                             peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2710                             (void *) peer, &request);
2711         if (unlikely(mxret != MX_SUCCESS)) {
2712                 cfs_spin_lock(&conn->mxk_lock);
2713                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2714                 cfs_spin_unlock(&conn->mxk_lock);
2715                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2716                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2717                 mxlnd_conn_decref(conn);
2718         }
2719         mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
2720                                MXLND_CONNECT_TIMEOUT/CFS_HZ*1000);
2721         return;
2722 }
2723
2724 #define MXLND_STATS 0
2725
2726 int
2727 mxlnd_check_sends(kmx_peer_t *peer)
2728 {
2729         int             ret             = 0;
2730         int             found           = 0;
2731         mx_return_t     mxret           = MX_SUCCESS;
2732         kmx_ctx_t       *tx             = NULL;
2733         kmx_conn_t      *conn           = NULL;
2734         u8              msg_type        = 0;
2735         int             credit          = 0;
2736         int             status          = 0;
2737         int             ntx_posted      = 0;
2738         int             credits         = 0;
2739 #if MXLND_STATS
2740         static unsigned long last       = 0;
2741 #endif
2742
2743         if (unlikely(peer == NULL)) {
2744                 LASSERT(peer != NULL);
2745                 return -1;
2746         }
2747         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
2748         conn = peer->mxp_conn;
2749         /* NOTE take a ref for the duration of this function since it is called
2750          * when there might not be any queued txs for this peer */
2751         if (conn) {
2752                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2753                         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2754                         return -1;
2755                 }
2756                 mxlnd_conn_addref(conn); /* for duration of this function */
2757         }
2758         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
2759
2760         /* do not add another ref for this tx */
2761
2762         if (conn == NULL) {
2763                 /* we do not have any conns */
2764                 CDEBUG(D_NETERROR, "peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2765                 return -1;
2766         }
2767
2768 #if MXLND_STATS
2769         if (cfs_time_after(jiffies, last)) {
2770                 last = jiffies + CFS_HZ;
2771                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2772                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2773                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2774                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2775                               conn->mxk_ntx_data, conn->mxk_data_posted);
2776         }
2777 #endif
2778
2779         cfs_spin_lock(&conn->mxk_lock);
2780         ntx_posted = conn->mxk_ntx_posted;
2781         credits = conn->mxk_credits;
2782
2783         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2784         LASSERT(ntx_posted >= 0);
2785
2786         LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2787         LASSERT(credits >= 0);
2788
2789         /* check number of queued msgs, ignore data */
2790         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2791                 /* check if any txs queued that could return credits... */
2792                 if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
2793                     conn->mxk_ntx_msgs == 0) {
2794                         /* if not, send a NOOP */
2795                         tx = mxlnd_get_idle_tx();
2796                         if (likely(tx != NULL)) {
2797                                 tx->mxc_peer = peer;
2798                                 tx->mxc_conn = peer->mxp_conn;
2799                                 mxlnd_conn_addref(conn); /* for this tx */
2800                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2801                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2802                                 mxlnd_peer_queue_tx_locked(tx);
2803                                 found = 1;
2804                                 goto done_locked;
2805                         }
2806                 }
2807         }
2808
2809         /* if the peer is not ready, try to connect */
2810         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2811             conn->mxk_status == MXLND_CONN_FAIL)) {
2812                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2813                 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2814                 cfs_spin_unlock(&conn->mxk_lock);
2815                 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2816                 goto done;
2817         }
2818
2819         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
2820                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
2821                 /* We have something to send. If we have a queued tx that does not
2822                  * require a credit (free), choose it since its completion will
2823                  * return a credit (here or at the peer), complete a DATA or
2824                  * CONN_REQ or CONN_ACK. */
2825                 cfs_list_t *tmp_tx = NULL;
2826                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
2827                         tmp_tx = &conn->mxk_tx_free_queue;
2828                 } else {
2829                         tmp_tx = &conn->mxk_tx_credit_queue;
2830                 }
2831                 tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2832
2833                 msg_type = tx->mxc_msg_type;
2834
2835                 /* don't try to send a rx */
2836                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2837
2838                 /* ensure that it is a valid msg type */
2839                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2840                         msg_type == MXLND_MSG_CONN_ACK ||
2841                         msg_type == MXLND_MSG_NOOP     ||
2842                         msg_type == MXLND_MSG_EAGER    ||
2843                         msg_type == MXLND_MSG_PUT_REQ  ||
2844                         msg_type == MXLND_MSG_PUT_ACK  ||
2845                         msg_type == MXLND_MSG_PUT_DATA ||
2846                         msg_type == MXLND_MSG_GET_REQ  ||
2847                         msg_type == MXLND_MSG_GET_DATA);
2848                 LASSERT(tx->mxc_peer == peer);
2849                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2850
2851                 credit = mxlnd_tx_requires_credit(tx);
2852                 if (credit) {
2853
2854                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2855                                 CDEBUG(D_NET, "%s: posted enough\n",
2856                                               libcfs_nid2str(peer->mxp_nid));
2857                                 goto done_locked;
2858                         }
2859
2860                         if (conn->mxk_credits == 0) {
2861                                 CDEBUG(D_NET, "%s: no credits\n",
2862                                               libcfs_nid2str(peer->mxp_nid));
2863                                 goto done_locked;
2864                         }
2865
2866                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2867                             conn->mxk_outstanding == 0) {  /* giving back credits */
2868                                 CDEBUG(D_NET, "%s: not using last credit\n",
2869                                               libcfs_nid2str(peer->mxp_nid));
2870                                 goto done_locked;
2871                         }
2872                 }
2873
2874                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2875                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2876                                 msg_type == MXLND_MSG_CONN_ACK)) {
2877                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2878                                              mxlnd_connstatus_to_str(conn->mxk_status),
2879                                              tx->mxc_cookie,
2880                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2881                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2882                                     cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
2883                                         cfs_list_del_init(&tx->mxc_list);
2884                                         tx->mxc_errno = -ECONNABORTED;
2885                                         cfs_spin_unlock(&conn->mxk_lock);
2886                                         mxlnd_put_idle_tx(tx);
2887                                         mxlnd_conn_decref(conn);
2888                                         goto done;
2889                                 }
2890                                 goto done_locked;
2891                         }
2892                 }
2893
2894                 cfs_list_del_init(&tx->mxc_list);
2895
2896                 /* handle credits, etc now while we have the lock to avoid races */
2897                 if (credit) {
2898                         conn->mxk_credits--;
2899                         conn->mxk_ntx_posted++;
2900                 }
2901                 if (msg_type != MXLND_MSG_PUT_DATA &&
2902                     msg_type != MXLND_MSG_GET_DATA) {
2903                         if (msg_type != MXLND_MSG_CONN_REQ &&
2904                             msg_type != MXLND_MSG_CONN_ACK) {
2905                                 conn->mxk_ntx_msgs--;
2906                         }
2907                 }
2908                 if (tx->mxc_incarnation == 0 &&
2909                     conn->mxk_incarnation != 0) {
2910                         tx->mxc_incarnation = conn->mxk_incarnation;
2911                 }
2912
2913                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2914                  * or (2) there is a non-DATA msg that can return credits in the
2915                  * queue, then drop this duplicate NOOP */
2916                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2917                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2918                             (conn->mxk_ntx_msgs >= 1)) {
2919                                 conn->mxk_credits++;
2920                                 conn->mxk_ntx_posted--;
2921                                 cfs_spin_unlock(&conn->mxk_lock);
2922                                 /* redundant NOOP */
2923                                 mxlnd_put_idle_tx(tx);
2924                                 mxlnd_conn_decref(conn);
2925                                 CDEBUG(D_NET, "%s: redundant noop\n",
2926                                               libcfs_nid2str(peer->mxp_nid));
2927                                 found = 1;
2928                                 goto done;
2929                         }
2930                 }
2931
2932                 found = 1;
2933                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2934                     (msg_type != MXLND_MSG_GET_DATA))) {
2935                         mxlnd_pack_msg_locked(tx);
2936                 }
2937
2938                 mxret = MX_SUCCESS;
2939
2940                 status = conn->mxk_status;
2941                 cfs_spin_unlock(&conn->mxk_lock);
2942
2943                 if (likely((status == MXLND_CONN_READY) ||
2944                     (msg_type == MXLND_MSG_CONN_REQ) ||
2945                     (msg_type == MXLND_MSG_CONN_ACK))) {
2946                         ret = 0;
2947                         if (msg_type != MXLND_MSG_CONN_REQ &&
2948                             msg_type != MXLND_MSG_CONN_ACK) {
2949                                 /* add to the pending list */
2950                                 ret = mxlnd_q_pending_ctx(tx);
2951                         } else {
2952                                 /* CONN_REQ/ACK */
2953                                 tx->mxc_state = MXLND_CTX_PENDING;
2954                         }
2955
2956                         if (ret == 0) {
2957                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2958                                     msg_type != MXLND_MSG_GET_DATA)) {
2959                                         /* send a msg style tx */
2960                                         LASSERT(tx->mxc_nseg == 1);
2961                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2962                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2963                                                mxlnd_msgtype_to_str(msg_type),
2964                                                tx->mxc_cookie);
2965                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2966                                                           &tx->mxc_seg,
2967                                                           tx->mxc_nseg,
2968                                                           tx->mxc_pin_type,
2969                                                           conn->mxk_epa,
2970                                                           tx->mxc_match,
2971                                                           (void *) tx,
2972                                                           &tx->mxc_mxreq);
2973                                 } else {
2974                                         /* send a DATA tx */
2975                                         cfs_spin_lock(&conn->mxk_lock);
2976                                         conn->mxk_ntx_data--;
2977                                         conn->mxk_data_posted++;
2978                                         cfs_spin_unlock(&conn->mxk_lock);
2979                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2980                                                mxlnd_msgtype_to_str(msg_type),
2981                                                tx->mxc_cookie);
2982                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2983                                                           tx->mxc_seg_list,
2984                                                           tx->mxc_nseg,
2985                                                           tx->mxc_pin_type,
2986                                                           conn->mxk_epa,
2987                                                           tx->mxc_match,
2988                                                           (void *) tx,
2989                                                           &tx->mxc_mxreq);
2990                                 }
2991                         } else {
2992                                 /* ret != 0 */
2993                                 mxret = MX_CONNECTION_FAILED;
2994                         }
2995                         if (likely(mxret == MX_SUCCESS)) {
2996                                 ret = 0;
2997                         } else {
2998                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2999                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
3000                                        libcfs_nid2str(peer->mxp_nid));
3001                                 /* NOTE mx_kisend() only fails if there are not enough
3002                                 * resources. Do not change the connection status. */
3003                                 if (mxret == MX_NO_RESOURCES) {
3004                                         tx->mxc_errno = -ENOMEM;
3005                                 } else {
3006                                         tx->mxc_errno = -ECONNABORTED;
3007                                 }
3008                                 if (credit) {
3009                                         cfs_spin_lock(&conn->mxk_lock);
3010                                         conn->mxk_ntx_posted--;
3011                                         conn->mxk_credits++;
3012                                         cfs_spin_unlock(&conn->mxk_lock);
3013                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
3014                                         msg_type == MXLND_MSG_GET_DATA) {
3015                                         cfs_spin_lock(&conn->mxk_lock);
3016                                         conn->mxk_data_posted--;
3017                                         cfs_spin_unlock(&conn->mxk_lock);
3018                                 }
3019                                 if (msg_type != MXLND_MSG_PUT_DATA &&
3020                                     msg_type != MXLND_MSG_GET_DATA &&
3021                                     msg_type != MXLND_MSG_CONN_REQ &&
3022                                     msg_type != MXLND_MSG_CONN_ACK) {
3023                                         cfs_spin_lock(&conn->mxk_lock);
3024                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
3025                                         cfs_spin_unlock(&conn->mxk_lock);
3026                                 }
3027                                 if (msg_type != MXLND_MSG_CONN_REQ &&
3028                                     msg_type != MXLND_MSG_CONN_ACK) {
3029                                         /* remove from the pending list */
3030                                         mxlnd_deq_pending_ctx(tx);
3031                                 }
3032                                 mxlnd_put_idle_tx(tx);
3033                                 mxlnd_conn_decref(conn);
3034                         }
3035                 }
3036                 cfs_spin_lock(&conn->mxk_lock);
3037         }
3038 done_locked:
3039         cfs_spin_unlock(&conn->mxk_lock);
3040 done:
3041         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3042         return found;
3043 }
3044
3045
3046 /**
3047  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3048  * @ctx - the tx descriptor
3049  *
3050  * Determine which type of send request it was and start the next step, if needed,
3051  * or, if done, signal completion to LNET. After we are done, put back on the
3052  * idle tx list.
3053  */
3054 void
3055 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3056 {
3057         int             code    = tx->mxc_status.code;
3058         int             failed  = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3059         kmx_msg_t       *msg    = tx->mxc_msg;
3060         kmx_peer_t      *peer   = tx->mxc_peer;
3061         kmx_conn_t      *conn   = tx->mxc_conn;
3062         u8              type    = tx->mxc_msg_type;
3063         int             credit  = mxlnd_tx_requires_credit(tx);
3064         u64             cookie  = tx->mxc_cookie;
3065
3066         CDEBUG(D_NET, "entering %s (0x%llx):\n",
3067                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3068
3069         LASSERT (peer != NULL);
3070         LASSERT (conn != NULL);
3071
3072         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3073                 LASSERT (type == msg->mxm_type);
3074         }
3075
3076         if (failed) {
3077                 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3078         } else {
3079                 cfs_spin_lock(&conn->mxk_lock);
3080                 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3081                 cfs_spin_unlock(&conn->mxk_lock);
3082         }
3083
3084         switch (type) {
3085
3086         case MXLND_MSG_GET_DATA:
3087                 cfs_spin_lock(&conn->mxk_lock);
3088                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3089                         conn->mxk_outstanding++;
3090                         conn->mxk_data_posted--;
3091                 }
3092                 cfs_spin_unlock(&conn->mxk_lock);
3093                 break;
3094
3095         case MXLND_MSG_PUT_DATA:
3096                 cfs_spin_lock(&conn->mxk_lock);
3097                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3098                         conn->mxk_data_posted--;
3099                 }
3100                 cfs_spin_unlock(&conn->mxk_lock);
3101                 break;
3102
3103         case MXLND_MSG_NOOP:
3104         case MXLND_MSG_PUT_REQ:
3105         case MXLND_MSG_PUT_ACK:
3106         case MXLND_MSG_GET_REQ:
3107         case MXLND_MSG_EAGER:
3108                 break;
3109
3110         case MXLND_MSG_CONN_ACK:
3111                 if (peer->mxp_incompatible) {
3112                         /* we sent our params, now close this conn */
3113                         mxlnd_conn_disconnect(conn, 0, 1);
3114                 }
3115         case MXLND_MSG_CONN_REQ:
3116                 if (failed) {
3117                         CDEBUG(D_NETERROR, "%s failed with %s (%d) (errno = %d)"
3118                                " to %s\n",
3119                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3120                                mx_strstatus(code), code, tx->mxc_errno,
3121                                libcfs_nid2str(tx->mxc_nid));
3122                         if (!peer->mxp_incompatible) {
3123                                 cfs_spin_lock(&conn->mxk_lock);
3124                                 if (code == MX_STATUS_BAD_SESSION)
3125                                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
3126                                 else
3127                                         mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3128                                 cfs_spin_unlock(&conn->mxk_lock);
3129                         }
3130                 }
3131                 break;
3132
3133         default:
3134                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
3135                 LBUG();
3136         }
3137
3138         if (credit) {
3139                 cfs_spin_lock(&conn->mxk_lock);
3140                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3141                         conn->mxk_ntx_posted--;
3142                 }
3143                 cfs_spin_unlock(&conn->mxk_lock);
3144         }
3145
3146         mxlnd_put_idle_tx(tx);
3147         mxlnd_conn_decref(conn);
3148
3149         mxlnd_check_sends(peer);
3150
3151         CDEBUG(D_NET, "leaving\n");
3152         return;
3153 }
3154
3155 /* Handle completion of MSG or DATA rx.
3156  * CONN_REQ and CONN_ACK are handled elsewhere. */
3157 void
3158 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3159 {
3160         int             ret             = 0;
3161         int             repost          = 1;
3162         int             credit          = 1;
3163         u32             nob             = rx->mxc_status.xfer_length;
3164         u64             bits            = rx->mxc_status.match_info;
3165         kmx_msg_t      *msg             = rx->mxc_msg;
3166         kmx_peer_t     *peer            = rx->mxc_peer;
3167         kmx_conn_t     *conn            = rx->mxc_conn;
3168         u8              type            = rx->mxc_msg_type;
3169         u64             seq             = bits;
3170         lnet_msg_t     *lntmsg[2];
3171         int             result          = 0;
3172         int             peer_ref        = 0;
3173         int             conn_ref        = 0;
3174
3175         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3176          * failed GET reply */
3177
3178         /* NOTE peer may still be NULL if it is a new peer and
3179          *      conn may be NULL if this is a re-connect */
3180         if (likely(peer != NULL && conn != NULL)) {
3181                 /* we have a reference on the conn */
3182                 conn_ref = 1;
3183         } else if (peer != NULL && conn == NULL) {
3184                 /* we have a reference on the peer */
3185                 peer_ref = 1;
3186         } else if (peer == NULL && conn != NULL) {
3187                 /* fatal error */
3188                 CERROR("rx 0x%llx from %s has conn but no peer\n",
3189                        bits, libcfs_nid2str(rx->mxc_nid));
3190                 LBUG();
3191         } /* else peer and conn == NULL */
3192
3193         if (conn == NULL && peer != NULL) {
3194                 cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3195                 conn = peer->mxp_conn;
3196                 if (conn) {
3197                         mxlnd_conn_addref(conn); /* conn takes ref... */
3198                         mxlnd_peer_decref(peer); /* from peer */
3199                         conn_ref = 1;
3200                         peer_ref = 0;
3201                 }
3202                 cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3203                 rx->mxc_conn = conn;
3204         }
3205
3206 #if MXLND_DEBUG
3207         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3208 #endif
3209
3210         lntmsg[0] = NULL;
3211         lntmsg[1] = NULL;
3212
3213         if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3214             rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3215                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
3216                                    libcfs_nid2str(rx->mxc_nid),
3217                                    mx_strstatus(rx->mxc_status.code),
3218                                    rx->mxc_status.code);
3219                 credit = 0;
3220                 goto cleanup;
3221         }
3222
3223         if (nob == 0) {
3224                 /* this may be a failed GET reply */
3225                 if (type == MXLND_MSG_GET_DATA) {
3226                         /* get the error (52-59) bits from the match bits */
3227                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3228                         lntmsg[0] = rx->mxc_lntmsg[0];
3229                         result = -ret;
3230                         goto cleanup;
3231                 } else {
3232                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
3233                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
3234                                            libcfs_nid2str(rx->mxc_nid));
3235                         goto cleanup;
3236                 }
3237         }
3238
3239         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3240         if (type == MXLND_MSG_PUT_DATA) {
3241                 /* result = 0; */
3242                 lntmsg[0] = rx->mxc_lntmsg[0];
3243                 goto cleanup;
3244         } else if (type == MXLND_MSG_GET_DATA) {
3245                 /* result = 0; */
3246                 lntmsg[0] = rx->mxc_lntmsg[0];
3247                 lntmsg[1] = rx->mxc_lntmsg[1];
3248                 goto cleanup;
3249         }
3250
3251         ret = mxlnd_unpack_msg(msg, nob);
3252         if (ret != 0) {
3253                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
3254                                    ret, libcfs_nid2str(rx->mxc_nid));
3255                 goto cleanup;
3256         }
3257         rx->mxc_nob = nob;
3258         type = msg->mxm_type;
3259
3260         if (rx->mxc_nid != msg->mxm_srcnid ||
3261             kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3262                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
3263                        "0x%llx and rx msg dst is 0x%llx)\n",
3264                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3265                        msg->mxm_dstnid);
3266                 goto cleanup;
3267         }
3268
3269         if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3270             msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3271                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
3272                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3273                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3274                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3275                        msg->mxm_srcstamp, conn->mxk_incarnation,
3276                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3277                 credit = 0;
3278                 goto cleanup;
3279         }
3280
3281         CDEBUG(D_NET, "Received %s with %d credits\n",
3282                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
3283
3284         LASSERT(peer != NULL && conn != NULL);
3285         if (msg->mxm_credits != 0) {
3286                 cfs_spin_lock(&conn->mxk_lock);
3287                 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3288                         if ((conn->mxk_credits + msg->mxm_credits) >
3289                              *kmxlnd_tunables.kmx_peercredits) {
3290                                 CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
3291                                        conn->mxk_credits, msg->mxm_credits);
3292                         }
3293                         conn->mxk_credits += msg->mxm_credits;
3294                         LASSERT(conn->mxk_credits >= 0);
3295                         LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3296                 }
3297                 cfs_spin_unlock(&conn->mxk_lock);
3298         }
3299
3300         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3301         switch (type) {
3302         case MXLND_MSG_NOOP:
3303                 break;
3304
3305         case MXLND_MSG_EAGER:
3306                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3307                                         msg->mxm_srcnid, rx, 0);
3308                 repost = ret < 0;
3309                 break;
3310
3311         case MXLND_MSG_PUT_REQ:
3312                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3313                                         msg->mxm_srcnid, rx, 1);
3314                 repost = ret < 0;
3315                 break;
3316
3317         case MXLND_MSG_PUT_ACK: {
3318                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3319                 if (cookie > MXLND_MAX_COOKIE) {
3320                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3321                                            libcfs_nid2str(rx->mxc_nid));
3322                         result = -((u32) MXLND_ERROR_VAL(cookie));
3323                         lntmsg[0] = rx->mxc_lntmsg[0];
3324                 } else {
3325                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3326                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3327                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3328                 }
3329                 /* repost == 1 */
3330                 break;
3331         }
3332         case MXLND_MSG_GET_REQ:
3333                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3334                                         msg->mxm_srcnid, rx, 1);
3335                 repost = ret < 0;
3336                 break;
3337
3338         default:
3339                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3340                                 libcfs_nid2str(rx->mxc_nid));
3341                 ret = -EPROTO;
3342                 break;
3343         }
3344
3345         if (ret < 0) {
3346                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3347                 cfs_spin_lock(&conn->mxk_lock);
3348                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3349                 cfs_spin_unlock(&conn->mxk_lock);
3350         }
3351
3352 cleanup:
3353         if (conn != NULL) {
3354                 cfs_spin_lock(&conn->mxk_lock);
3355                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3356                 cfs_spin_unlock(&conn->mxk_lock);
3357         }
3358
3359         if (repost) {
3360                 /* lnet_parse() failed, etc., repost now */
3361                 mxlnd_put_idle_rx(rx);
3362                 if (conn != NULL && credit == 1) {
3363                         if (type == MXLND_MSG_PUT_DATA ||
3364                             type == MXLND_MSG_EAGER ||
3365                             type == MXLND_MSG_PUT_REQ ||
3366                             type == MXLND_MSG_NOOP) {
3367                                 cfs_spin_lock(&conn->mxk_lock);
3368                                 conn->mxk_outstanding++;
3369                                 cfs_spin_unlock(&conn->mxk_lock);
3370                         }
3371                 }
3372                 if (conn_ref) mxlnd_conn_decref(conn);
3373                 LASSERT(peer_ref == 0);
3374         }
3375
3376         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3377                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3378         } else {
3379                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3380         }
3381
3382         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3383         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3384
3385         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3386
3387         return;
3388 }
3389
3390 void
3391 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3392 {
3393         kmx_ctx_t       *tx     = NULL;
3394         kmx_msg_t       *txmsg  = NULL;
3395         kmx_conn_t      *conn   = peer->mxp_conn;
3396         u64             nic_id  = 0ULL;
3397         u32             ep_id   = 0;
3398         u32             sid     = 0;
3399         u8              type    = (msg_type == MXLND_MSG_ICON_REQ ?
3400                                    MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3401
3402         /* a conn ref was taken when calling mx_iconnect(),
3403          * hold it until CONN_REQ or CONN_ACK completes */
3404
3405         CDEBUG(D_NET, "entering\n");
3406         if (status.code != MX_STATUS_SUCCESS) {
3407                 int send_bye    = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3408
3409                 CDEBUG(D_NETERROR, "mx_iconnect() failed for %s with %s (%d) "
3410                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3411                         mxlnd_msgtype_to_str(msg_type),
3412                         mx_strstatus(status.code), status.code,
3413                         libcfs_nid2str(peer->mxp_nid),
3414                         peer->mxp_nid,
3415                         peer->mxp_nic_id,
3416                         peer->mxp_ep_id);
3417                 cfs_spin_lock(&conn->mxk_lock);
3418                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3419                 cfs_spin_unlock(&conn->mxk_lock);
3420
3421                 if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
3422                                    MXLND_CONNECT_TIMEOUT)) {
3423                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3424                         mxlnd_conn_disconnect(conn, 0, send_bye);
3425                 }
3426
3427                 mxlnd_conn_decref(conn);
3428                 return;
3429         }
3430         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3431         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3432         cfs_spin_lock(&conn->mxk_lock);
3433         conn->mxk_epa = status.source;
3434         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3435         if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3436                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3437         }
3438         cfs_spin_unlock(&conn->mxk_lock);
3439         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3440
3441         /* mx_iconnect() succeeded, reset delay to 0 */
3442         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3443         peer->mxp_reconnect_time = 0;
3444         peer->mxp_conn->mxk_sid = sid;
3445         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3446
3447         /* marshal CONN_REQ or CONN_ACK msg */
3448         /* we are still using the conn ref from iconnect() - do not take another */
3449         tx = mxlnd_get_idle_tx();
3450         if (tx == NULL) {
3451                 CDEBUG(D_NETERROR, "Can't obtain %s tx for %s\n",
3452                        mxlnd_msgtype_to_str(type),
3453                        libcfs_nid2str(peer->mxp_nid));
3454                 cfs_spin_lock(&conn->mxk_lock);
3455                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3456                 cfs_spin_unlock(&conn->mxk_lock);
3457                 mxlnd_conn_decref(conn);
3458                 return;
3459         }
3460
3461         tx->mxc_peer = peer;
3462         tx->mxc_conn = conn;
3463         tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3464         CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3465         mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3466         txmsg = tx->mxc_msg;
3467         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3468         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3469         tx->mxc_match = mxlnd_create_match(tx, 0);
3470
3471         mxlnd_queue_tx(tx);
3472         return;
3473 }
3474
3475 /**
3476  * mxlnd_request_waitd - the MX request completion thread(s)
3477  * @arg - thread id (as a void *)
3478  *
3479  * This thread waits for a MX completion and then completes the request.
3480  * We will create one thread per CPU.
3481  */
3482 int
3483 mxlnd_request_waitd(void *arg)
3484 {
3485         long                    id              = (long) arg;
3486         char                    name[24];
3487         __u32                   result          = 0;
3488         mx_return_t             mxret           = MX_SUCCESS;
3489         mx_status_t             status;
3490         kmx_ctx_t              *ctx             = NULL;
3491         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3492         kmx_peer_t             *peer            = NULL;
3493         kmx_conn_t             *conn            = NULL;
3494 #if MXLND_POLLING
3495         int                     count           = 0;
3496 #endif
3497
3498         memset(name, 0, sizeof(name));
3499         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3500         cfs_daemonize(name);
3501
3502         memset(&status, 0, sizeof(status));
3503
3504         CDEBUG(D_NET, "%s starting\n", name);
3505
3506         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3507                 u8      msg_type        = 0;
3508
3509                 mxret = MX_SUCCESS;
3510                 result = 0;
3511 #if MXLND_POLLING
3512                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3513                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3514                                             &status, &result);
3515                 } else {
3516                         count = 0;
3517                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3518                                             0ULL, 0ULL, &status, &result);
3519                 }
3520 #else
3521                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3522                                     0ULL, 0ULL, &status, &result);
3523 #endif
3524                 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown)))
3525                         break;
3526
3527                 if (result != 1) {
3528                         /* nothing completed... */
3529                         continue;
3530                 }
3531
3532                 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3533                        "match_info 0x%llx and length %d\n",
3534                        mx_strstatus(status.code), status.code,
3535                        (u64) status.match_info, status.msg_length);
3536
3537                 if (status.code != MX_STATUS_SUCCESS) {
3538                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3539                                "match_info 0x%llx and length %d\n",
3540                                mx_strstatus(status.code), status.code,
3541                                (u64) status.match_info, status.msg_length);
3542                 }
3543
3544                 msg_type = MXLND_MSG_TYPE(status.match_info);
3545
3546                 /* This may be a mx_iconnect() request completing,
3547                  * check the bit mask for CONN_REQ and CONN_ACK */
3548                 if (msg_type == MXLND_MSG_ICON_REQ ||
3549                     msg_type == MXLND_MSG_ICON_ACK) {
3550                         peer = (kmx_peer_t*) status.context;
3551                         mxlnd_handle_connect_msg(peer, msg_type, status);
3552                         continue;
3553                 }
3554
3555                 /* This must be a tx or rx */
3556
3557                 /* NOTE: if this is a RX from the unexpected callback, it may
3558                  * have very little info. If we dropped it in unexpected_recv(),
3559                  * it will not have a context. If so, ignore it. */
3560                 ctx = (kmx_ctx_t *) status.context;
3561                 if (ctx != NULL) {
3562
3563                         req_type = ctx->mxc_type;
3564                         conn = ctx->mxc_conn; /* this may be NULL */
3565                         mxlnd_deq_pending_ctx(ctx);
3566
3567                         /* copy status to ctx->mxc_status */
3568                         ctx->mxc_status = status;
3569
3570                         switch (req_type) {
3571                         case MXLND_REQ_TX:
3572                                 mxlnd_handle_tx_completion(ctx);
3573                                 break;
3574                         case MXLND_REQ_RX:
3575                                 mxlnd_handle_rx_completion(ctx);
3576                                 break;
3577                         default:
3578                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3579                                 LBUG();
3580                                 break;
3581                         }
3582
3583                         /* conn is always set except for the first CONN_REQ rx
3584                          * from a new peer */
3585                         if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3586                                 mxlnd_conn_disconnect(conn, 1, 1);
3587                         }
3588                 }
3589                 CDEBUG(D_NET, "waitd() completed task\n");
3590         }
3591         CDEBUG(D_NET, "%s stopping\n", name);
3592         mxlnd_thread_stop(id);
3593         return 0;
3594 }
3595
3596
3597 unsigned long
3598 mxlnd_check_timeouts(unsigned long now)
3599 {
3600         int             i               = 0;
3601         int             disconnect      = 0;
3602         unsigned long   next            = 0; /* jiffies */
3603         kmx_peer_t      *peer           = NULL;
3604         kmx_conn_t      *conn           = NULL;
3605         cfs_rwlock_t    *g_lock         = &kmxlnd_data.kmx_global_lock;
3606
3607         cfs_read_lock(g_lock);
3608         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3609                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
3610                                         mxp_list) {
3611
3612                         if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3613                                 cfs_read_unlock(g_lock);
3614                                 return next;
3615                         }
3616
3617                         conn = peer->mxp_conn;
3618                         if (conn) {
3619                                 mxlnd_conn_addref(conn);
3620                         } else {
3621                                 continue;
3622                         }
3623
3624                         cfs_spin_lock(&conn->mxk_lock);
3625
3626                         /* if nothing pending (timeout == 0) or
3627                          * if conn is already disconnected,
3628                          * skip this conn */
3629                         if (conn->mxk_timeout == 0 ||
3630                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3631                                 cfs_spin_unlock(&conn->mxk_lock);
3632                                 mxlnd_conn_decref(conn);
3633                                 continue;
3634                         }
3635
3636                         /* we want to find the timeout that will occur first.
3637                          * if it is in the future, we will sleep until then.
3638                          * if it is in the past, then we will sleep one
3639                          * second and repeat the process. */
3640                         if ((next == 0) ||
3641                             (cfs_time_before(conn->mxk_timeout, next))) {
3642                                 next = conn->mxk_timeout;
3643                         }
3644
3645                         disconnect = 0;
3646
3647                         if (cfs_time_aftereq(now, conn->mxk_timeout))  {
3648                                 disconnect = 1;
3649                         }
3650                         cfs_spin_unlock(&conn->mxk_lock);
3651
3652                         if (disconnect) {
3653                                 mxlnd_conn_disconnect(conn, 1, 1);
3654                         }
3655                         mxlnd_conn_decref(conn);
3656                 }
3657         }
3658         cfs_read_unlock(g_lock);
3659         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3660
3661         return next;
3662 }
3663
3664 void
3665 mxlnd_passive_connect(kmx_connparams_t *cp)
3666 {
3667         int             ret             = 0;
3668         int             incompatible    = 0;
3669         u64             nic_id          = 0ULL;
3670         u32             ep_id           = 0;
3671         u32             sid             = 0;
3672         int             conn_ref        = 0;
3673         kmx_msg_t       *msg            = &cp->mxr_msg;
3674         kmx_peer_t      *peer           = cp->mxr_peer;
3675         kmx_conn_t      *conn           = NULL;
3676         cfs_rwlock_t    *g_lock         = &kmxlnd_data.kmx_global_lock;
3677
3678         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3679
3680         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3681         if (ret != 0) {
3682                 if (peer) {
3683                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from %s\n",
3684                                ret, libcfs_nid2str(peer->mxp_nid));
3685                 } else {
3686                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_REQ from "
3687                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3688                 }
3689                 goto cleanup;
3690         }
3691         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3692                 CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3693                                 libcfs_nid2str(msg->mxm_srcnid),
3694                                 libcfs_nid2str(msg->mxm_dstnid));
3695                 goto cleanup;
3696         }
3697         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3698                 CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3699                             "%d (%d wanted)\n",
3700                                 libcfs_nid2str(msg->mxm_srcnid),
3701                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3702                                 *kmxlnd_tunables.kmx_peercredits);
3703                 incompatible = 1;
3704         }
3705         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3706                 CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3707                             "%d (%d wanted)\n",
3708                                 libcfs_nid2str(msg->mxm_srcnid),
3709                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3710                                 (int) MXLND_MSG_SIZE);
3711                 incompatible = 1;
3712         }
3713
3714         if (peer == NULL) {
3715                 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3716                 if (peer == NULL) {
3717                         int             hash    = 0;
3718                         u32             board   = 0;
3719                         kmx_peer_t      *existing_peer    = NULL;
3720
3721                         hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3722
3723                         mx_nic_id_to_board_number(nic_id, &board);
3724
3725                         /* adds conn ref for peer and one for this function */
3726                         ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3727                                                board, ep_id, 0ULL);
3728                         if (ret != 0) {
3729                                 goto cleanup;
3730                         }
3731                         peer->mxp_conn->mxk_sid = sid;
3732                         LASSERT(peer->mxp_ep_id == ep_id);
3733                         cfs_write_lock(g_lock);
3734                         existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3735                         if (existing_peer) {
3736                                 mxlnd_conn_decref(peer->mxp_conn);
3737                                 mxlnd_peer_decref(peer);
3738                                 peer = existing_peer;
3739                                 mxlnd_conn_addref(peer->mxp_conn);
3740                                 conn = peer->mxp_conn;
3741                         } else {
3742                                 cfs_list_add_tail(&peer->mxp_list,
3743                                                   &kmxlnd_data.kmx_peers[hash]);
3744                                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
3745                         }
3746                         cfs_write_unlock(g_lock);
3747                 } else {
3748                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3749                         cfs_write_lock(g_lock);
3750                         mxlnd_peer_decref(peer); /* drop ref taken above */
3751                         cfs_write_unlock(g_lock);
3752                         if (ret != 0) {
3753                                 CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3754                                 goto cleanup;
3755                         }
3756                 }
3757                 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3758                 conn = peer->mxp_conn;
3759         } else { /* unexpected handler found peer */
3760                 kmx_conn_t      *old_conn       = peer->mxp_conn;
3761
3762                 if (sid != peer->mxp_conn->mxk_sid) {
3763                         /* do not call mx_disconnect() or send a BYE */
3764                         mxlnd_conn_disconnect(old_conn, 0, 0);
3765
3766                         /* This allocs a conn, points peer->mxp_conn to this one.
3767                         * The old conn is still on the peer->mxp_conns list.
3768                         * As the pending requests complete, they will call
3769                         * conn_decref() which will eventually free it. */
3770                         ret = mxlnd_conn_alloc(&conn, peer);
3771                         if (ret != 0) {
3772                                 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3773                                 goto cleanup;
3774                         }
3775                         /* conn_alloc() adds one ref for the peer and one
3776                          * for this function */
3777                         conn_ref = 1;
3778
3779                         peer->mxp_conn->mxk_sid = sid;
3780                 } else {
3781                         /* same sid */
3782                         conn = peer->mxp_conn;
3783                 }
3784         }
3785         cfs_write_lock(g_lock);
3786         peer->mxp_incompatible = incompatible;
3787         cfs_write_unlock(g_lock);
3788         cfs_spin_lock(&conn->mxk_lock);
3789         conn->mxk_incarnation = msg->mxm_srcstamp;
3790         mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3791         cfs_spin_unlock(&conn->mxk_lock);
3792
3793         /* handle_conn_ack() will create the CONN_ACK msg */
3794         mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3795
3796 cleanup:
3797         if (conn_ref) mxlnd_conn_decref(conn);
3798
3799         mxlnd_connparams_free(cp);
3800         return;
3801 }
3802
3803 void
3804 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3805 {
3806         int             ret             = 0;
3807         int             incompatible    = 0;
3808         u64             nic_id          = 0ULL;
3809         u32             ep_id           = 0;
3810         u32             sid             = 0;
3811         kmx_msg_t       *msg            = &cp->mxr_msg;
3812         kmx_peer_t      *peer           = cp->mxr_peer;
3813         kmx_conn_t      *conn           = cp->mxr_conn;
3814
3815         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3816
3817         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3818         if (ret != 0) {
3819                 if (peer) {
3820                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from %s\n",
3821                                ret, libcfs_nid2str(peer->mxp_nid));
3822                 } else {
3823                         CDEBUG(D_NETERROR, "Error %d unpacking CONN_ACK from "
3824                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3825                 }
3826                 ret = -1;
3827                 incompatible = 1;
3828                 goto failed;
3829         }
3830         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3831                 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3832                        "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3833                         libcfs_nid2str(msg->mxm_dstnid));
3834                 ret = -1;
3835                 goto failed;
3836         }
3837         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3838                 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3839                        "incompatible queue depth %d (%d wanted)\n",
3840                         libcfs_nid2str(msg->mxm_srcnid),
3841                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3842                         *kmxlnd_tunables.kmx_peercredits);
3843                 incompatible = 1;
3844                 ret = -1;
3845                 goto failed;
3846         }
3847         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3848                 CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3849                        "incompatible EAGER size %d (%d wanted)\n",
3850                         libcfs_nid2str(msg->mxm_srcnid),
3851                         msg->mxm_u.conn_req.mxcrm_eager_size,
3852                         (int) MXLND_MSG_SIZE);
3853                 incompatible = 1;
3854                 ret = -1;
3855                 goto failed;
3856         }
3857         cfs_write_lock(&kmxlnd_data.kmx_global_lock);
3858         peer->mxp_incompatible = incompatible;
3859         cfs_write_unlock(&kmxlnd_data.kmx_global_lock);
3860         cfs_spin_lock(&conn->mxk_lock);
3861         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3862         conn->mxk_outstanding = 0;
3863         conn->mxk_incarnation = msg->mxm_srcstamp;
3864         conn->mxk_timeout = 0;
3865         if (!incompatible) {
3866                 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3867                        libcfs_nid2str(msg->mxm_srcnid));
3868                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3869         }
3870         cfs_spin_unlock(&conn->mxk_lock);
3871
3872         if (!incompatible)
3873                 mxlnd_check_sends(peer);
3874
3875 failed:
3876         if (ret < 0) {
3877                 cfs_spin_lock(&conn->mxk_lock);
3878                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3879                 cfs_spin_unlock(&conn->mxk_lock);
3880         }
3881
3882         if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3883
3884         mxlnd_connparams_free(cp);
3885         return;
3886 }
3887
3888 int
3889 mxlnd_abort_msgs(void)
3890 {
3891         int                     count           = 0;
3892         cfs_list_t              *orphans        = &kmxlnd_data.kmx_orphan_msgs;
3893         cfs_spinlock_t          *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3894
3895         /* abort orphans */
3896         cfs_spin_lock(g_conn_lock);
3897         while (!cfs_list_empty(orphans)) {
3898                 kmx_ctx_t       *ctx     = NULL;
3899                 kmx_conn_t      *conn   = NULL;
3900
3901                 ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
3902                 cfs_list_del_init(&ctx->mxc_list);
3903                 cfs_spin_unlock(g_conn_lock);
3904
3905                 ctx->mxc_errno = -ECONNABORTED;
3906                 conn = ctx->mxc_conn;
3907                 CDEBUG(D_NET, "aborting %s %s %s\n",
3908                        mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3909                        ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3910                        libcfs_nid2str(ctx->mxc_nid));
3911                 if (ctx->mxc_type == MXLND_REQ_TX) {
3912                         mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3913                         if (conn) mxlnd_conn_decref(conn); /* for this tx */
3914                 } else {
3915                         ctx->mxc_state = MXLND_CTX_CANCELED;
3916                         mxlnd_handle_rx_completion(ctx);
3917                 }
3918
3919                 count++;
3920                 cfs_spin_lock(g_conn_lock);
3921         }
3922         cfs_spin_unlock(g_conn_lock);
3923
3924         return count;
3925 }
3926
3927 int
3928 mxlnd_free_conn_zombies(void)
3929 {
3930         int                     count           = 0;
3931         cfs_list_t             *zombies        = &kmxlnd_data.kmx_conn_zombies;
3932         cfs_spinlock_t         *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3933         cfs_rwlock_t           *g_lock         = &kmxlnd_data.kmx_global_lock;
3934
3935         /* cleanup any zombies */
3936         cfs_spin_lock(g_conn_lock);
3937         while (!cfs_list_empty(zombies)) {
3938                 kmx_conn_t      *conn   = NULL;
3939
3940                 conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3941                 cfs_list_del_init(&conn->mxk_zombie);
3942                 cfs_spin_unlock(g_conn_lock);
3943
3944                 cfs_write_lock(g_lock);
3945                 mxlnd_conn_free_locked(conn);
3946                 cfs_write_unlock(g_lock);
3947
3948                 count++;
3949                 cfs_spin_lock(g_conn_lock);
3950         }
3951         cfs_spin_unlock(g_conn_lock);
3952         CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3953         return count;
3954 }
3955
3956 /**
3957  * mxlnd_connd - handles incoming connection requests
3958  * @arg - thread id (as a void *)
3959  *
3960  * This thread handles incoming connection requests
3961  */
3962 int
3963 mxlnd_connd(void *arg)
3964 {
3965         long                    id              = (long) arg;
3966
3967         cfs_daemonize("mxlnd_connd");
3968
3969         CDEBUG(D_NET, "connd starting\n");
3970
3971         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3972                 int                ret             = 0;
3973                 kmx_connparams_t  *cp              = NULL;
3974                 cfs_spinlock_t    *g_conn_lock     = &kmxlnd_data.kmx_conn_lock;
3975                 cfs_list_t        *conn_reqs       = &kmxlnd_data.kmx_conn_reqs;
3976
3977                 ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
3978
3979                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
3980                         break;
3981
3982                 if (ret != 0)
3983                         continue;
3984
3985                 ret = mxlnd_abort_msgs();
3986                 ret += mxlnd_free_conn_zombies();
3987
3988                 cfs_spin_lock(g_conn_lock);
3989                 if (cfs_list_empty(conn_reqs)) {
3990                         if (ret == 0)
3991                                 CDEBUG(D_NETERROR, "connd woke up but did not "
3992                                        "find a kmx_connparams_t or zombie conn\n");
3993                         cfs_spin_unlock(g_conn_lock);
3994                         continue;
3995                 }
3996                 cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
3997                                     mxr_list);
3998                 cfs_list_del_init(&cp->mxr_list);
3999                 cfs_spin_unlock(g_conn_lock);
4000
4001                 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
4002                 case MXLND_MSG_CONN_REQ:
4003                         /* We have a connection request. Handle it. */
4004                         mxlnd_passive_connect(cp);
4005                         break;
4006                 case MXLND_MSG_CONN_ACK:
4007                         /* The peer is ready for messages */
4008                         mxlnd_check_conn_ack(cp);
4009                         break;
4010                 }
4011         }
4012
4013         mxlnd_free_conn_zombies();
4014
4015         CDEBUG(D_NET, "connd stopping\n");
4016         mxlnd_thread_stop(id);
4017         return 0;
4018 }
4019
4020 /**
4021  * mxlnd_timeoutd - enforces timeouts on messages
4022  * @arg - thread id (as a void *)
4023  *
4024  * This thread queries each peer for its earliest timeout. If a peer has timed out,
4025  * it calls mxlnd_conn_disconnect().
4026  *
4027  * After checking for timeouts, try progressing sends (call check_sends()).
4028  */
4029 int
4030 mxlnd_timeoutd(void *arg)
4031 {
4032         int             i       = 0;
4033         long            id      = (long) arg;
4034         unsigned long   now     = 0;
4035         unsigned long   next    = 0;
4036         unsigned long   delay   = CFS_HZ;
4037         kmx_peer_t     *peer    = NULL;
4038         kmx_peer_t     *temp    = NULL;
4039         kmx_conn_t     *conn    = NULL;
4040         cfs_rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
4041
4042         cfs_daemonize("mxlnd_timeoutd");
4043
4044         CDEBUG(D_NET, "timeoutd starting\n");
4045
4046         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
4047
4048                 now = jiffies;
4049                 /* if the next timeout has not arrived, go back to sleep */
4050                 if (cfs_time_after(now, next)) {
4051                         next = mxlnd_check_timeouts(now);
4052                 }
4053
4054                 /* try to progress peers' txs */
4055                cfs_write_lock(g_lock);
4056                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4057                         cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
4058
4059                         /* NOTE we are safe against the removal of peer, but
4060                          * not against the removal of temp */
4061                         cfs_list_for_each_entry_safe(peer, temp, peers,
4062                                                      mxp_list) {
4063                                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
4064                                         break;
4065                                 mxlnd_peer_addref(peer); /* add ref... */
4066                                 conn = peer->mxp_conn;
4067                                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4068                                         mxlnd_conn_addref(conn); /* take ref... */
4069                                 } else {
4070                                         CDEBUG(D_NET, "ignoring %s\n",
4071                                                libcfs_nid2str(peer->mxp_nid));
4072                                         mxlnd_peer_decref(peer); /* ...to here */
4073                                         continue;
4074                                 }
4075
4076                                 if ((conn->mxk_status == MXLND_CONN_READY ||
4077                                     conn->mxk_status == MXLND_CONN_FAIL) &&
4078                                     cfs_time_after(now,
4079                                                    conn->mxk_last_tx +
4080                                                    CFS_HZ)) {
4081                                         cfs_write_unlock(g_lock);
4082                                         mxlnd_check_sends(peer);
4083                                         cfs_write_lock(g_lock);
4084                                 }
4085                                 mxlnd_conn_decref(conn); /* until here */
4086                                 mxlnd_peer_decref(peer); /* ...to here */
4087                         }
4088                 }
4089                 cfs_write_unlock(g_lock);
4090
4091                 mxlnd_sleep(delay);
4092         }
4093         CDEBUG(D_NET, "timeoutd stopping\n");
4094         mxlnd_thread_stop(id);
4095         return 0;
4096 }