Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (C) 2006 Myricom, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/mxlnd/mxlnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  * Author: Scott Atchley <atchley at myri.com>
40  */
41
42 #include "mxlnd.h"
43
44 mx_endpoint_addr_t MX_EPA_NULL; /* use to determine if an endpoint is NULL */
45
46 inline int
47 mxlnd_endpoint_addr_null(mx_endpoint_addr_t epa)
48 {
49         /* if memcmp() == 0, it is NULL */
50         return !(memcmp(&epa, &MX_EPA_NULL, sizeof(epa)));
51 }
52
53 char *
54 mxlnd_ctxstate_to_str(int mxc_state)
55 {
56         switch (mxc_state) {
57         case MXLND_CTX_INIT:
58                 return "MXLND_CTX_INIT";
59         case MXLND_CTX_IDLE:
60                 return "MXLND_CTX_IDLE";
61         case MXLND_CTX_PREP:
62                 return "MXLND_CTX_PREP";
63         case MXLND_CTX_PENDING:
64                 return "MXLND_CTX_PENDING";
65         case MXLND_CTX_COMPLETED:
66                 return "MXLND_CTX_COMPLETED";
67         case MXLND_CTX_CANCELED:
68                 return "MXLND_CTX_CANCELED";
69         default:
70                 return "*unknown*";
71         }
72 }
73
74 char *
75 mxlnd_connstatus_to_str(int mxk_status)
76 {
77         switch (mxk_status) {
78         case MXLND_CONN_READY:
79                 return "MXLND_CONN_READY";
80         case MXLND_CONN_INIT:
81                 return "MXLND_CONN_INIT";
82         case MXLND_CONN_WAIT:
83                 return "MXLND_CONN_WAIT";
84         case MXLND_CONN_DISCONNECT:
85                 return "MXLND_CONN_DISCONNECT";
86         case MXLND_CONN_FAIL:
87                 return "MXLND_CONN_FAIL";
88         default:
89                 return "unknown";
90         }
91 }
92
93 char *
94 mxlnd_msgtype_to_str(int type) {
95         switch (type) {
96         case MXLND_MSG_EAGER:
97                 return "MXLND_MSG_EAGER";
98         case MXLND_MSG_CONN_REQ:
99                 return "MXLND_MSG_CONN_REQ";
100         case MXLND_MSG_CONN_ACK:
101                 return "MXLND_MSG_CONN_ACK";
102         case MXLND_MSG_BYE:
103                 return "MXLND_MSG_BYE";
104         case MXLND_MSG_NOOP:
105                 return "MXLND_MSG_NOOP";
106         case MXLND_MSG_PUT_REQ:
107                 return "MXLND_MSG_PUT_REQ";
108         case MXLND_MSG_PUT_ACK:
109                 return "MXLND_MSG_PUT_ACK";
110         case MXLND_MSG_PUT_DATA:
111                 return "MXLND_MSG_PUT_DATA";
112         case MXLND_MSG_GET_REQ:
113                 return "MXLND_MSG_GET_REQ";
114         case MXLND_MSG_GET_DATA:
115                 return "MXLND_MSG_GET_DATA";
116         default:
117                 return "unknown";
118         }
119 }
120
121 char *
122 mxlnd_lnetmsg_to_str(int type)
123 {
124         switch (type) {
125         case LNET_MSG_ACK:
126                 return "LNET_MSG_ACK";
127         case LNET_MSG_PUT:
128                 return "LNET_MSG_PUT";
129         case LNET_MSG_GET:
130                 return "LNET_MSG_GET";
131         case LNET_MSG_REPLY:
132                 return "LNET_MSG_REPLY";
133         case LNET_MSG_HELLO:
134                 return "LNET_MSG_HELLO";
135         default:
136                 LBUG();
137                 return "*unknown*";
138         }
139 }
140
141 static inline u64
142 mxlnd_create_match(kmx_ctx_t *ctx, u8 error)
143 {
144         u64 type        = (u64) ctx->mxc_msg_type;
145         u64 err         = (u64) error;
146         u64 match       = 0ULL;
147
148         mxlnd_valid_msg_type(ctx->mxc_msg_type);
149         LASSERT(ctx->mxc_cookie >> MXLND_ERROR_OFFSET == 0);
150         match = (type << MXLND_MSG_OFFSET) | (err << MXLND_ERROR_OFFSET) | ctx->mxc_cookie;
151         return match;
152 }
153
154 static inline void
155 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
156 {
157         *msg_type = (u8) MXLND_MSG_TYPE(match);
158         *error    = (u8) MXLND_ERROR_VAL(match);
159         *cookie   = match & MXLND_MAX_COOKIE;
160         mxlnd_valid_msg_type(*msg_type);
161         return;
162 }
163
164 kmx_ctx_t *
165 mxlnd_get_idle_rx(kmx_conn_t *conn)
166 {
167         cfs_list_t              *rxs    = NULL;
168         kmx_ctx_t               *rx     = NULL;
169
170         LASSERT(conn != NULL);
171
172         rxs = &conn->mxk_rx_idle;
173
174         spin_lock(&conn->mxk_lock);
175
176         if (cfs_list_empty (rxs)) {
177                 spin_unlock(&conn->mxk_lock);
178                 return NULL;
179         }
180
181         rx = cfs_list_entry (rxs->next, kmx_ctx_t, mxc_list);
182         cfs_list_del_init(&rx->mxc_list);
183         spin_unlock(&conn->mxk_lock);
184
185 #if MXLND_DEBUG
186         if (rx->mxc_get != rx->mxc_put) {
187                 CNETERR("*** RX get (%llu) != put (%llu) ***\n", rx->mxc_get, rx->mxc_put);
188                 CNETERR("*** incarnation= %lld ***\n", rx->mxc_incarnation);
189                 CNETERR("*** deadline= %ld ***\n", rx->mxc_deadline);
190                 CNETERR("*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
191                 CNETERR("*** listed?= %d ***\n", !cfs_list_empty(&rx->mxc_list));
192                 CNETERR("*** nid= 0x%llx ***\n", rx->mxc_nid);
193                 CNETERR("*** peer= 0x%p ***\n", rx->mxc_peer);
194                 CNETERR("*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
195                 CNETERR("*** cookie= 0x%llx ***\n", rx->mxc_cookie);
196                 CNETERR("*** nob= %d ***\n", rx->mxc_nob);
197         }
198 #endif
199         LASSERT (rx->mxc_get == rx->mxc_put);
200
201         rx->mxc_get++;
202
203         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
204         rx->mxc_state = MXLND_CTX_PREP;
205         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
206
207         return rx;
208 }
209
210 int
211 mxlnd_put_idle_rx(kmx_ctx_t *rx)
212 {
213         kmx_conn_t              *conn   = rx->mxc_conn;
214         cfs_list_t              *rxs    = &conn->mxk_rx_idle;
215
216         LASSERT(rx->mxc_type == MXLND_REQ_RX);
217
218         mxlnd_ctx_init(rx);
219
220         rx->mxc_put++;
221         LASSERT(rx->mxc_get == rx->mxc_put);
222
223         spin_lock(&conn->mxk_lock);
224         cfs_list_add(&rx->mxc_list, rxs);
225         spin_unlock(&conn->mxk_lock);
226         return 0;
227 }
228
229 kmx_ctx_t *
230 mxlnd_get_idle_tx(void)
231 {
232         cfs_list_t              *tmp    = &kmxlnd_data.kmx_tx_idle;
233         kmx_ctx_t               *tx     = NULL;
234
235         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
236
237         if (cfs_list_empty (&kmxlnd_data.kmx_tx_idle)) {
238                 CNETERR("%d txs in use\n", kmxlnd_data.kmx_tx_used);
239                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
240                 return NULL;
241         }
242
243         tmp = &kmxlnd_data.kmx_tx_idle;
244         tx = cfs_list_entry (tmp->next, kmx_ctx_t, mxc_list);
245         cfs_list_del_init(&tx->mxc_list);
246
247         /* Allocate a new completion cookie.  It might not be needed,
248          * but we've got a lock right now and we're unlikely to
249          * wrap... */
250         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
251         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
252                 kmxlnd_data.kmx_tx_next_cookie = 1;
253         }
254         kmxlnd_data.kmx_tx_used++;
255         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
256
257         LASSERT (tx->mxc_get == tx->mxc_put);
258
259         tx->mxc_get++;
260
261         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
262         LASSERT (tx->mxc_lntmsg[0] == NULL);
263         LASSERT (tx->mxc_lntmsg[1] == NULL);
264
265         tx->mxc_state = MXLND_CTX_PREP;
266         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
267
268         return tx;
269 }
270
271 void
272 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye);
273
274 int
275 mxlnd_put_idle_tx(kmx_ctx_t *tx)
276 {
277         int             result  = 0;
278         lnet_msg_t      *lntmsg[2];
279
280         LASSERT(tx->mxc_type == MXLND_REQ_TX);
281
282         if (tx->mxc_status.code != MX_STATUS_SUCCESS || tx->mxc_errno != 0) {
283                 kmx_conn_t      *conn   = tx->mxc_conn;
284
285                 result = -EIO;
286                 if (tx->mxc_errno != 0) result = tx->mxc_errno;
287                 /* FIXME should we set mx_dis? */
288                 mxlnd_conn_disconnect(conn, 0, 1);
289         }
290
291         lntmsg[0] = tx->mxc_lntmsg[0];
292         lntmsg[1] = tx->mxc_lntmsg[1];
293
294         mxlnd_ctx_init(tx);
295
296         tx->mxc_put++;
297         LASSERT(tx->mxc_get == tx->mxc_put);
298
299         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
300         cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
301         kmxlnd_data.kmx_tx_used--;
302         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
303
304         if (lntmsg[0] != NULL)
305                 lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
306         if (lntmsg[1] != NULL)
307                 lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
308         return 0;
309 }
310
311
312 void
313 mxlnd_connparams_free(kmx_connparams_t *cp)
314 {
315         LASSERT(cfs_list_empty(&cp->mxr_list));
316         MXLND_FREE(cp, sizeof(*cp));
317         return;
318 }
319
320 int
321 mxlnd_connparams_alloc(kmx_connparams_t **cp, void *context,
322                             mx_endpoint_addr_t epa, u64 match, u32 length,
323                             kmx_conn_t *conn, kmx_peer_t *peer, void *data)
324 {
325         kmx_connparams_t *c = NULL;
326
327         MXLND_ALLOC(c, sizeof(*c));
328         if (!c) return -ENOMEM;
329
330         CFS_INIT_LIST_HEAD(&c->mxr_list);
331         c->mxr_context = context;
332         c->mxr_epa = epa;
333         c->mxr_match = match;
334         c->mxr_nob = length;
335         c->mxr_conn = conn;
336         c->mxr_peer = peer;
337         c->mxr_msg = *((kmx_msg_t *) data);
338
339         *cp = c;
340         return 0;
341 }
342
343 static inline void
344 mxlnd_set_conn_status(kmx_conn_t *conn, int status)
345 {
346         conn->mxk_status = status;
347         cfs_mb();
348 }
349
350 /**
351  * mxlnd_conn_free_locked - free the conn
352  * @conn - a kmx_conn pointer
353  *
354  * The calling function should remove the conn from the conns list first
355  * then destroy it. Caller should have write-locked kmx_global_lock.
356  */
357 void
358 mxlnd_conn_free_locked(kmx_conn_t *conn)
359 {
360         int             valid   = !mxlnd_endpoint_addr_null(conn->mxk_epa);
361         kmx_peer_t      *peer   = conn->mxk_peer;
362
363         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
364         LASSERT (cfs_list_empty (&conn->mxk_tx_credit_queue) &&
365                  cfs_list_empty (&conn->mxk_tx_free_queue) &&
366                  cfs_list_empty (&conn->mxk_pending));
367         if (!cfs_list_empty(&conn->mxk_list)) {
368                 cfs_list_del_init(&conn->mxk_list);
369                 if (peer->mxp_conn == conn) {
370                         peer->mxp_conn = NULL;
371                         if (valid) {
372                                 kmx_conn_t      *temp   = NULL;
373
374                                 mx_get_endpoint_addr_context(conn->mxk_epa,
375                                                              (void **) &temp);
376                                 if (conn == temp) {
377                                         mx_set_endpoint_addr_context(conn->mxk_epa,
378                                                                      (void *) NULL);
379                                 }
380                         }
381                         /* unlink from global list and drop its ref */
382                         cfs_list_del_init(&peer->mxp_list);
383                         mxlnd_peer_decref(peer);
384                 }
385         }
386         mxlnd_peer_decref(peer); /* drop conn's ref to peer */
387         if (conn->mxk_rx_pages) {
388                 LASSERT (conn->mxk_rxs != NULL);
389                 mxlnd_free_pages(conn->mxk_rx_pages);
390         }
391         if (conn->mxk_rxs) {
392                 int             i       = 0;
393                 kmx_ctx_t       *rx     = NULL;
394
395                 for (i = 0; i < MXLND_RX_MSGS(); i++) {
396                         rx = &conn->mxk_rxs[i];
397                         if (rx->mxc_seg_list != NULL) {
398                                 LASSERT(rx->mxc_nseg > 0);
399                                 MXLND_FREE(rx->mxc_seg_list,
400                                            rx->mxc_nseg *
401                                            sizeof(*rx->mxc_seg_list));
402                         }
403                 }
404                 MXLND_FREE(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
405         }
406
407         MXLND_FREE(conn, sizeof (*conn));
408         return;
409 }
410
411
412 int
413 mxlnd_conn_cancel_pending_rxs(kmx_conn_t *conn)
414 {
415         int             found   = 0;
416         int             count   = 0;
417         kmx_ctx_t       *ctx    = NULL;
418         kmx_ctx_t       *next   = NULL;
419         mx_return_t     mxret   = MX_SUCCESS;
420         u32             result  = 0;
421
422         do {
423                 found = 0;
424                 spin_lock(&conn->mxk_lock);
425                 cfs_list_for_each_entry_safe(ctx, next, &conn->mxk_pending,
426                                              mxc_list) {
427                         cfs_list_del_init(&ctx->mxc_list);
428                         if (ctx->mxc_type == MXLND_REQ_RX) {
429                                 found = 1;
430                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
431                                                   &ctx->mxc_mxreq,
432                                                   &result);
433                                 if (mxret != MX_SUCCESS) {
434                                         CNETERR("mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
435                                 }
436                                 if (result == 1) {
437                                         ctx->mxc_errno = -ECONNABORTED;
438                                         ctx->mxc_state = MXLND_CTX_CANCELED;
439                                         spin_unlock(&conn->mxk_lock);
440                                         spin_lock(&kmxlnd_data.kmx_conn_lock);
441                                         /* we may be holding the global lock,
442                                          * move to orphan list so that it can free it */
443                                         cfs_list_add_tail(&ctx->mxc_list,
444                                                           &kmxlnd_data.kmx_orphan_msgs);
445                                         count++;
446                                         spin_unlock(&kmxlnd_data.kmx_conn_lock);
447                                         spin_lock(&conn->mxk_lock);
448                                 }
449                                 break;
450                         }
451                 }
452                 spin_unlock(&conn->mxk_lock);
453         } while (found);
454
455         return count;
456 }
457
458 int
459 mxlnd_cancel_queued_txs(kmx_conn_t *conn)
460 {
461         int             count   = 0;
462         cfs_list_t      *tmp    = NULL;
463
464         spin_lock(&conn->mxk_lock);
465         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
466                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
467
468                 kmx_ctx_t       *tx     = NULL;
469
470                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
471                         tmp = &conn->mxk_tx_free_queue;
472                 } else {
473                         tmp = &conn->mxk_tx_credit_queue;
474                 }
475
476                 tx = cfs_list_entry(tmp->next, kmx_ctx_t, mxc_list);
477                 cfs_list_del_init(&tx->mxc_list);
478                 spin_unlock(&conn->mxk_lock);
479                 tx->mxc_errno = -ECONNABORTED;
480                 tx->mxc_state = MXLND_CTX_CANCELED;
481                 /* move to orphan list and then abort */
482                 spin_lock(&kmxlnd_data.kmx_conn_lock);
483                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_orphan_msgs);
484                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
485                 count++;
486                 spin_lock(&conn->mxk_lock);
487         }
488         spin_unlock(&conn->mxk_lock);
489
490         return count;
491 }
492
493 void
494 mxlnd_send_message(mx_endpoint_addr_t epa, u8 msg_type, int error, u64 cookie)
495 {
496         u64 match = (((u64) msg_type) << MXLND_MSG_OFFSET) |
497                     (((u64) error) << MXLND_ERROR_OFFSET) | cookie;
498
499         mx_kisend(kmxlnd_data.kmx_endpt, NULL, 0, MX_PIN_PHYSICAL,
500                   epa, match, NULL, NULL);
501         return;
502 }
503
504 /**
505  * mxlnd_conn_disconnect - shutdown a connection
506  * @conn - a kmx_conn pointer
507  * @mx_dis - call mx_disconnect()
508  * @send_bye - send peer a BYE msg
509  *
510  * This function sets the status to DISCONNECT, completes queued
511  * txs with failure, calls mx_disconnect, which will complete
512  * pending txs and matched rxs with failure.
513  */
514 void
515 mxlnd_conn_disconnect(kmx_conn_t *conn, int mx_dis, int send_bye)
516 {
517         mx_endpoint_addr_t      epa     = conn->mxk_epa;
518         int                     valid   = !mxlnd_endpoint_addr_null(epa);
519         int                     count   = 0;
520
521         spin_lock(&conn->mxk_lock);
522         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
523                 spin_unlock(&conn->mxk_lock);
524                 return;
525         }
526         mxlnd_set_conn_status(conn, MXLND_CONN_DISCONNECT);
527         conn->mxk_timeout = 0;
528         spin_unlock(&conn->mxk_lock);
529
530         count = mxlnd_cancel_queued_txs(conn);
531         count += mxlnd_conn_cancel_pending_rxs(conn);
532
533         if (count) /* let connd call kmxlnd_abort_msgs() */
534                 up(&kmxlnd_data.kmx_conn_sem);
535
536         if (send_bye && valid &&
537             conn->mxk_peer->mxp_nid != kmxlnd_data.kmx_ni->ni_nid) {
538                 /* send a BYE to the peer */
539                 CDEBUG(D_NET, "%s: sending a BYE msg to %s\n", __func__,
540                                 libcfs_nid2str(conn->mxk_peer->mxp_nid));
541                 mxlnd_send_message(epa, MXLND_MSG_BYE, 0, 0);
542                 /* wait to allow the peer to ack our message */
543                 mxlnd_sleep(msecs_to_jiffies(20));
544         }
545
546         if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown) != 1) {
547                 unsigned long   last_msg        = 0;
548
549                 /* notify LNET that we are giving up on this peer */
550                 if (cfs_time_after(conn->mxk_last_rx, conn->mxk_last_tx))
551                         last_msg = conn->mxk_last_rx;
552                 else
553                         last_msg = conn->mxk_last_tx;
554
555                 lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_msg);
556
557                 if (mx_dis && valid &&
558                     (memcmp(&epa, &kmxlnd_data.kmx_epa, sizeof(epa) != 0)))
559                         mx_disconnect(kmxlnd_data.kmx_endpt, epa);
560         }
561         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
562
563         return;
564 }
565
566 /**
567  * mxlnd_conn_alloc - allocate and initialize a new conn struct
568  * @connp - address of a kmx_conn pointer
569  * @peer - owning kmx_peer
570  *
571  * Returns 0 on success and -ENOMEM on failure
572  */
573 int
574 mxlnd_conn_alloc_locked(kmx_conn_t **connp, kmx_peer_t *peer)
575 {
576         int             i       = 0;
577         int             ret     = 0;
578         int             ipage   = 0;
579         int             offset  = 0;
580         void           *addr    = NULL;
581         kmx_conn_t     *conn    = NULL;
582         kmx_pages_t    *pages   = NULL;
583         struct page    *page    = NULL;
584         kmx_ctx_t      *rx      = NULL;
585
586         LASSERT(peer != NULL);
587
588         MXLND_ALLOC(conn, sizeof (*conn));
589         if (conn == NULL) {
590                 CNETERR("Cannot allocate conn\n");
591                 return -ENOMEM;
592         }
593         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
594
595         memset(conn, 0, sizeof(*conn));
596
597         ret = mxlnd_alloc_pages(&pages, MXLND_RX_MSG_PAGES());
598         if (ret != 0) {
599                 CERROR("Can't allocate rx pages\n");
600                 MXLND_FREE(conn, sizeof(*conn));
601                 return -ENOMEM;
602         }
603         conn->mxk_rx_pages = pages;
604
605         MXLND_ALLOC(conn->mxk_rxs, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
606         if (conn->mxk_rxs == NULL) {
607                 CERROR("Can't allocate %d rx descriptors\n", MXLND_RX_MSGS());
608                 mxlnd_free_pages(pages);
609                 MXLND_FREE(conn, sizeof(*conn));
610                 return -ENOMEM;
611         }
612
613         memset(conn->mxk_rxs, 0, MXLND_RX_MSGS() * sizeof(kmx_ctx_t));
614
615         conn->mxk_peer = peer;
616         CFS_INIT_LIST_HEAD(&conn->mxk_list);
617         CFS_INIT_LIST_HEAD(&conn->mxk_zombie);
618         cfs_atomic_set(&conn->mxk_refcount, 2); /* ref for owning peer
619                                                    and one for the caller */
620         if (peer->mxp_nid == kmxlnd_data.kmx_ni->ni_nid) {
621                 u64     nic_id  = 0ULL;
622                 u32     ep_id   = 0;
623
624                 /* this is localhost, set the epa and status as up */
625                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
626                 conn->mxk_epa = kmxlnd_data.kmx_epa;
627                 mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
628                 peer->mxp_reconnect_time = 0;
629                 mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
630                 peer->mxp_nic_id = nic_id;
631                 peer->mxp_ep_id = ep_id;
632                 conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
633                 conn->mxk_timeout = 0;
634         } else {
635                 /* conn->mxk_incarnation = 0 - will be set by peer */
636                 /* conn->mxk_sid = 0 - will be set by peer */
637                 mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
638                 /* mxk_epa - to be set after mx_iconnect() */
639         }
640         spin_lock_init(&conn->mxk_lock);
641         /* conn->mxk_timeout = 0 */
642         /* conn->mxk_last_tx = 0 */
643         /* conn->mxk_last_rx = 0 */
644         CFS_INIT_LIST_HEAD(&conn->mxk_rx_idle);
645
646         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
647         /* mxk_outstanding = 0 */
648
649         CFS_INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
650         CFS_INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
651         /* conn->mxk_ntx_msgs = 0 */
652         /* conn->mxk_ntx_data = 0 */
653         /* conn->mxk_ntx_posted = 0 */
654         /* conn->mxk_data_posted = 0 */
655         CFS_INIT_LIST_HEAD(&conn->mxk_pending);
656
657         for (i = 0; i < MXLND_RX_MSGS(); i++) {
658
659                 rx = &conn->mxk_rxs[i];
660                 rx->mxc_type = MXLND_REQ_RX;
661                 CFS_INIT_LIST_HEAD(&rx->mxc_list);
662
663                 /* map mxc_msg to page */
664                 page = pages->mxg_pages[ipage];
665                 addr = page_address(page);
666                 LASSERT(addr != NULL);
667                 rx->mxc_msg = (kmx_msg_t *)(addr + offset);
668                 rx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(rx->mxc_msg));
669
670                 rx->mxc_conn = conn;
671                 rx->mxc_peer = peer;
672                 rx->mxc_nid = peer->mxp_nid;
673
674                 mxlnd_ctx_init(rx);
675
676                 offset += MXLND_MSG_SIZE;
677                 LASSERT (offset <= PAGE_SIZE);
678
679                 if (offset == PAGE_SIZE) {
680                         offset = 0;
681                         ipage++;
682                         LASSERT (ipage <= MXLND_TX_MSG_PAGES());
683                 }
684
685                 cfs_list_add_tail(&rx->mxc_list, &conn->mxk_rx_idle);
686         }
687
688         *connp = conn;
689
690         mxlnd_peer_addref(peer);        /* add a ref for this conn */
691
692         /* add to front of peer's conns list */
693         cfs_list_add(&conn->mxk_list, &peer->mxp_conns);
694         peer->mxp_conn = conn;
695         return 0;
696 }
697
698 int
699 mxlnd_conn_alloc(kmx_conn_t **connp, kmx_peer_t *peer)
700 {
701         int             ret     = 0;
702         rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
703
704         write_lock(g_lock);
705         ret = mxlnd_conn_alloc_locked(connp, peer);
706         write_unlock(g_lock);
707         return ret;
708 }
709
710 int
711 mxlnd_q_pending_ctx(kmx_ctx_t *ctx)
712 {
713         int             ret     = 0;
714         kmx_conn_t      *conn   = ctx->mxc_conn;
715
716         ctx->mxc_state = MXLND_CTX_PENDING;
717         if (conn != NULL) {
718                 spin_lock(&conn->mxk_lock);
719                 if (conn->mxk_status >= MXLND_CONN_INIT) {
720                         cfs_list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
721                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
722                                 conn->mxk_timeout = ctx->mxc_deadline;
723                         }
724                 } else {
725                         ctx->mxc_state = MXLND_CTX_COMPLETED;
726                         ret = -1;
727                 }
728                 spin_unlock(&conn->mxk_lock);
729         }
730         return ret;
731 }
732
733 int
734 mxlnd_deq_pending_ctx(kmx_ctx_t *ctx)
735 {
736         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
737                 ctx->mxc_state == MXLND_CTX_COMPLETED);
738         if (ctx->mxc_state != MXLND_CTX_PENDING &&
739             ctx->mxc_state != MXLND_CTX_COMPLETED) {
740                 CNETERR("deq ctx->mxc_state = %s\n",
741                         mxlnd_ctxstate_to_str(ctx->mxc_state));
742         }
743         ctx->mxc_state = MXLND_CTX_COMPLETED;
744         if (!cfs_list_empty(&ctx->mxc_list)) {
745                 kmx_conn_t      *conn = ctx->mxc_conn;
746                 kmx_ctx_t       *next = NULL;
747
748                 LASSERT(conn != NULL);
749                 spin_lock(&conn->mxk_lock);
750                 cfs_list_del_init(&ctx->mxc_list);
751                 conn->mxk_timeout = 0;
752                 if (!cfs_list_empty(&conn->mxk_pending)) {
753                         next = cfs_list_entry(conn->mxk_pending.next,
754                                               kmx_ctx_t, mxc_list);
755                         conn->mxk_timeout = next->mxc_deadline;
756                 }
757                 spin_unlock(&conn->mxk_lock);
758         }
759         return 0;
760 }
761
762 /**
763  * mxlnd_peer_free - free the peer
764  * @peer - a kmx_peer pointer
765  *
766  * The calling function should decrement the rxs, drain the tx queues and
767  * remove the peer from the peers list first then destroy it.
768  */
769 void
770 mxlnd_peer_free(kmx_peer_t *peer)
771 {
772         CDEBUG(D_NET, "freeing peer 0x%p %s\n", peer, libcfs_nid2str(peer->mxp_nid));
773
774         LASSERT (cfs_atomic_read(&peer->mxp_refcount) == 0);
775
776         if (!cfs_list_empty(&peer->mxp_list)) {
777                 /* assume we are locked */
778                 cfs_list_del_init(&peer->mxp_list);
779         }
780
781         MXLND_FREE(peer, sizeof (*peer));
782         cfs_atomic_dec(&kmxlnd_data.kmx_npeers);
783         return;
784 }
785
786 static int
787 mxlnd_lookup_mac(u32 ip, u64 *tmp_id)
788 {
789         int                     ret     = -EHOSTUNREACH;
790         unsigned char           *haddr  = NULL;
791         struct net_device       *dev    = NULL;
792         struct neighbour        *n      = NULL;
793         __be32                  dst_ip  = htonl(ip);
794
795         dev = dev_get_by_name(*kmxlnd_tunables.kmx_default_ipif);
796         if (dev == NULL)
797                 return -ENODEV;
798
799         haddr = (unsigned char *) tmp_id + 2; /* MAC is only 6 bytes */
800
801         n = neigh_lookup(&arp_tbl, &dst_ip, dev);
802         if (n) {
803                 n->used = jiffies;
804                 if (n->nud_state & NUD_VALID) {
805                         memcpy(haddr, n->ha, dev->addr_len);
806                         neigh_release(n);
807                         ret = 0;
808                 }
809         }
810
811         dev_put(dev);
812
813         return ret;
814 }
815
816
817 /* We only want the MAC address of the peer's Myricom NIC. We
818  * require that each node has the IPoMX interface (myriN) up.
819  * We will not pass any traffic over IPoMX, but it allows us
820  * to get the MAC address. */
821 static int
822 mxlnd_ip2nic_id(u32 ip, u64 *nic_id, int tries)
823 {
824         int                     ret     = 0;
825         int                     try     = 1;
826         int                     fatal   = 0;
827         u64                     tmp_id  = 0ULL;
828         cfs_socket_t            *sock   = NULL;
829
830         do {
831                 CDEBUG(D_NET, "try %d of %d tries\n", try, tries);
832                 ret = mxlnd_lookup_mac(ip, &tmp_id);
833                 if (ret == 0) {
834                         break;
835                 } else {
836                         /* not found, try to connect (force an arp) */
837                         ret = libcfs_sock_connect(&sock, &fatal, 0, 0, ip, 987);
838                         if (ret == -ECONNREFUSED) {
839                                 /* peer is there, get the MAC address */
840                                 mxlnd_lookup_mac(ip, &tmp_id);
841                                 if (tmp_id != 0ULL)
842                                         ret = 0;
843                                 break;
844                         } else if (ret == -EHOSTUNREACH && try < tries) {
845                                 /* add a little backoff */
846                                 CDEBUG(D_NET, "sleeping for %d jiffies\n",
847                                        CFS_HZ/4);
848                                 mxlnd_sleep(CFS_HZ/4);
849                         }
850                 }
851         } while (try++ < tries);
852         CDEBUG(D_NET, "done trying. ret = %d\n", ret);
853
854         if (tmp_id == 0ULL)
855                 ret = -EHOSTUNREACH;
856 #ifdef __LITTLE_ENDIAN
857         *nic_id = ___arch__swab64(tmp_id);
858 #else
859         *nic_id = tmp_id;
860 #endif
861         return ret;
862 }
863
864 /**
865  * mxlnd_peer_alloc - allocate and initialize a new peer struct
866  * @peerp - address of a kmx_peer pointer
867  * @nid - LNET node id
868  *
869  * Returns 0 on success and -ENOMEM on failure
870  */
871 int
872 mxlnd_peer_alloc(kmx_peer_t **peerp, lnet_nid_t nid, u32 board, u32 ep_id, u64 nic_id)
873 {
874         int             ret     = 0;
875         u32             ip      = LNET_NIDADDR(nid);
876         kmx_peer_t     *peer    = NULL;
877
878         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
879
880         MXLND_ALLOC(peer, sizeof (*peer));
881         if (peer == NULL) {
882                 CNETERR("Cannot allocate peer for NID 0x%llx\n",
883                        nid);
884                 return -ENOMEM;
885         }
886         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
887
888         memset(peer, 0, sizeof(*peer));
889
890         CFS_INIT_LIST_HEAD(&peer->mxp_list);
891         peer->mxp_nid = nid;
892         /* peer->mxp_ni unused - may be used for multi-rail */
893         cfs_atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
894
895         peer->mxp_board = board;
896         peer->mxp_ep_id = ep_id;
897         peer->mxp_nic_id = nic_id;
898
899         CFS_INIT_LIST_HEAD(&peer->mxp_conns);
900         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
901         if (ret != 0) {
902                 mxlnd_peer_decref(peer);
903                 return ret;
904         }
905         CFS_INIT_LIST_HEAD(&peer->mxp_tx_queue);
906
907         if (peer->mxp_nic_id != 0ULL)
908                 nic_id = peer->mxp_nic_id;
909
910         if (nic_id == 0ULL) {
911                 ret = mxlnd_ip2nic_id(ip, &nic_id, 1);
912                 if (ret == 0) {
913                         peer->mxp_nic_id = nic_id;
914                         mx_nic_id_to_board_number(nic_id, &peer->mxp_board);
915                 }
916         }
917
918         peer->mxp_nic_id = nic_id; /* may be 0ULL if ip2nic_id() failed */
919
920         /* peer->mxp_reconnect_time = 0 */
921         /* peer->mxp_incompatible = 0 */
922
923         *peerp = peer;
924         return 0;
925 }
926
927 static inline kmx_peer_t *
928 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
929 {
930         int             found   = 0;
931         int             hash    = 0;
932         kmx_peer_t      *peer   = NULL;
933
934         hash = mxlnd_nid_to_hash(nid);
935
936         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_list) {
937                 if (peer->mxp_nid == nid) {
938                         found = 1;
939                         mxlnd_peer_addref(peer);
940                         break;
941                 }
942         }
943         return (found ? peer : NULL);
944 }
945
946 static kmx_peer_t *
947 mxlnd_find_peer_by_nid(lnet_nid_t nid, int create)
948 {
949         int             ret     = 0;
950         int             hash    = 0;
951         kmx_peer_t      *peer   = NULL;
952         kmx_peer_t      *old    = NULL;
953         rwlock_t    *g_lock = &kmxlnd_data.kmx_global_lock;
954
955         read_lock(g_lock);
956         peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
957
958         if ((peer && peer->mxp_conn) || /* found peer with conn or */
959             (!peer && !create)) {       /* did not find peer and do not create one */
960                 read_unlock(g_lock);
961                 return peer;
962         }
963
964         read_unlock(g_lock);
965
966         /* if peer but _not_ conn */
967         if (peer && !peer->mxp_conn) {
968                 if (create) {
969                         write_lock(g_lock);
970                         if (!peer->mxp_conn) { /* check again */
971                                 /* create the conn */
972                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
973                                 if (ret != 0) {
974                                         /* we tried, return the peer only.
975                                          * the caller needs to see if the conn exists */
976                                         CNETERR("%s: %s could not alloc conn\n",
977                                         __func__, libcfs_nid2str(peer->mxp_nid));
978                                 } else {
979                                         /* drop extra conn ref */
980                                         mxlnd_conn_decref(peer->mxp_conn);
981                                 }
982                         }
983                         write_unlock(g_lock);
984                 }
985                 return peer;
986         }
987
988         /* peer not found and we need to create one */
989         hash = mxlnd_nid_to_hash(nid);
990
991         /* create peer (and conn) */
992         /* adds conn ref for peer and one for this function */
993         ret = mxlnd_peer_alloc(&peer, nid, *kmxlnd_tunables.kmx_board,
994                                *kmxlnd_tunables.kmx_ep_id, 0ULL);
995         if (ret != 0) /* no memory, peer is NULL */
996                 return NULL;
997
998         write_lock(g_lock);
999
1000         /* look again */
1001         old = mxlnd_find_peer_by_nid_locked(nid);
1002         if (old) {
1003                 /* someone already created one */
1004                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
1005                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
1006                 mxlnd_peer_decref(peer);
1007                 peer = old;
1008         } else {
1009                 /* no other peer, use this one */
1010                 cfs_list_add_tail(&peer->mxp_list,
1011                                   &kmxlnd_data.kmx_peers[hash]);
1012                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
1013                 mxlnd_peer_addref(peer);
1014                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref from peer_alloc */
1015         }
1016
1017         write_unlock(g_lock);
1018
1019         return peer;
1020 }
1021
1022 static inline int
1023 mxlnd_tx_requires_credit(kmx_ctx_t *tx)
1024 {
1025         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
1026                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
1027                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
1028                 tx->mxc_msg_type == MXLND_MSG_NOOP);
1029 }
1030
1031 /**
1032  * mxlnd_init_msg - set type and number of bytes
1033  * @msg - msg pointer
1034  * @type - of message
1035  * @body_nob - bytes in msg body
1036  */
1037 static inline void
1038 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
1039 {
1040         msg->mxm_type = type;
1041         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
1042 }
1043
1044 static inline void
1045 mxlnd_init_tx_msg (kmx_ctx_t *tx, u8 type, int body_nob, lnet_nid_t nid)
1046 {
1047         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
1048         kmx_msg_t       *msg    = NULL;
1049
1050         LASSERT (tx != NULL);
1051         LASSERT (nob <= MXLND_MSG_SIZE);
1052
1053         tx->mxc_nid = nid;
1054         /* tx->mxc_peer should have already been set if we know it */
1055         tx->mxc_msg_type = type;
1056         tx->mxc_nseg = 1;
1057         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
1058         tx->mxc_seg.segment_length = nob;
1059         tx->mxc_pin_type = MX_PIN_PHYSICAL;
1060
1061         msg = tx->mxc_msg;
1062         msg->mxm_type = type;
1063         msg->mxm_nob  = nob;
1064
1065         return;
1066 }
1067
1068 static inline __u32
1069 mxlnd_cksum (void *ptr, int nob)
1070 {
1071         char  *c  = ptr;
1072         __u32  sum = 0;
1073
1074         while (nob-- > 0)
1075                 sum = ((sum << 1) | (sum >> 31)) + *c++;
1076
1077         /* ensure I don't return 0 (== no checksum) */
1078         return (sum == 0) ? 1 : sum;
1079 }
1080
1081 /**
1082  * mxlnd_pack_msg_locked - complete msg info
1083  * @tx - msg to send
1084  */
1085 static inline void
1086 mxlnd_pack_msg_locked(kmx_ctx_t *tx)
1087 {
1088         kmx_msg_t       *msg    = tx->mxc_msg;
1089
1090         /* type and nob should already be set in init_msg() */
1091         msg->mxm_magic    = MXLND_MSG_MAGIC;
1092         msg->mxm_version  = MXLND_MSG_VERSION;
1093         /*   mxm_type */
1094         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
1095          * return credits as well */
1096         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
1097             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1098                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
1099                 tx->mxc_conn->mxk_outstanding = 0;
1100         } else {
1101                 msg->mxm_credits  = 0;
1102         }
1103         /*   mxm_nob */
1104         msg->mxm_cksum    = 0;
1105         msg->mxm_srcnid   = kmxlnd_data.kmx_ni->ni_nid;
1106         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
1107         msg->mxm_dstnid   = tx->mxc_nid;
1108         /* if it is a new peer, the dststamp will be 0 */
1109         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
1110
1111         if (*kmxlnd_tunables.kmx_cksum) {
1112                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
1113         }
1114 }
1115
1116 int
1117 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
1118 {
1119         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
1120         __u32     msg_cksum     = 0;
1121         int       flip          = 0;
1122         int       msg_nob       = 0;
1123
1124         /* 6 bytes are enough to have received magic + version */
1125         if (nob < 6) {
1126                 CNETERR("not enough bytes for magic + hdr: %d\n", nob);
1127                 return -EPROTO;
1128         }
1129
1130         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
1131                 flip = 0;
1132         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
1133                 flip = 1;
1134         } else {
1135                 CNETERR("Bad magic: %08x\n", msg->mxm_magic);
1136                 return -EPROTO;
1137         }
1138
1139         if (msg->mxm_version !=
1140             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
1141                 CNETERR("Bad version: %d\n", msg->mxm_version);
1142                 return -EPROTO;
1143         }
1144
1145         if (nob < hdr_size) {
1146                 CNETERR("not enough for a header: %d\n", nob);
1147                 return -EPROTO;
1148         }
1149
1150         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
1151         if (msg_nob > nob) {
1152                 CNETERR("Short message: got %d, wanted %d\n", nob, msg_nob);
1153                 return -EPROTO;
1154         }
1155
1156         /* checksum must be computed with mxm_cksum zero and BEFORE anything
1157          * gets flipped */
1158         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
1159         msg->mxm_cksum = 0;
1160         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
1161                 CNETERR("Bad checksum\n");
1162                 return -EPROTO;
1163         }
1164         msg->mxm_cksum = msg_cksum;
1165
1166         if (flip) {
1167                 /* leave magic unflipped as a clue to peer endianness */
1168                 __swab16s(&msg->mxm_version);
1169                 CLASSERT (sizeof(msg->mxm_type) == 1);
1170                 CLASSERT (sizeof(msg->mxm_credits) == 1);
1171                 msg->mxm_nob = msg_nob;
1172                 __swab64s(&msg->mxm_srcnid);
1173                 __swab64s(&msg->mxm_srcstamp);
1174                 __swab64s(&msg->mxm_dstnid);
1175                 __swab64s(&msg->mxm_dststamp);
1176         }
1177
1178         if (msg->mxm_srcnid == LNET_NID_ANY) {
1179                 CNETERR("Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
1180                 return -EPROTO;
1181         }
1182
1183         switch (msg->mxm_type) {
1184         default:
1185                 CNETERR("Unknown message type %x\n", msg->mxm_type);
1186                 return -EPROTO;
1187
1188         case MXLND_MSG_NOOP:
1189                 break;
1190
1191         case MXLND_MSG_EAGER:
1192                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
1193                         CNETERR("Short EAGER: %d(%d)\n", msg_nob,
1194                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
1195                         return -EPROTO;
1196                 }
1197                 break;
1198
1199         case MXLND_MSG_PUT_REQ:
1200                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
1201                         CNETERR("Short PUT_REQ: %d(%d)\n", msg_nob,
1202                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
1203                         return -EPROTO;
1204                 }
1205                 if (flip)
1206                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
1207                 break;
1208
1209         case MXLND_MSG_PUT_ACK:
1210                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
1211                         CNETERR("Short PUT_ACK: %d(%d)\n", msg_nob,
1212                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
1213                         return -EPROTO;
1214                 }
1215                 if (flip) {
1216                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
1217                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
1218                 }
1219                 break;
1220
1221         case MXLND_MSG_GET_REQ:
1222                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
1223                         CNETERR("Short GET_REQ: %d(%d)\n", msg_nob,
1224                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
1225                         return -EPROTO;
1226                 }
1227                 if (flip) {
1228                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
1229                 }
1230                 break;
1231
1232         case MXLND_MSG_CONN_REQ:
1233         case MXLND_MSG_CONN_ACK:
1234                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
1235                         CNETERR("Short connreq/ack: %d(%d)\n", msg_nob,
1236                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
1237                         return -EPROTO;
1238                 }
1239                 if (flip) {
1240                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
1241                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
1242                 }
1243                 break;
1244         }
1245         return 0;
1246 }
1247
1248
1249 /**
1250  * mxlnd_recv_msg
1251  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1252  * @rx
1253  * @msg_type
1254  * @cookie
1255  * @length - length of incoming message
1256  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1257  *
1258  * The caller gets the rx and sets nid, peer and conn if known.
1259  *
1260  * Returns 0 on success and -1 on failure
1261  */
1262 int
1263 mxlnd_recv_msg(lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie, u32 length)
1264 {
1265         int             ret     = 0;
1266         mx_return_t     mxret   = MX_SUCCESS;
1267         uint64_t        mask    = ~(MXLND_ERROR_MASK);
1268
1269         rx->mxc_msg_type = msg_type;
1270         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1271         rx->mxc_cookie = cookie;
1272         /* rx->mxc_match may already be set */
1273         /* rx->mxc_seg.segment_ptr is already set */
1274         rx->mxc_seg.segment_length = length;
1275         ret = mxlnd_q_pending_ctx(rx);
1276         if (ret == -1) {
1277                 /* the caller is responsible for calling conn_decref() if needed */
1278                 return -1;
1279         }
1280         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1281                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1282         if (mxret != MX_SUCCESS) {
1283                 mxlnd_deq_pending_ctx(rx);
1284                 CNETERR("mx_kirecv() failed with %s (%d)\n",
1285                         mx_strerror(mxret), (int) mxret);
1286                 return -1;
1287         }
1288         return 0;
1289 }
1290
1291
1292 /**
1293  * mxlnd_unexpected_recv - this is the callback function that will handle
1294  *                         unexpected receives
1295  * @context - NULL, ignore
1296  * @source - the peer's mx_endpoint_addr_t
1297  * @match_value - the msg's bits, should be MXLND_MSG_EAGER
1298  * @length - length of incoming message
1299  * @data_if_available - used for CONN_[REQ|ACK]
1300  *
1301  * If it is an eager-sized msg, we will call recv_msg() with the actual
1302  * length. If it is a large message, we will call recv_msg() with a
1303  * length of 0 bytes to drop it because we should never have a large,
1304  * unexpected message.
1305  *
1306  * NOTE - The MX library blocks until this function completes. Make it as fast as
1307  * possible. DO NOT allocate memory which can block!
1308  *
1309  * If we cannot get a rx or the conn is closed, drop the message on the floor
1310  * (i.e. recv 0 bytes and ignore).
1311  */
1312 mx_unexp_handler_action_t
1313 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1314                  uint64_t match_value, uint32_t length, void *data_if_available)
1315 {
1316         int             ret             = 0;
1317         kmx_ctx_t       *rx             = NULL;
1318         mx_ksegment_t   seg;
1319         u8              msg_type        = 0;
1320         u8              error           = 0;
1321         u64             cookie          = 0ULL;
1322         kmx_conn_t      *conn           = NULL;
1323         kmx_peer_t      *peer           = NULL;
1324         u64             nic_id          = 0ULL;
1325         u32             ep_id           = 0;
1326         u32             sid             = 0;
1327
1328         /* TODO this will change to the net struct */
1329         if (context != NULL) {
1330                 CNETERR("non-NULL context\n");
1331         }
1332
1333 #if MXLND_DEBUG
1334         CDEBUG(D_NET, "bits=0x%llx length=%d\n", match_value, length);
1335 #endif
1336
1337         mx_decompose_endpoint_addr2(source, &nic_id, &ep_id, &sid);
1338         mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1339         read_lock(&kmxlnd_data.kmx_global_lock);
1340         mx_get_endpoint_addr_context(source, (void **) &conn);
1341         if (conn) {
1342                 mxlnd_conn_addref(conn); /* add ref for this function */
1343                 peer = conn->mxk_peer;
1344         }
1345         read_unlock(&kmxlnd_data.kmx_global_lock);
1346
1347         if (msg_type == MXLND_MSG_BYE) {
1348                 if (conn) {
1349                         CDEBUG(D_NET, "peer %s sent BYE msg\n",
1350                                         libcfs_nid2str(peer->mxp_nid));
1351                         mxlnd_conn_disconnect(conn, 1, 0);
1352                         mxlnd_conn_decref(conn); /* drop ref taken above */
1353                 }
1354                 return MX_RECV_FINISHED;
1355         }
1356
1357         if (msg_type == MXLND_MSG_CONN_REQ) {
1358                 kmx_connparams_t       *cp      = NULL;
1359                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1360                                                   sizeof(kmx_connreq_msg_t);
1361
1362                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1363                 if (unlikely(length != expected || !data_if_available)) {
1364                         CNETERR("received invalid CONN_REQ from %llx "
1365                                 "length=%d (expected %d)\n", nic_id, length, expected);
1366                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, EPROTO, 0);
1367                         return MX_RECV_FINISHED;
1368                 }
1369
1370                 ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1371                                          conn, peer, data_if_available);
1372                 if (unlikely(ret != 0)) {
1373                         CNETERR("unable to alloc CONN_REQ from %llx:%d\n",
1374                                 nic_id, ep_id);
1375                         mxlnd_send_message(source, MXLND_MSG_CONN_ACK, ENOMEM, 0);
1376                         return MX_RECV_FINISHED;
1377                 }
1378                 spin_lock(&kmxlnd_data.kmx_conn_lock);
1379                 cfs_list_add_tail(&cp->mxr_list, &kmxlnd_data.kmx_conn_reqs);
1380                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1381                 up(&kmxlnd_data.kmx_conn_sem);
1382                 return MX_RECV_FINISHED;
1383         }
1384         if (msg_type == MXLND_MSG_CONN_ACK) {
1385                 kmx_connparams_t  *cp           = NULL;
1386                 const int       expected        = offsetof(kmx_msg_t, mxm_u) +
1387                                                   sizeof(kmx_connreq_msg_t);
1388
1389                 LASSERT(conn);
1390                 if (unlikely(error != 0)) {
1391                         CNETERR("received CONN_ACK from %s with error -%d\n",
1392                                libcfs_nid2str(peer->mxp_nid), (int) error);
1393                         mxlnd_conn_disconnect(conn, 1, 0);
1394                 } else if (unlikely(length != expected || !data_if_available)) {
1395                         CNETERR("received %s CONN_ACK from %s "
1396                                "length=%d (expected %d)\n",
1397                                data_if_available ? "short" : "missing",
1398                                libcfs_nid2str(peer->mxp_nid), length, expected);
1399                         mxlnd_conn_disconnect(conn, 1, 1);
1400                 } else {
1401                         /* peer is ready for messages */
1402                         ret = mxlnd_connparams_alloc(&cp, context, source, match_value, length,
1403                                          conn, peer, data_if_available);
1404                         if (unlikely(ret != 0)) {
1405                                 CNETERR("unable to alloc kmx_connparams_t"
1406                                                " from %llx:%d\n", nic_id, ep_id);
1407                                 mxlnd_conn_disconnect(conn, 1, 1);
1408                         } else {
1409                                 spin_lock(&kmxlnd_data.kmx_conn_lock);
1410                                 cfs_list_add_tail(&cp->mxr_list,
1411                                                   &kmxlnd_data.kmx_conn_reqs);
1412                                 spin_unlock(&kmxlnd_data.kmx_conn_lock);
1413                                 up(&kmxlnd_data.kmx_conn_sem);
1414                         }
1415                 }
1416                 mxlnd_conn_decref(conn); /* drop ref taken above */
1417
1418                 return MX_RECV_FINISHED;
1419         }
1420
1421         /* Handle unexpected messages (PUT_REQ and GET_REQ) */
1422
1423         LASSERT(peer != NULL && conn != NULL);
1424
1425         rx = mxlnd_get_idle_rx(conn);
1426         if (rx != NULL) {
1427                 if (length <= MXLND_MSG_SIZE) {
1428                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1429                 } else {
1430                         CNETERR("unexpected large receive with "
1431                                 "match_value=0x%llx length=%d\n",
1432                                 match_value, length);
1433                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1434                 }
1435
1436                 if (ret == 0) {
1437                         /* hold conn ref until rx completes */
1438                         rx->mxc_conn = conn;
1439                         rx->mxc_peer = peer;
1440                         rx->mxc_nid = peer->mxp_nid;
1441                 } else {
1442                         CNETERR("could not post receive\n");
1443                         mxlnd_put_idle_rx(rx);
1444                 }
1445         }
1446
1447         /* Encountered error, drop incoming message on the floor */
1448         /* We could use MX_RECV_FINISHED but posting the receive of 0 bytes
1449          * uses the standard code path and acks the sender normally */
1450
1451         if (rx == NULL || ret != 0) {
1452                 mxlnd_conn_decref(conn); /* drop ref taken above */
1453                 if (rx == NULL) {
1454                         CNETERR("no idle rxs available - dropping rx"
1455                                 " 0x%llx from %s\n", match_value,
1456                                 libcfs_nid2str(peer->mxp_nid));
1457                 } else {
1458                         /* ret != 0 */
1459                         CNETERR("disconnected peer - dropping rx\n");
1460                 }
1461                 seg.segment_ptr = 0ULL;
1462                 seg.segment_length = 0;
1463                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1464                           match_value, ~0ULL, NULL, NULL);
1465         }
1466
1467         return MX_RECV_CONTINUE;
1468 }
1469
1470
1471 int
1472 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1473 {
1474         int              i      = 0;
1475         int              ret    = -ENOENT;
1476         kmx_peer_t      *peer   = NULL;
1477
1478         read_lock(&kmxlnd_data.kmx_global_lock);
1479         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1480                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1481                                         mxp_list) {
1482                         if (index-- == 0) {
1483                                 *nidp = peer->mxp_nid;
1484                                 *count = cfs_atomic_read(&peer->mxp_refcount);
1485                                 ret = 0;
1486                                 break;
1487                         }
1488                 }
1489         }
1490         read_unlock(&kmxlnd_data.kmx_global_lock);
1491
1492         return ret;
1493 }
1494
1495 void
1496 mxlnd_del_peer_locked(kmx_peer_t *peer)
1497 {
1498         if (peer->mxp_conn) {
1499                 mxlnd_conn_disconnect(peer->mxp_conn, 1, 1);
1500         } else {
1501                 cfs_list_del_init(&peer->mxp_list); /* remove from the global list */
1502                 mxlnd_peer_decref(peer); /* drop global list ref */
1503         }
1504         return;
1505 }
1506
1507 int
1508 mxlnd_del_peer(lnet_nid_t nid)
1509 {
1510         int             i       = 0;
1511         int             ret     = 0;
1512         kmx_peer_t      *peer   = NULL;
1513         kmx_peer_t      *next   = NULL;
1514
1515         if (nid != LNET_NID_ANY) {
1516                 peer = mxlnd_find_peer_by_nid(nid, 0); /* adds peer ref */
1517         }
1518         write_lock(&kmxlnd_data.kmx_global_lock);
1519         if (nid != LNET_NID_ANY) {
1520                 if (peer == NULL) {
1521                         ret = -ENOENT;
1522                 } else {
1523                         mxlnd_peer_decref(peer); /* and drops it */
1524                         mxlnd_del_peer_locked(peer);
1525                 }
1526         } else { /* LNET_NID_ANY */
1527                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1528                         cfs_list_for_each_entry_safe(peer, next,
1529                                                      &kmxlnd_data.kmx_peers[i],
1530                                                      mxp_list) {
1531                                 mxlnd_del_peer_locked(peer);
1532                         }
1533                 }
1534         }
1535         write_unlock(&kmxlnd_data.kmx_global_lock);
1536
1537         return ret;
1538 }
1539
1540 kmx_conn_t *
1541 mxlnd_get_conn_by_idx(int index)
1542 {
1543         int              i      = 0;
1544         kmx_peer_t      *peer   = NULL;
1545         kmx_conn_t      *conn   = NULL;
1546
1547         read_lock(&kmxlnd_data.kmx_global_lock);
1548         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1549                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
1550                                         mxp_list) {
1551                         cfs_list_for_each_entry(conn, &peer->mxp_conns,
1552                                                 mxk_list) {
1553                                 if (index-- > 0) {
1554                                         continue;
1555                                 }
1556
1557                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1558                                 read_unlock(&kmxlnd_data.kmx_global_lock);
1559                                 return conn;
1560                         }
1561                 }
1562         }
1563         read_unlock(&kmxlnd_data.kmx_global_lock);
1564
1565         return NULL;
1566 }
1567
1568 void
1569 mxlnd_close_matching_conns_locked(kmx_peer_t *peer)
1570 {
1571         kmx_conn_t      *conn   = NULL;
1572         kmx_conn_t      *next   = NULL;
1573
1574         cfs_list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list)
1575                 mxlnd_conn_disconnect(conn, 0, 1);
1576
1577         return;
1578 }
1579
1580 int
1581 mxlnd_close_matching_conns(lnet_nid_t nid)
1582 {
1583         int             i       = 0;
1584         int             ret     = 0;
1585         kmx_peer_t      *peer   = NULL;
1586
1587         write_lock(&kmxlnd_data.kmx_global_lock);
1588         if (nid != LNET_NID_ANY) {
1589                 peer = mxlnd_find_peer_by_nid_locked(nid); /* adds peer ref */
1590                 if (peer == NULL) {
1591                         ret = -ENOENT;
1592                 } else {
1593                         mxlnd_close_matching_conns_locked(peer);
1594                         mxlnd_peer_decref(peer); /* and drops it here */
1595                 }
1596         } else { /* LNET_NID_ANY */
1597                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1598                         cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],                                                mxp_list)
1599                                 mxlnd_close_matching_conns_locked(peer);
1600                 }
1601         }
1602         write_unlock(&kmxlnd_data.kmx_global_lock);
1603
1604         return ret;
1605 }
1606
1607 /**
1608  * mxlnd_ctl - modify MXLND parameters
1609  * @ni - LNET interface handle
1610  * @cmd - command to change
1611  * @arg - the ioctl data
1612  */
1613 int
1614 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1615 {
1616         struct libcfs_ioctl_data *data  = arg;
1617         int                       ret   = -EINVAL;
1618
1619         LASSERT (ni == kmxlnd_data.kmx_ni);
1620
1621         switch (cmd) {
1622         case IOC_LIBCFS_GET_PEER: {
1623                 lnet_nid_t      nid     = 0;
1624                 int             count   = 0;
1625
1626                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1627                 data->ioc_nid    = nid;
1628                 data->ioc_count  = count;
1629                 break;
1630         }
1631         case IOC_LIBCFS_DEL_PEER: {
1632                 ret = mxlnd_del_peer(data->ioc_nid);
1633                 break;
1634         }
1635         case IOC_LIBCFS_GET_CONN: {
1636                 kmx_conn_t      *conn = NULL;
1637
1638                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1639                 if (conn == NULL) {
1640                         ret = -ENOENT;
1641                 } else {
1642                         ret = 0;
1643                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1644                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1645                 }
1646                 break;
1647         }
1648         case IOC_LIBCFS_CLOSE_CONNECTION: {
1649                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1650                 break;
1651         }
1652         default:
1653                 CNETERR("unknown ctl(%d)\n", cmd);
1654                 break;
1655         }
1656
1657         return ret;
1658 }
1659
1660 /**
1661  * mxlnd_peer_queue_tx_locked - add the tx to the peer's tx queue
1662  * @tx
1663  *
1664  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1665  */
1666 void
1667 mxlnd_peer_queue_tx_locked(kmx_ctx_t *tx)
1668 {
1669         u8              msg_type        = tx->mxc_msg_type;
1670         kmx_conn_t      *conn           = tx->mxc_conn;
1671
1672         LASSERT (msg_type != 0);
1673         LASSERT (tx->mxc_nid != 0);
1674         LASSERT (tx->mxc_peer != NULL);
1675         LASSERT (tx->mxc_conn != NULL);
1676
1677         tx->mxc_incarnation = conn->mxk_incarnation;
1678
1679         if (msg_type != MXLND_MSG_PUT_DATA &&
1680             msg_type != MXLND_MSG_GET_DATA) {
1681                 /* msg style tx */
1682                 if (mxlnd_tx_requires_credit(tx)) {
1683                         cfs_list_add_tail(&tx->mxc_list,
1684                                           &conn->mxk_tx_credit_queue);
1685                         conn->mxk_ntx_msgs++;
1686                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1687                            msg_type == MXLND_MSG_CONN_ACK) {
1688                         /* put conn msgs at the front of the queue */
1689                         cfs_list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1690                 } else {
1691                         /* PUT_ACK, PUT_NAK */
1692                         cfs_list_add_tail(&tx->mxc_list,
1693                                           &conn->mxk_tx_free_queue);
1694                         conn->mxk_ntx_msgs++;
1695                 }
1696         } else {
1697                 /* data style tx */
1698                 cfs_list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1699                 conn->mxk_ntx_data++;
1700         }
1701
1702         return;
1703 }
1704
1705 /**
1706  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1707  * @tx
1708  *
1709  * Add the tx to the peer's msg or data queue
1710  */
1711 static inline void
1712 mxlnd_peer_queue_tx(kmx_ctx_t *tx)
1713 {
1714         LASSERT(tx->mxc_peer != NULL);
1715         LASSERT(tx->mxc_conn != NULL);
1716         spin_lock(&tx->mxc_conn->mxk_lock);
1717         mxlnd_peer_queue_tx_locked(tx);
1718         spin_unlock(&tx->mxc_conn->mxk_lock);
1719
1720         return;
1721 }
1722
1723 /**
1724  * mxlnd_queue_tx - add the tx to the global tx queue
1725  * @tx
1726  *
1727  * Add the tx to the global queue and up the tx_queue_sem
1728  */
1729 void
1730 mxlnd_queue_tx(kmx_ctx_t *tx)
1731 {
1732         kmx_peer_t *peer   = tx->mxc_peer;
1733         LASSERT (tx->mxc_nid != 0);
1734
1735         if (peer != NULL) {
1736                 if (peer->mxp_incompatible &&
1737                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1738                         /* let this fail now */
1739                         tx->mxc_errno = -ECONNABORTED;
1740                         mxlnd_conn_decref(peer->mxp_conn);
1741                         mxlnd_put_idle_tx(tx);
1742                         return;
1743                 }
1744                 if (tx->mxc_conn == NULL) {
1745                         int             ret     = 0;
1746                         kmx_conn_t      *conn   = NULL;
1747
1748                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1749                         if (ret != 0) {
1750                                 tx->mxc_errno = ret;
1751                                 mxlnd_put_idle_tx(tx);
1752                                 goto done;
1753                         }
1754                         tx->mxc_conn = conn;
1755                         mxlnd_peer_decref(peer); /* and takes it from peer */
1756                 }
1757                 LASSERT(tx->mxc_conn != NULL);
1758                 mxlnd_peer_queue_tx(tx);
1759                 mxlnd_check_sends(peer);
1760         } else {
1761                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1762                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1763                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1764                 up(&kmxlnd_data.kmx_tx_queue_sem);
1765         }
1766 done:
1767         return;
1768 }
1769
1770 int
1771 mxlnd_setup_iov(kmx_ctx_t *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1772 {
1773         int             i                       = 0;
1774         int             sum                     = 0;
1775         int             old_sum                 = 0;
1776         int             nseg                    = 0;
1777         int             first_iov               = -1;
1778         int             first_iov_offset        = 0;
1779         int             first_found             = 0;
1780         int             last_iov                = -1;
1781         int             last_iov_length         = 0;
1782         mx_ksegment_t  *seg                     = NULL;
1783
1784         if (niov == 0) return 0;
1785         LASSERT(iov != NULL);
1786
1787         for (i = 0; i < niov; i++) {
1788                 sum = old_sum + (u32) iov[i].iov_len;
1789                 if (!first_found && (sum > offset)) {
1790                         first_iov = i;
1791                         first_iov_offset = offset - old_sum;
1792                         first_found = 1;
1793                         sum = (u32) iov[i].iov_len - first_iov_offset;
1794                         old_sum = 0;
1795                 }
1796                 if (sum >= nob) {
1797                         last_iov = i;
1798                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1799                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1800                         break;
1801                 }
1802                 old_sum = sum;
1803         }
1804         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1805         nseg = last_iov - first_iov + 1;
1806         LASSERT(nseg > 0);
1807
1808         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1809         if (seg == NULL) {
1810                 CNETERR("MXLND_ALLOC() failed\n");
1811                 return -1;
1812         }
1813         memset(seg, 0, nseg * sizeof(*seg));
1814         ctx->mxc_nseg = nseg;
1815         sum = 0;
1816         for (i = 0; i < nseg; i++) {
1817                 seg[i].segment_ptr = MX_PA_TO_U64(virt_to_phys(iov[first_iov + i].iov_base));
1818                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1819                 if (i == 0) {
1820                         seg[i].segment_ptr += (u64) first_iov_offset;
1821                         seg[i].segment_length -= (u32) first_iov_offset;
1822                 }
1823                 if (i == (nseg - 1)) {
1824                         seg[i].segment_length = (u32) last_iov_length;
1825                 }
1826                 sum += seg[i].segment_length;
1827         }
1828         ctx->mxc_seg_list = seg;
1829         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1830 #ifdef MX_PIN_FULLPAGES
1831         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1832 #endif
1833         LASSERT(nob == sum);
1834         return 0;
1835 }
1836
1837 int
1838 mxlnd_setup_kiov(kmx_ctx_t *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1839 {
1840         int             i                       = 0;
1841         int             sum                     = 0;
1842         int             old_sum                 = 0;
1843         int             nseg                    = 0;
1844         int             first_kiov              = -1;
1845         int             first_kiov_offset       = 0;
1846         int             first_found             = 0;
1847         int             last_kiov               = -1;
1848         int             last_kiov_length        = 0;
1849         mx_ksegment_t  *seg                     = NULL;
1850
1851         if (niov == 0) return 0;
1852         LASSERT(kiov != NULL);
1853
1854         for (i = 0; i < niov; i++) {
1855                 sum = old_sum + kiov[i].kiov_len;
1856                 if (i == 0) sum -= kiov[i].kiov_offset;
1857                 if (!first_found && (sum > offset)) {
1858                         first_kiov = i;
1859                         first_kiov_offset = offset - old_sum;
1860                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1861                         first_found = 1;
1862                         sum = kiov[i].kiov_len - first_kiov_offset;
1863                         old_sum = 0;
1864                 }
1865                 if (sum >= nob) {
1866                         last_kiov = i;
1867                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1868                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1869                         break;
1870                 }
1871                 old_sum = sum;
1872         }
1873         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1874         nseg = last_kiov - first_kiov + 1;
1875         LASSERT(nseg > 0);
1876
1877         MXLND_ALLOC(seg, nseg * sizeof(*seg));
1878         if (seg == NULL) {
1879                 CNETERR("MXLND_ALLOC() failed\n");
1880                 return -1;
1881         }
1882         memset(seg, 0, niov * sizeof(*seg));
1883         ctx->mxc_nseg = niov;
1884         sum = 0;
1885         for (i = 0; i < niov; i++) {
1886                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1887                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1888                 if (i == 0) {
1889                         seg[i].segment_ptr += (u64) first_kiov_offset;
1890                         /* we have to add back the original kiov_offset */
1891                         seg[i].segment_length -= first_kiov_offset +
1892                                                  kiov[first_kiov].kiov_offset;
1893                 }
1894                 if (i == (nseg - 1)) {
1895                         seg[i].segment_length = last_kiov_length;
1896                 }
1897                 sum += seg[i].segment_length;
1898         }
1899         ctx->mxc_seg_list = seg;
1900         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1901 #ifdef MX_PIN_FULLPAGES
1902         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1903 #endif
1904         LASSERT(nob == sum);
1905         return 0;
1906 }
1907
1908 void
1909 mxlnd_send_nak(kmx_ctx_t *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1910 {
1911         LASSERT(type == MXLND_MSG_PUT_ACK);
1912         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1913         tx->mxc_cookie = cookie;
1914         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1915         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << MXLND_ERROR_OFFSET); /* error code */
1916         tx->mxc_match = mxlnd_create_match(tx, status);
1917
1918         mxlnd_queue_tx(tx);
1919 }
1920
1921
1922 /**
1923  * mxlnd_send_data - get tx, map [k]iov, queue tx
1924  * @ni
1925  * @lntmsg
1926  * @peer
1927  * @msg_type
1928  * @cookie
1929  *
1930  * This setups the DATA send for PUT or GET.
1931  *
1932  * On success, it queues the tx, on failure it calls lnet_finalize()
1933  */
1934 void
1935 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_peer_t *peer, u8 msg_type, u64 cookie)
1936 {
1937         int                     ret     = 0;
1938         lnet_process_id_t       target  = lntmsg->msg_target;
1939         unsigned int            niov    = lntmsg->msg_niov;
1940         struct iovec           *iov     = lntmsg->msg_iov;
1941         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
1942         unsigned int            offset  = lntmsg->msg_offset;
1943         unsigned int            nob     = lntmsg->msg_len;
1944         kmx_ctx_t              *tx      = NULL;
1945
1946         LASSERT(lntmsg != NULL);
1947         LASSERT(peer != NULL);
1948         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1949         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0);
1950
1951         tx = mxlnd_get_idle_tx();
1952         if (tx == NULL) {
1953                 CNETERR("Can't allocate %s tx for %s\n",
1954                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1955                         libcfs_nid2str(target.nid));
1956                 goto failed_0;
1957         }
1958         tx->mxc_nid = target.nid;
1959         /* NOTE called when we have a ref on the conn, get one for this tx */
1960         mxlnd_conn_addref(peer->mxp_conn);
1961         tx->mxc_peer = peer;
1962         tx->mxc_conn = peer->mxp_conn;
1963         tx->mxc_msg_type = msg_type;
1964         tx->mxc_lntmsg[0] = lntmsg;
1965         tx->mxc_cookie = cookie;
1966         tx->mxc_match = mxlnd_create_match(tx, 0);
1967
1968         /* This setups up the mx_ksegment_t to send the DATA payload  */
1969         if (nob == 0) {
1970                 /* do not setup the segments */
1971                 CNETERR("nob = 0; why didn't we use an EAGER reply "
1972                         "to %s?\n", libcfs_nid2str(target.nid));
1973                 ret = 0;
1974         } else if (kiov == NULL) {
1975                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1976         } else {
1977                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1978         }
1979         if (ret != 0) {
1980                 CNETERR("Can't setup send DATA for %s\n",
1981                         libcfs_nid2str(target.nid));
1982                 tx->mxc_errno = -EIO;
1983                 goto failed_1;
1984         }
1985         mxlnd_queue_tx(tx);
1986         return;
1987
1988 failed_1:
1989         mxlnd_conn_decref(peer->mxp_conn);
1990         mxlnd_put_idle_tx(tx);
1991         return;
1992
1993 failed_0:
1994         CNETERR("no tx avail\n");
1995         lnet_finalize(ni, lntmsg, -EIO);
1996         return;
1997 }
1998
1999 /**
2000  * mxlnd_recv_data - map [k]iov, post rx
2001  * @ni
2002  * @lntmsg
2003  * @rx
2004  * @msg_type
2005  * @cookie
2006  *
2007  * This setups the DATA receive for PUT or GET.
2008  *
2009  * On success, it returns 0, on failure it returns -1
2010  */
2011 int
2012 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, kmx_ctx_t *rx, u8 msg_type, u64 cookie)
2013 {
2014         int                     ret     = 0;
2015         lnet_process_id_t       target  = lntmsg->msg_target;
2016         unsigned int            niov    = lntmsg->msg_niov;
2017         struct iovec           *iov     = lntmsg->msg_iov;
2018         lnet_kiov_t            *kiov    = lntmsg->msg_kiov;
2019         unsigned int            offset  = lntmsg->msg_offset;
2020         unsigned int            nob     = lntmsg->msg_len;
2021         mx_return_t             mxret   = MX_SUCCESS;
2022         u64                     mask    = ~(MXLND_ERROR_MASK);
2023
2024         /* above assumes MXLND_MSG_PUT_DATA */
2025         if (msg_type == MXLND_MSG_GET_DATA) {
2026                 niov = lntmsg->msg_md->md_niov;
2027                 iov = lntmsg->msg_md->md_iov.iov;
2028                 kiov = lntmsg->msg_md->md_iov.kiov;
2029                 offset = 0;
2030                 nob = lntmsg->msg_md->md_length;
2031         }
2032
2033         LASSERT(lntmsg != NULL);
2034         LASSERT(rx != NULL);
2035         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
2036         LASSERT((cookie>>MXLND_ERROR_OFFSET) == 0); /* ensure top 12 bits are 0 */
2037
2038         rx->mxc_msg_type = msg_type;
2039         rx->mxc_state = MXLND_CTX_PENDING;
2040         rx->mxc_nid = target.nid;
2041         /* if posting a GET_DATA, we may not yet know the peer */
2042         if (rx->mxc_peer != NULL) {
2043                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
2044         }
2045         rx->mxc_lntmsg[0] = lntmsg;
2046         rx->mxc_cookie = cookie;
2047         rx->mxc_match = mxlnd_create_match(rx, 0);
2048         /* This setups up the mx_ksegment_t to receive the DATA payload  */
2049         if (kiov == NULL) {
2050                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
2051         } else {
2052                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
2053         }
2054         if (msg_type == MXLND_MSG_GET_DATA) {
2055                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
2056                 if (rx->mxc_lntmsg[1] == NULL) {
2057                         CNETERR("Can't create reply for GET -> %s\n",
2058                                 libcfs_nid2str(target.nid));
2059                         ret = -1;
2060                 }
2061         }
2062         if (ret != 0) {
2063                 CNETERR("Can't setup %s rx for %s\n",
2064                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
2065                        libcfs_nid2str(target.nid));
2066                 return -1;
2067         }
2068         ret = mxlnd_q_pending_ctx(rx);
2069         if (ret == -1) {
2070                 return -1;
2071         }
2072         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
2073         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
2074                           rx->mxc_seg_list, rx->mxc_nseg,
2075                           rx->mxc_pin_type, rx->mxc_match,
2076                           mask, (void *) rx,
2077                           &rx->mxc_mxreq);
2078         if (mxret != MX_SUCCESS) {
2079                 if (rx->mxc_conn != NULL) {
2080                         mxlnd_deq_pending_ctx(rx);
2081                 }
2082                 CNETERR("mx_kirecv() failed with %d for %s\n",
2083                         (int) mxret, libcfs_nid2str(target.nid));
2084                 return -1;
2085         }
2086
2087         return 0;
2088 }
2089
2090 /**
2091  * mxlnd_send - the LND required send function
2092  * @ni
2093  * @private
2094  * @lntmsg
2095  *
2096  * This must not block. Since we may not have a peer struct for the receiver,
2097  * it will append send messages on a global tx list. We will then up the
2098  * tx_queued's semaphore to notify it of the new send.
2099  */
2100 int
2101 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
2102 {
2103         int                     ret             = 0;
2104         int                     type            = lntmsg->msg_type;
2105         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
2106         lnet_process_id_t       target          = lntmsg->msg_target;
2107         lnet_nid_t              nid             = target.nid;
2108         int                     target_is_router = lntmsg->msg_target_is_router;
2109         int                     routing         = lntmsg->msg_routing;
2110         unsigned int            payload_niov    = lntmsg->msg_niov;
2111         struct iovec           *payload_iov     = lntmsg->msg_iov;
2112         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
2113         unsigned int            payload_offset  = lntmsg->msg_offset;
2114         unsigned int            payload_nob     = lntmsg->msg_len;
2115         kmx_ctx_t              *tx              = NULL;
2116         kmx_msg_t              *txmsg           = NULL;
2117         kmx_ctx_t              *rx              = (kmx_ctx_t *) private; /* for REPLY */
2118         kmx_ctx_t              *rx_data         = NULL;
2119         kmx_conn_t             *conn            = NULL;
2120         int                     nob             = 0;
2121         uint32_t                length          = 0;
2122         kmx_peer_t             *peer            = NULL;
2123         rwlock_t                *g_lock         =&kmxlnd_data.kmx_global_lock;
2124
2125         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
2126                        payload_nob, payload_niov, libcfs_id2str(target));
2127
2128         LASSERT (payload_nob == 0 || payload_niov > 0);
2129         LASSERT (payload_niov <= LNET_MAX_IOV);
2130         /* payload is either all vaddrs or all pages */
2131         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
2132
2133         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
2134
2135         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
2136          * to a new peer, so create one if not found */
2137         peer = mxlnd_find_peer_by_nid(nid, 1); /* adds peer ref */
2138         if (peer == NULL || peer->mxp_conn == NULL) {
2139                 /* we could not find it nor could we create one or
2140                  * one exists but we cannot create a conn,
2141                  * fail this message */
2142                 if (peer) {
2143                         /* found peer without conn, drop ref taken above */
2144                         LASSERT(peer->mxp_conn == NULL);
2145                         mxlnd_peer_decref(peer);
2146                 }
2147                 return -ENOMEM;
2148         }
2149
2150         /* we have a peer with a conn */
2151
2152         if (unlikely(peer->mxp_incompatible)) {
2153                 mxlnd_peer_decref(peer); /* drop ref taken above */
2154         } else {
2155                 read_lock(g_lock);
2156                 conn = peer->mxp_conn;
2157                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT)
2158                         mxlnd_conn_addref(conn);
2159                 else
2160                         conn = NULL;
2161                 read_unlock(g_lock);
2162                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
2163                 if (!conn)
2164                         return -ENOTCONN;
2165         }
2166
2167         LASSERT(peer && conn);
2168
2169         CDEBUG(D_NET, "%s: peer 0x%llx is 0x%p\n", __func__, nid, peer);
2170
2171         switch (type) {
2172         case LNET_MSG_ACK:
2173                 LASSERT (payload_nob == 0);
2174                 break;
2175
2176         case LNET_MSG_REPLY:
2177         case LNET_MSG_PUT:
2178                 /* Is the payload small enough not to need DATA? */
2179                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
2180                 if (nob <= MXLND_MSG_SIZE)
2181                         break;                  /* send EAGER */
2182
2183                 tx = mxlnd_get_idle_tx();
2184                 if (unlikely(tx == NULL)) {
2185                         CNETERR("Can't allocate %s tx for %s\n",
2186                                 type == LNET_MSG_PUT ? "PUT" : "REPLY",
2187                                 libcfs_nid2str(nid));
2188                         if (conn) mxlnd_conn_decref(conn);
2189                         return -ENOMEM;
2190                 }
2191
2192                 tx->mxc_peer = peer;
2193                 tx->mxc_conn = conn;
2194                 /* we added a conn ref above */
2195                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
2196                 txmsg = tx->mxc_msg;
2197                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
2198                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
2199                 tx->mxc_match = mxlnd_create_match(tx, 0);
2200
2201                 /* we must post a receive _before_ sending the request.
2202                  * we need to determine how much to receive, it will be either
2203                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
2204
2205                 rx = mxlnd_get_idle_rx(conn);
2206                 if (unlikely(rx == NULL)) {
2207                         CNETERR("Can't allocate rx for PUT_ACK for %s\n",
2208                                 libcfs_nid2str(nid));
2209                         mxlnd_put_idle_tx(tx);
2210                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
2211                         return -ENOMEM;
2212                 }
2213                 rx->mxc_nid = nid;
2214                 rx->mxc_peer = peer;
2215                 mxlnd_conn_addref(conn); /* for this rx */
2216                 rx->mxc_conn = conn;
2217                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
2218                 rx->mxc_cookie = tx->mxc_cookie;
2219                 rx->mxc_match = mxlnd_create_match(rx, 0);
2220
2221                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
2222                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
2223                 if (unlikely(ret != 0)) {
2224                         CNETERR("recv_msg() failed for PUT_ACK for %s\n",
2225                                            libcfs_nid2str(nid));
2226                         rx->mxc_lntmsg[0] = NULL;
2227                         mxlnd_put_idle_rx(rx);
2228                         mxlnd_put_idle_tx(tx);
2229                         mxlnd_conn_decref(conn); /* for the rx... */
2230                         mxlnd_conn_decref(conn); /* and for the tx */
2231                         return -EHOSTUNREACH;
2232                 }
2233
2234                 mxlnd_queue_tx(tx);
2235                 return 0;
2236
2237         case LNET_MSG_GET:
2238                 if (routing || target_is_router)
2239                         break;                  /* send EAGER */
2240
2241                 /* is the REPLY message too small for DATA? */
2242                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
2243                 if (nob <= MXLND_MSG_SIZE)
2244                         break;                  /* send EAGER */
2245
2246                 /* get tx (we need the cookie) , post rx for incoming DATA,
2247                  * then post GET_REQ tx */
2248                 tx = mxlnd_get_idle_tx();
2249                 if (unlikely(tx == NULL)) {
2250                         CNETERR("Can't allocate GET tx for %s\n",
2251                                 libcfs_nid2str(nid));
2252                         mxlnd_conn_decref(conn); /* for the ref taken above */
2253                         return -ENOMEM;
2254                 }
2255                 rx_data = mxlnd_get_idle_rx(conn);
2256                 if (unlikely(rx_data == NULL)) {
2257                         CNETERR("Can't allocate DATA rx for %s\n",
2258                                 libcfs_nid2str(nid));
2259                         mxlnd_put_idle_tx(tx);
2260                         mxlnd_conn_decref(conn); /* for the ref taken above */
2261                         return -ENOMEM;
2262                 }
2263                 rx_data->mxc_peer = peer;
2264                 /* NOTE no need to lock peer before adding conn ref since we took
2265                  * a conn ref for the tx (it cannot be freed between there and here ) */
2266                 mxlnd_conn_addref(conn); /* for the rx_data */
2267                 rx_data->mxc_conn = conn;
2268
2269                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
2270                 if (unlikely(ret != 0)) {
2271                         CNETERR("Can't setup GET sink for %s\n",
2272                                 libcfs_nid2str(nid));
2273                         mxlnd_put_idle_rx(rx_data);
2274                         mxlnd_put_idle_tx(tx);
2275                         mxlnd_conn_decref(conn); /* for the rx_data... */
2276                         mxlnd_conn_decref(conn); /* and for the tx */
2277                         return -EIO;
2278                 }
2279
2280                 tx->mxc_peer = peer;
2281                 tx->mxc_conn = conn;
2282                 /* conn ref taken above */
2283                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
2284                 txmsg = tx->mxc_msg;
2285                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
2286                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
2287                 tx->mxc_match = mxlnd_create_match(tx, 0);
2288
2289                 mxlnd_queue_tx(tx);
2290                 return 0;
2291
2292         default:
2293                 LBUG();
2294                 mxlnd_conn_decref(conn); /* drop ref taken above */
2295                 return -EIO;
2296         }
2297
2298         /* send EAGER */
2299
2300         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
2301                 <= MXLND_MSG_SIZE);
2302
2303         tx = mxlnd_get_idle_tx();
2304         if (unlikely(tx == NULL)) {
2305                 CNETERR("Can't send %s to %s: tx descs exhausted\n",
2306                         mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
2307                 mxlnd_conn_decref(conn); /* drop ref taken above */
2308                 return -ENOMEM;
2309         }
2310
2311         tx->mxc_peer = peer;
2312         tx->mxc_conn = conn;
2313         /* conn ref taken above */
2314         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
2315         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
2316         tx->mxc_match = mxlnd_create_match(tx, 0);
2317
2318         txmsg = tx->mxc_msg;
2319         txmsg->mxm_u.eager.mxem_hdr = *hdr;
2320
2321         if (payload_kiov != NULL)
2322                 lnet_copy_kiov2flat(MXLND_MSG_SIZE, txmsg,
2323                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2324                             payload_niov, payload_kiov, payload_offset, payload_nob);
2325         else
2326                 lnet_copy_iov2flat(MXLND_MSG_SIZE, txmsg,
2327                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2328                             payload_niov, payload_iov, payload_offset, payload_nob);
2329
2330         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
2331         mxlnd_queue_tx(tx);
2332         return 0;
2333 }
2334
2335 /**
2336  * mxlnd_recv - the LND required recv function
2337  * @ni
2338  * @private
2339  * @lntmsg
2340  * @delayed
2341  * @niov
2342  * @kiov
2343  * @offset
2344  * @mlen
2345  * @rlen
2346  *
2347  * This must not block.
2348  */
2349 int
2350 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2351              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2352              unsigned int offset, unsigned int mlen, unsigned int rlen)
2353 {
2354         int             ret             = 0;
2355         int             nob             = 0;
2356         int             len             = 0;
2357         kmx_ctx_t       *rx             = private;
2358         kmx_msg_t       *rxmsg          = rx->mxc_msg;
2359         lnet_nid_t       nid            = rx->mxc_nid;
2360         kmx_ctx_t       *tx             = NULL;
2361         kmx_msg_t       *txmsg          = NULL;
2362         kmx_peer_t      *peer           = rx->mxc_peer;
2363         kmx_conn_t      *conn           = peer->mxp_conn;
2364         u64              cookie         = 0ULL;
2365         int              msg_type       = rxmsg->mxm_type;
2366         int              repost         = 1;
2367         int              credit         = 0;
2368         int              finalize       = 0;
2369
2370         LASSERT (mlen <= rlen);
2371         /* Either all pages or all vaddrs */
2372         LASSERT (!(kiov != NULL && iov != NULL));
2373         LASSERT (peer && conn);
2374
2375         /* conn_addref(conn) already taken for the primary rx */
2376
2377         switch (msg_type) {
2378         case MXLND_MSG_EAGER:
2379                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2380                 len = rx->mxc_status.xfer_length;
2381                 if (unlikely(nob > len)) {
2382                         CNETERR("Eager message from %s too big: %d(%d)\n",
2383                                 libcfs_nid2str(nid), nob, len);
2384                         ret = -EPROTO;
2385                         break;
2386                 }
2387
2388                 if (kiov != NULL)
2389                         lnet_copy_flat2kiov(niov, kiov, offset,
2390                                 MXLND_MSG_SIZE, rxmsg,
2391                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2392                                 mlen);
2393                 else
2394                         lnet_copy_flat2iov(niov, iov, offset,
2395                                 MXLND_MSG_SIZE, rxmsg,
2396                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2397                                 mlen);
2398                 finalize = 1;
2399                 credit = 1;
2400                 break;
2401
2402         case MXLND_MSG_PUT_REQ:
2403                 /* we are going to reuse the rx, store the needed info */
2404                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2405
2406                 /* get tx, post rx, send PUT_ACK */
2407
2408                 tx = mxlnd_get_idle_tx();
2409                 if (unlikely(tx == NULL)) {
2410                         CNETERR("Can't allocate tx for %s\n", libcfs_nid2str(nid));
2411                         /* Not replying will break the connection */
2412                         ret = -ENOMEM;
2413                         break;
2414                 }
2415                 if (unlikely(mlen == 0)) {
2416                         finalize = 1;
2417                         tx->mxc_peer = peer;
2418                         tx->mxc_conn = conn;
2419                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2420                         /* repost = 1 */
2421                         break;
2422                 }
2423
2424                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2425                 tx->mxc_peer = peer;
2426                 tx->mxc_conn = conn;
2427                 /* no need to lock peer first since we already have a ref */
2428                 mxlnd_conn_addref(conn); /* for the tx */
2429                 txmsg = tx->mxc_msg;
2430                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2431                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2432                 tx->mxc_cookie = cookie;
2433                 tx->mxc_match = mxlnd_create_match(tx, 0);
2434
2435                 /* we must post a receive _before_ sending the PUT_ACK */
2436                 mxlnd_ctx_init(rx);
2437                 rx->mxc_state = MXLND_CTX_PREP;
2438                 rx->mxc_peer = peer;
2439                 rx->mxc_conn = conn;
2440                 /* do not take another ref for this rx, it is already taken */
2441                 rx->mxc_nid = peer->mxp_nid;
2442                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA,
2443                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2444
2445                 if (unlikely(ret != 0)) {
2446                         /* Notify peer that it's over */
2447                         CNETERR("Can't setup PUT_DATA rx for %s: %d\n",
2448                                 libcfs_nid2str(nid), ret);
2449                         mxlnd_ctx_init(tx);
2450                         tx->mxc_state = MXLND_CTX_PREP;
2451                         tx->mxc_peer = peer;
2452                         tx->mxc_conn = conn;
2453                         /* finalize = 0, let the PUT_ACK tx finalize this */
2454                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2455                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2456                         /* conn ref already taken above */
2457                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2458                         /* repost = 1 */
2459                         break;
2460                 }
2461
2462                 mxlnd_queue_tx(tx);
2463                 /* do not return a credit until after PUT_DATA returns */
2464                 repost = 0;
2465                 break;
2466
2467         case MXLND_MSG_GET_REQ:
2468                 cookie = rxmsg->mxm_u.get_req.mxgrm_cookie;
2469
2470                 if (likely(lntmsg != NULL)) {
2471                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2472                                         cookie);
2473                 } else {
2474                         /* GET didn't match anything */
2475                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2476                          * We have to embed the error code in the match bits.
2477                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2478                         tx = mxlnd_get_idle_tx();
2479                         if (unlikely(tx == NULL)) {
2480                                 CNETERR("Can't get tx for GET NAK for %s\n",
2481                                         libcfs_nid2str(nid));
2482                                 /* we can't get a tx, notify the peer that the GET failed */
2483                                 mxlnd_send_message(conn->mxk_epa, MXLND_MSG_GET_DATA,
2484                                                    ENODATA, cookie);
2485                                 ret = -ENOMEM;
2486                                 break;
2487                         }
2488                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2489                         tx->mxc_state = MXLND_CTX_PENDING;
2490                         tx->mxc_nid = nid;
2491                         tx->mxc_peer = peer;
2492                         tx->mxc_conn = conn;
2493                         /* no need to lock peer first since we already have a ref */
2494                         mxlnd_conn_addref(conn); /* for this tx */
2495                         tx->mxc_cookie = cookie;
2496                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2497                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2498                         mxlnd_queue_tx(tx);
2499                 }
2500                 /* finalize lntmsg after tx completes */
2501                 break;
2502
2503         default:
2504                 LBUG();
2505         }
2506
2507         if (repost) {
2508                 /* we received a message, increment peer's outstanding credits */
2509                 if (credit == 1) {
2510                         spin_lock(&conn->mxk_lock);
2511                         conn->mxk_outstanding++;
2512                         spin_unlock(&conn->mxk_lock);
2513                 }
2514                 /* we are done with the rx */
2515                 mxlnd_put_idle_rx(rx);
2516                 mxlnd_conn_decref(conn);
2517         }
2518
2519         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0);
2520
2521         /* we received a credit, see if we can use it to send a msg */
2522         if (credit) mxlnd_check_sends(peer);
2523
2524         return ret;
2525 }
2526
2527 void
2528 mxlnd_sleep(unsigned long timeout)
2529 {
2530         cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2531         cfs_schedule_timeout(timeout);
2532         return;
2533 }
2534
2535 /**
2536  * mxlnd_tx_queued - the generic send queue thread
2537  * @arg - thread id (as a void *)
2538  *
2539  * This thread moves send messages from the global tx_queue to the owning
2540  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2541  * it to the global peer list.
2542  */
2543 int
2544 mxlnd_tx_queued(void *arg)
2545 {
2546         long                    id      = (long) arg;
2547         int                     ret     = 0;
2548         int                     found   = 0;
2549         kmx_ctx_t              *tx      = NULL;
2550         kmx_peer_t             *peer    = NULL;
2551         cfs_list_t             *queue   = &kmxlnd_data.kmx_tx_queue;
2552         spinlock_t              *tx_q_lock = &kmxlnd_data.kmx_tx_queue_lock;
2553         rwlock_t                *g_lock  = &kmxlnd_data.kmx_global_lock;
2554
2555         cfs_daemonize("mxlnd_tx_queued");
2556
2557         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
2558                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2559                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
2560                         break;
2561                 if (ret != 0) /* Should we check for -EINTR? */
2562                         continue;
2563                 spin_lock(tx_q_lock);
2564                 if (cfs_list_empty(&kmxlnd_data.kmx_tx_queue)) {
2565                         spin_unlock(tx_q_lock);
2566                         continue;
2567                 }
2568                 tx = cfs_list_entry(queue->next, kmx_ctx_t, mxc_list);
2569                 cfs_list_del_init(&tx->mxc_list);
2570                 spin_unlock(tx_q_lock);
2571
2572                 found = 0;
2573                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid, 0); /* adds ref*/
2574                 if (peer != NULL) {
2575                         tx->mxc_peer = peer;
2576                         write_lock(g_lock);
2577                         if (peer->mxp_conn == NULL) {
2578                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn,
2579                                                               peer);
2580                                 if (ret != 0) {
2581                                         /* out of memory: give up, fail tx */
2582                                         tx->mxc_errno = -ENOMEM;
2583                                         mxlnd_peer_decref(peer);
2584                                         write_unlock(g_lock);
2585                                         mxlnd_put_idle_tx(tx);
2586                                         continue;
2587                                 }
2588                         }
2589                         tx->mxc_conn = peer->mxp_conn;
2590                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2591                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2592                         write_unlock(g_lock);
2593                         mxlnd_queue_tx(tx);
2594                         found = 1;
2595                 }
2596                 if (found == 0) {
2597                         int             hash    = 0;
2598                         kmx_peer_t     *peer    = NULL;
2599                         kmx_peer_t     *old     = NULL;
2600
2601                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2602
2603                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2604                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2605                         /* create peer */
2606                         /* adds conn ref for this function */
2607                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid,
2608                                         *kmxlnd_tunables.kmx_board,
2609                                         *kmxlnd_tunables.kmx_ep_id, 0ULL);
2610                         if (ret != 0) {
2611                                 /* finalize message */
2612                                 tx->mxc_errno = ret;
2613                                 mxlnd_put_idle_tx(tx);
2614                                 continue;
2615                         }
2616                         tx->mxc_peer = peer;
2617                         tx->mxc_conn = peer->mxp_conn;
2618                         /* this tx will keep the conn ref taken in peer_alloc() */
2619
2620                         /* add peer to global peer list, but look to see
2621                          * if someone already created it after we released
2622                          * the read lock */
2623                         write_lock(g_lock);
2624                         old = mxlnd_find_peer_by_nid_locked(peer->mxp_nid);
2625                         if (old) {
2626                                 /* we have a peer ref on old */
2627                                 if (old->mxp_conn) {
2628                                         found = 1;
2629                                 } else {
2630                                         /* no conn */
2631                                         /* drop our ref taken above... */
2632                                         mxlnd_peer_decref(old);
2633                                         /* and delete it */
2634                                         mxlnd_del_peer_locked(old);
2635                                 }
2636                         }
2637
2638                         if (found == 0) {
2639                                 cfs_list_add_tail(&peer->mxp_list,
2640                                                   &kmxlnd_data.kmx_peers[hash]);
2641                                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
2642                         } else {
2643                                 tx->mxc_peer = old;
2644                                 tx->mxc_conn = old->mxp_conn;
2645                                 LASSERT(old->mxp_conn != NULL);
2646                                 mxlnd_conn_addref(old->mxp_conn);
2647                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2648                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2649                                 mxlnd_peer_decref(peer);
2650                         }
2651                         write_unlock(g_lock);
2652
2653                         mxlnd_queue_tx(tx);
2654                 }
2655         }
2656         mxlnd_thread_stop(id);
2657         return 0;
2658 }
2659
2660 /* When calling this, we must not have the peer lock. */
2661 void
2662 mxlnd_iconnect(kmx_peer_t *peer, u8 msg_type)
2663 {
2664         mx_return_t     mxret           = MX_SUCCESS;
2665         mx_request_t    request;
2666         kmx_conn_t      *conn           = peer->mxp_conn;
2667         u64             match           = ((u64) msg_type) << MXLND_MSG_OFFSET;
2668
2669         /* NOTE we are holding a conn ref every time we call this function,
2670          * we do not need to lock the peer before taking another ref */
2671         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2672
2673         LASSERT(msg_type == MXLND_MSG_ICON_REQ || msg_type == MXLND_MSG_ICON_ACK);
2674
2675         if (peer->mxp_reconnect_time == 0) {
2676                 peer->mxp_reconnect_time = jiffies;
2677         }
2678
2679         if (peer->mxp_nic_id == 0ULL) {
2680                 int     ret     = 0;
2681
2682                 ret = mxlnd_ip2nic_id(LNET_NIDADDR(peer->mxp_nid),
2683                                       &peer->mxp_nic_id, MXLND_LOOKUP_COUNT);
2684                 if (ret == 0) {
2685                         mx_nic_id_to_board_number(peer->mxp_nic_id, &peer->mxp_board);
2686                 }
2687                 if (peer->mxp_nic_id == 0ULL && conn->mxk_status == MXLND_CONN_WAIT) {
2688                         /* not mapped yet, return */
2689                         spin_lock(&conn->mxk_lock);
2690                         mxlnd_set_conn_status(conn, MXLND_CONN_INIT);
2691                         spin_unlock(&conn->mxk_lock);
2692                 }
2693         }
2694
2695         if (cfs_time_after(jiffies,
2696                            peer->mxp_reconnect_time + MXLND_CONNECT_TIMEOUT) &&
2697             conn->mxk_status != MXLND_CONN_DISCONNECT) {
2698                 /* give up and notify LNET */
2699                 CDEBUG(D_NET, "timeout trying to connect to %s\n",
2700                        libcfs_nid2str(peer->mxp_nid));
2701                 mxlnd_conn_disconnect(conn, 0, 0);
2702                 mxlnd_conn_decref(conn);
2703                 return;
2704         }
2705
2706         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2707                             peer->mxp_ep_id, MXLND_MSG_MAGIC, match,
2708                             (void *) peer, &request);
2709         if (unlikely(mxret != MX_SUCCESS)) {
2710                 spin_lock(&conn->mxk_lock);
2711                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
2712                 spin_unlock(&conn->mxk_lock);
2713                 CNETERR("mx_iconnect() failed with %s (%d) to %s\n",
2714                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2715                 mxlnd_conn_decref(conn);
2716         }
2717         mx_set_request_timeout(kmxlnd_data.kmx_endpt, request,
2718                                MXLND_CONNECT_TIMEOUT/CFS_HZ*1000);
2719         return;
2720 }
2721
2722 #define MXLND_STATS 0
2723
2724 int
2725 mxlnd_check_sends(kmx_peer_t *peer)
2726 {
2727         int             ret             = 0;
2728         int             found           = 0;
2729         mx_return_t     mxret           = MX_SUCCESS;
2730         kmx_ctx_t       *tx             = NULL;
2731         kmx_conn_t      *conn           = NULL;
2732         u8              msg_type        = 0;
2733         int             credit          = 0;
2734         int             status          = 0;
2735         int             ntx_posted      = 0;
2736         int             credits         = 0;
2737 #if MXLND_STATS
2738         static unsigned long last       = 0;
2739 #endif
2740
2741         if (unlikely(peer == NULL)) {
2742                 LASSERT(peer != NULL);
2743                 return -1;
2744         }
2745         write_lock(&kmxlnd_data.kmx_global_lock);
2746         conn = peer->mxp_conn;
2747         /* NOTE take a ref for the duration of this function since it is
2748          * called when there might not be any queued txs for this peer */
2749         if (conn) {
2750                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2751                         write_unlock(&kmxlnd_data.kmx_global_lock);
2752                         return -1;
2753                 }
2754                 mxlnd_conn_addref(conn); /* for duration of this function */
2755         }
2756         write_unlock(&kmxlnd_data.kmx_global_lock);
2757
2758         /* do not add another ref for this tx */
2759
2760         if (conn == NULL) {
2761                 /* we do not have any conns */
2762                 CNETERR("peer %s has no conn\n", libcfs_nid2str(peer->mxp_nid));
2763                 return -1;
2764         }
2765
2766 #if MXLND_STATS
2767         if (cfs_time_after(jiffies, last)) {
2768                 last = jiffies + CFS_HZ;
2769                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2770                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2771                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2772                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2773                               conn->mxk_ntx_data, conn->mxk_data_posted);
2774         }
2775 #endif
2776
2777         spin_lock(&conn->mxk_lock);
2778         ntx_posted = conn->mxk_ntx_posted;
2779         credits = conn->mxk_credits;
2780
2781         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_peercredits);
2782         LASSERT(ntx_posted >= 0);
2783
2784         LASSERT(credits <= *kmxlnd_tunables.kmx_peercredits);
2785         LASSERT(credits >= 0);
2786
2787         /* check number of queued msgs, ignore data */
2788         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER()) {
2789                 /* check if any txs queued that could return credits... */
2790                 if (cfs_list_empty(&conn->mxk_tx_credit_queue) ||
2791                     conn->mxk_ntx_msgs == 0) {
2792                         /* if not, send a NOOP */
2793                         tx = mxlnd_get_idle_tx();
2794                         if (likely(tx != NULL)) {
2795                                 tx->mxc_peer = peer;
2796                                 tx->mxc_conn = peer->mxp_conn;
2797                                 mxlnd_conn_addref(conn); /* for this tx */
2798                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2799                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2800                                 mxlnd_peer_queue_tx_locked(tx);
2801                                 found = 1;
2802                                 goto done_locked;
2803                         }
2804                 }
2805         }
2806
2807         /* if the peer is not ready, try to connect */
2808         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2809             conn->mxk_status == MXLND_CONN_FAIL)) {
2810                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2811                 mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
2812                 spin_unlock(&conn->mxk_lock);
2813                 mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_REQ);
2814                 goto done;
2815         }
2816
2817         while (!cfs_list_empty(&conn->mxk_tx_free_queue) ||
2818                !cfs_list_empty(&conn->mxk_tx_credit_queue)) {
2819                 /* We have something to send. If we have a queued tx that does not
2820                  * require a credit (free), choose it since its completion will
2821                  * return a credit (here or at the peer), complete a DATA or
2822                  * CONN_REQ or CONN_ACK. */
2823                 cfs_list_t *tmp_tx = NULL;
2824                 if (!cfs_list_empty(&conn->mxk_tx_free_queue)) {
2825                         tmp_tx = &conn->mxk_tx_free_queue;
2826                 } else {
2827                         tmp_tx = &conn->mxk_tx_credit_queue;
2828                 }
2829                 tx = cfs_list_entry(tmp_tx->next, kmx_ctx_t, mxc_list);
2830
2831                 msg_type = tx->mxc_msg_type;
2832
2833                 /* don't try to send a rx */
2834                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2835
2836                 /* ensure that it is a valid msg type */
2837                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2838                         msg_type == MXLND_MSG_CONN_ACK ||
2839                         msg_type == MXLND_MSG_NOOP     ||
2840                         msg_type == MXLND_MSG_EAGER    ||
2841                         msg_type == MXLND_MSG_PUT_REQ  ||
2842                         msg_type == MXLND_MSG_PUT_ACK  ||
2843                         msg_type == MXLND_MSG_PUT_DATA ||
2844                         msg_type == MXLND_MSG_GET_REQ  ||
2845                         msg_type == MXLND_MSG_GET_DATA);
2846                 LASSERT(tx->mxc_peer == peer);
2847                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2848
2849                 credit = mxlnd_tx_requires_credit(tx);
2850                 if (credit) {
2851
2852                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_peercredits) {
2853                                 CDEBUG(D_NET, "%s: posted enough\n",
2854                                               libcfs_nid2str(peer->mxp_nid));
2855                                 goto done_locked;
2856                         }
2857
2858                         if (conn->mxk_credits == 0) {
2859                                 CDEBUG(D_NET, "%s: no credits\n",
2860                                               libcfs_nid2str(peer->mxp_nid));
2861                                 goto done_locked;
2862                         }
2863
2864                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2865                             conn->mxk_outstanding == 0) {  /* giving back credits */
2866                                 CDEBUG(D_NET, "%s: not using last credit\n",
2867                                               libcfs_nid2str(peer->mxp_nid));
2868                                 goto done_locked;
2869                         }
2870                 }
2871
2872                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2873                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2874                                 msg_type == MXLND_MSG_CONN_ACK)) {
2875                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2876                                              mxlnd_connstatus_to_str(conn->mxk_status),
2877                                              tx->mxc_cookie,
2878                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2879                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT ||
2880                                     cfs_time_aftereq(jiffies, tx->mxc_deadline)) {
2881                                         cfs_list_del_init(&tx->mxc_list);
2882                                         tx->mxc_errno = -ECONNABORTED;
2883                                         spin_unlock(&conn->mxk_lock);
2884                                         mxlnd_put_idle_tx(tx);
2885                                         mxlnd_conn_decref(conn);
2886                                         goto done;
2887                                 }
2888                                 goto done_locked;
2889                         }
2890                 }
2891
2892                 cfs_list_del_init(&tx->mxc_list);
2893
2894                 /* handle credits, etc now while we have the lock to avoid races */
2895                 if (credit) {
2896                         conn->mxk_credits--;
2897                         conn->mxk_ntx_posted++;
2898                 }
2899                 if (msg_type != MXLND_MSG_PUT_DATA &&
2900                     msg_type != MXLND_MSG_GET_DATA) {
2901                         if (msg_type != MXLND_MSG_CONN_REQ &&
2902                             msg_type != MXLND_MSG_CONN_ACK) {
2903                                 conn->mxk_ntx_msgs--;
2904                         }
2905                 }
2906                 if (tx->mxc_incarnation == 0 &&
2907                     conn->mxk_incarnation != 0) {
2908                         tx->mxc_incarnation = conn->mxk_incarnation;
2909                 }
2910
2911                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER
2912                  * or (2) there is a non-DATA msg that can return credits in the
2913                  * queue, then drop this duplicate NOOP */
2914                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2915                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER()) ||
2916                             (conn->mxk_ntx_msgs >= 1)) {
2917                                 conn->mxk_credits++;
2918                                 conn->mxk_ntx_posted--;
2919                                 spin_unlock(&conn->mxk_lock);
2920                                 /* redundant NOOP */
2921                                 mxlnd_put_idle_tx(tx);
2922                                 mxlnd_conn_decref(conn);
2923                                 CDEBUG(D_NET, "%s: redundant noop\n",
2924                                               libcfs_nid2str(peer->mxp_nid));
2925                                 found = 1;
2926                                 goto done;
2927                         }
2928                 }
2929
2930                 found = 1;
2931                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2932                     (msg_type != MXLND_MSG_GET_DATA))) {
2933                         mxlnd_pack_msg_locked(tx);
2934                 }
2935
2936                 mxret = MX_SUCCESS;
2937
2938                 status = conn->mxk_status;
2939                 spin_unlock(&conn->mxk_lock);
2940
2941                 if (likely((status == MXLND_CONN_READY) ||
2942                     (msg_type == MXLND_MSG_CONN_REQ) ||
2943                     (msg_type == MXLND_MSG_CONN_ACK))) {
2944                         ret = 0;
2945                         if (msg_type != MXLND_MSG_CONN_REQ &&
2946                             msg_type != MXLND_MSG_CONN_ACK) {
2947                                 /* add to the pending list */
2948                                 ret = mxlnd_q_pending_ctx(tx);
2949                         } else {
2950                                 /* CONN_REQ/ACK */
2951                                 tx->mxc_state = MXLND_CTX_PENDING;
2952                         }
2953
2954                         if (ret == 0) {
2955                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2956                                     msg_type != MXLND_MSG_GET_DATA)) {
2957                                         /* send a msg style tx */
2958                                         LASSERT(tx->mxc_nseg == 1);
2959                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2960                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2961                                                mxlnd_msgtype_to_str(msg_type),
2962                                                tx->mxc_cookie);
2963                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2964                                                           &tx->mxc_seg,
2965                                                           tx->mxc_nseg,
2966                                                           tx->mxc_pin_type,
2967                                                           conn->mxk_epa,
2968                                                           tx->mxc_match,
2969                                                           (void *) tx,
2970                                                           &tx->mxc_mxreq);
2971                                 } else {
2972                                         /* send a DATA tx */
2973                                         spin_lock(&conn->mxk_lock);
2974                                         conn->mxk_ntx_data--;
2975                                         conn->mxk_data_posted++;
2976                                         spin_unlock(&conn->mxk_lock);
2977                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2978                                                mxlnd_msgtype_to_str(msg_type),
2979                                                tx->mxc_cookie);
2980                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2981                                                           tx->mxc_seg_list,
2982                                                           tx->mxc_nseg,
2983                                                           tx->mxc_pin_type,
2984                                                           conn->mxk_epa,
2985                                                           tx->mxc_match,
2986                                                           (void *) tx,
2987                                                           &tx->mxc_mxreq);
2988                                 }
2989                         } else {
2990                                 /* ret != 0 */
2991                                 mxret = MX_CONNECTION_FAILED;
2992                         }
2993                         if (likely(mxret == MX_SUCCESS)) {
2994                                 ret = 0;
2995                         } else {
2996                                 CNETERR("mx_kisend() failed with %s (%d) "
2997                                         "sending to %s\n", mx_strerror(mxret), (int) mxret,
2998                                        libcfs_nid2str(peer->mxp_nid));
2999                                 /* NOTE mx_kisend() only fails if there are not enough
3000                                 * resources. Do not change the connection status. */
3001                                 if (mxret == MX_NO_RESOURCES) {
3002                                         tx->mxc_errno = -ENOMEM;
3003                                 } else {
3004                                         tx->mxc_errno = -ECONNABORTED;
3005                                 }
3006                                 if (credit) {
3007                                         spin_lock(&conn->mxk_lock);
3008                                         conn->mxk_ntx_posted--;
3009                                         conn->mxk_credits++;
3010                                         spin_unlock(&conn->mxk_lock);
3011                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
3012                                            msg_type == MXLND_MSG_GET_DATA) {
3013                                         spin_lock(&conn->mxk_lock);
3014                                         conn->mxk_data_posted--;
3015                                         spin_unlock(&conn->mxk_lock);
3016                                 }
3017                                 if (msg_type != MXLND_MSG_PUT_DATA &&
3018                                     msg_type != MXLND_MSG_GET_DATA &&
3019                                     msg_type != MXLND_MSG_CONN_REQ &&
3020                                     msg_type != MXLND_MSG_CONN_ACK) {
3021                                         spin_lock(&conn->mxk_lock);
3022                                         conn->mxk_outstanding +=
3023                                                 tx->mxc_msg->mxm_credits;
3024                                         spin_unlock(&conn->mxk_lock);
3025                                 }
3026                                 if (msg_type != MXLND_MSG_CONN_REQ &&
3027                                     msg_type != MXLND_MSG_CONN_ACK) {
3028                                         /* remove from the pending list */
3029                                         mxlnd_deq_pending_ctx(tx);
3030                                 }
3031                                 mxlnd_put_idle_tx(tx);
3032                                 mxlnd_conn_decref(conn);
3033                         }
3034                 }
3035                 spin_lock(&conn->mxk_lock);
3036         }
3037 done_locked:
3038         spin_unlock(&conn->mxk_lock);
3039 done:
3040         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
3041         return found;
3042 }
3043
3044
3045 /**
3046  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
3047  * @ctx - the tx descriptor
3048  *
3049  * Determine which type of send request it was and start the next step, if needed,
3050  * or, if done, signal completion to LNET. After we are done, put back on the
3051  * idle tx list.
3052  */
3053 void
3054 mxlnd_handle_tx_completion(kmx_ctx_t *tx)
3055 {
3056         int             code    = tx->mxc_status.code;
3057         int             failed  = (code != MX_STATUS_SUCCESS || tx->mxc_errno != 0);
3058         kmx_msg_t       *msg    = tx->mxc_msg;
3059         kmx_peer_t      *peer   = tx->mxc_peer;
3060         kmx_conn_t      *conn   = tx->mxc_conn;
3061         u8              type    = tx->mxc_msg_type;
3062         int             credit  = mxlnd_tx_requires_credit(tx);
3063         u64             cookie  = tx->mxc_cookie;
3064
3065         CDEBUG(D_NET, "entering %s (0x%llx):\n",
3066                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
3067
3068         LASSERT (peer != NULL);
3069         LASSERT (conn != NULL);
3070
3071         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
3072                 LASSERT (type == msg->mxm_type);
3073         }
3074
3075         if (failed) {
3076                 if (tx->mxc_errno == 0) tx->mxc_errno = -EIO;
3077         } else {
3078                 spin_lock(&conn->mxk_lock);
3079                 conn->mxk_last_tx = cfs_time_current(); /* jiffies */
3080                 spin_unlock(&conn->mxk_lock);
3081         }
3082
3083         switch (type) {
3084
3085         case MXLND_MSG_GET_DATA:
3086                 spin_lock(&conn->mxk_lock);
3087                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3088                         conn->mxk_outstanding++;
3089                         conn->mxk_data_posted--;
3090                 }
3091                 spin_unlock(&conn->mxk_lock);
3092                 break;
3093
3094         case MXLND_MSG_PUT_DATA:
3095                 spin_lock(&conn->mxk_lock);
3096                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3097                         conn->mxk_data_posted--;
3098                 }
3099                 spin_unlock(&conn->mxk_lock);
3100                 break;
3101
3102         case MXLND_MSG_NOOP:
3103         case MXLND_MSG_PUT_REQ:
3104         case MXLND_MSG_PUT_ACK:
3105         case MXLND_MSG_GET_REQ:
3106         case MXLND_MSG_EAGER:
3107                 break;
3108
3109         case MXLND_MSG_CONN_ACK:
3110                 if (peer->mxp_incompatible) {
3111                         /* we sent our params, now close this conn */
3112                         mxlnd_conn_disconnect(conn, 0, 1);
3113                 }
3114         case MXLND_MSG_CONN_REQ:
3115                 if (failed) {
3116                         CNETERR("%s failed with %s (%d) (errno = %d) to %s\n",
3117                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
3118                                mx_strstatus(code), code, tx->mxc_errno,
3119                                libcfs_nid2str(tx->mxc_nid));
3120                         if (!peer->mxp_incompatible) {
3121                                 spin_lock(&conn->mxk_lock);
3122                                 if (code == MX_STATUS_BAD_SESSION)
3123                                         mxlnd_set_conn_status(conn,
3124                                                               MXLND_CONN_INIT);
3125                                 else
3126                                         mxlnd_set_conn_status(conn,
3127                                                               MXLND_CONN_FAIL);
3128                                 spin_unlock(&conn->mxk_lock);
3129                         }
3130                 }
3131                 break;
3132
3133         default:
3134                 CNETERR("Unknown msg type of %d\n", type);
3135                 LBUG();
3136         }
3137
3138         if (credit) {
3139                 spin_lock(&conn->mxk_lock);
3140                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
3141                         conn->mxk_ntx_posted--;
3142                 }
3143                 spin_unlock(&conn->mxk_lock);
3144         }
3145
3146         mxlnd_put_idle_tx(tx);
3147         mxlnd_conn_decref(conn);
3148
3149         mxlnd_check_sends(peer);
3150
3151         CDEBUG(D_NET, "leaving\n");
3152         return;
3153 }
3154
3155 /* Handle completion of MSG or DATA rx.
3156  * CONN_REQ and CONN_ACK are handled elsewhere. */
3157 void
3158 mxlnd_handle_rx_completion(kmx_ctx_t *rx)
3159 {
3160         int             ret             = 0;
3161         int             repost          = 1;
3162         int             credit          = 1;
3163         u32             nob             = rx->mxc_status.xfer_length;
3164         u64             bits            = rx->mxc_status.match_info;
3165         kmx_msg_t      *msg             = rx->mxc_msg;
3166         kmx_peer_t     *peer            = rx->mxc_peer;
3167         kmx_conn_t     *conn            = rx->mxc_conn;
3168         u8              type            = rx->mxc_msg_type;
3169         u64             seq             = bits;
3170         lnet_msg_t     *lntmsg[2];
3171         int             result          = 0;
3172         int             peer_ref        = 0;
3173         int             conn_ref        = 0;
3174
3175         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ,
3176          * failed GET reply */
3177
3178         /* NOTE peer may still be NULL if it is a new peer and
3179          *      conn may be NULL if this is a re-connect */
3180         if (likely(peer != NULL && conn != NULL)) {
3181                 /* we have a reference on the conn */
3182                 conn_ref = 1;
3183         } else if (peer != NULL && conn == NULL) {
3184                 /* we have a reference on the peer */
3185                 peer_ref = 1;
3186         } else if (peer == NULL && conn != NULL) {
3187                 /* fatal error */
3188                 CERROR("rx 0x%llx from %s has conn but no peer\n",
3189                        bits, libcfs_nid2str(rx->mxc_nid));
3190                 LBUG();
3191         } /* else peer and conn == NULL */
3192
3193         if (conn == NULL && peer != NULL) {
3194                 write_lock(&kmxlnd_data.kmx_global_lock);
3195                 conn = peer->mxp_conn;
3196                 if (conn) {
3197                         mxlnd_conn_addref(conn); /* conn takes ref... */
3198                         mxlnd_peer_decref(peer); /* from peer */
3199                         conn_ref = 1;
3200                         peer_ref = 0;
3201                 }
3202                 write_unlock(&kmxlnd_data.kmx_global_lock);
3203                 rx->mxc_conn = conn;
3204         }
3205
3206 #if MXLND_DEBUG
3207         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
3208 #endif
3209
3210         lntmsg[0] = NULL;
3211         lntmsg[1] = NULL;
3212
3213         if (rx->mxc_status.code != MX_STATUS_SUCCESS &&
3214             rx->mxc_status.code != MX_STATUS_TRUNCATED) {
3215                 CNETERR("rx from %s failed with %s (%d)\n",
3216                         libcfs_nid2str(rx->mxc_nid),
3217                         mx_strstatus(rx->mxc_status.code),
3218                         rx->mxc_status.code);
3219                 credit = 0;
3220                 goto cleanup;
3221         }
3222
3223         if (nob == 0) {
3224                 /* this may be a failed GET reply */
3225                 if (type == MXLND_MSG_GET_DATA) {
3226                         /* get the error (52-59) bits from the match bits */
3227                         ret = (u32) MXLND_ERROR_VAL(rx->mxc_status.match_info);
3228                         lntmsg[0] = rx->mxc_lntmsg[0];
3229                         result = -ret;
3230                         goto cleanup;
3231                 } else {
3232                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
3233                         CNETERR("rx from %s returned with 0 bytes\n",
3234                                 libcfs_nid2str(rx->mxc_nid));
3235                         goto cleanup;
3236                 }
3237         }
3238
3239         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
3240         if (type == MXLND_MSG_PUT_DATA) {
3241                 /* result = 0; */
3242                 lntmsg[0] = rx->mxc_lntmsg[0];
3243                 goto cleanup;
3244         } else if (type == MXLND_MSG_GET_DATA) {
3245                 /* result = 0; */
3246                 lntmsg[0] = rx->mxc_lntmsg[0];
3247                 lntmsg[1] = rx->mxc_lntmsg[1];
3248                 goto cleanup;
3249         }
3250
3251         ret = mxlnd_unpack_msg(msg, nob);
3252         if (ret != 0) {
3253                 CNETERR("Error %d unpacking rx from %s\n",
3254                         ret, libcfs_nid2str(rx->mxc_nid));
3255                 goto cleanup;
3256         }
3257         rx->mxc_nob = nob;
3258         type = msg->mxm_type;
3259
3260         if (rx->mxc_nid != msg->mxm_srcnid ||
3261             kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3262                 CNETERR("rx with mismatched NID (type %s) (my nid is "
3263                        "0x%llx and rx msg dst is 0x%llx)\n",
3264                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
3265                        msg->mxm_dstnid);
3266                 goto cleanup;
3267         }
3268
3269         if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
3270             msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
3271                 CNETERR("Stale rx from %s with type %s "
3272                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
3273                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
3274                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
3275                        msg->mxm_srcstamp, conn->mxk_incarnation,
3276                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
3277                 credit = 0;
3278                 goto cleanup;
3279         }
3280
3281         CDEBUG(D_NET, "Received %s with %d credits\n",
3282                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
3283
3284         LASSERT(peer != NULL && conn != NULL);
3285         if (msg->mxm_credits != 0) {
3286                 spin_lock(&conn->mxk_lock);
3287                 if (msg->mxm_srcstamp == conn->mxk_incarnation) {
3288                         if ((conn->mxk_credits + msg->mxm_credits) >
3289                              *kmxlnd_tunables.kmx_peercredits) {
3290                                 CNETERR("mxk_credits %d  mxm_credits %d\n",
3291                                         conn->mxk_credits, msg->mxm_credits);
3292                         }
3293                         conn->mxk_credits += msg->mxm_credits;
3294                         LASSERT(conn->mxk_credits >= 0);
3295                         LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_peercredits);
3296                 }
3297                 spin_unlock(&conn->mxk_lock);
3298         }
3299
3300         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
3301         switch (type) {
3302         case MXLND_MSG_NOOP:
3303                 break;
3304
3305         case MXLND_MSG_EAGER:
3306                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
3307                                         msg->mxm_srcnid, rx, 0);
3308                 repost = ret < 0;
3309                 break;
3310
3311         case MXLND_MSG_PUT_REQ:
3312                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
3313                                         msg->mxm_srcnid, rx, 1);
3314                 repost = ret < 0;
3315                 break;
3316
3317         case MXLND_MSG_PUT_ACK: {
3318                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
3319                 if (cookie > MXLND_MAX_COOKIE) {
3320                         CNETERR("NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3321                                 libcfs_nid2str(rx->mxc_nid));
3322                         result = -((u32) MXLND_ERROR_VAL(cookie));
3323                         lntmsg[0] = rx->mxc_lntmsg[0];
3324                 } else {
3325                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3326                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3327                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3328                 }
3329                 /* repost == 1 */
3330                 break;
3331         }
3332         case MXLND_MSG_GET_REQ:
3333                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3334                                         msg->mxm_srcnid, rx, 1);
3335                 repost = ret < 0;
3336                 break;
3337
3338         default:
3339                 CNETERR("Bad MXLND message type %x from %s\n", msg->mxm_type,
3340                         libcfs_nid2str(rx->mxc_nid));
3341                 ret = -EPROTO;
3342                 break;
3343         }
3344
3345         if (ret < 0) {
3346                 CDEBUG(D_NET, "setting PEER_CONN_FAILED\n");
3347                 spin_lock(&conn->mxk_lock);
3348                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3349                 spin_unlock(&conn->mxk_lock);
3350         }
3351
3352 cleanup:
3353         if (conn != NULL) {
3354                 spin_lock(&conn->mxk_lock);
3355                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3356                 spin_unlock(&conn->mxk_lock);
3357         }
3358
3359         if (repost) {
3360                 /* lnet_parse() failed, etc., repost now */
3361                 mxlnd_put_idle_rx(rx);
3362                 if (conn != NULL && credit == 1) {
3363                         if (type == MXLND_MSG_PUT_DATA ||
3364                             type == MXLND_MSG_EAGER ||
3365                             type == MXLND_MSG_PUT_REQ ||
3366                             type == MXLND_MSG_NOOP) {
3367                                 spin_lock(&conn->mxk_lock);
3368                                 conn->mxk_outstanding++;
3369                                 spin_unlock(&conn->mxk_lock);
3370                         }
3371                 }
3372                 if (conn_ref) mxlnd_conn_decref(conn);
3373                 LASSERT(peer_ref == 0);
3374         }
3375
3376         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3377                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3378         } else {
3379                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3380         }
3381
3382         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3383         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3384
3385         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3386
3387         return;
3388 }
3389
3390 void
3391 mxlnd_handle_connect_msg(kmx_peer_t *peer, u8 msg_type, mx_status_t status)
3392 {
3393         kmx_ctx_t       *tx     = NULL;
3394         kmx_msg_t       *txmsg  = NULL;
3395         kmx_conn_t      *conn   = peer->mxp_conn;
3396         u64             nic_id  = 0ULL;
3397         u32             ep_id   = 0;
3398         u32             sid     = 0;
3399         u8              type    = (msg_type == MXLND_MSG_ICON_REQ ?
3400                                    MXLND_MSG_CONN_REQ : MXLND_MSG_CONN_ACK);
3401
3402         /* a conn ref was taken when calling mx_iconnect(),
3403          * hold it until CONN_REQ or CONN_ACK completes */
3404
3405         CDEBUG(D_NET, "entering\n");
3406         if (status.code != MX_STATUS_SUCCESS) {
3407                 int send_bye    = (msg_type == MXLND_MSG_ICON_REQ ? 0 : 1);
3408
3409                 CNETERR("mx_iconnect() failed for %s with %s (%d) "
3410                         "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxp_ep_id = %d\n",
3411                         mxlnd_msgtype_to_str(msg_type),
3412                         mx_strstatus(status.code), status.code,
3413                         libcfs_nid2str(peer->mxp_nid),
3414                         peer->mxp_nid,
3415                         peer->mxp_nic_id,
3416                         peer->mxp_ep_id);
3417                 spin_lock(&conn->mxk_lock);
3418                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3419                 spin_unlock(&conn->mxk_lock);
3420
3421                 if (cfs_time_after(jiffies, peer->mxp_reconnect_time +
3422                                    MXLND_CONNECT_TIMEOUT)) {
3423                         CNETERR("timeout, calling conn_disconnect()\n");
3424                         mxlnd_conn_disconnect(conn, 0, send_bye);
3425                 }
3426
3427                 mxlnd_conn_decref(conn);
3428                 return;
3429         }
3430         mx_decompose_endpoint_addr2(status.source, &nic_id, &ep_id, &sid);
3431         write_lock(&kmxlnd_data.kmx_global_lock);
3432         spin_lock(&conn->mxk_lock);
3433         conn->mxk_epa = status.source;
3434         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) conn);
3435         if (msg_type == MXLND_MSG_ICON_ACK && likely(!peer->mxp_incompatible)) {
3436                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3437         }
3438         spin_unlock(&conn->mxk_lock);
3439         write_unlock(&kmxlnd_data.kmx_global_lock);
3440
3441         /* mx_iconnect() succeeded, reset delay to 0 */
3442         write_lock(&kmxlnd_data.kmx_global_lock);
3443         peer->mxp_reconnect_time = 0;
3444         peer->mxp_conn->mxk_sid = sid;
3445         write_unlock(&kmxlnd_data.kmx_global_lock);
3446
3447         /* marshal CONN_REQ or CONN_ACK msg */
3448         /* we are still using the conn ref from iconnect() - do not take another */
3449         tx = mxlnd_get_idle_tx();
3450         if (tx == NULL) {
3451                 CNETERR("Can't obtain %s tx for %s\n",
3452                        mxlnd_msgtype_to_str(type),
3453                        libcfs_nid2str(peer->mxp_nid));
3454                 spin_lock(&conn->mxk_lock);
3455                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3456                 spin_unlock(&conn->mxk_lock);
3457                 mxlnd_conn_decref(conn);
3458                 return;
3459         }
3460
3461         tx->mxc_peer = peer;
3462         tx->mxc_conn = conn;
3463         tx->mxc_deadline = jiffies + MXLND_CONNECT_TIMEOUT;
3464         CDEBUG(D_NET, "sending %s\n", mxlnd_msgtype_to_str(type));
3465         mxlnd_init_tx_msg (tx, type, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3466         txmsg = tx->mxc_msg;
3467         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_peercredits;
3468         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_MSG_SIZE;
3469         tx->mxc_match = mxlnd_create_match(tx, 0);
3470
3471         mxlnd_queue_tx(tx);
3472         return;
3473 }
3474
3475 /**
3476  * mxlnd_request_waitd - the MX request completion thread(s)
3477  * @arg - thread id (as a void *)
3478  *
3479  * This thread waits for a MX completion and then completes the request.
3480  * We will create one thread per CPU.
3481  */
3482 int
3483 mxlnd_request_waitd(void *arg)
3484 {
3485         long                    id              = (long) arg;
3486         char                    name[24];
3487         __u32                   result          = 0;
3488         mx_return_t             mxret           = MX_SUCCESS;
3489         mx_status_t             status;
3490         kmx_ctx_t              *ctx             = NULL;
3491         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3492         kmx_peer_t             *peer            = NULL;
3493         kmx_conn_t             *conn            = NULL;
3494 #if MXLND_POLLING
3495         int                     count           = 0;
3496 #endif
3497
3498         memset(name, 0, sizeof(name));
3499         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3500         cfs_daemonize(name);
3501
3502         memset(&status, 0, sizeof(status));
3503
3504         CDEBUG(D_NET, "%s starting\n", name);
3505
3506         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3507                 u8      msg_type        = 0;
3508
3509                 mxret = MX_SUCCESS;
3510                 result = 0;
3511 #if MXLND_POLLING
3512                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3513                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0ULL, 0ULL,
3514                                             &status, &result);
3515                 } else {
3516                         count = 0;
3517                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3518                                             0ULL, 0ULL, &status, &result);
3519                 }
3520 #else
3521                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3522                                     0ULL, 0ULL, &status, &result);
3523 #endif
3524                 if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown)))
3525                         break;
3526
3527                 if (result != 1) {
3528                         /* nothing completed... */
3529                         continue;
3530                 }
3531
3532                 CDEBUG(D_NET, "wait_any() returned with %s (%d) with "
3533                        "match_info 0x%llx and length %d\n",
3534                        mx_strstatus(status.code), status.code,
3535                        (u64) status.match_info, status.msg_length);
3536
3537                 if (status.code != MX_STATUS_SUCCESS) {
3538                         CNETERR("wait_any() failed with %s (%d) with "
3539                                 "match_info 0x%llx and length %d\n",
3540                                 mx_strstatus(status.code), status.code,
3541                                 (u64) status.match_info, status.msg_length);
3542                 }
3543
3544                 msg_type = MXLND_MSG_TYPE(status.match_info);
3545
3546                 /* This may be a mx_iconnect() request completing,
3547                  * check the bit mask for CONN_REQ and CONN_ACK */
3548                 if (msg_type == MXLND_MSG_ICON_REQ ||
3549                     msg_type == MXLND_MSG_ICON_ACK) {
3550                         peer = (kmx_peer_t*) status.context;
3551                         mxlnd_handle_connect_msg(peer, msg_type, status);
3552                         continue;
3553                 }
3554
3555                 /* This must be a tx or rx */
3556
3557                 /* NOTE: if this is a RX from the unexpected callback, it may
3558                  * have very little info. If we dropped it in unexpected_recv(),
3559                  * it will not have a context. If so, ignore it. */
3560                 ctx = (kmx_ctx_t *) status.context;
3561                 if (ctx != NULL) {
3562
3563                         req_type = ctx->mxc_type;
3564                         conn = ctx->mxc_conn; /* this may be NULL */
3565                         mxlnd_deq_pending_ctx(ctx);
3566
3567                         /* copy status to ctx->mxc_status */
3568                         ctx->mxc_status = status;
3569
3570                         switch (req_type) {
3571                         case MXLND_REQ_TX:
3572                                 mxlnd_handle_tx_completion(ctx);
3573                                 break;
3574                         case MXLND_REQ_RX:
3575                                 mxlnd_handle_rx_completion(ctx);
3576                                 break;
3577                         default:
3578                                 CNETERR("Unknown ctx type %d\n", req_type);
3579                                 LBUG();
3580                                 break;
3581                         }
3582
3583                         /* conn is always set except for the first CONN_REQ rx
3584                          * from a new peer */
3585                         if (status.code != MX_STATUS_SUCCESS && conn != NULL) {
3586                                 mxlnd_conn_disconnect(conn, 1, 1);
3587                         }
3588                 }
3589                 CDEBUG(D_NET, "waitd() completed task\n");
3590         }
3591         CDEBUG(D_NET, "%s stopping\n", name);
3592         mxlnd_thread_stop(id);
3593         return 0;
3594 }
3595
3596
3597 unsigned long
3598 mxlnd_check_timeouts(unsigned long now)
3599 {
3600         int             i               = 0;
3601         int             disconnect      = 0;
3602         unsigned long   next            = 0; /* jiffies */
3603         kmx_peer_t      *peer           = NULL;
3604         kmx_conn_t      *conn           = NULL;
3605         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3606
3607         read_lock(g_lock);
3608         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3609                 cfs_list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i],
3610                                         mxp_list) {
3611
3612                         if (unlikely(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3613                                 read_unlock(g_lock);
3614                                 return next;
3615                         }
3616
3617                         conn = peer->mxp_conn;
3618                         if (conn) {
3619                                 mxlnd_conn_addref(conn);
3620                         } else {
3621                                 continue;
3622                         }
3623
3624                         spin_lock(&conn->mxk_lock);
3625
3626                         /* if nothing pending (timeout == 0) or
3627                          * if conn is already disconnected,
3628                          * skip this conn */
3629                         if (conn->mxk_timeout == 0 ||
3630                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3631                                 spin_unlock(&conn->mxk_lock);
3632                                 mxlnd_conn_decref(conn);
3633                                 continue;
3634                         }
3635
3636                         /* we want to find the timeout that will occur first.
3637                          * if it is in the future, we will sleep until then.
3638                          * if it is in the past, then we will sleep one
3639                          * second and repeat the process. */
3640                         if ((next == 0) ||
3641                             (cfs_time_before(conn->mxk_timeout, next))) {
3642                                 next = conn->mxk_timeout;
3643                         }
3644
3645                         disconnect = 0;
3646
3647                         if (cfs_time_aftereq(now, conn->mxk_timeout))
3648                                 disconnect = 1;
3649                         spin_unlock(&conn->mxk_lock);
3650
3651                         if (disconnect)
3652                                 mxlnd_conn_disconnect(conn, 1, 1);
3653                         mxlnd_conn_decref(conn);
3654                 }
3655         }
3656         read_unlock(g_lock);
3657         if (next == 0)
3658                 next = now + MXLND_COMM_TIMEOUT;
3659
3660         return next;
3661 }
3662
3663 void
3664 mxlnd_passive_connect(kmx_connparams_t *cp)
3665 {
3666         int             ret             = 0;
3667         int             incompatible    = 0;
3668         u64             nic_id          = 0ULL;
3669         u32             ep_id           = 0;
3670         u32             sid             = 0;
3671         int             conn_ref        = 0;
3672         kmx_msg_t       *msg            = &cp->mxr_msg;
3673         kmx_peer_t      *peer           = cp->mxr_peer;
3674         kmx_conn_t      *conn           = NULL;
3675         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3676
3677         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3678
3679         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3680         if (ret != 0) {
3681                 if (peer) {
3682                         CNETERR("Error %d unpacking CONN_REQ from %s\n",
3683                                ret, libcfs_nid2str(peer->mxp_nid));
3684                 } else {
3685                         CNETERR("Error %d unpacking CONN_REQ from "
3686                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3687                 }
3688                 goto cleanup;
3689         }
3690         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3691                 CNETERR("Can't accept %s: bad dst nid %s\n",
3692                                 libcfs_nid2str(msg->mxm_srcnid),
3693                                 libcfs_nid2str(msg->mxm_dstnid));
3694                 goto cleanup;
3695         }
3696         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3697                 CNETERR("Can't accept %s: incompatible queue depth "
3698                             "%d (%d wanted)\n",
3699                                 libcfs_nid2str(msg->mxm_srcnid),
3700                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3701                                 *kmxlnd_tunables.kmx_peercredits);
3702                 incompatible = 1;
3703         }
3704         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3705                 CNETERR("Can't accept %s: incompatible EAGER size "
3706                             "%d (%d wanted)\n",
3707                                 libcfs_nid2str(msg->mxm_srcnid),
3708                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3709                                 (int) MXLND_MSG_SIZE);
3710                 incompatible = 1;
3711         }
3712
3713         if (peer == NULL) {
3714                 peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid, 0); /* adds peer ref */
3715                 if (peer == NULL) {
3716                         int             hash    = 0;
3717                         u32             board   = 0;
3718                         kmx_peer_t      *existing_peer    = NULL;
3719
3720                         hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3721
3722                         mx_nic_id_to_board_number(nic_id, &board);
3723
3724                         /* adds conn ref for peer and one for this function */
3725                         ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid,
3726                                                board, ep_id, 0ULL);
3727                         if (ret != 0) {
3728                                 goto cleanup;
3729                         }
3730                         peer->mxp_conn->mxk_sid = sid;
3731                         LASSERT(peer->mxp_ep_id == ep_id);
3732                         write_lock(g_lock);
3733                         existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3734                         if (existing_peer) {
3735                                 mxlnd_conn_decref(peer->mxp_conn);
3736                                 mxlnd_peer_decref(peer);
3737                                 peer = existing_peer;
3738                                 mxlnd_conn_addref(peer->mxp_conn);
3739                                 conn = peer->mxp_conn;
3740                         } else {
3741                                 cfs_list_add_tail(&peer->mxp_list,
3742                                                   &kmxlnd_data.kmx_peers[hash]);
3743                                 cfs_atomic_inc(&kmxlnd_data.kmx_npeers);
3744                         }
3745                         write_unlock(g_lock);
3746                 } else {
3747                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3748                         write_lock(g_lock);
3749                         mxlnd_peer_decref(peer); /* drop ref taken above */
3750                         write_unlock(g_lock);
3751                         if (ret != 0) {
3752                                 CNETERR("Cannot allocate mxp_conn\n");
3753                                 goto cleanup;
3754                         }
3755                 }
3756                 conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3757                 conn = peer->mxp_conn;
3758         } else { /* unexpected handler found peer */
3759                 kmx_conn_t      *old_conn       = peer->mxp_conn;
3760
3761                 if (sid != peer->mxp_conn->mxk_sid) {
3762                         /* do not call mx_disconnect() or send a BYE */
3763                         mxlnd_conn_disconnect(old_conn, 0, 0);
3764
3765                         /* This allocs a conn, points peer->mxp_conn to this one.
3766                         * The old conn is still on the peer->mxp_conns list.
3767                         * As the pending requests complete, they will call
3768                         * conn_decref() which will eventually free it. */
3769                         ret = mxlnd_conn_alloc(&conn, peer);
3770                         if (ret != 0) {
3771                                 CNETERR("Cannot allocate peer->mxp_conn\n");
3772                                 goto cleanup;
3773                         }
3774                         /* conn_alloc() adds one ref for the peer and one
3775                          * for this function */
3776                         conn_ref = 1;
3777
3778                         peer->mxp_conn->mxk_sid = sid;
3779                 } else {
3780                         /* same sid */
3781                         conn = peer->mxp_conn;
3782                 }
3783         }
3784         write_lock(g_lock);
3785         peer->mxp_incompatible = incompatible;
3786         write_unlock(g_lock);
3787         spin_lock(&conn->mxk_lock);
3788         conn->mxk_incarnation = msg->mxm_srcstamp;
3789         mxlnd_set_conn_status(conn, MXLND_CONN_WAIT);
3790         spin_unlock(&conn->mxk_lock);
3791
3792         /* handle_conn_ack() will create the CONN_ACK msg */
3793         mxlnd_iconnect(peer, (u8) MXLND_MSG_ICON_ACK);
3794
3795 cleanup:
3796         if (conn_ref) mxlnd_conn_decref(conn);
3797
3798         mxlnd_connparams_free(cp);
3799         return;
3800 }
3801
3802 void
3803 mxlnd_check_conn_ack(kmx_connparams_t *cp)
3804 {
3805         int             ret             = 0;
3806         int             incompatible    = 0;
3807         u64             nic_id          = 0ULL;
3808         u32             ep_id           = 0;
3809         u32             sid             = 0;
3810         kmx_msg_t       *msg            = &cp->mxr_msg;
3811         kmx_peer_t      *peer           = cp->mxr_peer;
3812         kmx_conn_t      *conn           = cp->mxr_conn;
3813
3814         mx_decompose_endpoint_addr2(cp->mxr_epa, &nic_id, &ep_id, &sid);
3815
3816         ret = mxlnd_unpack_msg(msg, cp->mxr_nob);
3817         if (ret != 0) {
3818                 if (peer) {
3819                         CNETERR("Error %d unpacking CONN_ACK from %s\n",
3820                                ret, libcfs_nid2str(peer->mxp_nid));
3821                 } else {
3822                         CNETERR("Error %d unpacking CONN_ACK from "
3823                                "unknown host with nic_id 0x%llx\n", ret, nic_id);
3824                 }
3825                 ret = -1;
3826                 incompatible = 1;
3827                 goto failed;
3828         }
3829         if (kmxlnd_data.kmx_ni->ni_nid != msg->mxm_dstnid) {
3830                 CNETERR("Can't accept CONN_ACK from %s: "
3831                        "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3832                         libcfs_nid2str(msg->mxm_dstnid));
3833                 ret = -1;
3834                 goto failed;
3835         }
3836         if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_peercredits) {
3837                 CNETERR("Can't accept CONN_ACK from %s: "
3838                        "incompatible queue depth %d (%d wanted)\n",
3839                         libcfs_nid2str(msg->mxm_srcnid),
3840                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3841                         *kmxlnd_tunables.kmx_peercredits);
3842                 incompatible = 1;
3843                 ret = -1;
3844                 goto failed;
3845         }
3846         if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_MSG_SIZE) {
3847                 CNETERR("Can't accept CONN_ACK from %s: "
3848                         "incompatible EAGER size %d (%d wanted)\n",
3849                         libcfs_nid2str(msg->mxm_srcnid),
3850                         msg->mxm_u.conn_req.mxcrm_eager_size,
3851                         (int) MXLND_MSG_SIZE);
3852                 incompatible = 1;
3853                 ret = -1;
3854                 goto failed;
3855         }
3856         write_lock(&kmxlnd_data.kmx_global_lock);
3857         peer->mxp_incompatible = incompatible;
3858         write_unlock(&kmxlnd_data.kmx_global_lock);
3859         spin_lock(&conn->mxk_lock);
3860         conn->mxk_credits = *kmxlnd_tunables.kmx_peercredits;
3861         conn->mxk_outstanding = 0;
3862         conn->mxk_incarnation = msg->mxm_srcstamp;
3863         conn->mxk_timeout = 0;
3864         if (!incompatible) {
3865                 CDEBUG(D_NET, "setting peer %s CONN_READY\n",
3866                        libcfs_nid2str(msg->mxm_srcnid));
3867                 mxlnd_set_conn_status(conn, MXLND_CONN_READY);
3868         }
3869         spin_unlock(&conn->mxk_lock);
3870
3871         if (!incompatible)
3872                 mxlnd_check_sends(peer);
3873
3874 failed:
3875         if (ret < 0) {
3876                 spin_lock(&conn->mxk_lock);
3877                 mxlnd_set_conn_status(conn, MXLND_CONN_FAIL);
3878                 spin_unlock(&conn->mxk_lock);
3879         }
3880
3881         if (incompatible) mxlnd_conn_disconnect(conn, 0, 0);
3882
3883         mxlnd_connparams_free(cp);
3884         return;
3885 }
3886
3887 int
3888 mxlnd_abort_msgs(void)
3889 {
3890         int                     count           = 0;
3891         cfs_list_t              *orphans        = &kmxlnd_data.kmx_orphan_msgs;
3892         spinlock_t              *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3893
3894         /* abort orphans */
3895         spin_lock(g_conn_lock);
3896         while (!cfs_list_empty(orphans)) {
3897                 kmx_ctx_t       *ctx     = NULL;
3898                 kmx_conn_t      *conn   = NULL;
3899
3900                 ctx = cfs_list_entry(orphans->next, kmx_ctx_t, mxc_list);
3901                 cfs_list_del_init(&ctx->mxc_list);
3902                 spin_unlock(g_conn_lock);
3903
3904                 ctx->mxc_errno = -ECONNABORTED;
3905                 conn = ctx->mxc_conn;
3906                 CDEBUG(D_NET, "aborting %s %s %s\n",
3907                        mxlnd_msgtype_to_str(ctx->mxc_msg_type),
3908                        ctx->mxc_type == MXLND_REQ_TX ? "(TX) to" : "(RX) from",
3909                        libcfs_nid2str(ctx->mxc_nid));
3910                 if (ctx->mxc_type == MXLND_REQ_TX) {
3911                         mxlnd_put_idle_tx(ctx); /* do not hold any locks */
3912                         if (conn) mxlnd_conn_decref(conn); /* for this tx */
3913                 } else {
3914                         ctx->mxc_state = MXLND_CTX_CANCELED;
3915                         mxlnd_handle_rx_completion(ctx);
3916                 }
3917
3918                 count++;
3919                 spin_lock(g_conn_lock);
3920         }
3921         spin_unlock(g_conn_lock);
3922
3923         return count;
3924 }
3925
3926 int
3927 mxlnd_free_conn_zombies(void)
3928 {
3929         int             count           = 0;
3930         cfs_list_t      *zombies        = &kmxlnd_data.kmx_conn_zombies;
3931         spinlock_t      *g_conn_lock    = &kmxlnd_data.kmx_conn_lock;
3932         rwlock_t        *g_lock         = &kmxlnd_data.kmx_global_lock;
3933
3934         /* cleanup any zombies */
3935         spin_lock(g_conn_lock);
3936         while (!cfs_list_empty(zombies)) {
3937                 kmx_conn_t      *conn   = NULL;
3938
3939                 conn = cfs_list_entry(zombies->next, kmx_conn_t, mxk_zombie);
3940                 cfs_list_del_init(&conn->mxk_zombie);
3941                 spin_unlock(g_conn_lock);
3942
3943                 write_lock(g_lock);
3944                 mxlnd_conn_free_locked(conn);
3945                 write_unlock(g_lock);
3946
3947                 count++;
3948                 spin_lock(g_conn_lock);
3949         }
3950         spin_unlock(g_conn_lock);
3951         CDEBUG(D_NET, "%s: freed %d zombies\n", __func__, count);
3952         return count;
3953 }
3954
3955 /**
3956  * mxlnd_connd - handles incoming connection requests
3957  * @arg - thread id (as a void *)
3958  *
3959  * This thread handles incoming connection requests
3960  */
3961 int
3962 mxlnd_connd(void *arg)
3963 {
3964         long                    id              = (long) arg;
3965
3966         cfs_daemonize("mxlnd_connd");
3967
3968         CDEBUG(D_NET, "connd starting\n");
3969
3970         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
3971                 int                ret             = 0;
3972                 kmx_connparams_t  *cp              = NULL;
3973                 spinlock_t        *g_conn_lock  = &kmxlnd_data.kmx_conn_lock;
3974                 cfs_list_t        *conn_reqs    = &kmxlnd_data.kmx_conn_reqs;
3975
3976                 ret = down_interruptible(&kmxlnd_data.kmx_conn_sem);
3977
3978                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
3979                         break;
3980
3981                 if (ret != 0)
3982                         continue;
3983
3984                 ret = mxlnd_abort_msgs();
3985                 ret += mxlnd_free_conn_zombies();
3986
3987                 spin_lock(g_conn_lock);
3988                 if (cfs_list_empty(conn_reqs)) {
3989                         if (ret == 0)
3990                                 CNETERR("connd woke up but did not find a "
3991                                         "kmx_connparams_t or zombie conn\n");
3992                         spin_unlock(g_conn_lock);
3993                         continue;
3994                 }
3995                 cp = cfs_list_entry(conn_reqs->next, kmx_connparams_t,
3996                                     mxr_list);
3997                 cfs_list_del_init(&cp->mxr_list);
3998                 spin_unlock(g_conn_lock);
3999
4000                 switch (MXLND_MSG_TYPE(cp->mxr_match)) {
4001                 case MXLND_MSG_CONN_REQ:
4002                         /* We have a connection request. Handle it. */
4003                         mxlnd_passive_connect(cp);
4004                         break;
4005                 case MXLND_MSG_CONN_ACK:
4006                         /* The peer is ready for messages */
4007                         mxlnd_check_conn_ack(cp);
4008                         break;
4009                 }
4010         }
4011
4012         mxlnd_free_conn_zombies();
4013
4014         CDEBUG(D_NET, "connd stopping\n");
4015         mxlnd_thread_stop(id);
4016         return 0;
4017 }
4018
4019 /**
4020  * mxlnd_timeoutd - enforces timeouts on messages
4021  * @arg - thread id (as a void *)
4022  *
4023  * This thread queries each peer for its earliest timeout. If a peer has timed out,
4024  * it calls mxlnd_conn_disconnect().
4025  *
4026  * After checking for timeouts, try progressing sends (call check_sends()).
4027  */
4028 int
4029 mxlnd_timeoutd(void *arg)
4030 {
4031         int             i       = 0;
4032         long            id      = (long) arg;
4033         unsigned long   now     = 0;
4034         unsigned long   next    = 0;
4035         unsigned long   delay   = CFS_HZ;
4036         kmx_peer_t     *peer    = NULL;
4037         kmx_peer_t     *temp    = NULL;
4038         kmx_conn_t     *conn    = NULL;
4039         rwlock_t   *g_lock  = &kmxlnd_data.kmx_global_lock;
4040
4041         cfs_daemonize("mxlnd_timeoutd");
4042
4043         CDEBUG(D_NET, "timeoutd starting\n");
4044
4045         while (!(cfs_atomic_read(&kmxlnd_data.kmx_shutdown))) {
4046
4047                 now = jiffies;
4048                 /* if the next timeout has not arrived, go back to sleep */
4049                 if (cfs_time_after(now, next)) {
4050                         next = mxlnd_check_timeouts(now);
4051                 }
4052
4053                 /* try to progress peers' txs */
4054                 write_lock(g_lock);
4055                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
4056                         cfs_list_t *peers = &kmxlnd_data.kmx_peers[i];
4057
4058                         /* NOTE we are safe against the removal of peer, but
4059                          * not against the removal of temp */
4060                         cfs_list_for_each_entry_safe(peer, temp, peers,
4061                                                      mxp_list) {
4062                                 if (cfs_atomic_read(&kmxlnd_data.kmx_shutdown))
4063                                         break;
4064                                 mxlnd_peer_addref(peer); /* add ref... */
4065                                 conn = peer->mxp_conn;
4066                                 if (conn && conn->mxk_status != MXLND_CONN_DISCONNECT) {
4067                                         mxlnd_conn_addref(conn); /* take ref... */
4068                                 } else {
4069                                         CDEBUG(D_NET, "ignoring %s\n",
4070                                                libcfs_nid2str(peer->mxp_nid));
4071                                         mxlnd_peer_decref(peer); /* ...to here */
4072                                         continue;
4073                                 }
4074
4075                                 if ((conn->mxk_status == MXLND_CONN_READY ||
4076                                     conn->mxk_status == MXLND_CONN_FAIL) &&
4077                                     cfs_time_after(now,
4078                                                    conn->mxk_last_tx +
4079                                                    CFS_HZ)) {
4080                                         write_unlock(g_lock);
4081                                         mxlnd_check_sends(peer);
4082                                         write_lock(g_lock);
4083                                 }
4084                                 mxlnd_conn_decref(conn); /* until here */
4085                                 mxlnd_peer_decref(peer); /* ...to here */
4086                         }
4087                 }
4088                 write_unlock(g_lock);
4089
4090                 mxlnd_sleep(delay);
4091         }
4092         CDEBUG(D_NET, "timeoutd stopping\n");
4093         mxlnd_thread_stop(id);
4094         return 0;
4095 }