Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  *
33  * Copyright (C) 2006 Myricom, Inc.
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  *
39  * lnet/klnds/mxlnd/mxlnd.c
40  *
41  * Author: Eric Barton <eric@bartonsoftware.com>
42  * Author: Scott Atchley <atchley at myri.com>
43  */
44
45 #include "mxlnd.h"
46
47 inline void mxlnd_noop(char *s, ...)
48 {
49         return;
50 }
51
52 char *
53 mxlnd_ctxstate_to_str(int mxc_state)
54 {
55         switch (mxc_state) {
56         case MXLND_CTX_INIT:
57                 return "MXLND_CTX_INIT";
58         case MXLND_CTX_IDLE:
59                 return "MXLND_CTX_IDLE";
60         case MXLND_CTX_PREP:
61                 return "MXLND_CTX_PREP";
62         case MXLND_CTX_PENDING:
63                 return "MXLND_CTX_PENDING";
64         case MXLND_CTX_COMPLETED:
65                 return "MXLND_CTX_COMPLETED";
66         case MXLND_CTX_CANCELED:
67                 return "MXLND_CTX_CANCELED";
68         default:
69                 return "*unknown*";
70         }
71 }
72
73 char *
74 mxlnd_connstatus_to_str(int mxk_status)
75 {
76         switch (mxk_status) {
77         case MXLND_CONN_READY:
78                 return "MXLND_CONN_READY";
79         case MXLND_CONN_INIT:
80                 return "MXLND_CONN_INIT";
81         case MXLND_CONN_REQ:
82                 return "MXLND_CONN_REQ";
83         case MXLND_CONN_ACK:
84                 return "MXLND_CONN_ACK";
85         case MXLND_CONN_WAIT:
86                 return "MXLND_CONN_WAIT";
87         case MXLND_CONN_DISCONNECT:
88                 return "MXLND_CONN_DISCONNECT";
89         case MXLND_CONN_FAIL:
90                 return "MXLND_CONN_FAIL";
91         default:
92                 return "unknown";
93         }
94 }
95
96 char *
97 mxlnd_msgtype_to_str(int type) {
98         switch (type) {
99         case MXLND_MSG_EAGER:
100                 return "MXLND_MSG_EAGER";
101         case MXLND_MSG_CONN_REQ:
102                 return "MXLND_MSG_CONN_REQ";
103         case MXLND_MSG_CONN_ACK:
104                 return "MXLND_MSG_CONN_ACK";
105         case MXLND_MSG_NOOP:
106                 return "MXLND_MSG_NOOP";
107         case MXLND_MSG_PUT_REQ:
108                 return "MXLND_MSG_PUT_REQ";
109         case MXLND_MSG_PUT_ACK:
110                 return "MXLND_MSG_PUT_ACK";
111         case MXLND_MSG_PUT_DATA:
112                 return "MXLND_MSG_PUT_DATA";
113         case MXLND_MSG_GET_REQ:
114                 return "MXLND_MSG_GET_REQ";
115         case MXLND_MSG_GET_DATA:
116                 return "MXLND_MSG_GET_DATA";
117         default:
118                 return "unknown";
119         }
120 }
121
122 char *
123 mxlnd_lnetmsg_to_str(int type)
124 {
125         switch (type) {
126         case LNET_MSG_ACK:
127                 return "LNET_MSG_ACK";
128         case LNET_MSG_PUT:
129                 return "LNET_MSG_PUT";
130         case LNET_MSG_GET:
131                 return "LNET_MSG_GET";
132         case LNET_MSG_REPLY:
133                 return "LNET_MSG_REPLY";
134         case LNET_MSG_HELLO:
135                 return "LNET_MSG_HELLO";
136         default:
137                 LBUG();
138                 return "*unknown*";
139         }
140 }
141
142 static inline u64
143 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
144 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
145 {
146         u64 type        = (u64) ctx->mxc_msg_type;
147         u64 err         = (u64) error;
148         u64 match       = 0LL;
149
150         LASSERT(ctx->mxc_msg_type != 0);
151         LASSERT(ctx->mxc_cookie >> 52 == 0);
152         match = (type << 60) | (err << 52) | ctx->mxc_cookie;
153         return match;
154 }
155
156 static inline void
157 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
158 {
159         *msg_type = (u8) (match >> 60);
160         *error    = (u8) ((match >> 52) & 0xFF);
161         *cookie   = match & 0xFFFFFFFFFFFFFLL;
162         LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
163                 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
164                 *msg_type == MXLND_MSG_EAGER    ||
165                 *msg_type == MXLND_MSG_CONN_REQ ||
166                 *msg_type == MXLND_MSG_CONN_ACK ||
167                 *msg_type == MXLND_MSG_NOOP     ||
168                 *msg_type == MXLND_MSG_PUT_REQ  ||
169                 *msg_type == MXLND_MSG_PUT_ACK  ||
170                 *msg_type == MXLND_MSG_PUT_DATA ||
171                 *msg_type == MXLND_MSG_GET_REQ  ||
172                 *msg_type == MXLND_MSG_GET_DATA);
173         return;
174 }
175
176 struct kmx_ctx *
177 mxlnd_get_idle_rx(void)
178 {
179         struct list_head        *tmp    = NULL;
180         struct kmx_ctx          *rx     = NULL;
181
182         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
183
184         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
185                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
186                 return NULL;
187         }
188
189         tmp = &kmxlnd_data.kmx_rx_idle;
190         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
191         list_del_init(&rx->mxc_list);
192         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
193
194 #if MXLND_DEBUG
195         if (rx->mxc_get != rx->mxc_put) {
196                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
197                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
198                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
199                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
200                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
201                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
202                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
203                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
204                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
205                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
206         }
207 #endif
208         LASSERT (rx->mxc_get == rx->mxc_put);
209
210         rx->mxc_get++;
211
212         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
213         rx->mxc_state = MXLND_CTX_PREP;
214
215         return rx;
216 }
217
218 int
219 mxlnd_put_idle_rx(struct kmx_ctx *rx)
220 {
221         if (rx == NULL) {
222                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
223                 return -EINVAL;
224         } else if (rx->mxc_type != MXLND_REQ_RX) {
225                 CDEBUG(D_NETERROR, "called with tx\n");
226                 return -EINVAL;
227         }
228         LASSERT(rx->mxc_get == rx->mxc_put + 1);
229         mxlnd_ctx_init(rx);
230         rx->mxc_put++;
231         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
232         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
233         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
234         return 0;
235 }
236
237 int
238 mxlnd_reduce_idle_rxs(__u32 count)
239 {
240         __u32                   i       = 0;
241         struct kmx_ctx          *rx     = NULL;
242
243         spin_lock(&kmxlnd_data.kmx_rxs_lock);
244         for (i = 0; i < count; i++) {
245                 rx = mxlnd_get_idle_rx();
246                 if (rx != NULL) {
247                         struct list_head *tmp = &rx->mxc_global_list;
248                         list_del_init(tmp);
249                         mxlnd_ctx_free(rx);
250                 } else {
251                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
252                         break;
253                 }
254         }
255         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
256         return 0;
257 }
258
259 struct kmx_ctx *
260 mxlnd_get_idle_tx(void)
261 {
262         struct list_head        *tmp    = NULL;
263         struct kmx_ctx          *tx     = NULL;
264
265         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
266
267         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
268                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
269                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
270                 return NULL;
271         }
272
273         tmp = &kmxlnd_data.kmx_tx_idle;
274         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
275         list_del_init(&tx->mxc_list);
276
277         /* Allocate a new completion cookie.  It might not be needed,
278          * but we've got a lock right now and we're unlikely to
279          * wrap... */
280         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
281         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
282                 kmxlnd_data.kmx_tx_next_cookie = 1;
283         }
284         kmxlnd_data.kmx_tx_used++;
285         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
286
287         LASSERT (tx->mxc_get == tx->mxc_put);
288
289         tx->mxc_get++;
290
291         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
292         LASSERT (tx->mxc_lntmsg[0] == NULL);
293         LASSERT (tx->mxc_lntmsg[1] == NULL);
294
295         tx->mxc_state = MXLND_CTX_PREP;
296
297         return tx;
298 }
299
300 int
301 mxlnd_put_idle_tx(struct kmx_ctx *tx)
302 {
303         //int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
304         int             result  = 0;
305         lnet_msg_t      *lntmsg[2];
306
307         if (tx == NULL) {
308                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
309                 return -EINVAL;
310         } else if (tx->mxc_type != MXLND_REQ_TX) {
311                 CDEBUG(D_NETERROR, "called with rx\n");
312                 return -EINVAL;
313         }
314         if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
315               tx->mxc_status.code == MX_STATUS_TRUNCATED))
316                 result = -EIO;
317
318         lntmsg[0] = tx->mxc_lntmsg[0];
319         lntmsg[1] = tx->mxc_lntmsg[1];
320
321         LASSERT(tx->mxc_get == tx->mxc_put + 1);
322         mxlnd_ctx_init(tx);
323         tx->mxc_put++;
324         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
325         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
326         kmxlnd_data.kmx_tx_used--;
327         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
328         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
329         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
330         return 0;
331 }
332
333 /**
334  * mxlnd_conn_free - free the conn
335  * @conn - a kmx_conn pointer
336  *
337  * The calling function should remove the conn from the conns list first
338  * then destroy it.
339  */
340 void
341 mxlnd_conn_free(struct kmx_conn *conn)
342 {
343         struct kmx_peer *peer   = conn->mxk_peer;
344
345         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
346         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
347                  list_empty (&conn->mxk_tx_free_queue) &&
348                  list_empty (&conn->mxk_pending));
349         if (!list_empty(&conn->mxk_list)) {
350                 spin_lock(&peer->mxp_lock);
351                 list_del_init(&conn->mxk_list);
352                 if (peer->mxp_conn == conn) {
353                         peer->mxp_conn = NULL;
354                         if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
355                                 mx_set_endpoint_addr_context(conn->mxk_epa,
356                                                              (void *) NULL);
357                         }
358                 }
359                 spin_unlock(&peer->mxp_lock);
360         }
361         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
362         MXLND_FREE (conn, sizeof (*conn));
363         return;
364 }
365
366
367 void
368 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
369 {
370         int                     found   = 0;
371         struct kmx_ctx          *ctx    = NULL;
372         struct kmx_ctx          *next   = NULL;
373         mx_return_t             mxret   = MX_SUCCESS;
374         u32                     result  = 0;
375
376         do {
377                 found = 0;
378                 spin_lock(&conn->mxk_lock);
379                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
380                         /* we will delete all including txs */
381                         list_del_init(&ctx->mxc_list);
382                         if (ctx->mxc_type == MXLND_REQ_RX) {
383                                 found = 1;
384                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
385                                                   &ctx->mxc_mxreq,
386                                                   &result);
387                                 if (mxret != MX_SUCCESS) {
388                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
389                                 }
390                                 if (result == 1) {
391                                         ctx->mxc_status.code = -ECONNABORTED;
392                                         ctx->mxc_state = MXLND_CTX_CANCELED;
393                                         /* NOTE this calls lnet_finalize() and
394                                          * we cannot hold any locks when calling it.
395                                          * It also calls mxlnd_conn_decref(conn) */
396                                         spin_unlock(&conn->mxk_lock);
397                                         mxlnd_handle_rx_completion(ctx);
398                                         spin_lock(&conn->mxk_lock);
399                                 }
400                                 break;
401                         }
402                 }
403                 spin_unlock(&conn->mxk_lock);
404         }
405         while (found);
406
407         return;
408 }
409
410 /**
411  * mxlnd_conn_disconnect - shutdown a connection
412  * @conn - a kmx_conn pointer
413  *
414  * This function sets the status to DISCONNECT, completes queued
415  * txs with failure, calls mx_disconnect, which will complete
416  * pending txs and matched rxs with failure.
417  */
418 void
419 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
420 {
421         struct list_head        *tmp    = NULL;
422
423         spin_lock(&conn->mxk_lock);
424         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
425                 spin_unlock(&conn->mxk_lock);
426                 return;
427         }
428         conn->mxk_status = MXLND_CONN_DISCONNECT;
429         conn->mxk_timeout = 0;
430
431         while (!list_empty(&conn->mxk_tx_free_queue) ||
432                !list_empty(&conn->mxk_tx_credit_queue)) {
433
434                 struct kmx_ctx          *tx     = NULL;
435
436                 if (!list_empty(&conn->mxk_tx_free_queue)) {
437                         tmp = &conn->mxk_tx_free_queue;
438                 } else {
439                         tmp = &conn->mxk_tx_credit_queue;
440                 }
441
442                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
443                 list_del_init(&tx->mxc_list);
444                 tx->mxc_status.code = -ECONNABORTED;
445                 spin_unlock(&conn->mxk_lock);
446                 mxlnd_put_idle_tx(tx);
447                 mxlnd_conn_decref(conn); /* for this tx */
448                 spin_lock(&conn->mxk_lock);
449         }
450
451         spin_unlock(&conn->mxk_lock);
452
453         /* cancel pending rxs */
454         mxlnd_conn_cancel_pending_rxs(conn);
455
456         if (kmxlnd_data.kmx_shutdown != 1) {
457
458                 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
459
460                 if (notify) {
461                         time_t          last_alive      = 0;
462                         unsigned long   last_msg        = 0;
463
464                         /* notify LNET that we are giving up on this peer */
465                         if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
466                                 last_msg = conn->mxk_last_rx;
467                         } else {
468                                 last_msg = conn->mxk_last_tx;
469                         }
470                         last_alive = cfs_time_current_sec() -
471                                      cfs_duration_sec(cfs_time_current() - last_msg);
472                         lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
473                 }
474         }
475         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
476
477         return;
478 }
479
480 /**
481  * mxlnd_conn_alloc - allocate and initialize a new conn struct
482  * @connp - address of a kmx_conn pointer
483  * @peer - owning kmx_peer
484  *
485  * Returns 0 on success and -ENOMEM on failure
486  */
487 int
488 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
489 {
490         struct kmx_conn *conn    = NULL;
491
492         LASSERT(peer != NULL);
493
494         MXLND_ALLOC(conn, sizeof (*conn));
495         if (conn == NULL) {
496                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
497                 return -ENOMEM;
498         }
499         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
500
501         memset(conn, 0, sizeof(*conn));
502
503         /* conn->mxk_incarnation = 0 - will be set by peer */
504         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer 
505                                                    and one for the caller */
506         conn->mxk_peer = peer;
507         /* mxk_epa - to be set after mx_iconnect() */
508         INIT_LIST_HEAD(&conn->mxk_list);
509         spin_lock_init(&conn->mxk_lock);
510         /* conn->mxk_timeout = 0 */
511         conn->mxk_last_tx = jiffies;
512         conn->mxk_last_rx = conn->mxk_last_tx;
513         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
514         /* mxk_outstanding = 0 */
515         conn->mxk_status = MXLND_CONN_INIT;
516         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
517         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
518         /* conn->mxk_ntx_msgs = 0 */
519         /* conn->mxk_ntx_data = 0 */
520         /* conn->mxk_ntx_posted = 0 */
521         /* conn->mxk_data_posted = 0 */
522         INIT_LIST_HEAD(&conn->mxk_pending);
523
524         *connp = conn;
525
526         mxlnd_peer_addref(peer);        /* add a ref for this conn */
527
528         /* add to front of peer's conns list */
529         list_add(&conn->mxk_list, &peer->mxp_conns);
530         peer->mxp_conn = conn;
531         return 0;
532 }
533
534 int
535 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
536 {
537         int ret = 0;
538         spin_lock(&peer->mxp_lock);
539         ret = mxlnd_conn_alloc_locked(connp, peer);
540         spin_unlock(&peer->mxp_lock);
541         return ret;
542 }
543
544 int
545 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
546 {
547         int             ret     = 0;
548         struct kmx_conn *conn   = ctx->mxc_conn;
549
550         ctx->mxc_state = MXLND_CTX_PENDING;
551         if (conn != NULL) {
552                 spin_lock(&conn->mxk_lock);
553                 if (conn->mxk_status >= MXLND_CONN_INIT) {
554                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
555                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
556                                 conn->mxk_timeout = ctx->mxc_deadline;
557                         }
558                 } else {
559                         ctx->mxc_state = MXLND_CTX_COMPLETED;
560                         ret = -1;
561                 }
562                 spin_unlock(&conn->mxk_lock);
563         }
564         return ret;
565 }
566
567 int
568 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
569 {
570         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
571                 ctx->mxc_state == MXLND_CTX_COMPLETED);
572         if (ctx->mxc_state != MXLND_CTX_PENDING &&
573             ctx->mxc_state != MXLND_CTX_COMPLETED) {
574                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
575                        mxlnd_ctxstate_to_str(ctx->mxc_state));
576         }
577         ctx->mxc_state = MXLND_CTX_COMPLETED;
578         if (!list_empty(&ctx->mxc_list)) {
579                 struct kmx_conn *conn = ctx->mxc_conn;
580                 struct kmx_ctx *next = NULL;
581                 LASSERT(conn != NULL);
582                 spin_lock(&conn->mxk_lock);
583                 list_del_init(&ctx->mxc_list);
584                 conn->mxk_timeout = 0;
585                 if (!list_empty(&conn->mxk_pending)) {
586                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
587                         conn->mxk_timeout = next->mxc_deadline;
588                 }
589                 spin_unlock(&conn->mxk_lock);
590         }
591         return 0;
592 }
593
594 /**
595  * mxlnd_peer_free - free the peer
596  * @peer - a kmx_peer pointer
597  *
598  * The calling function should decrement the rxs, drain the tx queues and
599  * remove the peer from the peers list first then destroy it.
600  */
601 void
602 mxlnd_peer_free(struct kmx_peer *peer)
603 {
604         CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
605
606         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
607
608         if (peer->mxp_host != NULL) {
609                 spin_lock(&peer->mxp_host->mxh_lock);
610                 peer->mxp_host->mxh_peer = NULL;
611                 spin_unlock(&peer->mxp_host->mxh_lock);
612         }
613         if (!list_empty(&peer->mxp_peers)) {
614                 /* assume we are locked */
615                 list_del_init(&peer->mxp_peers);
616         }
617
618         MXLND_FREE (peer, sizeof (*peer));
619         atomic_dec(&kmxlnd_data.kmx_npeers);
620         return;
621 }
622
623 void
624 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
625 {
626         u64             nic_id  = 0LL;
627         char            name[MX_MAX_HOSTNAME_LEN + 1];
628         mx_return_t     mxret   = MX_SUCCESS;
629
630         memset(name, 0, sizeof(name));
631         snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
632         mxret = mx_hostname_to_nic_id(name, &nic_id);
633         if (mxret == MX_SUCCESS) {
634                 peer->mxp_nic_id = nic_id;
635         } else {
636                 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
637                                    "with %s\n", name, mx_strerror(mxret));
638                 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
639                 if (mxret == MX_SUCCESS) {
640                         peer->mxp_nic_id = nic_id;
641                 } else {
642                         CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
643                                            "with %s\n", peer->mxp_host->mxh_hostname,
644                                            mx_strerror(mxret));
645                 }
646         }
647         return;
648 }
649
650 /**
651  * mxlnd_peer_alloc - allocate and initialize a new peer struct
652  * @peerp - address of a kmx_peer pointer
653  * @nid - LNET node id
654  *
655  * Returns 0 on success and -ENOMEM on failure
656  */
657 int
658 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
659 {
660         int                     i       = 0;
661         int                     ret     = 0;
662         u32                     addr    = LNET_NIDADDR(nid);
663         struct kmx_peer        *peer    = NULL;
664         struct kmx_host        *host    = NULL;
665
666         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
667
668         MXLND_ALLOC(peer, sizeof (*peer));
669         if (peer == NULL) {
670                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
671                 return -ENOMEM;
672         }
673         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
674
675         memset(peer, 0, sizeof(*peer));
676
677         list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
678                 if (addr == host->mxh_addr) {
679                         peer->mxp_host = host;
680                         spin_lock(&host->mxh_lock);
681                         host->mxh_peer = peer;
682                         spin_unlock(&host->mxh_lock);
683                         break;
684                 }
685         }
686         if (peer->mxp_host == NULL) {
687                 CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
688                 MXLND_FREE(peer, sizeof(*peer));
689                 return -ENXIO;
690         }
691
692         peer->mxp_nid = nid;
693         /* peer->mxp_incarnation */
694         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
695         mxlnd_peer_hostname_to_nic_id(peer);
696
697         INIT_LIST_HEAD(&peer->mxp_peers);
698         spin_lock_init(&peer->mxp_lock);
699         INIT_LIST_HEAD(&peer->mxp_conns);
700         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
701         if (ret != 0) {
702                 mxlnd_peer_decref(peer);
703                 return ret;
704         }
705
706         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
707                 struct kmx_ctx   *rx     = NULL;
708                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
709                 if (ret != 0) {
710                         mxlnd_reduce_idle_rxs(i);
711                         mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
712                         mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
713                         mxlnd_peer_decref(peer);
714                         return ret;
715                 }
716                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
717                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
718                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
719                 rx->mxc_put = -1;
720                 mxlnd_put_idle_rx(rx);
721         }
722         /* peer->mxp_reconnect_time = 0 */
723         /* peer->mxp_incompatible = 0 */
724
725         *peerp = peer;
726         return 0;
727 }
728
729 /**
730  * mxlnd_nid_to_hash - hash the nid
731  * @nid - msg pointer
732  *
733  * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
734  */
735 static inline int
736 mxlnd_nid_to_hash(lnet_nid_t nid)
737 {
738         return (nid & MXLND_HASH_MASK) ^
739                ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
740 }
741
742 static inline struct kmx_peer *
743 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
744 {
745         int                     found   = 0;
746         int                     hash    = 0;
747         struct kmx_peer         *peer   = NULL;
748
749         hash = mxlnd_nid_to_hash(nid);
750
751         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
752                 if (peer->mxp_nid == nid) {
753                         found = 1;
754                         mxlnd_peer_addref(peer);
755                         break;
756                 }
757         }
758         return (found ? peer : NULL);
759 }
760
761 static inline struct kmx_peer *
762 mxlnd_find_peer_by_nid(lnet_nid_t nid)
763 {
764         struct kmx_peer *peer   = NULL;
765
766         read_lock(&kmxlnd_data.kmx_peers_lock);
767         peer = mxlnd_find_peer_by_nid_locked(nid);
768         read_unlock(&kmxlnd_data.kmx_peers_lock);
769         return peer;
770 }
771
772 static inline int
773 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
774 {
775         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
776                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
777                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
778                 tx->mxc_msg_type == MXLND_MSG_NOOP);
779 }
780
781 /**
782  * mxlnd_init_msg - set type and number of bytes
783  * @msg - msg pointer
784  * @type - of message
785  * @body_nob - bytes in msg body
786  */
787 static inline void
788 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
789 {
790         msg->mxm_type = type;
791         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
792 }
793
794 static inline void
795 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
796 {
797         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
798         struct kmx_msg  *msg    = NULL;
799
800         LASSERT (tx != NULL);
801         LASSERT (nob <= MXLND_EAGER_SIZE);
802
803         tx->mxc_nid = nid;
804         /* tx->mxc_peer should have already been set if we know it */
805         tx->mxc_msg_type = type;
806         tx->mxc_nseg = 1;
807         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
808         tx->mxc_seg.segment_length = nob;
809         tx->mxc_pin_type = MX_PIN_PHYSICAL;
810         //tx->mxc_state = MXLND_CTX_PENDING;
811
812         msg = tx->mxc_msg;
813         msg->mxm_type = type;
814         msg->mxm_nob  = nob;
815
816         return;
817 }
818
819 static inline __u32
820 mxlnd_cksum (void *ptr, int nob)
821 {
822         char  *c  = ptr;
823         __u32  sum = 0;
824
825         while (nob-- > 0)
826                 sum = ((sum << 1) | (sum >> 31)) + *c++;
827
828         /* ensure I don't return 0 (== no checksum) */
829         return (sum == 0) ? 1 : sum;
830 }
831
832 /**
833  * mxlnd_pack_msg - complete msg info
834  * @tx - msg to send
835  */
836 static inline void
837 mxlnd_pack_msg(struct kmx_ctx *tx)
838 {
839         struct kmx_msg  *msg    = tx->mxc_msg;
840
841         /* type and nob should already be set in init_msg() */
842         msg->mxm_magic    = MXLND_MSG_MAGIC;
843         msg->mxm_version  = MXLND_MSG_VERSION;
844         /*   mxm_type */
845         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
846          * return credits as well */
847         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
848             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
849                 spin_lock(&tx->mxc_conn->mxk_lock);
850                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
851                 tx->mxc_conn->mxk_outstanding = 0;
852                 spin_unlock(&tx->mxc_conn->mxk_lock);
853         } else {
854                 msg->mxm_credits  = 0;
855         }
856         /*   mxm_nob */
857         msg->mxm_cksum    = 0;
858         msg->mxm_srcnid   = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
859         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
860         msg->mxm_dstnid   = tx->mxc_nid;
861         /* if it is a new peer, the dststamp will be 0 */
862         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
863         msg->mxm_seq      = tx->mxc_cookie;
864
865         if (*kmxlnd_tunables.kmx_cksum) {
866                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
867         }
868 }
869
870 int
871 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
872 {
873         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
874         __u32     msg_cksum     = 0;
875         int       flip          = 0;
876         int       msg_nob       = 0;
877
878         /* 6 bytes are enough to have received magic + version */
879         if (nob < 6) {
880                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
881                 return -EPROTO;
882         }
883
884         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
885                 flip = 0;
886         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
887                 flip = 1;
888         } else {
889                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
890                 return -EPROTO;
891         }
892
893         if (msg->mxm_version !=
894             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
895                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
896                 return -EPROTO;
897         }
898
899         if (nob < hdr_size) {
900                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
901                 return -EPROTO;
902         }
903
904         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
905         if (msg_nob > nob) {
906                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
907                 return -EPROTO;
908         }
909
910         /* checksum must be computed with mxm_cksum zero and BEFORE anything
911          * gets flipped */
912         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
913         msg->mxm_cksum = 0;
914         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
915                 CDEBUG(D_NETERROR, "Bad checksum\n");
916                 return -EPROTO;
917         }
918         msg->mxm_cksum = msg_cksum;
919
920         if (flip) {
921                 /* leave magic unflipped as a clue to peer endianness */
922                 __swab16s(&msg->mxm_version);
923                 CLASSERT (sizeof(msg->mxm_type) == 1);
924                 CLASSERT (sizeof(msg->mxm_credits) == 1);
925                 msg->mxm_nob = msg_nob;
926                 __swab64s(&msg->mxm_srcnid);
927                 __swab64s(&msg->mxm_srcstamp);
928                 __swab64s(&msg->mxm_dstnid);
929                 __swab64s(&msg->mxm_dststamp);
930                 __swab64s(&msg->mxm_seq);
931         }
932
933         if (msg->mxm_srcnid == LNET_NID_ANY) {
934                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
935                 return -EPROTO;
936         }
937
938         switch (msg->mxm_type) {
939         default:
940                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
941                 return -EPROTO;
942
943         case MXLND_MSG_NOOP:
944                 break;
945
946         case MXLND_MSG_EAGER:
947                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
948                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
949                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
950                         return -EPROTO;
951                 }
952                 break;
953
954         case MXLND_MSG_PUT_REQ:
955                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
956                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
957                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
958                         return -EPROTO;
959                 }
960                 if (flip)
961                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
962                 break;
963
964         case MXLND_MSG_PUT_ACK:
965                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
966                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
967                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
968                         return -EPROTO;
969                 }
970                 if (flip) {
971                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
972                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
973                 }
974                 break;
975
976         case MXLND_MSG_GET_REQ:
977                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
978                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
979                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
980                         return -EPROTO;
981                 }
982                 if (flip) {
983                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
984                 }
985                 break;
986
987         case MXLND_MSG_CONN_REQ:
988         case MXLND_MSG_CONN_ACK:
989                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
990                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
991                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
992                         return -EPROTO;
993                 }
994                 if (flip) {
995                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
996                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
997                 }
998                 break;
999         }
1000         return 0;
1001 }
1002
1003 /**
1004  * mxlnd_recv_msg
1005  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
1006  * @rx
1007  * @msg_type
1008  * @cookie
1009  * @length - length of incoming message
1010  * @pending - add to kmx_pending (0 is NO and 1 is YES)
1011  *
1012  * The caller gets the rx and sets nid, peer and conn if known.
1013  *
1014  * Returns 0 on success and -1 on failure
1015  */
1016 int
1017 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
1018 {
1019         int             ret     = 0;
1020         mx_return_t     mxret   = MX_SUCCESS;
1021         uint64_t        mask    = 0xF00FFFFFFFFFFFFFLL;
1022
1023         rx->mxc_msg_type = msg_type;
1024         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1025         rx->mxc_cookie = cookie;
1026         /* rx->mxc_match may already be set */
1027         /* rx->mxc_seg.segment_ptr is already set */
1028         rx->mxc_seg.segment_length = length;
1029         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1030         ret = mxlnd_q_pending_ctx(rx);
1031         if (ret == -1) {
1032                 /* the caller is responsible for calling conn_decref() if needed */
1033                 return -1;
1034         }
1035         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1036                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1037         if (mxret != MX_SUCCESS) {
1038                 mxlnd_deq_pending_ctx(rx);
1039                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n", 
1040                                    mx_strerror(mxret), (int) mxret);
1041                 return -1;
1042         }
1043         return 0;
1044 }
1045
1046
1047 /**
1048  * mxlnd_unexpected_recv - this is the callback function that will handle 
1049  *                         unexpected receives
1050  * @context - NULL, ignore
1051  * @source - the peer's mx_endpoint_addr_t
1052  * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1053  * @length - length of incoming message
1054  * @data_if_available - ignore
1055  *
1056  * If it is an eager-sized msg, we will call recv_msg() with the actual
1057  * length. If it is a large message, we will call recv_msg() with a
1058  * length of 0 bytes to drop it because we should never have a large,
1059  * unexpected message.
1060  *
1061  * NOTE - The MX library blocks until this function completes. Make it as fast as
1062  * possible. DO NOT allocate memory which can block!
1063  *
1064  * If we cannot get a rx or the conn is closed, drop the message on the floor
1065  * (i.e. recv 0 bytes and ignore).
1066  */
1067 mx_unexp_handler_action_t
1068 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1069                  uint64_t match_value, uint32_t length, void *data_if_available)
1070 {
1071         int             ret             = 0;
1072         struct kmx_ctx  *rx             = NULL;
1073         mx_ksegment_t   seg;
1074         u8              msg_type        = 0;
1075         u8              error           = 0;
1076         u64             cookie          = 0LL;
1077
1078         if (context != NULL) {
1079                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1080         }
1081
1082 #if MXLND_DEBUG
1083         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1084 #endif
1085
1086         rx = mxlnd_get_idle_rx();
1087         if (rx != NULL) {
1088                 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1089                 if (length <= MXLND_EAGER_SIZE) {
1090                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1091                 } else {
1092                         CDEBUG(D_NETERROR, "unexpected large receive with "
1093                                            "match_value=0x%llx length=%d\n",
1094                                            match_value, length);
1095                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1096                 }
1097
1098                 if (ret == 0) {
1099                         struct kmx_peer *peer   = NULL;
1100                         struct kmx_conn *conn   = NULL;
1101
1102                         /* NOTE to avoid a peer disappearing out from under us,
1103                          *      read lock the peers lock first */
1104                         read_lock(&kmxlnd_data.kmx_peers_lock);
1105                         mx_get_endpoint_addr_context(source, (void **) &peer);
1106                         if (peer != NULL) {
1107                                 mxlnd_peer_addref(peer); /* add a ref... */
1108                                 spin_lock(&peer->mxp_lock);
1109                                 conn = peer->mxp_conn;
1110                                 if (conn) {
1111                                         mxlnd_conn_addref(conn); /* add ref until rx completed */
1112                                         mxlnd_peer_decref(peer); /* and drop peer ref */
1113                                         rx->mxc_conn = conn;
1114                                 }
1115                                 spin_unlock(&peer->mxp_lock);
1116                                 rx->mxc_peer = peer;
1117                                 rx->mxc_nid = peer->mxp_nid;
1118                         }
1119                         read_unlock(&kmxlnd_data.kmx_peers_lock);
1120                 } else {
1121                         CDEBUG(D_NETERROR, "could not post receive\n");
1122                         mxlnd_put_idle_rx(rx);
1123                 }
1124         }
1125
1126         if (rx == NULL || ret != 0) {
1127                 if (rx == NULL) {
1128                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1129                 } else {
1130                         /* ret != 0 */
1131                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1132                 }
1133                 seg.segment_ptr = 0LL;
1134                 seg.segment_length = 0;
1135                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1136                           match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1137         }
1138
1139         return MX_RECV_CONTINUE;
1140 }
1141
1142
1143 int
1144 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1145 {
1146         int                      i      = 0;
1147         int                      ret    = -ENOENT;
1148         struct kmx_peer         *peer   = NULL;
1149
1150         read_lock(&kmxlnd_data.kmx_peers_lock);
1151         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1152                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1153                         if (index-- > 0)
1154                                 continue;
1155
1156                         *nidp = peer->mxp_nid;
1157                         *count = atomic_read(&peer->mxp_refcount);
1158                         ret = 0;
1159                         break;
1160                 }
1161         }
1162         read_unlock(&kmxlnd_data.kmx_peers_lock);
1163
1164         return ret;
1165 }
1166
1167 void
1168 mxlnd_del_peer_locked(struct kmx_peer *peer)
1169 {
1170         list_del_init(&peer->mxp_peers); /* remove from the global list */
1171         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1172         mxlnd_peer_decref(peer); /* drop global list ref */
1173         return;
1174 }
1175
1176 int
1177 mxlnd_del_peer(lnet_nid_t nid)
1178 {
1179         int             i       = 0;
1180         int             ret     = 0;
1181         struct kmx_peer *peer   = NULL;
1182         struct kmx_peer *next   = NULL;
1183
1184         if (nid != LNET_NID_ANY) {
1185                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1186         }
1187         write_lock(&kmxlnd_data.kmx_peers_lock);
1188         if (nid != LNET_NID_ANY) {
1189                 if (peer == NULL) {
1190                         ret = -ENOENT;
1191                 } else {
1192                         mxlnd_peer_decref(peer); /* and drops it */
1193                         mxlnd_del_peer_locked(peer);
1194                 }
1195         } else { /* LNET_NID_ANY */
1196                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1197                         list_for_each_entry_safe(peer, next,
1198                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1199                                 mxlnd_del_peer_locked(peer);
1200                         }
1201                 }
1202         }
1203         write_unlock(&kmxlnd_data.kmx_peers_lock);
1204
1205         return ret;
1206 }
1207
1208 struct kmx_conn *
1209 mxlnd_get_conn_by_idx(int index)
1210 {
1211         int                      i      = 0;
1212         struct kmx_peer         *peer   = NULL;
1213         struct kmx_conn         *conn   = NULL;
1214
1215         read_lock(&kmxlnd_data.kmx_peers_lock);
1216         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1217                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1218                         spin_lock(&peer->mxp_lock);
1219                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1220                                 if (index-- > 0) {
1221                                         continue;
1222                                 }
1223
1224                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1225                                 spin_unlock(&peer->mxp_lock);
1226                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
1227                                 return conn;
1228                         }
1229                         spin_unlock(&peer->mxp_lock);
1230                 }
1231         }
1232         read_unlock(&kmxlnd_data.kmx_peers_lock);
1233
1234         return NULL;
1235 }
1236
1237 void
1238 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1239 {
1240         struct kmx_conn *conn   = NULL;
1241         struct kmx_conn *next   = NULL;
1242
1243         spin_lock(&peer->mxp_lock);
1244         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1245                 mxlnd_conn_disconnect(conn, 0 , 0);
1246         }
1247         spin_unlock(&peer->mxp_lock);
1248         return;
1249 }
1250
1251 int
1252 mxlnd_close_matching_conns(lnet_nid_t nid)
1253 {
1254         int             i       = 0;
1255         int             ret     = 0;
1256         struct kmx_peer *peer   = NULL;
1257
1258         read_lock(&kmxlnd_data.kmx_peers_lock);
1259         if (nid != LNET_NID_ANY) {
1260                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1261                 if (peer == NULL) {
1262                         ret = -ENOENT;
1263                 } else {
1264                         mxlnd_close_matching_conns_locked(peer);
1265                         mxlnd_peer_decref(peer); /* and drops it here */
1266                 }
1267         } else { /* LNET_NID_ANY */
1268                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1269                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1270                                 mxlnd_close_matching_conns_locked(peer);
1271                 }
1272         }
1273         read_unlock(&kmxlnd_data.kmx_peers_lock);
1274
1275         return ret;
1276 }
1277
1278 /**
1279  * mxlnd_ctl - modify MXLND parameters
1280  * @ni - LNET interface handle
1281  * @cmd - command to change
1282  * @arg - the ioctl data
1283  *
1284  * Not implemented yet.
1285  */
1286 int
1287 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1288 {
1289         struct libcfs_ioctl_data *data  = arg;
1290         int                       ret   = -EINVAL;
1291
1292         LASSERT (ni == kmxlnd_data.kmx_ni);
1293
1294         switch (cmd) {
1295         case IOC_LIBCFS_GET_PEER: {
1296                 lnet_nid_t      nid     = 0;
1297                 int             count   = 0;
1298
1299                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1300                 data->ioc_nid    = nid;
1301                 data->ioc_count  = count;
1302                 break;
1303         }
1304         case IOC_LIBCFS_DEL_PEER: {
1305                 ret = mxlnd_del_peer(data->ioc_nid);
1306                 break;
1307         }
1308         case IOC_LIBCFS_GET_CONN: {
1309                 struct kmx_conn *conn = NULL;
1310
1311                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1312                 if (conn == NULL) {
1313                         ret = -ENOENT;
1314                 } else {
1315                         ret = 0;
1316                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1317                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1318                 }
1319                 break;
1320         }
1321         case IOC_LIBCFS_CLOSE_CONNECTION: {
1322                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1323                 break;
1324         }
1325         default:
1326                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1327                 break;
1328         }
1329
1330         return ret;
1331 }
1332
1333 /**
1334  * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1335  * @tx
1336  *
1337  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1338  */
1339 void
1340 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1341 {
1342         u8                      msg_type        = tx->mxc_msg_type;
1343         //struct kmx_peer         *peer           = tx->mxc_peer;
1344         struct kmx_conn         *conn           = tx->mxc_conn;
1345
1346         LASSERT (msg_type != 0);
1347         LASSERT (tx->mxc_nid != 0);
1348         LASSERT (tx->mxc_peer != NULL);
1349         LASSERT (tx->mxc_conn != NULL);
1350
1351         tx->mxc_incarnation = conn->mxk_incarnation;
1352
1353         if (msg_type != MXLND_MSG_PUT_DATA &&
1354             msg_type != MXLND_MSG_GET_DATA) {
1355                 /* msg style tx */
1356                 if (mxlnd_tx_requires_credit(tx)) {
1357                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1358                         conn->mxk_ntx_msgs++;
1359                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1360                            msg_type == MXLND_MSG_CONN_ACK) {
1361                         /* put conn msgs at the front of the queue */
1362                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1363                 } else {
1364                         /* PUT_ACK, PUT_NAK */
1365                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1366                         conn->mxk_ntx_msgs++;
1367                 }
1368         } else {
1369                 /* data style tx */
1370                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1371                 conn->mxk_ntx_data++;
1372         }
1373
1374         return;
1375 }
1376
1377 /**
1378  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1379  * @tx
1380  *
1381  * Add the tx to the peer's msg or data queue
1382  */
1383 static inline void
1384 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1385 {
1386         LASSERT(tx->mxc_peer != NULL);
1387         LASSERT(tx->mxc_conn != NULL);
1388         spin_lock(&tx->mxc_conn->mxk_lock);
1389         mxlnd_peer_queue_tx_locked(tx);
1390         spin_unlock(&tx->mxc_conn->mxk_lock);
1391
1392         return;
1393 }
1394
1395 /**
1396  * mxlnd_queue_tx - add the tx to the global tx queue
1397  * @tx
1398  *
1399  * Add the tx to the global queue and up the tx_queue_sem
1400  */
1401 void
1402 mxlnd_queue_tx(struct kmx_ctx *tx)
1403 {
1404         struct kmx_peer *peer   = tx->mxc_peer;
1405         LASSERT (tx->mxc_nid != 0);
1406
1407         if (peer != NULL) {
1408                 if (peer->mxp_incompatible &&
1409                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1410                         /* let this fail now */
1411                         tx->mxc_status.code = -ECONNABORTED;
1412                         mxlnd_conn_decref(peer->mxp_conn);
1413                         mxlnd_put_idle_tx(tx);
1414                         return;
1415                 }
1416                 if (tx->mxc_conn == NULL) {
1417                         int             ret     = 0;
1418                         struct kmx_conn *conn   = NULL;
1419
1420                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1421                         if (ret != 0) {
1422                                 tx->mxc_status.code = ret;
1423                                 mxlnd_put_idle_tx(tx);
1424                                 goto done;
1425                         }
1426                         tx->mxc_conn = conn;
1427                         mxlnd_peer_decref(peer); /* and takes it from peer */
1428                 }
1429                 LASSERT(tx->mxc_conn != NULL);
1430                 mxlnd_peer_queue_tx(tx);
1431                 mxlnd_check_sends(peer);
1432         } else {
1433                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1434                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1435                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1436                 up(&kmxlnd_data.kmx_tx_queue_sem);
1437         }
1438 done:
1439         return;
1440 }
1441
1442 int
1443 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1444 {
1445         int             i                       = 0;
1446         int             sum                     = 0;
1447         int             old_sum                 = 0;
1448         int             nseg                    = 0;
1449         int             first_iov               = -1;
1450         int             first_iov_offset        = 0;
1451         int             first_found             = 0;
1452         int             last_iov                = -1;
1453         int             last_iov_length         = 0;
1454         mx_ksegment_t  *seg                     = NULL;
1455
1456         if (niov == 0) return 0;
1457         LASSERT(iov != NULL);
1458
1459         for (i = 0; i < niov; i++) {
1460                 sum = old_sum + (u32) iov[i].iov_len;
1461                 if (!first_found && (sum > offset)) {
1462                         first_iov = i;
1463                         first_iov_offset = offset - old_sum;
1464                         first_found = 1;
1465                         sum = (u32) iov[i].iov_len - first_iov_offset;
1466                         old_sum = 0;
1467                 }
1468                 if (sum >= nob) {
1469                         last_iov = i;
1470                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1471                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1472                         break;
1473                 }
1474                 old_sum = sum;
1475         }
1476         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1477         nseg = last_iov - first_iov + 1;
1478         LASSERT(nseg > 0);
1479
1480         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1481         if (seg == NULL) {
1482                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1483                 return -1;
1484         }
1485         memset(seg, 0, nseg * sizeof(*seg));
1486         ctx->mxc_nseg = nseg;
1487         sum = 0;
1488         for (i = 0; i < nseg; i++) {
1489                 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1490                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1491                 if (i == 0) {
1492                         seg[i].segment_ptr += (u64) first_iov_offset;
1493                         seg[i].segment_length -= (u32) first_iov_offset;
1494                 }
1495                 if (i == (nseg - 1)) {
1496                         seg[i].segment_length = (u32) last_iov_length;
1497                 }
1498                 sum += seg[i].segment_length;
1499         }
1500         ctx->mxc_seg_list = seg;
1501         ctx->mxc_pin_type = MX_PIN_KERNEL;
1502 #ifdef MX_PIN_FULLPAGES
1503         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1504 #endif
1505         LASSERT(nob == sum);
1506         return 0;
1507 }
1508
1509 int
1510 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1511 {
1512         int             i                       = 0;
1513         int             sum                     = 0;
1514         int             old_sum                 = 0;
1515         int             nseg                    = 0;
1516         int             first_kiov              = -1;
1517         int             first_kiov_offset       = 0;
1518         int             first_found             = 0;
1519         int             last_kiov               = -1;
1520         int             last_kiov_length        = 0;
1521         mx_ksegment_t  *seg                     = NULL;
1522
1523         if (niov == 0) return 0;
1524         LASSERT(kiov != NULL);
1525
1526         for (i = 0; i < niov; i++) {
1527                 sum = old_sum + kiov[i].kiov_len;
1528                 if (i == 0) sum -= kiov[i].kiov_offset;
1529                 if (!first_found && (sum > offset)) {
1530                         first_kiov = i;
1531                         first_kiov_offset = offset - old_sum;
1532                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1533                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1534                         first_found = 1;
1535                         sum = kiov[i].kiov_len - first_kiov_offset;
1536                         old_sum = 0;
1537                 }
1538                 if (sum >= nob) {
1539                         last_kiov = i;
1540                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1541                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1542                         break;
1543                 }
1544                 old_sum = sum;
1545         }
1546         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1547         nseg = last_kiov - first_kiov + 1;
1548         LASSERT(nseg > 0);
1549
1550         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1551         if (seg == NULL) {
1552                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1553                 return -1;
1554         }
1555         memset(seg, 0, niov * sizeof(*seg));
1556         ctx->mxc_nseg = niov;
1557         sum = 0;
1558         for (i = 0; i < niov; i++) {
1559                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1560                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1561                 if (i == 0) {
1562                         seg[i].segment_ptr += (u64) first_kiov_offset;
1563                         /* we have to add back the original kiov_offset */
1564                         seg[i].segment_length -= first_kiov_offset +
1565                                                  kiov[first_kiov].kiov_offset;
1566                 }
1567                 if (i == (nseg - 1)) {
1568                         seg[i].segment_length = last_kiov_length;
1569                 }
1570                 sum += seg[i].segment_length;
1571         }
1572         ctx->mxc_seg_list = seg;
1573         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1574 #ifdef MX_PIN_FULLPAGES
1575         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1576 #endif
1577         LASSERT(nob == sum);
1578         return 0;
1579 }
1580
1581 void
1582 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1583 {
1584         LASSERT(type == MXLND_MSG_PUT_ACK);
1585         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1586         tx->mxc_cookie = cookie;
1587         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1588         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1589         tx->mxc_match = mxlnd_create_match(tx, status);
1590
1591         mxlnd_queue_tx(tx);
1592 }
1593
1594
1595 /**
1596  * mxlnd_send_data - get tx, map [k]iov, queue tx
1597  * @ni
1598  * @lntmsg
1599  * @peer
1600  * @msg_type
1601  * @cookie
1602  *
1603  * This setups the DATA send for PUT or GET.
1604  *
1605  * On success, it queues the tx, on failure it calls lnet_finalize()
1606  */
1607 void
1608 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1609 {
1610         int                     ret             = 0;
1611         lnet_process_id_t       target          = lntmsg->msg_target;
1612         unsigned int            niov            = lntmsg->msg_niov;
1613         struct iovec           *iov             = lntmsg->msg_iov;
1614         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1615         unsigned int            offset          = lntmsg->msg_offset;
1616         unsigned int            nob             = lntmsg->msg_len;
1617         struct kmx_ctx         *tx              = NULL;
1618
1619         LASSERT(lntmsg != NULL);
1620         LASSERT(peer != NULL);
1621         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1622         LASSERT((cookie>>52) == 0);
1623
1624         tx = mxlnd_get_idle_tx();
1625         if (tx == NULL) {
1626                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1627                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1628                         libcfs_nid2str(target.nid));
1629                 goto failed_0;
1630         }
1631         tx->mxc_nid = target.nid;
1632         /* NOTE called when we have a ref on the conn, get one for this tx */
1633         mxlnd_conn_addref(peer->mxp_conn);
1634         tx->mxc_peer = peer;
1635         tx->mxc_conn = peer->mxp_conn;
1636         tx->mxc_msg_type = msg_type;
1637         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1638         tx->mxc_state = MXLND_CTX_PENDING;
1639         tx->mxc_lntmsg[0] = lntmsg;
1640         tx->mxc_cookie = cookie;
1641         tx->mxc_match = mxlnd_create_match(tx, 0);
1642
1643         /* This setups up the mx_ksegment_t to send the DATA payload  */
1644         if (nob == 0) {
1645                 /* do not setup the segments */
1646                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1647                                    "to %s?\n", libcfs_nid2str(target.nid));
1648                 ret = 0;
1649         } else if (kiov == NULL) {
1650                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1651         } else {
1652                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1653         }
1654         if (ret != 0) {
1655                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1656                                    libcfs_nid2str(target.nid));
1657                 tx->mxc_status.code = -EIO;
1658                 goto failed_1;
1659         }
1660         mxlnd_queue_tx(tx);
1661         return;
1662
1663 failed_1:
1664         mxlnd_conn_decref(peer->mxp_conn);
1665         mxlnd_put_idle_tx(tx);
1666         return;
1667
1668 failed_0:
1669         CDEBUG(D_NETERROR, "no tx avail\n");
1670         lnet_finalize(ni, lntmsg, -EIO);
1671         return;
1672 }
1673
1674 /**
1675  * mxlnd_recv_data - map [k]iov, post rx
1676  * @ni
1677  * @lntmsg
1678  * @rx
1679  * @msg_type
1680  * @cookie
1681  *
1682  * This setups the DATA receive for PUT or GET.
1683  *
1684  * On success, it returns 0, on failure it returns -1
1685  */
1686 int
1687 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1688 {
1689         int                     ret             = 0;
1690         lnet_process_id_t       target          = lntmsg->msg_target;
1691         unsigned int            niov            = lntmsg->msg_niov;
1692         struct iovec           *iov             = lntmsg->msg_iov;
1693         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1694         unsigned int            offset          = lntmsg->msg_offset;
1695         unsigned int            nob             = lntmsg->msg_len;
1696         mx_return_t             mxret           = MX_SUCCESS;
1697
1698         /* above assumes MXLND_MSG_PUT_DATA */
1699         if (msg_type == MXLND_MSG_GET_DATA) {
1700                 niov = lntmsg->msg_md->md_niov;
1701                 iov = lntmsg->msg_md->md_iov.iov;
1702                 kiov = lntmsg->msg_md->md_iov.kiov;
1703                 offset = 0;
1704                 nob = lntmsg->msg_md->md_length;
1705         }
1706
1707         LASSERT(lntmsg != NULL);
1708         LASSERT(rx != NULL);
1709         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1710         LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1711
1712         rx->mxc_msg_type = msg_type;
1713         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1714         rx->mxc_state = MXLND_CTX_PENDING;
1715         rx->mxc_nid = target.nid;
1716         /* if posting a GET_DATA, we may not yet know the peer */
1717         if (rx->mxc_peer != NULL) {
1718                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1719         }
1720         rx->mxc_lntmsg[0] = lntmsg;
1721         rx->mxc_cookie = cookie;
1722         rx->mxc_match = mxlnd_create_match(rx, 0);
1723         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1724         if (kiov == NULL) {
1725                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1726         } else {
1727                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1728         }
1729         if (msg_type == MXLND_MSG_GET_DATA) {
1730                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1731                 if (rx->mxc_lntmsg[1] == NULL) {
1732                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1733                                            libcfs_nid2str(target.nid));
1734                         ret = -1;
1735                 }
1736         }
1737         if (ret != 0) {
1738                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1739                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1740                        libcfs_nid2str(target.nid));
1741                 return -1;
1742         }
1743         ret = mxlnd_q_pending_ctx(rx);
1744         if (ret == -1) {
1745                 return -1;
1746         }
1747         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1748         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1749                           rx->mxc_seg_list, rx->mxc_nseg,
1750                           rx->mxc_pin_type, rx->mxc_match,
1751                           0xF00FFFFFFFFFFFFFLL, (void *) rx,
1752                           &rx->mxc_mxreq);
1753         if (mxret != MX_SUCCESS) {
1754                 if (rx->mxc_conn != NULL) {
1755                         mxlnd_deq_pending_ctx(rx);
1756                 }
1757                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1758                                    (int) mxret, libcfs_nid2str(target.nid));
1759                 return -1;
1760         }
1761
1762         return 0;
1763 }
1764
1765 /**
1766  * mxlnd_send - the LND required send function
1767  * @ni
1768  * @private
1769  * @lntmsg
1770  *
1771  * This must not block. Since we may not have a peer struct for the receiver,
1772  * it will append send messages on a global tx list. We will then up the
1773  * tx_queued's semaphore to notify it of the new send. 
1774  */
1775 int
1776 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1777 {
1778         int                     ret             = 0;
1779         int                     type            = lntmsg->msg_type;
1780         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1781         lnet_process_id_t       target          = lntmsg->msg_target;
1782         lnet_nid_t              nid             = target.nid;
1783         int                     target_is_router = lntmsg->msg_target_is_router;
1784         int                     routing         = lntmsg->msg_routing;
1785         unsigned int            payload_niov    = lntmsg->msg_niov;
1786         struct iovec           *payload_iov     = lntmsg->msg_iov;
1787         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1788         unsigned int            payload_offset  = lntmsg->msg_offset;
1789         unsigned int            payload_nob     = lntmsg->msg_len;
1790         struct kmx_ctx         *tx              = NULL;
1791         struct kmx_msg         *txmsg           = NULL;
1792         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1793         struct kmx_ctx         *rx_data         = NULL;
1794         struct kmx_conn        *conn            = NULL;
1795         int                     nob             = 0;
1796         uint32_t                length          = 0;
1797         struct kmx_peer         *peer           = NULL;
1798
1799         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1800                        payload_nob, payload_niov, libcfs_id2str(target));
1801
1802         LASSERT (payload_nob == 0 || payload_niov > 0);
1803         LASSERT (payload_niov <= LNET_MAX_IOV);
1804         /* payload is either all vaddrs or all pages */
1805         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1806
1807         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1808
1809         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1810          * to a new peer, use the nid */
1811         peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1812         if (peer != NULL) {
1813                 if (unlikely(peer->mxp_incompatible)) {
1814                         mxlnd_peer_decref(peer); /* drop ref taken above */
1815                 } else {
1816                         spin_lock(&peer->mxp_lock);
1817                         conn = peer->mxp_conn;
1818                         if (conn) {
1819                                 mxlnd_conn_addref(conn);
1820                                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1821                         }
1822                         spin_unlock(&peer->mxp_lock);
1823                 }
1824         }
1825         if (conn == NULL && peer != NULL) {
1826                 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1827                        peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1828         }
1829
1830         switch (type) {
1831         case LNET_MSG_ACK:
1832                 LASSERT (payload_nob == 0);
1833                 break;
1834
1835         case LNET_MSG_REPLY:
1836         case LNET_MSG_PUT:
1837                 /* Is the payload small enough not to need DATA? */
1838                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1839                 if (nob <= MXLND_EAGER_SIZE)
1840                         break;                  /* send EAGER */
1841
1842                 tx = mxlnd_get_idle_tx();
1843                 if (unlikely(tx == NULL)) {
1844                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1845                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1846                                libcfs_nid2str(nid));
1847                         if (conn) mxlnd_conn_decref(conn);
1848                         return -ENOMEM;
1849                 }
1850
1851                 /* the peer may be NULL */
1852                 tx->mxc_peer = peer;
1853                 tx->mxc_conn = conn; /* may be NULL */
1854                 /* we added a conn ref above */
1855                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1856                 txmsg = tx->mxc_msg;
1857                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1858                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1859                 tx->mxc_match = mxlnd_create_match(tx, 0);
1860
1861                 /* we must post a receive _before_ sending the request.
1862                  * we need to determine how much to receive, it will be either
1863                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1864
1865                 rx = mxlnd_get_idle_rx();
1866                 if (unlikely(rx == NULL)) {
1867                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1868                                            libcfs_nid2str(nid));
1869                         mxlnd_put_idle_tx(tx);
1870                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1871                         return -ENOMEM;
1872                 }
1873                 rx->mxc_nid = nid;
1874                 rx->mxc_peer = peer;
1875                 /* conn may be NULL but unlikely since the first msg is always small */
1876                 /* NOTE no need to lock peer before adding conn ref since we took
1877                  * a conn ref for the tx (it cannot be freed between there and here ) */
1878                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1879                 rx->mxc_conn = conn;
1880                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1881                 rx->mxc_cookie = tx->mxc_cookie;
1882                 rx->mxc_match = mxlnd_create_match(rx, 0);
1883
1884                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1885                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1886                 if (unlikely(ret != 0)) {
1887                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1888                                            libcfs_nid2str(nid));
1889                         rx->mxc_lntmsg[0] = NULL;
1890                         mxlnd_put_idle_rx(rx);
1891                         mxlnd_put_idle_tx(tx);
1892                         if (conn) {
1893                                 mxlnd_conn_decref(conn); /* for the rx... */
1894                                 mxlnd_conn_decref(conn); /* and for the tx */
1895                         }
1896                         return -EHOSTUNREACH;
1897                 }
1898
1899                 mxlnd_queue_tx(tx);
1900                 return 0;
1901
1902         case LNET_MSG_GET:
1903                 if (routing || target_is_router)
1904                         break;                  /* send EAGER */
1905
1906                 /* is the REPLY message too small for DATA? */
1907                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1908                 if (nob <= MXLND_EAGER_SIZE)
1909                         break;                  /* send EAGER */
1910
1911                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1912                  * then post GET_REQ tx */
1913                 tx = mxlnd_get_idle_tx();
1914                 if (unlikely(tx == NULL)) {
1915                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1916                                            libcfs_nid2str(nid));
1917                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1918                         return -ENOMEM;
1919                 }
1920                 rx_data = mxlnd_get_idle_rx();
1921                 if (unlikely(rx_data == NULL)) {
1922                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1923                                            libcfs_nid2str(nid));
1924                         mxlnd_put_idle_tx(tx);
1925                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1926                         return -ENOMEM;
1927                 }
1928                 rx_data->mxc_peer = peer;
1929                 /* NOTE no need to lock peer before adding conn ref since we took
1930                  * a conn ref for the tx (it cannot be freed between there and here ) */
1931                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1932                 rx_data->mxc_conn = conn; /* may be NULL */
1933
1934                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1935                 if (unlikely(ret != 0)) {
1936                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1937                                            libcfs_nid2str(nid));
1938                         mxlnd_put_idle_rx(rx_data);
1939                         mxlnd_put_idle_tx(tx);
1940                         if (conn) {
1941                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1942                                 mxlnd_conn_decref(conn); /* and for the tx */
1943                         }
1944                         return -EIO;
1945                 }
1946
1947                 tx->mxc_peer = peer;
1948                 tx->mxc_conn = conn; /* may be NULL */
1949                 /* conn ref taken above */
1950                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1951                 txmsg = tx->mxc_msg;
1952                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1953                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1954                 tx->mxc_match = mxlnd_create_match(tx, 0);
1955
1956                 mxlnd_queue_tx(tx);
1957                 return 0;
1958
1959         default:
1960                 LBUG();
1961                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1962                 return -EIO;
1963         }
1964
1965         /* send EAGER */
1966
1967         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1968                 <= MXLND_EAGER_SIZE);
1969
1970         tx = mxlnd_get_idle_tx();
1971         if (unlikely(tx == NULL)) {
1972                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1973                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1974                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1975                 return -ENOMEM;
1976         }
1977
1978         tx->mxc_peer = peer;
1979         tx->mxc_conn = conn; /* may be NULL */
1980         /* conn ref taken above */
1981         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1982         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1983         tx->mxc_match = mxlnd_create_match(tx, 0);
1984
1985         txmsg = tx->mxc_msg;
1986         txmsg->mxm_u.eager.mxem_hdr = *hdr;
1987
1988         if (payload_kiov != NULL)
1989                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1990                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1991                             payload_niov, payload_kiov, payload_offset, payload_nob);
1992         else
1993                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1994                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1995                             payload_niov, payload_iov, payload_offset, payload_nob);
1996
1997         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1998         mxlnd_queue_tx(tx);
1999         return 0;
2000 }
2001
2002 /**
2003  * mxlnd_recv - the LND required recv function
2004  * @ni
2005  * @private
2006  * @lntmsg
2007  * @delayed
2008  * @niov
2009  * @kiov
2010  * @offset
2011  * @mlen
2012  * @rlen
2013  *
2014  * This must not block.
2015  */
2016 int
2017 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
2018              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
2019              unsigned int offset, unsigned int mlen, unsigned int rlen)
2020 {
2021         int                     ret             = 0;
2022         int                     nob             = 0;
2023         int                     len             = 0;
2024         struct kmx_ctx          *rx             = private;
2025         struct kmx_msg          *rxmsg          = rx->mxc_msg;
2026         lnet_nid_t               nid            = rx->mxc_nid;
2027         struct kmx_ctx          *tx             = NULL;
2028         struct kmx_msg          *txmsg          = NULL;
2029         struct kmx_peer         *peer           = rx->mxc_peer;
2030         struct kmx_conn         *conn           = peer->mxp_conn;
2031         u64                      cookie         = 0LL;
2032         int                      msg_type       = rxmsg->mxm_type;
2033         int                      repost         = 1;
2034         int                      credit         = 0;
2035         int                      finalize       = 0;
2036
2037         LASSERT (mlen <= rlen);
2038         /* Either all pages or all vaddrs */
2039         LASSERT (!(kiov != NULL && iov != NULL));
2040         LASSERT (peer != NULL);
2041
2042         /* conn_addref(conn) already taken for the primary rx */
2043
2044         switch (msg_type) {
2045         case MXLND_MSG_EAGER:
2046                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2047                 len = rx->mxc_status.xfer_length;
2048                 if (unlikely(nob > len)) {
2049                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2050                                            libcfs_nid2str(nid), nob, len);
2051                         ret = -EPROTO;
2052                         break;
2053                 }
2054
2055                 if (kiov != NULL)
2056                         lnet_copy_flat2kiov(niov, kiov, offset,
2057                                 MXLND_EAGER_SIZE, rxmsg,
2058                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2059                                 mlen);
2060                 else
2061                         lnet_copy_flat2iov(niov, iov, offset,
2062                                 MXLND_EAGER_SIZE, rxmsg,
2063                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2064                                 mlen);
2065                 finalize = 1;
2066                 credit = 1;
2067                 break;
2068
2069         case MXLND_MSG_PUT_REQ:
2070                 /* we are going to reuse the rx, store the needed info */
2071                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2072
2073                 /* get tx, post rx, send PUT_ACK */
2074
2075                 tx = mxlnd_get_idle_tx();
2076                 if (unlikely(tx == NULL)) {
2077                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2078                         /* Not replying will break the connection */
2079                         ret = -ENOMEM;
2080                         break;
2081                 }
2082                 if (unlikely(mlen == 0)) {
2083                         finalize = 1;
2084                         tx->mxc_peer = peer;
2085                         tx->mxc_conn = conn;
2086                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2087                         /* repost = 1 */
2088                         break;
2089                 }
2090
2091                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2092                 tx->mxc_peer = peer;
2093                 tx->mxc_conn = conn;
2094                 /* no need to lock peer first since we already have a ref */
2095                 mxlnd_conn_addref(conn); /* for the tx */
2096                 txmsg = tx->mxc_msg;
2097                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2098                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2099                 tx->mxc_cookie = cookie;
2100                 tx->mxc_match = mxlnd_create_match(tx, 0);
2101
2102                 /* we must post a receive _before_ sending the PUT_ACK */
2103                 mxlnd_ctx_init(rx);
2104                 rx->mxc_state = MXLND_CTX_PREP;
2105                 rx->mxc_peer = peer;
2106                 rx->mxc_conn = conn;
2107                 /* do not take another ref for this rx, it is already taken */
2108                 rx->mxc_nid = peer->mxp_nid;
2109                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2110                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2111
2112                 if (unlikely(ret != 0)) {
2113                         /* Notify peer that it's over */
2114                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2115                                            libcfs_nid2str(nid), ret);
2116                         mxlnd_ctx_init(tx);
2117                         tx->mxc_state = MXLND_CTX_PREP;
2118                         tx->mxc_peer = peer;
2119                         tx->mxc_conn = conn;
2120                         /* finalize = 0, let the PUT_ACK tx finalize this */
2121                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2122                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2123                         /* conn ref already taken above */
2124                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2125                         /* repost = 1 */
2126                         break;
2127                 }
2128
2129                 mxlnd_queue_tx(tx);
2130                 /* do not return a credit until after PUT_DATA returns */
2131                 repost = 0;
2132                 break;
2133
2134         case MXLND_MSG_GET_REQ:
2135                 if (likely(lntmsg != NULL)) {
2136                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2137                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2138                 } else {
2139                         /* GET didn't match anything */
2140                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2141                          * We have to embed the error code in the match bits.
2142                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2143                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2144
2145                         tx = mxlnd_get_idle_tx();
2146                         if (unlikely(tx == NULL)) {
2147                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2148                                                    libcfs_nid2str(nid));
2149                                 ret = -ENOMEM;
2150                                 break;
2151                         }
2152                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2153                         tx->mxc_state = MXLND_CTX_PENDING;
2154                         tx->mxc_nid = nid;
2155                         tx->mxc_peer = peer;
2156                         tx->mxc_conn = conn;
2157                         /* no need to lock peer first since we already have a ref */
2158                         mxlnd_conn_addref(conn); /* for this tx */
2159                         tx->mxc_cookie = cookie;
2160                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2161                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2162                         mxlnd_queue_tx(tx);
2163                 }
2164                 /* finalize lntmsg after tx completes */
2165                 break;
2166
2167         default:
2168                 LBUG();
2169         }
2170
2171         if (repost) {
2172                 /* we received a message, increment peer's outstanding credits */
2173                 if (credit == 1) {
2174                         spin_lock(&conn->mxk_lock);
2175                         conn->mxk_outstanding++;
2176                         spin_unlock(&conn->mxk_lock);
2177                 }
2178                 /* we are done with the rx */
2179                 mxlnd_put_idle_rx(rx);
2180                 mxlnd_conn_decref(conn);
2181         }
2182
2183         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2184
2185         /* we received a credit, see if we can use it to send a msg */
2186         if (credit) mxlnd_check_sends(peer);
2187
2188         return ret;
2189 }
2190
2191 void
2192 mxlnd_sleep(unsigned long timeout)
2193 {
2194         set_current_state(TASK_INTERRUPTIBLE);
2195         schedule_timeout(timeout);
2196         return;
2197 }
2198
2199 /**
2200  * mxlnd_tx_queued - the generic send queue thread
2201  * @arg - thread id (as a void *)
2202  *
2203  * This thread moves send messages from the global tx_queue to the owning
2204  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2205  * it to the global peer list.
2206  */
2207 int
2208 mxlnd_tx_queued(void *arg)
2209 {
2210         long                    id      = (long) arg;
2211         int                     ret     = 0;
2212         int                     found   = 0;
2213         struct kmx_ctx         *tx      = NULL;
2214         struct kmx_peer        *peer    = NULL;
2215         struct list_head       *tmp_tx  = NULL;
2216
2217         cfs_daemonize("mxlnd_tx_queued");
2218         //cfs_block_allsigs();
2219
2220         while (!kmxlnd_data.kmx_shutdown) {
2221                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2222                 if (kmxlnd_data.kmx_shutdown)
2223                         break;
2224                 if (ret != 0) // Should we check for -EINTR?
2225                         continue;
2226                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2227                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2228                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2229                         continue;
2230                 }
2231                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2232                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2233                 list_del_init(&tx->mxc_list);
2234                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2235
2236                 found = 0;
2237                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2238                 if (peer != NULL) {
2239                         tx->mxc_peer = peer;
2240                         spin_lock(&peer->mxp_lock);
2241                         if (peer->mxp_conn == NULL) {
2242                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2243                                 if (ret != 0) {
2244                                         /* out of memory, give up and fail tx */
2245                                         tx->mxc_status.code = -ENOMEM;
2246                                         spin_unlock(&peer->mxp_lock);
2247                                         mxlnd_peer_decref(peer);
2248                                         mxlnd_put_idle_tx(tx);
2249                                         continue;
2250                                 }
2251                         }
2252                         tx->mxc_conn = peer->mxp_conn;
2253                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2254                         spin_unlock(&peer->mxp_lock);
2255                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2256                         mxlnd_queue_tx(tx);
2257                         found = 1;
2258                 }
2259                 if (found == 0) {
2260                         int              hash   = 0;
2261                         struct kmx_peer *peer = NULL;
2262                         struct kmx_peer *old = NULL;
2263
2264                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2265
2266                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2267                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2268                         /* create peer */
2269                         /* adds conn ref for this function */
2270                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2271                         if (ret != 0) {
2272                                 /* finalize message */
2273                                 tx->mxc_status.code = ret;
2274                                 mxlnd_put_idle_tx(tx);
2275                                 continue;
2276                         }
2277                         tx->mxc_peer = peer;
2278                         tx->mxc_conn = peer->mxp_conn;
2279                         /* this tx will keep the conn ref taken in peer_alloc() */
2280
2281                         /* add peer to global peer list, but look to see
2282                          * if someone already created it after we released
2283                          * the read lock */
2284                         write_lock(&kmxlnd_data.kmx_peers_lock);
2285                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2286                                 if (old->mxp_nid == peer->mxp_nid) {
2287                                         /* somebody beat us here, we created a duplicate */
2288                                         found = 1;
2289                                         break;
2290                                 }
2291                         }
2292
2293                         if (found == 0) {
2294                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2295                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2296                         } else {
2297                                 tx->mxc_peer = old;
2298                                 spin_lock(&old->mxp_lock);
2299                                 tx->mxc_conn = old->mxp_conn;
2300                                 /* FIXME can conn be NULL? */
2301                                 LASSERT(old->mxp_conn != NULL);
2302                                 mxlnd_conn_addref(old->mxp_conn);
2303                                 spin_unlock(&old->mxp_lock);
2304                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2305                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2306                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2307                                 mxlnd_peer_decref(peer);
2308                         }
2309                         write_unlock(&kmxlnd_data.kmx_peers_lock);
2310
2311                         mxlnd_queue_tx(tx);
2312                 }
2313         }
2314         mxlnd_thread_stop(id);
2315         return 0;
2316 }
2317
2318 /* When calling this, we must not have the peer lock. */
2319 void
2320 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2321 {
2322         mx_return_t             mxret   = MX_SUCCESS;
2323         mx_request_t            request;
2324         struct kmx_conn         *conn   = peer->mxp_conn;
2325
2326         /* NOTE we are holding a conn ref every time we call this function,
2327          * we do not need to lock the peer before taking another ref */
2328         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2329
2330         LASSERT(mask == MXLND_MASK_ICON_REQ ||
2331                 mask == MXLND_MASK_ICON_ACK);
2332
2333         if (peer->mxp_reconnect_time == 0) {
2334                 peer->mxp_reconnect_time = jiffies;
2335         }
2336
2337         if (peer->mxp_nic_id == 0LL) {
2338                 mxlnd_peer_hostname_to_nic_id(peer);
2339                 if (peer->mxp_nic_id == 0LL) {
2340                         /* not mapped yet, return */
2341                         spin_lock(&conn->mxk_lock);
2342                         conn->mxk_status = MXLND_CONN_INIT;
2343                         spin_unlock(&conn->mxk_lock);
2344                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2345                                 /* give up and notify LNET */
2346                                 mxlnd_conn_disconnect(conn, 0, 1);
2347                                 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2348                                                                             function... */
2349                                 mxlnd_conn_decref(peer->mxp_conn); /* which we no 
2350                                                                       longer need */
2351                         }
2352                         mxlnd_conn_decref(conn);
2353                         return;
2354                 }
2355         }
2356
2357         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2358                             peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2359                             (void *) peer, &request);
2360         if (unlikely(mxret != MX_SUCCESS)) {
2361                 spin_lock(&conn->mxk_lock);
2362                 conn->mxk_status = MXLND_CONN_FAIL;
2363                 spin_unlock(&conn->mxk_lock);
2364                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2365                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2366                 mxlnd_conn_decref(conn);
2367         }
2368         return;
2369 }
2370
2371 #define MXLND_STATS 0
2372
2373 int
2374 mxlnd_check_sends(struct kmx_peer *peer)
2375 {
2376         int                     ret             = 0;
2377         int                     found           = 0;
2378         mx_return_t             mxret           = MX_SUCCESS;
2379         struct kmx_ctx          *tx             = NULL;
2380         struct kmx_conn         *conn           = NULL;
2381         u8                      msg_type        = 0;
2382         int                     credit          = 0;
2383         int                     status          = 0;
2384         int                     ntx_posted      = 0;
2385         int                     credits         = 0;
2386 #if MXLND_STATS
2387         static unsigned long    last            = 0;
2388 #endif
2389
2390         if (unlikely(peer == NULL)) {
2391                 LASSERT(peer != NULL);
2392                 return -1;
2393         }
2394         spin_lock(&peer->mxp_lock);
2395         conn = peer->mxp_conn;
2396         /* NOTE take a ref for the duration of this function since it is called
2397          * when there might not be any queued txs for this peer */
2398         if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2399         spin_unlock(&peer->mxp_lock);
2400
2401         /* do not add another ref for this tx */
2402
2403         if (conn == NULL) {
2404                 /* we do not have any conns */
2405                 return -1;
2406         }
2407
2408 #if MXLND_STATS
2409         if (time_after(jiffies, last)) {
2410                 last = jiffies + HZ;
2411                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2412                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2413                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2414                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2415                               conn->mxk_ntx_data, conn->mxk_data_posted);
2416         }
2417 #endif
2418
2419         /* cache peer state for asserts */
2420         spin_lock(&conn->mxk_lock);
2421         ntx_posted = conn->mxk_ntx_posted;
2422         credits = conn->mxk_credits;
2423         spin_unlock(&conn->mxk_lock);
2424
2425         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2426         LASSERT(ntx_posted >= 0);
2427
2428         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2429         LASSERT(credits >= 0);
2430
2431         /* check number of queued msgs, ignore data */
2432         spin_lock(&conn->mxk_lock);
2433         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2434                 /* check if any txs queued that could return credits... */
2435                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2436                         /* if not, send a NOOP */
2437                         tx = mxlnd_get_idle_tx();
2438                         if (likely(tx != NULL)) {
2439                                 tx->mxc_peer = peer;
2440                                 tx->mxc_conn = peer->mxp_conn;
2441                                 mxlnd_conn_addref(conn); /* for this tx */
2442                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2443                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2444                                 mxlnd_peer_queue_tx_locked(tx);
2445                                 found = 1;
2446                                 goto done_locked;
2447                         }
2448                 }
2449         }
2450         spin_unlock(&conn->mxk_lock);
2451
2452         /* if the peer is not ready, try to connect */
2453         spin_lock(&conn->mxk_lock);
2454         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2455             conn->mxk_status == MXLND_CONN_FAIL ||
2456             conn->mxk_status == MXLND_CONN_REQ)) {
2457                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2458                 conn->mxk_status = MXLND_CONN_WAIT;
2459                 spin_unlock(&conn->mxk_lock);
2460                 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2461                 goto done;
2462         }
2463         spin_unlock(&conn->mxk_lock);
2464
2465         spin_lock(&conn->mxk_lock);
2466         while (!list_empty(&conn->mxk_tx_free_queue) ||
2467                !list_empty(&conn->mxk_tx_credit_queue)) {
2468                 /* We have something to send. If we have a queued tx that does not
2469                  * require a credit (free), choose it since its completion will 
2470                  * return a credit (here or at the peer), complete a DATA or 
2471                  * CONN_REQ or CONN_ACK. */
2472                 struct list_head *tmp_tx = NULL;
2473                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2474                         tmp_tx = &conn->mxk_tx_free_queue;
2475                 } else {
2476                         tmp_tx = &conn->mxk_tx_credit_queue;
2477                 }
2478                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2479
2480                 msg_type = tx->mxc_msg_type;
2481
2482                 /* don't try to send a rx */
2483                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2484
2485                 /* ensure that it is a valid msg type */
2486                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2487                         msg_type == MXLND_MSG_CONN_ACK ||
2488                         msg_type == MXLND_MSG_NOOP     ||
2489                         msg_type == MXLND_MSG_EAGER    ||
2490                         msg_type == MXLND_MSG_PUT_REQ  ||
2491                         msg_type == MXLND_MSG_PUT_ACK  ||
2492                         msg_type == MXLND_MSG_PUT_DATA ||
2493                         msg_type == MXLND_MSG_GET_REQ  ||
2494                         msg_type == MXLND_MSG_GET_DATA);
2495                 LASSERT(tx->mxc_peer == peer);
2496                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2497
2498                 credit = mxlnd_tx_requires_credit(tx);
2499                 if (credit) {
2500
2501                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2502                                 CDEBUG(D_NET, "%s: posted enough\n",
2503                                               libcfs_nid2str(peer->mxp_nid));
2504                                 goto done_locked;
2505                         }
2506
2507                         if (conn->mxk_credits == 0) {
2508                                 CDEBUG(D_NET, "%s: no credits\n",
2509                                               libcfs_nid2str(peer->mxp_nid));
2510                                 goto done_locked;
2511                         }
2512
2513                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2514                             conn->mxk_outstanding == 0) {  /* giving back credits */
2515                                 CDEBUG(D_NET, "%s: not using last credit\n",
2516                                               libcfs_nid2str(peer->mxp_nid));
2517                                 goto done_locked;
2518                         }
2519                 }
2520
2521                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2522                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2523                                 msg_type == MXLND_MSG_CONN_ACK)) {
2524                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2525                                              mxlnd_connstatus_to_str(conn->mxk_status),
2526                                              tx->mxc_cookie,
2527                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2528                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2529                                         list_del_init(&tx->mxc_list);
2530                                         tx->mxc_status.code = -ECONNABORTED;
2531                                         mxlnd_put_idle_tx(tx);
2532                                         mxlnd_conn_decref(conn);
2533                                 }
2534                                 goto done_locked;
2535                         }
2536                 }
2537
2538                 list_del_init(&tx->mxc_list);
2539
2540                 /* handle credits, etc now while we have the lock to avoid races */
2541                 if (credit) {
2542                         conn->mxk_credits--;
2543                         conn->mxk_ntx_posted++;
2544                 }
2545                 if (msg_type != MXLND_MSG_PUT_DATA &&
2546                     msg_type != MXLND_MSG_GET_DATA) {
2547                         if (msg_type != MXLND_MSG_CONN_REQ &&
2548                             msg_type != MXLND_MSG_CONN_ACK) {
2549                                 conn->mxk_ntx_msgs--;
2550                         }
2551                 }
2552                 if (tx->mxc_incarnation == 0 &&
2553                     conn->mxk_incarnation != 0) {
2554                         tx->mxc_incarnation = conn->mxk_incarnation;
2555                 }
2556                 spin_unlock(&conn->mxk_lock);
2557
2558                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2559                  * or (2) there is a non-DATA msg that can return credits in the 
2560                  * queue, then drop this duplicate NOOP */
2561                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2562                         spin_lock(&conn->mxk_lock);
2563                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2564                             (conn->mxk_ntx_msgs >= 1)) {
2565                                 conn->mxk_credits++;
2566                                 conn->mxk_ntx_posted--;
2567                                 spin_unlock(&conn->mxk_lock);
2568                                 /* redundant NOOP */
2569                                 mxlnd_put_idle_tx(tx);
2570                                 mxlnd_conn_decref(conn);
2571                                 CDEBUG(D_NET, "%s: redundant noop\n",
2572                                               libcfs_nid2str(peer->mxp_nid));
2573                                 found = 1;
2574                                 goto done;
2575                         }
2576                         spin_unlock(&conn->mxk_lock);
2577                 }
2578
2579                 found = 1;
2580                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2581                     (msg_type != MXLND_MSG_GET_DATA))) {
2582                         mxlnd_pack_msg(tx);
2583                 }
2584
2585                 //ret = -ECONNABORTED;
2586                 mxret = MX_SUCCESS;
2587
2588                 spin_lock(&conn->mxk_lock);
2589                 status = conn->mxk_status;
2590                 spin_unlock(&conn->mxk_lock);
2591
2592                 if (likely((status == MXLND_CONN_READY) ||
2593                     (msg_type == MXLND_MSG_CONN_REQ) ||
2594                     (msg_type == MXLND_MSG_CONN_ACK))) {
2595                         ret = 0;
2596                         if (msg_type != MXLND_MSG_CONN_REQ &&
2597                             msg_type != MXLND_MSG_CONN_ACK) {
2598                                 /* add to the pending list */
2599                                 ret = mxlnd_q_pending_ctx(tx);
2600                                 if (ret == -1) {
2601                                         /* FIXME the conn is disconnected, now what? */
2602                                 }
2603                         } else {
2604                                 /* CONN_REQ/ACK */
2605                                 tx->mxc_state = MXLND_CTX_PENDING;
2606                         }
2607
2608                         if (ret == 0) {
2609                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2610                                     msg_type != MXLND_MSG_GET_DATA)) {
2611                                         /* send a msg style tx */
2612                                         LASSERT(tx->mxc_nseg == 1);
2613                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2614                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2615                                                mxlnd_msgtype_to_str(msg_type),
2616                                                tx->mxc_cookie);
2617                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2618                                                           &tx->mxc_seg,
2619                                                           tx->mxc_nseg,
2620                                                           tx->mxc_pin_type,
2621                                                           conn->mxk_epa,
2622                                                           tx->mxc_match,
2623                                                           (void *) tx,
2624                                                           &tx->mxc_mxreq);
2625                                 } else {
2626                                         /* send a DATA tx */
2627                                         spin_lock(&conn->mxk_lock);
2628                                         conn->mxk_ntx_data--;
2629                                         conn->mxk_data_posted++;
2630                                         spin_unlock(&conn->mxk_lock);
2631                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2632                                                mxlnd_msgtype_to_str(msg_type),
2633                                                tx->mxc_cookie);
2634                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2635                                                           tx->mxc_seg_list,
2636                                                           tx->mxc_nseg,
2637                                                           tx->mxc_pin_type,
2638                                                           conn->mxk_epa,
2639                                                           tx->mxc_match,
2640                                                           (void *) tx,
2641                                                           &tx->mxc_mxreq);
2642                                 }
2643                         } else {
2644                                 mxret = MX_CONNECTION_FAILED;
2645                         }
2646                         if (likely(mxret == MX_SUCCESS)) {
2647                                 ret = 0;
2648                         } else {
2649                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2650                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2651                                        libcfs_nid2str(peer->mxp_nid));
2652                                 /* NOTE mx_kisend() only fails if there are not enough 
2653                                 * resources. Do not change the connection status. */
2654                                 if (mxret == MX_NO_RESOURCES) {
2655                                         tx->mxc_status.code = -ENOMEM;
2656                                 } else {
2657                                         tx->mxc_status.code = -ECONNABORTED;
2658                                 }
2659                                 if (credit) {
2660                                         spin_lock(&conn->mxk_lock);
2661                                         conn->mxk_ntx_posted--;
2662                                         conn->mxk_credits++;
2663                                         spin_unlock(&conn->mxk_lock);
2664                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2665                                         msg_type == MXLND_MSG_GET_DATA) {
2666                                         spin_lock(&conn->mxk_lock);
2667                                         conn->mxk_data_posted--;
2668                                         spin_unlock(&conn->mxk_lock);
2669                                 }
2670                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2671                                     msg_type != MXLND_MSG_GET_DATA &&
2672                                     msg_type != MXLND_MSG_CONN_REQ &&
2673                                     msg_type != MXLND_MSG_CONN_ACK) {
2674                                         spin_lock(&conn->mxk_lock);
2675                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2676                                         spin_unlock(&conn->mxk_lock);
2677                                 }
2678                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2679                                     msg_type != MXLND_MSG_CONN_ACK) {
2680                                         /* remove from the pending list */
2681                                         mxlnd_deq_pending_ctx(tx);
2682                                 }
2683                                 mxlnd_put_idle_tx(tx);
2684                                 mxlnd_conn_decref(conn);
2685                         }
2686                 }
2687                 spin_lock(&conn->mxk_lock);
2688         }
2689 done_locked:
2690         spin_unlock(&conn->mxk_lock);
2691 done:
2692         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2693         return found;
2694 }
2695
2696
2697 /**
2698  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2699  * @ctx - the tx descriptor
2700  *
2701  * Determine which type of send request it was and start the next step, if needed,
2702  * or, if done, signal completion to LNET. After we are done, put back on the
2703  * idle tx list.
2704  */
2705 void
2706 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2707 {
2708         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2709         struct kmx_msg  *msg    = tx->mxc_msg;
2710         struct kmx_peer *peer   = tx->mxc_peer;
2711         struct kmx_conn *conn   = tx->mxc_conn;
2712         u8              type    = tx->mxc_msg_type;
2713         int             credit  = mxlnd_tx_requires_credit(tx);
2714         u64             cookie  = tx->mxc_cookie;
2715
2716         CDEBUG(D_NET, "entering %s (0x%llx):\n",
2717                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2718
2719         if (unlikely(conn == NULL)) {
2720                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2721                 conn = peer->mxp_conn;
2722                 if (conn != NULL) {
2723                         /* do not add a ref for the tx, it was set before sending */
2724                         tx->mxc_conn = conn;
2725                         tx->mxc_peer = conn->mxk_peer;
2726                 }
2727         }
2728         LASSERT (peer != NULL);
2729         LASSERT (conn != NULL);
2730
2731         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2732                 LASSERT (type == msg->mxm_type);
2733         }
2734
2735         if (failed) {
2736                 tx->mxc_status.code = -EIO;
2737         } else {
2738                 spin_lock(&conn->mxk_lock);
2739                 conn->mxk_last_tx = jiffies;
2740                 spin_unlock(&conn->mxk_lock);
2741         }
2742
2743         switch (type) {
2744
2745         case MXLND_MSG_GET_DATA:
2746                 spin_lock(&conn->mxk_lock);
2747                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2748                         conn->mxk_outstanding++;
2749                         conn->mxk_data_posted--;
2750                 }
2751                 spin_unlock(&conn->mxk_lock);
2752                 break;
2753
2754         case MXLND_MSG_PUT_DATA:
2755                 spin_lock(&conn->mxk_lock);
2756                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2757                         conn->mxk_data_posted--;
2758                 }
2759                 spin_unlock(&conn->mxk_lock);
2760                 break;
2761
2762         case MXLND_MSG_NOOP:
2763         case MXLND_MSG_PUT_REQ:
2764         case MXLND_MSG_PUT_ACK:
2765         case MXLND_MSG_GET_REQ:
2766         case MXLND_MSG_EAGER:
2767         //case MXLND_MSG_NAK:
2768                 break;
2769
2770         case MXLND_MSG_CONN_ACK:
2771                 if (peer->mxp_incompatible) {
2772                         /* we sent our params, now close this conn */
2773                         mxlnd_conn_disconnect(conn, 0, 1);
2774                 }
2775         case MXLND_MSG_CONN_REQ:
2776                 if (failed) {
2777                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2778                                "failed with %s (%d) to %s\n",
2779                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2780                                mx_strstatus(tx->mxc_status.code),
2781                                tx->mxc_status.code,
2782                                libcfs_nid2str(tx->mxc_nid));
2783                         if (!peer->mxp_incompatible) {
2784                                 spin_lock(&conn->mxk_lock);
2785                                 conn->mxk_status = MXLND_CONN_FAIL;
2786                                 spin_unlock(&conn->mxk_lock);
2787                         }
2788                 }
2789                 break;
2790
2791         default:
2792                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2793                 LBUG();
2794         }
2795
2796         if (credit) {
2797                 spin_lock(&conn->mxk_lock);
2798                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2799                         conn->mxk_ntx_posted--;
2800                 }
2801                 spin_unlock(&conn->mxk_lock);
2802         }
2803
2804         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2805         mxlnd_put_idle_tx(tx);
2806         mxlnd_conn_decref(conn);
2807
2808         mxlnd_check_sends(peer);
2809
2810         return;
2811 }
2812
2813 void
2814 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2815 {
2816         int                     ret             = 0;
2817         int                     repost          = 1;
2818         int                     credit          = 1;
2819         u32                     nob             = rx->mxc_status.xfer_length;
2820         u64                     bits            = rx->mxc_status.match_info;
2821         struct kmx_msg         *msg             = rx->mxc_msg;
2822         struct kmx_peer        *peer            = rx->mxc_peer;
2823         struct kmx_conn        *conn            = rx->mxc_conn;
2824         u8                      type            = rx->mxc_msg_type;
2825         u64                     seq             = 0LL;
2826         lnet_msg_t             *lntmsg[2];
2827         int                     result          = 0;
2828         u64                     nic_id          = 0LL;
2829         u32                     ep_id           = 0;
2830         int                     peer_ref        = 0;
2831         int                     conn_ref        = 0;
2832         int                     incompatible    = 0;
2833
2834         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2835          * failed GET reply, CONN_REQ, or a CONN_ACK */
2836
2837         /* NOTE peer may still be NULL if it is a new peer and
2838          *      conn may be NULL if this is a re-connect */
2839         if (likely(peer != NULL && conn != NULL)) {
2840                 /* we have a reference on the conn */
2841                 conn_ref = 1;
2842         } else if (peer != NULL && conn == NULL) {
2843                 /* we have a reference on the peer */
2844                 peer_ref = 1;
2845         } else if (peer == NULL && conn != NULL) {
2846                 /* fatal error */
2847                 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2848                 LBUG();
2849         } /* else peer and conn == NULL */
2850
2851 #if 0
2852         if (peer == NULL || conn == NULL) {
2853                 /* if the peer was disconnected, the peer may exist but
2854                  * not have any valid conns */
2855                 decref = 0; /* no peer means no ref was taken for this rx */
2856         }
2857 #endif
2858
2859         if (conn == NULL && peer != NULL) {
2860                 spin_lock(&peer->mxp_lock);
2861                 conn = peer->mxp_conn;
2862                 if (conn) {
2863                         mxlnd_conn_addref(conn); /* conn takes ref... */
2864                         mxlnd_peer_decref(peer); /* from peer */
2865                         conn_ref = 1;
2866                         peer_ref = 0;
2867                 }
2868                 spin_unlock(&peer->mxp_lock);
2869                 rx->mxc_conn = conn;
2870         }
2871
2872 #if MXLND_DEBUG
2873         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2874 #endif
2875
2876         lntmsg[0] = NULL;
2877         lntmsg[1] = NULL;
2878
2879         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2880                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2881                                    libcfs_nid2str(rx->mxc_nid),
2882                                    mx_strstatus(rx->mxc_status.code),
2883                                    (int) rx->mxc_status.code);
2884                 credit = 0;
2885                 goto cleanup;
2886         }
2887
2888         if (nob == 0) {
2889                 /* this may be a failed GET reply */
2890                 if (type == MXLND_MSG_GET_DATA) {
2891                         bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2892                         ret = (u32) (bits>>52);
2893                         lntmsg[0] = rx->mxc_lntmsg[0];
2894                         result = -ret;
2895                         goto cleanup;
2896                 } else {
2897                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2898                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2899                                            libcfs_nid2str(rx->mxc_nid));
2900                         goto cleanup;
2901                 }
2902         }
2903
2904         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2905         if (type == MXLND_MSG_PUT_DATA) {
2906                 result = rx->mxc_status.code;
2907                 lntmsg[0] = rx->mxc_lntmsg[0];
2908                 goto cleanup;
2909         } else if (type == MXLND_MSG_GET_DATA) {
2910                 result = rx->mxc_status.code;
2911                 lntmsg[0] = rx->mxc_lntmsg[0];
2912                 lntmsg[1] = rx->mxc_lntmsg[1];
2913                 goto cleanup;
2914         }
2915
2916         ret = mxlnd_unpack_msg(msg, nob);
2917         if (ret != 0) {
2918                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2919                                    ret, libcfs_nid2str(rx->mxc_nid));
2920                 goto cleanup;
2921         }
2922         rx->mxc_nob = nob;
2923         type = msg->mxm_type;
2924         seq = msg->mxm_seq;
2925
2926         if (type != MXLND_MSG_CONN_REQ &&
2927             (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
2928              !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
2929                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2930                        "0x%llx and rx msg dst is 0x%llx)\n",
2931                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2932                        msg->mxm_dstnid);
2933                 goto cleanup;
2934         }
2935
2936         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2937                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2938                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2939                         if (conn != NULL) {
2940                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2941                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2942                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2943                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2944                                        msg->mxm_srcstamp, conn->mxk_incarnation,
2945                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2946                         } else {
2947                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2948                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2949                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2950                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2951                         }
2952                         credit = 0;
2953                         goto cleanup;
2954                 }
2955         }
2956
2957         CDEBUG(D_NET, "Received %s with %d credits\n",
2958                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
2959
2960         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2961             msg->mxm_type != MXLND_MSG_CONN_ACK) {
2962                 LASSERT(peer != NULL);
2963                 LASSERT(conn != NULL);
2964                 if (msg->mxm_credits != 0) {
2965                         spin_lock(&conn->mxk_lock);
2966                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2967                                 if ((conn->mxk_credits + msg->mxm_credits) > 
2968                                      *kmxlnd_tunables.kmx_credits) {
2969                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
2970                                                conn->mxk_credits, msg->mxm_credits);
2971                                 }
2972                                 conn->mxk_credits += msg->mxm_credits;
2973                                 LASSERT(conn->mxk_credits >= 0);
2974                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2975                         }
2976                         spin_unlock(&conn->mxk_lock);
2977                 }
2978         }
2979
2980         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2981         switch (type) {
2982         case MXLND_MSG_NOOP:
2983                 break;
2984
2985         case MXLND_MSG_EAGER:
2986                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2987                                         msg->mxm_srcnid, rx, 0);
2988                 repost = ret < 0;
2989                 break;
2990
2991         case MXLND_MSG_PUT_REQ:
2992                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2993                                         msg->mxm_srcnid, rx, 1);
2994                 repost = ret < 0;
2995                 break;
2996
2997         case MXLND_MSG_PUT_ACK: {
2998                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2999                 if (cookie > MXLND_MAX_COOKIE) {
3000                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
3001                                            libcfs_nid2str(rx->mxc_nid));
3002                         result = -((cookie >> 52) & 0xff);
3003                         lntmsg[0] = rx->mxc_lntmsg[0];
3004                 } else {
3005                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
3006                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
3007                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
3008                 }
3009                 /* repost == 1 */
3010                 break;
3011         }
3012         case MXLND_MSG_GET_REQ:
3013                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
3014                                         msg->mxm_srcnid, rx, 1);
3015                 repost = ret < 0;
3016                 break;
3017
3018         case MXLND_MSG_CONN_REQ:
3019                 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3020                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3021                                         libcfs_nid2str(msg->mxm_srcnid),
3022                                         libcfs_nid2str(msg->mxm_dstnid));
3023                         goto cleanup;
3024                 }
3025                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3026                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3027                                     "%d (%d wanted)\n",
3028                                         libcfs_nid2str(msg->mxm_srcnid),
3029                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3030                                         *kmxlnd_tunables.kmx_credits);
3031                         incompatible = 1;
3032                 }
3033                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3034                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3035                                     "%d (%d wanted)\n",
3036                                         libcfs_nid2str(msg->mxm_srcnid),
3037                                         msg->mxm_u.conn_req.mxcrm_eager_size,
3038                                         (int) MXLND_EAGER_SIZE);
3039                         incompatible = 1;
3040                 }
3041                 if (peer == NULL) {
3042                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3043                         if (peer == NULL) {
3044                                 int             hash    = 0;
3045                                 struct kmx_peer *existing_peer    = NULL;
3046                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3047
3048                                 mx_decompose_endpoint_addr(rx->mxc_status.source,
3049                                                            &nic_id, &ep_id);
3050                                 rx->mxc_nid = msg->mxm_srcnid;
3051
3052                                 /* adds conn ref for peer and one for this function */
3053                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
3054                                 if (ret != 0) {
3055                                         goto cleanup;
3056                                 }
3057                                 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
3058                                 write_lock(&kmxlnd_data.kmx_peers_lock);
3059                                 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3060                                 if (existing_peer) {
3061                                         mxlnd_conn_decref(peer->mxp_conn);
3062                                         mxlnd_peer_decref(peer);
3063                                         peer = existing_peer;
3064                                         mxlnd_conn_addref(peer->mxp_conn);
3065                                 } else {
3066                                         list_add_tail(&peer->mxp_peers,
3067                                                       &kmxlnd_data.kmx_peers[hash]);
3068                                         write_unlock(&kmxlnd_data.kmx_peers_lock);
3069                                         atomic_inc(&kmxlnd_data.kmx_npeers);
3070                                 }
3071                         } else {
3072                                 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3073                                 mxlnd_peer_decref(peer); /* drop ref taken above */
3074                                 if (ret != 0) {
3075                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3076                                         goto cleanup;
3077                                 }
3078                         }
3079                         conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3080                         conn = peer->mxp_conn;
3081                 } else {
3082                         struct kmx_conn *old_conn       = conn;
3083
3084                         /* do not call mx_disconnect() */
3085                         mxlnd_conn_disconnect(old_conn, 0, 0);
3086
3087                         /* the ref for this rx was taken on the old_conn */
3088                         mxlnd_conn_decref(old_conn);
3089
3090                         /* This allocs a conn, points peer->mxp_conn to this one.
3091                          * The old conn is still on the peer->mxp_conns list.
3092                          * As the pending requests complete, they will call
3093                          * conn_decref() which will eventually free it. */
3094                         ret = mxlnd_conn_alloc(&conn, peer);
3095                         if (ret != 0) {
3096                                 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3097                                 goto cleanup;
3098                         }
3099                         /* conn_alloc() adds one ref for the peer and one for this function */
3100                         conn_ref = 1;
3101                 }
3102                 spin_lock(&peer->mxp_lock);
3103                 peer->mxp_incarnation = msg->mxm_srcstamp;
3104                 peer->mxp_incompatible = incompatible;
3105                 spin_unlock(&peer->mxp_lock);
3106                 spin_lock(&conn->mxk_lock);
3107                 conn->mxk_incarnation = msg->mxm_srcstamp;
3108                 conn->mxk_status = MXLND_CONN_WAIT;
3109                 spin_unlock(&conn->mxk_lock);
3110
3111                 /* handle_conn_ack() will create the CONN_ACK msg */
3112                 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
3113
3114                 break;
3115
3116         case MXLND_MSG_CONN_ACK:
3117                 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3118                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3119                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3120                                 libcfs_nid2str(msg->mxm_dstnid));
3121                         ret = -1;
3122                         goto failed;
3123                 }
3124                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3125                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3126                                "incompatible queue depth %d (%d wanted)\n",
3127                                 libcfs_nid2str(msg->mxm_srcnid),
3128                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3129                                 *kmxlnd_tunables.kmx_credits);
3130                         spin_lock(&conn->mxk_lock);
3131                         conn->mxk_status = MXLND_CONN_FAIL;
3132                         spin_unlock(&conn->mxk_lock);
3133                         incompatible = 1;
3134                         ret = -1;
3135                 }
3136                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3137                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3138                                "incompatible EAGER size %d (%d wanted)\n",
3139                                 libcfs_nid2str(msg->mxm_srcnid),
3140                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3141                                 (int) MXLND_EAGER_SIZE);
3142                         spin_lock(&conn->mxk_lock);
3143                         conn->mxk_status = MXLND_CONN_FAIL;
3144                         spin_unlock(&conn->mxk_lock);
3145                         incompatible = 1;
3146                         ret = -1;
3147                 }
3148                 spin_lock(&peer->mxp_lock);
3149                 peer->mxp_incarnation = msg->mxm_srcstamp;
3150                 peer->mxp_incompatible = incompatible;
3151                 spin_unlock(&peer->mxp_lock);
3152                 spin_lock(&conn->mxk_lock);
3153                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3154                 conn->mxk_outstanding = 0;
3155                 conn->mxk_incarnation = msg->mxm_srcstamp;
3156                 conn->mxk_timeout = 0;
3157                 if (!incompatible) {
3158                         conn->mxk_status = MXLND_CONN_READY;
3159                 }
3160                 spin_unlock(&conn->mxk_lock);
3161                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3162                 break;
3163
3164         default:
3165                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3166                                 libcfs_nid2str(rx->mxc_nid));
3167                 ret = -EPROTO;
3168                 break;
3169         }
3170
3171 failed:
3172         if (ret < 0) {
3173                 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3174                 spin_lock(&conn->mxk_lock);
3175                 conn->mxk_status = MXLND_CONN_FAIL;
3176                 spin_unlock(&conn->mxk_lock);
3177         }
3178
3179 cleanup:
3180         if (conn != NULL) {
3181                 spin_lock(&conn->mxk_lock);
3182                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3183                 spin_unlock(&conn->mxk_lock);
3184         }
3185
3186         if (repost) {
3187                 /* lnet_parse() failed, etc., repost now */
3188                 mxlnd_put_idle_rx(rx);
3189                 if (conn != NULL && credit == 1) {
3190                         if (type == MXLND_MSG_PUT_DATA) {
3191                                 spin_lock(&conn->mxk_lock);
3192                                 conn->mxk_outstanding++;
3193                                 spin_unlock(&conn->mxk_lock);
3194                         } else if (type != MXLND_MSG_GET_DATA &&
3195                                   (type == MXLND_MSG_EAGER ||
3196                                    type == MXLND_MSG_PUT_REQ ||
3197                                    type == MXLND_MSG_NOOP)) {
3198                                 spin_lock(&conn->mxk_lock);
3199                                 conn->mxk_outstanding++;
3200                                 spin_unlock(&conn->mxk_lock);
3201                         }
3202                 }
3203                 if (conn_ref) mxlnd_conn_decref(conn);
3204                 LASSERT(peer_ref == 0);
3205         }
3206
3207         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3208                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3209         } else {
3210                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3211         }
3212
3213         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3214         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3215
3216         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3217
3218         return;
3219 }
3220
3221
3222
3223 void
3224 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3225 {
3226         struct kmx_ctx  *tx     = NULL;
3227         struct kmx_msg  *txmsg   = NULL;
3228         struct kmx_conn *conn   = peer->mxp_conn;
3229
3230         /* a conn ref was taken when calling mx_iconnect(), 
3231          * hold it until CONN_REQ or CONN_ACK completes */
3232
3233         CDEBUG(D_NET, "entering\n");
3234         if (status.code != MX_STATUS_SUCCESS) {
3235                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3236                         mx_strstatus(status.code), status.code,
3237                         libcfs_nid2str(peer->mxp_nid));
3238                 spin_lock(&conn->mxk_lock);
3239                 conn->mxk_status = MXLND_CONN_FAIL;
3240                 spin_unlock(&conn->mxk_lock);
3241
3242                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3243                         struct kmx_conn *new_conn       = NULL;
3244                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3245                         mxlnd_conn_disconnect(conn, 0, 1);
3246                         mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3247                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3248                         spin_lock(&peer->mxp_lock);
3249                         peer->mxp_reconnect_time = 0;
3250                         spin_unlock(&peer->mxp_lock);
3251                 }
3252
3253                 mxlnd_conn_decref(conn);
3254                 return;
3255         }
3256
3257         spin_lock(&conn->mxk_lock);
3258         conn->mxk_epa = status.source;
3259         spin_unlock(&conn->mxk_lock);
3260         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3261          *      we should not need to lock the peer */
3262         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3263
3264         /* mx_iconnect() succeeded, reset delay to 0 */
3265         spin_lock(&peer->mxp_lock);
3266         peer->mxp_reconnect_time = 0;
3267         spin_unlock(&peer->mxp_lock);
3268
3269         /* marshal CONN_REQ msg */
3270         /* we are still using the conn ref from iconnect() - do not take another */
3271         tx = mxlnd_get_idle_tx();
3272         if (tx == NULL) {
3273                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3274                                    libcfs_nid2str(peer->mxp_nid));
3275                 spin_lock(&conn->mxk_lock);
3276                 conn->mxk_status = MXLND_CONN_FAIL;
3277                 spin_unlock(&conn->mxk_lock);
3278                 mxlnd_conn_decref(conn);
3279                 return;
3280         }
3281
3282         tx->mxc_peer = peer;
3283         tx->mxc_conn = conn;
3284         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3285         txmsg = tx->mxc_msg;
3286         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3287         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3288         tx->mxc_match = mxlnd_create_match(tx, 0);
3289
3290         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3291         mxlnd_queue_tx(tx);
3292         return;
3293 }
3294
3295 void
3296 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3297 {
3298         struct kmx_ctx  *tx     = NULL;
3299         struct kmx_msg  *txmsg   = NULL;
3300         struct kmx_conn *conn   = peer->mxp_conn;
3301
3302         /* a conn ref was taken when calling mx_iconnect(), 
3303          * hold it until CONN_REQ or CONN_ACK completes */
3304
3305         CDEBUG(D_NET, "entering\n");
3306         if (status.code != MX_STATUS_SUCCESS) {
3307                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3308                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3309                         mx_strstatus(status.code), status.code,
3310                         libcfs_nid2str(peer->mxp_nid),
3311                         peer->mxp_nid,
3312                         peer->mxp_nic_id,
3313                         peer->mxp_host->mxh_ep_id);
3314                 spin_lock(&conn->mxk_lock);
3315                 conn->mxk_status = MXLND_CONN_FAIL;
3316                 spin_unlock(&conn->mxk_lock);
3317
3318                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3319                         struct kmx_conn *new_conn       = NULL;
3320                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3321                         mxlnd_conn_disconnect(conn, 0, 1);
3322                         mxlnd_conn_alloc(&new_conn, peer); /* adds ref for 
3323                                                               this function... */
3324                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3325                         spin_lock(&peer->mxp_lock);
3326                         peer->mxp_reconnect_time = 0;
3327                         spin_unlock(&peer->mxp_lock);
3328                 }
3329
3330                 mxlnd_conn_decref(conn);
3331                 return;
3332         }
3333         spin_lock(&conn->mxk_lock);
3334         conn->mxk_epa = status.source;
3335         if (likely(!peer->mxp_incompatible)) {
3336                 conn->mxk_status = MXLND_CONN_READY;
3337         }
3338         spin_unlock(&conn->mxk_lock);
3339         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3340          *      we should not have to lock the peer */
3341         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3342
3343         /* mx_iconnect() succeeded, reset delay to 0 */
3344         spin_lock(&peer->mxp_lock);
3345         peer->mxp_reconnect_time = 0;
3346         spin_unlock(&peer->mxp_lock);
3347
3348         /* marshal CONN_ACK msg */
3349         tx = mxlnd_get_idle_tx();
3350         if (tx == NULL) {
3351                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3352                                    libcfs_nid2str(peer->mxp_nid));
3353                 spin_lock(&conn->mxk_lock);
3354                 conn->mxk_status = MXLND_CONN_FAIL;
3355                 spin_unlock(&conn->mxk_lock);
3356                 mxlnd_conn_decref(conn);
3357                 return;
3358         }
3359
3360         tx->mxc_peer = peer;
3361         tx->mxc_conn = conn;
3362         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3363         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3364         txmsg = tx->mxc_msg;
3365         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3366         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3367         tx->mxc_match = mxlnd_create_match(tx, 0);
3368
3369         mxlnd_queue_tx(tx);
3370         return;
3371 }
3372
3373 /**
3374  * mxlnd_request_waitd - the MX request completion thread(s)
3375  * @arg - thread id (as a void *)
3376  *
3377  * This thread waits for a MX completion and then completes the request.
3378  * We will create one thread per CPU.
3379  */
3380 int
3381 mxlnd_request_waitd(void *arg)
3382 {
3383         long                    id              = (long) arg;
3384         char                    name[24];
3385         __u32                   result          = 0;
3386         mx_return_t             mxret           = MX_SUCCESS;
3387         mx_status_t             status;
3388         struct kmx_ctx         *ctx             = NULL;
3389         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3390         struct kmx_peer        *peer            = NULL;
3391         struct kmx_conn        *conn            = NULL;
3392 #if MXLND_POLLING
3393         int                     count           = 0;
3394 #endif
3395
3396         memset(name, 0, sizeof(name));
3397         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3398         cfs_daemonize(name);
3399         //cfs_block_allsigs();
3400
3401         memset(&status, 0, sizeof(status));
3402
3403         CDEBUG(D_NET, "%s starting\n", name);
3404
3405         while (!kmxlnd_data.kmx_shutdown) {
3406                 mxret = MX_SUCCESS;
3407                 result = 0;
3408 #if MXLND_POLLING
3409                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3410                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3411                                             &status, &result);
3412                 } else {
3413                         count = 0;
3414                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3415                                             0LL, 0LL, &status, &result);
3416                 }
3417 #else
3418                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3419                                     0LL, 0LL, &status, &result);
3420 #endif
3421                 if (unlikely(kmxlnd_data.kmx_shutdown))
3422                         break;
3423
3424                 if (result != 1) {
3425                         /* nothing completed... */
3426                         continue;
3427                 }
3428
3429                 if (status.code != MX_STATUS_SUCCESS) {
3430                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3431                                "match_info 0x%llx and length %d\n",
3432                                mx_strstatus(status.code), status.code,
3433                                (u64) status.match_info, status.msg_length);
3434                 }
3435
3436                 /* This may be a mx_iconnect() request completing,
3437                  * check the bit mask for CONN_REQ and CONN_ACK */
3438                 if (status.match_info == MXLND_MASK_ICON_REQ ||
3439                     status.match_info == MXLND_MASK_ICON_ACK) {
3440                         peer = (struct kmx_peer*) status.context;
3441                         if (status.match_info == MXLND_MASK_ICON_REQ) {
3442                                 mxlnd_handle_conn_req(peer, status);
3443                         } else {
3444                                 mxlnd_handle_conn_ack(peer, status);
3445                         }
3446                         continue;
3447                 }
3448
3449                 /* This must be a tx or rx */
3450
3451                 /* NOTE: if this is a RX from the unexpected callback, it may
3452                  * have very little info. If we dropped it in unexpected_recv(),
3453                  * it will not have a context. If so, ignore it. */
3454                 ctx = (struct kmx_ctx *) status.context;
3455                 if (ctx != NULL) {
3456
3457                         req_type = ctx->mxc_type;
3458                         conn = ctx->mxc_conn; /* this may be NULL */
3459                         mxlnd_deq_pending_ctx(ctx);
3460
3461                         /* copy status to ctx->mxc_status */
3462                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3463
3464                         switch (req_type) {
3465                         case MXLND_REQ_TX:
3466                                 mxlnd_handle_tx_completion(ctx);
3467                                 break;
3468                         case MXLND_REQ_RX:
3469                                 mxlnd_handle_rx_completion(ctx);
3470                                 break;
3471                         default:
3472                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3473                                 LBUG();
3474                                 break;
3475                         }
3476
3477                         /* FIXME may need to reconsider this */
3478                         /* conn is always set except for the first CONN_REQ rx
3479                          * from a new peer */
3480                         if (!(status.code == MX_STATUS_SUCCESS ||
3481                               status.code == MX_STATUS_TRUNCATED) &&
3482                               conn != NULL) {
3483                                 mxlnd_conn_disconnect(conn, 1, 1);
3484                         }
3485                 }
3486                 CDEBUG(D_NET, "waitd() completed task\n");
3487         }
3488         CDEBUG(D_NET, "%s stopping\n", name);
3489         mxlnd_thread_stop(id);
3490         return 0;
3491 }
3492
3493
3494 unsigned long
3495 mxlnd_check_timeouts(unsigned long now)
3496 {
3497         int                     i               = 0;
3498         int                     disconnect      = 0;
3499         unsigned long           next            = 0;
3500         struct  kmx_peer        *peer           = NULL;
3501         struct  kmx_conn        *conn           = NULL;
3502
3503         read_lock(&kmxlnd_data.kmx_peers_lock);
3504         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3505                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3506
3507                         if (unlikely(kmxlnd_data.kmx_shutdown)) {
3508                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3509                                 return next;
3510                         }
3511
3512                         spin_lock(&peer->mxp_lock);
3513                         conn = peer->mxp_conn;
3514                         if (conn) {
3515                                 mxlnd_conn_addref(conn);
3516                                 spin_unlock(&peer->mxp_lock);
3517                         } else {
3518                                 spin_unlock(&peer->mxp_lock);
3519                                 continue;
3520                         }
3521
3522                         spin_lock(&conn->mxk_lock);
3523
3524                         /* if nothing pending (timeout == 0) or
3525                          * if conn is already disconnected,
3526                          * skip this conn */
3527                         if (conn->mxk_timeout == 0 ||
3528                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3529                                 spin_unlock(&conn->mxk_lock);
3530                                 mxlnd_conn_decref(conn);
3531                                 continue;
3532                         }
3533
3534                         /* we want to find the timeout that will occur first.
3535                          * if it is in the future, we will sleep until then.
3536                          * if it is in the past, then we will sleep one
3537                          * second and repeat the process. */
3538                         if ((next == 0) || (conn->mxk_timeout < next)) {
3539                                 next = conn->mxk_timeout;
3540                         }
3541
3542                         disconnect = 0;
3543
3544                         if (time_after_eq(now, conn->mxk_timeout))  {
3545                                 disconnect = 1;
3546                         }
3547                         spin_unlock(&conn->mxk_lock);
3548
3549                         if (disconnect) {
3550                                 mxlnd_conn_disconnect(conn, 1, 1);
3551                         }
3552                         mxlnd_conn_decref(conn);
3553                 }
3554         }
3555         read_unlock(&kmxlnd_data.kmx_peers_lock);
3556         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3557
3558         return next;
3559 }
3560
3561 /**
3562  * mxlnd_timeoutd - enforces timeouts on messages
3563  * @arg - thread id (as a void *)
3564  *
3565  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3566  * it calls mxlnd_conn_disconnect().
3567  *
3568  * After checking for timeouts, try progressing sends (call check_sends()).
3569  */
3570 int
3571 mxlnd_timeoutd(void *arg)
3572 {
3573         int                     i       = 0;
3574         long                    id      = (long) arg;
3575         unsigned long           now     = 0;
3576         unsigned long           next    = 0;
3577         unsigned long           delay   = HZ;
3578         struct kmx_peer        *peer    = NULL;
3579         struct kmx_conn        *conn    = NULL;
3580
3581         cfs_daemonize("mxlnd_timeoutd");
3582         //cfs_block_allsigs();
3583
3584         CDEBUG(D_NET, "timeoutd starting\n");
3585
3586         while (!kmxlnd_data.kmx_shutdown) {
3587
3588                 now = jiffies;
3589                 /* if the next timeout has not arrived, go back to sleep */
3590                 if (time_after(now, next)) {
3591                         next = mxlnd_check_timeouts(now);
3592                 }
3593
3594                read_lock(&kmxlnd_data.kmx_peers_lock);
3595                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3596                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3597                                 spin_lock(&peer->mxp_lock);
3598                                 conn = peer->mxp_conn;
3599                                 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3600                                 spin_unlock(&peer->mxp_lock);
3601
3602                                 if (conn == NULL)
3603                                         continue;
3604
3605                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3606                                     time_after(now, conn->mxk_last_tx + HZ)) {
3607                                         mxlnd_check_sends(peer);
3608                                 }
3609                                 mxlnd_conn_decref(conn); /* until here */
3610                         }
3611                 }
3612                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3613
3614                 mxlnd_sleep(delay);
3615         }
3616         CDEBUG(D_NET, "timeoutd stopping\n");
3617         mxlnd_thread_stop(id);
3618         return 0;
3619 }