Whamcloud - gitweb
- mxlnd updates from upstream.
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2004 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  * Copyright (C) 2006 Myricom, Inc.
7  *   Author: Myricom, Inc. <help at myri.com>
8  *
9  *   This file is part of Lustre, http://www.lustre.org.
10  *
11  *   Lustre is free software; you can redistribute it and/or
12  *   modify it under the terms of version 2 of the GNU General Public
13  *   License as published by the Free Software Foundation.
14  *
15  *   Lustre is distributed in the hope that it will be useful,
16  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *   GNU General Public License for more details.
19  *
20  *   You should have received a copy of the GNU General Public License
21  *   along with Lustre; if not, write to the Free Software
22  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23  */
24
25 #include "mxlnd.h"
26
27 inline void mxlnd_noop(char *s, ...)
28 {
29         return;
30 }
31
32 char *
33 mxlnd_ctxstate_to_str(int mxc_state)
34 {
35         switch (mxc_state) {
36         case MXLND_CTX_INIT:
37                 return "MXLND_CTX_INIT";
38         case MXLND_CTX_IDLE:
39                 return "MXLND_CTX_IDLE";
40         case MXLND_CTX_PREP:
41                 return "MXLND_CTX_PREP";
42         case MXLND_CTX_PENDING:
43                 return "MXLND_CTX_PENDING";
44         case MXLND_CTX_COMPLETED:
45                 return "MXLND_CTX_COMPLETED";
46         case MXLND_CTX_CANCELED:
47                 return "MXLND_CTX_CANCELED";
48         default:
49                 return "*unknown*";
50         }
51 }
52
53 char *
54 mxlnd_connstatus_to_str(int mxk_status)
55 {
56         switch (mxk_status) {
57         case MXLND_CONN_READY:
58                 return "MXLND_CONN_READY";
59         case MXLND_CONN_INIT:
60                 return "MXLND_CONN_INIT";
61         case MXLND_CONN_REQ:
62                 return "MXLND_CONN_REQ";
63         case MXLND_CONN_ACK:
64                 return "MXLND_CONN_ACK";
65         case MXLND_CONN_WAIT:
66                 return "MXLND_CONN_WAIT";
67         case MXLND_CONN_DISCONNECT:
68                 return "MXLND_CONN_DISCONNECT";
69         case MXLND_CONN_FAIL:
70                 return "MXLND_CONN_FAIL";
71         default:
72                 return "unknown";
73         }
74 }
75
76 char *
77 mxlnd_msgtype_to_str(int type) {
78         switch (type) {
79         case MXLND_MSG_EAGER:
80                 return "MXLND_MSG_EAGER";
81         case MXLND_MSG_CONN_REQ:
82                 return "MXLND_MSG_CONN_REQ";
83         case MXLND_MSG_CONN_ACK:
84                 return "MXLND_MSG_CONN_ACK";
85         case MXLND_MSG_NOOP:
86                 return "MXLND_MSG_NOOP";
87         case MXLND_MSG_PUT_REQ:
88                 return "MXLND_MSG_PUT_REQ";
89         case MXLND_MSG_PUT_ACK:
90                 return "MXLND_MSG_PUT_ACK";
91         case MXLND_MSG_PUT_DATA:
92                 return "MXLND_MSG_PUT_DATA";
93         case MXLND_MSG_GET_REQ:
94                 return "MXLND_MSG_GET_REQ";
95         case MXLND_MSG_GET_DATA:
96                 return "MXLND_MSG_GET_DATA";
97         default:
98                 return "unknown";
99         }
100 }
101
102 char *
103 mxlnd_lnetmsg_to_str(int type)
104 {
105         switch (type) {
106         case LNET_MSG_ACK:
107                 return "LNET_MSG_ACK";
108         case LNET_MSG_PUT:
109                 return "LNET_MSG_PUT";
110         case LNET_MSG_GET:
111                 return "LNET_MSG_GET";
112         case LNET_MSG_REPLY:
113                 return "LNET_MSG_REPLY";
114         case LNET_MSG_HELLO:
115                 return "LNET_MSG_HELLO";
116         default:
117                 LBUG();
118                 return "*unknown*";
119         }
120 }
121
122 static inline u64
123 //mxlnd_create_match(u8 msg_type, u8 error, u64 cookie)
124 mxlnd_create_match(struct kmx_ctx *ctx, u8 error)
125 {
126         u64 type        = (u64) ctx->mxc_msg_type;
127         u64 err         = (u64) error;
128         u64 match       = 0LL;
129
130         LASSERT(ctx->mxc_msg_type != 0);
131         LASSERT(ctx->mxc_cookie >> 52 == 0);
132         match = (type << 60) | (err << 52) | ctx->mxc_cookie;
133         return match;
134 }
135
136 static inline void
137 mxlnd_parse_match(u64 match, u8 *msg_type, u8 *error, u64 *cookie)
138 {
139         *msg_type = (u8) (match >> 60);
140         *error    = (u8) ((match >> 52) & 0xFF);
141         *cookie   = match & 0xFFFFFFFFFFFFFLL;
142         LASSERT(match == (MXLND_MASK_ICON_REQ & 0xF000000000000000LL) ||
143                 match == (MXLND_MASK_ICON_ACK & 0xF000000000000000LL) ||
144                 *msg_type == MXLND_MSG_EAGER    ||
145                 *msg_type == MXLND_MSG_CONN_REQ ||
146                 *msg_type == MXLND_MSG_CONN_ACK ||
147                 *msg_type == MXLND_MSG_NOOP     ||
148                 *msg_type == MXLND_MSG_PUT_REQ  ||
149                 *msg_type == MXLND_MSG_PUT_ACK  ||
150                 *msg_type == MXLND_MSG_PUT_DATA ||
151                 *msg_type == MXLND_MSG_GET_REQ  ||
152                 *msg_type == MXLND_MSG_GET_DATA);
153         return;
154 }
155
156 struct kmx_ctx *
157 mxlnd_get_idle_rx(void)
158 {
159         struct list_head        *tmp    = NULL;
160         struct kmx_ctx          *rx     = NULL;
161
162         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
163
164         if (list_empty (&kmxlnd_data.kmx_rx_idle)) {
165                 spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
166                 return NULL;
167         }
168
169         tmp = &kmxlnd_data.kmx_rx_idle;
170         rx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
171         list_del_init(&rx->mxc_list);
172         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
173
174 #if MXLND_DEBUG
175         if (rx->mxc_get != rx->mxc_put) {
176                 CDEBUG(D_NETERROR, "*** RX get (%lld) != put (%lld) ***\n", rx->mxc_get, rx->mxc_put);
177                 CDEBUG(D_NETERROR, "*** incarnation= %lld ***\n", rx->mxc_incarnation);
178                 CDEBUG(D_NETERROR, "*** deadline= %ld ***\n", rx->mxc_deadline);
179                 CDEBUG(D_NETERROR, "*** state= %s ***\n", mxlnd_ctxstate_to_str(rx->mxc_state));
180                 CDEBUG(D_NETERROR, "*** listed?= %d ***\n", !list_empty(&rx->mxc_list));
181                 CDEBUG(D_NETERROR, "*** nid= 0x%llx ***\n", rx->mxc_nid);
182                 CDEBUG(D_NETERROR, "*** peer= 0x%p ***\n", rx->mxc_peer);
183                 CDEBUG(D_NETERROR, "*** msg_type= %s ***\n", mxlnd_msgtype_to_str(rx->mxc_msg_type));
184                 CDEBUG(D_NETERROR, "*** cookie= 0x%llx ***\n", rx->mxc_cookie);
185                 CDEBUG(D_NETERROR, "*** nob= %d ***\n", rx->mxc_nob);
186         }
187 #endif
188         LASSERT (rx->mxc_get == rx->mxc_put);
189
190         rx->mxc_get++;
191
192         LASSERT (rx->mxc_state == MXLND_CTX_IDLE);
193         rx->mxc_state = MXLND_CTX_PREP;
194
195         return rx;
196 }
197
198 int
199 mxlnd_put_idle_rx(struct kmx_ctx *rx)
200 {
201         if (rx == NULL) {
202                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
203                 return -EINVAL;
204         } else if (rx->mxc_type != MXLND_REQ_RX) {
205                 CDEBUG(D_NETERROR, "called with tx\n");
206                 return -EINVAL;
207         }
208         LASSERT(rx->mxc_get == rx->mxc_put + 1);
209         mxlnd_ctx_init(rx);
210         rx->mxc_put++;
211         spin_lock(&kmxlnd_data.kmx_rx_idle_lock);
212         list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
213         spin_unlock(&kmxlnd_data.kmx_rx_idle_lock);
214         return 0;
215 }
216
217 int
218 mxlnd_reduce_idle_rxs(__u32 count)
219 {
220         __u32                   i       = 0;
221         struct kmx_ctx          *rx     = NULL;
222
223         spin_lock(&kmxlnd_data.kmx_rxs_lock);
224         for (i = 0; i < count; i++) {
225                 rx = mxlnd_get_idle_rx();
226                 if (rx != NULL) {
227                         struct list_head *tmp = &rx->mxc_global_list;
228                         list_del_init(tmp);
229                         mxlnd_ctx_free(rx);
230                 } else {
231                         CDEBUG(D_NETERROR, "only reduced %d out of %d rxs\n", i, count);
232                         break;
233                 }
234         }
235         spin_unlock(&kmxlnd_data.kmx_rxs_lock);
236         return 0;
237 }
238
239 struct kmx_ctx *
240 mxlnd_get_idle_tx(void)
241 {
242         struct list_head        *tmp    = NULL;
243         struct kmx_ctx          *tx     = NULL;
244
245         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
246
247         if (list_empty (&kmxlnd_data.kmx_tx_idle)) {
248                 CDEBUG(D_NETERROR, "%d txs in use\n", kmxlnd_data.kmx_tx_used);
249                 spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
250                 return NULL;
251         }
252
253         tmp = &kmxlnd_data.kmx_tx_idle;
254         tx = list_entry (tmp->next, struct kmx_ctx, mxc_list);
255         list_del_init(&tx->mxc_list);
256
257         /* Allocate a new completion cookie.  It might not be needed,
258          * but we've got a lock right now and we're unlikely to
259          * wrap... */
260         tx->mxc_cookie = kmxlnd_data.kmx_tx_next_cookie++;
261         if (kmxlnd_data.kmx_tx_next_cookie > MXLND_MAX_COOKIE) {
262                 kmxlnd_data.kmx_tx_next_cookie = 1;
263         }
264         kmxlnd_data.kmx_tx_used++;
265         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
266
267         LASSERT (tx->mxc_get == tx->mxc_put);
268
269         tx->mxc_get++;
270
271         LASSERT (tx->mxc_state == MXLND_CTX_IDLE);
272         LASSERT (tx->mxc_lntmsg[0] == NULL);
273         LASSERT (tx->mxc_lntmsg[1] == NULL);
274
275         tx->mxc_state = MXLND_CTX_PREP;
276
277         return tx;
278 }
279
280 int
281 mxlnd_put_idle_tx(struct kmx_ctx *tx)
282 {
283         //int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS && tx->mxc_status.code != MX_STATUS_TRUNCATED);
284         int             result  = 0;
285         lnet_msg_t      *lntmsg[2];
286
287         if (tx == NULL) {
288                 CDEBUG(D_NETERROR, "called with NULL pointer\n");
289                 return -EINVAL;
290         } else if (tx->mxc_type != MXLND_REQ_TX) {
291                 CDEBUG(D_NETERROR, "called with rx\n");
292                 return -EINVAL;
293         }
294         if (!(tx->mxc_status.code == MX_STATUS_SUCCESS ||
295               tx->mxc_status.code == MX_STATUS_TRUNCATED))
296                 result = -EIO;
297
298         lntmsg[0] = tx->mxc_lntmsg[0];
299         lntmsg[1] = tx->mxc_lntmsg[1];
300
301         LASSERT(tx->mxc_get == tx->mxc_put + 1);
302         mxlnd_ctx_init(tx);
303         tx->mxc_put++;
304         spin_lock(&kmxlnd_data.kmx_tx_idle_lock);
305         list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
306         kmxlnd_data.kmx_tx_used--;
307         spin_unlock(&kmxlnd_data.kmx_tx_idle_lock);
308         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
309         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
310         return 0;
311 }
312
313 /**
314  * mxlnd_conn_free - free the conn
315  * @conn - a kmx_conn pointer
316  *
317  * The calling function should remove the conn from the conns list first
318  * then destroy it.
319  */
320 void
321 mxlnd_conn_free(struct kmx_conn *conn)
322 {
323         struct kmx_peer *peer   = conn->mxk_peer;
324
325         CDEBUG(D_NET, "freeing conn 0x%p *****\n", conn);
326         LASSERT (list_empty (&conn->mxk_tx_credit_queue) &&
327                  list_empty (&conn->mxk_tx_free_queue) &&
328                  list_empty (&conn->mxk_pending));
329         if (!list_empty(&conn->mxk_list)) {
330                 spin_lock(&peer->mxp_lock);
331                 list_del_init(&conn->mxk_list);
332                 if (peer->mxp_conn == conn) {
333                         peer->mxp_conn = NULL;
334                         if (!(conn->mxk_epa.stuff[0] == 0 && conn->mxk_epa.stuff[1] == 0)) {
335                                 mx_set_endpoint_addr_context(conn->mxk_epa,
336                                                              (void *) NULL);
337                         }
338                 }
339                 spin_unlock(&peer->mxp_lock);
340         }
341         mxlnd_peer_decref(conn->mxk_peer); /* drop conn's ref to peer */
342         MXLND_FREE (conn, sizeof (*conn));
343         return;
344 }
345
346
347 void
348 mxlnd_conn_cancel_pending_rxs(struct kmx_conn *conn)
349 {
350         int                     found   = 0;
351         struct kmx_ctx          *ctx    = NULL;
352         struct kmx_ctx          *next   = NULL;
353         mx_return_t             mxret   = MX_SUCCESS;
354         u32                     result  = 0;
355
356         do {
357                 found = 0;
358                 spin_lock(&conn->mxk_lock);
359                 list_for_each_entry_safe(ctx, next, &conn->mxk_pending, mxc_list) {
360                         /* we will delete all including txs */
361                         list_del_init(&ctx->mxc_list);
362                         if (ctx->mxc_type == MXLND_REQ_RX) {
363                                 found = 1;
364                                 mxret = mx_cancel(kmxlnd_data.kmx_endpt,
365                                                   &ctx->mxc_mxreq,
366                                                   &result);
367                                 if (mxret != MX_SUCCESS) {
368                                         CDEBUG(D_NETERROR, "mx_cancel() returned %s (%d)\n", mx_strerror(mxret), mxret);
369                                 }
370                                 if (result == 1) {
371                                         ctx->mxc_status.code = -ECONNABORTED;
372                                         ctx->mxc_state = MXLND_CTX_CANCELED;
373                                         /* NOTE this calls lnet_finalize() and
374                                          * we cannot hold any locks when calling it.
375                                          * It also calls mxlnd_conn_decref(conn) */
376                                         spin_unlock(&conn->mxk_lock);
377                                         mxlnd_handle_rx_completion(ctx);
378                                         spin_lock(&conn->mxk_lock);
379                                 }
380                                 break;
381                         }
382                 }
383                 spin_unlock(&conn->mxk_lock);
384         }
385         while (found);
386
387         return;
388 }
389
390 /**
391  * mxlnd_conn_disconnect - shutdown a connection
392  * @conn - a kmx_conn pointer
393  *
394  * This function sets the status to DISCONNECT, completes queued
395  * txs with failure, calls mx_disconnect, which will complete
396  * pending txs and matched rxs with failure.
397  */
398 void
399 mxlnd_conn_disconnect(struct kmx_conn *conn, int mx_dis, int notify)
400 {
401         struct list_head        *tmp    = NULL;
402
403         spin_lock(&conn->mxk_lock);
404         if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
405                 spin_unlock(&conn->mxk_lock);
406                 return;
407         }
408         conn->mxk_status = MXLND_CONN_DISCONNECT;
409         conn->mxk_timeout = 0;
410
411         while (!list_empty(&conn->mxk_tx_free_queue) ||
412                !list_empty(&conn->mxk_tx_credit_queue)) {
413
414                 struct kmx_ctx          *tx     = NULL;
415
416                 if (!list_empty(&conn->mxk_tx_free_queue)) {
417                         tmp = &conn->mxk_tx_free_queue;
418                 } else {
419                         tmp = &conn->mxk_tx_credit_queue;
420                 }
421
422                 tx = list_entry(tmp->next, struct kmx_ctx, mxc_list);
423                 list_del_init(&tx->mxc_list);
424                 tx->mxc_status.code = -ECONNABORTED;
425                 spin_unlock(&conn->mxk_lock);
426                 mxlnd_put_idle_tx(tx);
427                 mxlnd_conn_decref(conn); /* for this tx */
428                 spin_lock(&conn->mxk_lock);
429         }
430
431         spin_unlock(&conn->mxk_lock);
432
433         /* cancel pending rxs */
434         mxlnd_conn_cancel_pending_rxs(conn);
435
436         if (kmxlnd_data.kmx_shutdown != 1) {
437
438                 if (mx_dis) mx_disconnect(kmxlnd_data.kmx_endpt, conn->mxk_epa);
439
440                 if (notify) {
441                         time_t          last_alive      = 0;
442                         unsigned long   last_msg        = 0;
443
444                         /* notify LNET that we are giving up on this peer */
445                         if (time_after(conn->mxk_last_rx, conn->mxk_last_tx)) {
446                                 last_msg = conn->mxk_last_rx;
447                         } else {
448                                 last_msg = conn->mxk_last_tx;
449                         }
450                         last_alive = cfs_time_current_sec() -
451                                      cfs_duration_sec(cfs_time_current() - last_msg);
452                         lnet_notify(kmxlnd_data.kmx_ni, conn->mxk_peer->mxp_nid, 0, last_alive);
453                 }
454         }
455         mxlnd_conn_decref(conn); /* drop the owning peer's reference */
456
457         return;
458 }
459
460 /**
461  * mxlnd_conn_alloc - allocate and initialize a new conn struct
462  * @connp - address of a kmx_conn pointer
463  * @peer - owning kmx_peer
464  *
465  * Returns 0 on success and -ENOMEM on failure
466  */
467 int
468 mxlnd_conn_alloc_locked(struct kmx_conn **connp, struct kmx_peer *peer)
469 {
470         struct kmx_conn *conn    = NULL;
471
472         LASSERT(peer != NULL);
473
474         MXLND_ALLOC(conn, sizeof (*conn));
475         if (conn == NULL) {
476                 CDEBUG(D_NETERROR, "Cannot allocate conn\n");
477                 return -ENOMEM;
478         }
479         CDEBUG(D_NET, "allocated conn 0x%p for peer 0x%p\n", conn, peer);
480
481         memset(conn, 0, sizeof(*conn));
482
483         /* conn->mxk_incarnation = 0 - will be set by peer */
484         atomic_set(&conn->mxk_refcount, 2);     /* ref for owning peer 
485                                                    and one for the caller */
486         conn->mxk_peer = peer;
487         /* mxk_epa - to be set after mx_iconnect() */
488         INIT_LIST_HEAD(&conn->mxk_list);
489         spin_lock_init(&conn->mxk_lock);
490         /* conn->mxk_timeout = 0 */
491         conn->mxk_last_tx = jiffies;
492         conn->mxk_last_rx = conn->mxk_last_tx;
493         conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
494         /* mxk_outstanding = 0 */
495         conn->mxk_status = MXLND_CONN_INIT;
496         INIT_LIST_HEAD(&conn->mxk_tx_credit_queue);
497         INIT_LIST_HEAD(&conn->mxk_tx_free_queue);
498         /* conn->mxk_ntx_msgs = 0 */
499         /* conn->mxk_ntx_data = 0 */
500         /* conn->mxk_ntx_posted = 0 */
501         /* conn->mxk_data_posted = 0 */
502         INIT_LIST_HEAD(&conn->mxk_pending);
503
504         *connp = conn;
505
506         mxlnd_peer_addref(peer);        /* add a ref for this conn */
507
508         /* add to front of peer's conns list */
509         list_add(&conn->mxk_list, &peer->mxp_conns);
510         peer->mxp_conn = conn;
511         return 0;
512 }
513
514 int
515 mxlnd_conn_alloc(struct kmx_conn **connp, struct kmx_peer *peer)
516 {
517         int ret = 0;
518         spin_lock(&peer->mxp_lock);
519         ret = mxlnd_conn_alloc_locked(connp, peer);
520         spin_unlock(&peer->mxp_lock);
521         return ret;
522 }
523
524 int
525 mxlnd_q_pending_ctx(struct kmx_ctx *ctx)
526 {
527         int             ret     = 0;
528         struct kmx_conn *conn   = ctx->mxc_conn;
529
530         ctx->mxc_state = MXLND_CTX_PENDING;
531         if (conn != NULL) {
532                 spin_lock(&conn->mxk_lock);
533                 if (conn->mxk_status >= MXLND_CONN_INIT) {
534                         list_add_tail(&ctx->mxc_list, &conn->mxk_pending);
535                         if (conn->mxk_timeout == 0 || ctx->mxc_deadline < conn->mxk_timeout) {
536                                 conn->mxk_timeout = ctx->mxc_deadline;
537                         }
538                 } else {
539                         ctx->mxc_state = MXLND_CTX_COMPLETED;
540                         ret = -1;
541                 }
542                 spin_unlock(&conn->mxk_lock);
543         }
544         return ret;
545 }
546
547 int
548 mxlnd_deq_pending_ctx(struct kmx_ctx *ctx)
549 {
550         LASSERT(ctx->mxc_state == MXLND_CTX_PENDING ||
551                 ctx->mxc_state == MXLND_CTX_COMPLETED);
552         if (ctx->mxc_state != MXLND_CTX_PENDING &&
553             ctx->mxc_state != MXLND_CTX_COMPLETED) {
554                 CDEBUG(D_NETERROR, "deq ctx->mxc_state = %s\n", 
555                        mxlnd_ctxstate_to_str(ctx->mxc_state));
556         }
557         ctx->mxc_state = MXLND_CTX_COMPLETED;
558         if (!list_empty(&ctx->mxc_list)) {
559                 struct kmx_conn *conn = ctx->mxc_conn;
560                 struct kmx_ctx *next = NULL;
561                 LASSERT(conn != NULL);
562                 spin_lock(&conn->mxk_lock);
563                 list_del_init(&ctx->mxc_list);
564                 conn->mxk_timeout = 0;
565                 if (!list_empty(&conn->mxk_pending)) {
566                         next = list_entry(conn->mxk_pending.next, struct kmx_ctx, mxc_list);
567                         conn->mxk_timeout = next->mxc_deadline;
568                 }
569                 spin_unlock(&conn->mxk_lock);
570         }
571         return 0;
572 }
573
574 /**
575  * mxlnd_peer_free - free the peer
576  * @peer - a kmx_peer pointer
577  *
578  * The calling function should decrement the rxs, drain the tx queues and
579  * remove the peer from the peers list first then destroy it.
580  */
581 void
582 mxlnd_peer_free(struct kmx_peer *peer)
583 {
584         CDEBUG(D_NET, "freeing peer 0x%p\n", peer);
585
586         LASSERT (atomic_read(&peer->mxp_refcount) == 0);
587
588         if (peer->mxp_host != NULL) {
589                 spin_lock(&peer->mxp_host->mxh_lock);
590                 peer->mxp_host->mxh_peer = NULL;
591                 spin_unlock(&peer->mxp_host->mxh_lock);
592         }
593         if (!list_empty(&peer->mxp_peers)) {
594                 /* assume we are locked */
595                 list_del_init(&peer->mxp_peers);
596         }
597
598         MXLND_FREE (peer, sizeof (*peer));
599         atomic_dec(&kmxlnd_data.kmx_npeers);
600         return;
601 }
602
603 void
604 mxlnd_peer_hostname_to_nic_id(struct kmx_peer *peer)
605 {
606         u64             nic_id  = 0LL;
607         char            name[MX_MAX_HOSTNAME_LEN + 1];
608         mx_return_t     mxret   = MX_SUCCESS;
609
610         memset(name, 0, sizeof(name));
611         snprintf(name, sizeof(name), "%s:%d", peer->mxp_host->mxh_hostname, peer->mxp_host->mxh_board);
612         mxret = mx_hostname_to_nic_id(name, &nic_id);
613         if (mxret == MX_SUCCESS) {
614                 peer->mxp_nic_id = nic_id;
615         } else {
616                 CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
617                                    "with %s\n", name, mx_strerror(mxret));
618                 mxret = mx_hostname_to_nic_id(peer->mxp_host->mxh_hostname, &nic_id);
619                 if (mxret == MX_SUCCESS) {
620                         peer->mxp_nic_id = nic_id;
621                 } else {
622                         CDEBUG(D_NETERROR, "mx_hostname_to_nic_id() failed for %s "
623                                            "with %s\n", peer->mxp_host->mxh_hostname,
624                                            mx_strerror(mxret));
625                 }
626         }
627         return;
628 }
629
630 /**
631  * mxlnd_peer_alloc - allocate and initialize a new peer struct
632  * @peerp - address of a kmx_peer pointer
633  * @nid - LNET node id
634  *
635  * Returns 0 on success and -ENOMEM on failure
636  */
637 int
638 mxlnd_peer_alloc(struct kmx_peer **peerp, lnet_nid_t nid)
639 {
640         int                     i       = 0;
641         int                     ret     = 0;
642         u32                     addr    = LNET_NIDADDR(nid);
643         struct kmx_peer        *peer    = NULL;
644         struct kmx_host        *host    = NULL;
645
646         LASSERT (nid != LNET_NID_ANY && nid != 0LL);
647
648         MXLND_ALLOC(peer, sizeof (*peer));
649         if (peer == NULL) {
650                 CDEBUG(D_NETERROR, "Cannot allocate peer for NID 0x%llx\n", nid);
651                 return -ENOMEM;
652         }
653         CDEBUG(D_NET, "allocated peer 0x%p for NID 0x%llx\n", peer, nid);
654
655         memset(peer, 0, sizeof(*peer));
656
657         list_for_each_entry(host, &kmxlnd_data.kmx_hosts, mxh_list) {
658                 if (addr == host->mxh_addr) {
659                         peer->mxp_host = host;
660                         spin_lock(&host->mxh_lock);
661                         host->mxh_peer = peer;
662                         spin_unlock(&host->mxh_lock);
663                         break;
664                 }
665         }
666         if (peer->mxp_host == NULL) {
667                 CDEBUG(D_NETERROR, "unknown host for NID 0x%llx\n", nid);
668                 MXLND_FREE(peer, sizeof(*peer));
669                 return -ENXIO;
670         }
671
672         peer->mxp_nid = nid;
673         /* peer->mxp_incarnation */
674         atomic_set(&peer->mxp_refcount, 1);     /* ref for kmx_peers list */
675         mxlnd_peer_hostname_to_nic_id(peer);
676
677         INIT_LIST_HEAD(&peer->mxp_peers);
678         spin_lock_init(&peer->mxp_lock);
679         INIT_LIST_HEAD(&peer->mxp_conns);
680         ret = mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds 2nd conn ref here... */
681         if (ret != 0) {
682                 mxlnd_peer_decref(peer);
683                 return ret;
684         }
685
686         for (i = 0; i < *kmxlnd_tunables.kmx_credits - 1; i++) {
687                 struct kmx_ctx   *rx     = NULL;
688                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
689                 if (ret != 0) {
690                         mxlnd_reduce_idle_rxs(i);
691                         mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref... */
692                         mxlnd_conn_decref(peer->mxp_conn); /* drop this function's ref */
693                         mxlnd_peer_decref(peer);
694                         return ret;
695                 }
696                 spin_lock(&kmxlnd_data.kmx_rxs_lock);
697                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
698                 spin_unlock(&kmxlnd_data.kmx_rxs_lock);
699                 rx->mxc_put = -1;
700                 mxlnd_put_idle_rx(rx);
701         }
702         /* peer->mxp_reconnect_time = 0 */
703         /* peer->mxp_incompatible = 0 */
704
705         *peerp = peer;
706         return 0;
707 }
708
709 /**
710  * mxlnd_nid_to_hash - hash the nid
711  * @nid - msg pointer
712  *
713  * Takes the u64 nid and XORs the lowest N bits by the next lowest N bits.
714  */
715 static inline int
716 mxlnd_nid_to_hash(lnet_nid_t nid)
717 {
718         return (nid & MXLND_HASH_MASK) ^
719                ((nid & (MXLND_HASH_MASK << MXLND_HASH_BITS)) >> MXLND_HASH_BITS);
720 }
721
722 static inline struct kmx_peer *
723 mxlnd_find_peer_by_nid_locked(lnet_nid_t nid)
724 {
725         int                     found   = 0;
726         int                     hash    = 0;
727         struct kmx_peer         *peer   = NULL;
728
729         hash = mxlnd_nid_to_hash(nid);
730
731         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
732                 if (peer->mxp_nid == nid) {
733                         found = 1;
734                         mxlnd_peer_addref(peer);
735                         break;
736                 }
737         }
738         return (found ? peer : NULL);
739 }
740
741 static inline struct kmx_peer *
742 mxlnd_find_peer_by_nid(lnet_nid_t nid)
743 {
744         struct kmx_peer *peer   = NULL;
745
746         read_lock(&kmxlnd_data.kmx_peers_lock);
747         peer = mxlnd_find_peer_by_nid_locked(nid);
748         read_unlock(&kmxlnd_data.kmx_peers_lock);
749         return peer;
750 }
751
752 static inline int
753 mxlnd_tx_requires_credit(struct kmx_ctx *tx)
754 {
755         return (tx->mxc_msg_type == MXLND_MSG_EAGER ||
756                 tx->mxc_msg_type == MXLND_MSG_GET_REQ ||
757                 tx->mxc_msg_type == MXLND_MSG_PUT_REQ ||
758                 tx->mxc_msg_type == MXLND_MSG_NOOP);
759 }
760
761 /**
762  * mxlnd_init_msg - set type and number of bytes
763  * @msg - msg pointer
764  * @type - of message
765  * @body_nob - bytes in msg body
766  */
767 static inline void
768 mxlnd_init_msg(kmx_msg_t *msg, u8 type, int body_nob)
769 {
770         msg->mxm_type = type;
771         msg->mxm_nob  = offsetof(kmx_msg_t, mxm_u) + body_nob;
772 }
773
774 static inline void
775 mxlnd_init_tx_msg (struct kmx_ctx *tx, u8 type, int body_nob, lnet_nid_t nid)
776 {
777         int             nob     = offsetof (kmx_msg_t, mxm_u) + body_nob;
778         struct kmx_msg  *msg    = NULL;
779
780         LASSERT (tx != NULL);
781         LASSERT (nob <= MXLND_EAGER_SIZE);
782
783         tx->mxc_nid = nid;
784         /* tx->mxc_peer should have already been set if we know it */
785         tx->mxc_msg_type = type;
786         tx->mxc_nseg = 1;
787         /* tx->mxc_seg.segment_ptr is already pointing to mxc_page */
788         tx->mxc_seg.segment_length = nob;
789         tx->mxc_pin_type = MX_PIN_PHYSICAL;
790         //tx->mxc_state = MXLND_CTX_PENDING;
791
792         msg = tx->mxc_msg;
793         msg->mxm_type = type;
794         msg->mxm_nob  = nob;
795
796         return;
797 }
798
799 static inline __u32
800 mxlnd_cksum (void *ptr, int nob)
801 {
802         char  *c  = ptr;
803         __u32  sum = 0;
804
805         while (nob-- > 0)
806                 sum = ((sum << 1) | (sum >> 31)) + *c++;
807
808         /* ensure I don't return 0 (== no checksum) */
809         return (sum == 0) ? 1 : sum;
810 }
811
812 /**
813  * mxlnd_pack_msg - complete msg info
814  * @tx - msg to send
815  */
816 static inline void
817 mxlnd_pack_msg(struct kmx_ctx *tx)
818 {
819         struct kmx_msg  *msg    = tx->mxc_msg;
820
821         /* type and nob should already be set in init_msg() */
822         msg->mxm_magic    = MXLND_MSG_MAGIC;
823         msg->mxm_version  = MXLND_MSG_VERSION;
824         /*   mxm_type */
825         /* don't use mxlnd_tx_requires_credit() since we want PUT_ACK to
826          * return credits as well */
827         if (tx->mxc_msg_type != MXLND_MSG_CONN_REQ &&
828             tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
829                 spin_lock(&tx->mxc_conn->mxk_lock);
830                 msg->mxm_credits  = tx->mxc_conn->mxk_outstanding;
831                 tx->mxc_conn->mxk_outstanding = 0;
832                 spin_unlock(&tx->mxc_conn->mxk_lock);
833         } else {
834                 msg->mxm_credits  = 0;
835         }
836         /*   mxm_nob */
837         msg->mxm_cksum    = 0;
838         msg->mxm_srcnid   = lnet_ptlcompat_srcnid(kmxlnd_data.kmx_ni->ni_nid, tx->mxc_nid);
839         msg->mxm_srcstamp = kmxlnd_data.kmx_incarnation;
840         msg->mxm_dstnid   = tx->mxc_nid;
841         /* if it is a new peer, the dststamp will be 0 */
842         msg->mxm_dststamp = tx->mxc_conn->mxk_incarnation;
843         msg->mxm_seq      = tx->mxc_cookie;
844
845         if (*kmxlnd_tunables.kmx_cksum) {
846                 msg->mxm_cksum = mxlnd_cksum(msg, msg->mxm_nob);
847         }
848 }
849
850 int
851 mxlnd_unpack_msg(kmx_msg_t *msg, int nob)
852 {
853         const int hdr_size      = offsetof(kmx_msg_t, mxm_u);
854         __u32     msg_cksum     = 0;
855         int       flip          = 0;
856         int       msg_nob       = 0;
857
858         /* 6 bytes are enough to have received magic + version */
859         if (nob < 6) {
860                 CDEBUG(D_NETERROR, "not enough bytes for magic + hdr: %d\n", nob);
861                 return -EPROTO;
862         }
863
864         if (msg->mxm_magic == MXLND_MSG_MAGIC) {
865                 flip = 0;
866         } else if (msg->mxm_magic == __swab32(MXLND_MSG_MAGIC)) {
867                 flip = 1;
868         } else {
869                 CDEBUG(D_NETERROR, "Bad magic: %08x\n", msg->mxm_magic);
870                 return -EPROTO;
871         }
872
873         if (msg->mxm_version !=
874             (flip ? __swab16(MXLND_MSG_VERSION) : MXLND_MSG_VERSION)) {
875                 CDEBUG(D_NETERROR, "Bad version: %d\n", msg->mxm_version);
876                 return -EPROTO;
877         }
878
879         if (nob < hdr_size) {
880                 CDEBUG(D_NETERROR, "not enough for a header: %d\n", nob);
881                 return -EPROTO;
882         }
883
884         msg_nob = flip ? __swab32(msg->mxm_nob) : msg->mxm_nob;
885         if (msg_nob > nob) {
886                 CDEBUG(D_NETERROR, "Short message: got %d, wanted %d\n", nob, msg_nob);
887                 return -EPROTO;
888         }
889
890         /* checksum must be computed with mxm_cksum zero and BEFORE anything
891          * gets flipped */
892         msg_cksum = flip ? __swab32(msg->mxm_cksum) : msg->mxm_cksum;
893         msg->mxm_cksum = 0;
894         if (msg_cksum != 0 && msg_cksum != mxlnd_cksum(msg, msg_nob)) {
895                 CDEBUG(D_NETERROR, "Bad checksum\n");
896                 return -EPROTO;
897         }
898         msg->mxm_cksum = msg_cksum;
899
900         if (flip) {
901                 /* leave magic unflipped as a clue to peer endianness */
902                 __swab16s(&msg->mxm_version);
903                 CLASSERT (sizeof(msg->mxm_type) == 1);
904                 CLASSERT (sizeof(msg->mxm_credits) == 1);
905                 msg->mxm_nob = msg_nob;
906                 __swab64s(&msg->mxm_srcnid);
907                 __swab64s(&msg->mxm_srcstamp);
908                 __swab64s(&msg->mxm_dstnid);
909                 __swab64s(&msg->mxm_dststamp);
910                 __swab64s(&msg->mxm_seq);
911         }
912
913         if (msg->mxm_srcnid == LNET_NID_ANY) {
914                 CDEBUG(D_NETERROR, "Bad src nid: %s\n", libcfs_nid2str(msg->mxm_srcnid));
915                 return -EPROTO;
916         }
917
918         switch (msg->mxm_type) {
919         default:
920                 CDEBUG(D_NETERROR, "Unknown message type %x\n", msg->mxm_type);
921                 return -EPROTO;
922
923         case MXLND_MSG_NOOP:
924                 break;
925
926         case MXLND_MSG_EAGER:
927                 if (msg_nob < offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0])) {
928                         CDEBUG(D_NETERROR, "Short EAGER: %d(%d)\n", msg_nob,
929                                (int)offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[0]));
930                         return -EPROTO;
931                 }
932                 break;
933
934         case MXLND_MSG_PUT_REQ:
935                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_req)) {
936                         CDEBUG(D_NETERROR, "Short PUT_REQ: %d(%d)\n", msg_nob,
937                                (int)(hdr_size + sizeof(msg->mxm_u.put_req)));
938                         return -EPROTO;
939                 }
940                 if (flip)
941                         __swab64s(&msg->mxm_u.put_req.mxprm_cookie);
942                 break;
943
944         case MXLND_MSG_PUT_ACK:
945                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.put_ack)) {
946                         CDEBUG(D_NETERROR, "Short PUT_ACK: %d(%d)\n", msg_nob,
947                                (int)(hdr_size + sizeof(msg->mxm_u.put_ack)));
948                         return -EPROTO;
949                 }
950                 if (flip) {
951                         __swab64s(&msg->mxm_u.put_ack.mxpam_src_cookie);
952                         __swab64s(&msg->mxm_u.put_ack.mxpam_dst_cookie);
953                 }
954                 break;
955
956         case MXLND_MSG_GET_REQ:
957                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.get_req)) {
958                         CDEBUG(D_NETERROR, "Short GET_REQ: %d(%d)\n", msg_nob,
959                                (int)(hdr_size + sizeof(msg->mxm_u.get_req)));
960                         return -EPROTO;
961                 }
962                 if (flip) {
963                         __swab64s(&msg->mxm_u.get_req.mxgrm_cookie);
964                 }
965                 break;
966
967         case MXLND_MSG_CONN_REQ:
968         case MXLND_MSG_CONN_ACK:
969                 if (msg_nob < hdr_size + sizeof(msg->mxm_u.conn_req)) {
970                         CDEBUG(D_NETERROR, "Short connreq/ack: %d(%d)\n", msg_nob,
971                                (int)(hdr_size + sizeof(msg->mxm_u.conn_req)));
972                         return -EPROTO;
973                 }
974                 if (flip) {
975                         __swab32s(&msg->mxm_u.conn_req.mxcrm_queue_depth);
976                         __swab32s(&msg->mxm_u.conn_req.mxcrm_eager_size);
977                 }
978                 break;
979         }
980         return 0;
981 }
982
983 /**
984  * mxlnd_recv_msg
985  * @lntmsg - the LNET msg that this is continuing. If EAGER, then NULL.
986  * @rx
987  * @msg_type
988  * @cookie
989  * @length - length of incoming message
990  * @pending - add to kmx_pending (0 is NO and 1 is YES)
991  *
992  * The caller gets the rx and sets nid, peer and conn if known.
993  *
994  * Returns 0 on success and -1 on failure
995  */
996 int
997 mxlnd_recv_msg(lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie, u32 length)
998 {
999         int             ret     = 0;
1000         mx_return_t     mxret   = MX_SUCCESS;
1001         uint64_t        mask    = 0xF00FFFFFFFFFFFFFLL;
1002
1003         rx->mxc_msg_type = msg_type;
1004         rx->mxc_lntmsg[0] = lntmsg; /* may be NULL if EAGER */
1005         rx->mxc_cookie = cookie;
1006         /* rx->mxc_match may already be set */
1007         /* rx->mxc_seg.segment_ptr is already set */
1008         rx->mxc_seg.segment_length = length;
1009         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1010         ret = mxlnd_q_pending_ctx(rx);
1011         if (ret == -1) {
1012                 /* the caller is responsible for calling conn_decref() if needed */
1013                 return -1;
1014         }
1015         mxret = mx_kirecv(kmxlnd_data.kmx_endpt, &rx->mxc_seg, 1, MX_PIN_PHYSICAL,
1016                           cookie, mask, (void *) rx, &rx->mxc_mxreq);
1017         if (mxret != MX_SUCCESS) {
1018                 mxlnd_deq_pending_ctx(rx);
1019                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %s (%d)\n", 
1020                                    mx_strerror(mxret), (int) mxret);
1021                 return -1;
1022         }
1023         return 0;
1024 }
1025
1026
1027 /**
1028  * mxlnd_unexpected_recv - this is the callback function that will handle 
1029  *                         unexpected receives
1030  * @context - NULL, ignore
1031  * @source - the peer's mx_endpoint_addr_t
1032  * @match_value - the msg's bit, should be MXLND_MASK_EAGER
1033  * @length - length of incoming message
1034  * @data_if_available - ignore
1035  *
1036  * If it is an eager-sized msg, we will call recv_msg() with the actual
1037  * length. If it is a large message, we will call recv_msg() with a
1038  * length of 0 bytes to drop it because we should never have a large,
1039  * unexpected message.
1040  *
1041  * NOTE - The MX library blocks until this function completes. Make it as fast as
1042  * possible. DO NOT allocate memory which can block!
1043  *
1044  * If we cannot get a rx or the conn is closed, drop the message on the floor
1045  * (i.e. recv 0 bytes and ignore).
1046  */
1047 mx_unexp_handler_action_t
1048 mxlnd_unexpected_recv(void *context, mx_endpoint_addr_t source,
1049                  uint64_t match_value, uint32_t length, void *data_if_available)
1050 {
1051         int             ret             = 0;
1052         struct kmx_ctx  *rx             = NULL;
1053         mx_ksegment_t   seg;
1054         u8              msg_type        = 0;
1055         u8              error           = 0;
1056         u64             cookie          = 0LL;
1057
1058         if (context != NULL) {
1059                 CDEBUG(D_NETERROR, "unexpected receive with non-NULL context\n");
1060         }
1061
1062 #if MXLND_DEBUG
1063         CDEBUG(D_NET, "unexpected_recv() bits=0x%llx length=%d\n", match_value, length);
1064 #endif
1065
1066         rx = mxlnd_get_idle_rx();
1067         if (rx != NULL) {
1068                 mxlnd_parse_match(match_value, &msg_type, &error, &cookie);
1069                 if (length <= MXLND_EAGER_SIZE) {
1070                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, length);
1071                 } else {
1072                         CDEBUG(D_NETERROR, "unexpected large receive with "
1073                                            "match_value=0x%llx length=%d\n",
1074                                            match_value, length);
1075                         ret = mxlnd_recv_msg(NULL, rx, msg_type, match_value, 0);
1076                 }
1077
1078                 if (ret == 0) {
1079                         struct kmx_peer *peer   = NULL;
1080                         struct kmx_conn *conn   = NULL;
1081
1082                         /* NOTE to avoid a peer disappearing out from under us,
1083                          *      read lock the peers lock first */
1084                         read_lock(&kmxlnd_data.kmx_peers_lock);
1085                         mx_get_endpoint_addr_context(source, (void **) &peer);
1086                         if (peer != NULL) {
1087                                 mxlnd_peer_addref(peer); /* add a ref... */
1088                                 spin_lock(&peer->mxp_lock);
1089                                 conn = peer->mxp_conn;
1090                                 if (conn) {
1091                                         mxlnd_conn_addref(conn); /* add ref until rx completed */
1092                                         mxlnd_peer_decref(peer); /* and drop peer ref */
1093                                         rx->mxc_conn = conn;
1094                                 }
1095                                 spin_unlock(&peer->mxp_lock);
1096                                 rx->mxc_peer = peer;
1097                                 rx->mxc_nid = peer->mxp_nid;
1098                         }
1099                         read_unlock(&kmxlnd_data.kmx_peers_lock);
1100                 } else {
1101                         CDEBUG(D_NETERROR, "could not post receive\n");
1102                         mxlnd_put_idle_rx(rx);
1103                 }
1104         }
1105
1106         if (rx == NULL || ret != 0) {
1107                 if (rx == NULL) {
1108                         CDEBUG(D_NETERROR, "no idle rxs available - dropping rx\n");
1109                 } else {
1110                         /* ret != 0 */
1111                         CDEBUG(D_NETERROR, "disconnected peer - dropping rx\n");
1112                 }
1113                 seg.segment_ptr = 0LL;
1114                 seg.segment_length = 0;
1115                 mx_kirecv(kmxlnd_data.kmx_endpt, &seg, 1, MX_PIN_PHYSICAL,
1116                           match_value, 0xFFFFFFFFFFFFFFFFLL, NULL, NULL);
1117         }
1118
1119         return MX_RECV_CONTINUE;
1120 }
1121
1122
1123 int
1124 mxlnd_get_peer_info(int index, lnet_nid_t *nidp, int *count)
1125 {
1126         int                      i      = 0;
1127         int                      ret    = -ENOENT;
1128         struct kmx_peer         *peer   = NULL;
1129
1130         read_lock(&kmxlnd_data.kmx_peers_lock);
1131         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1132                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1133                         if (index-- > 0)
1134                                 continue;
1135
1136                         *nidp = peer->mxp_nid;
1137                         *count = atomic_read(&peer->mxp_refcount);
1138                         ret = 0;
1139                         break;
1140                 }
1141         }
1142         read_unlock(&kmxlnd_data.kmx_peers_lock);
1143
1144         return ret;
1145 }
1146
1147 void
1148 mxlnd_del_peer_locked(struct kmx_peer *peer)
1149 {
1150         list_del_init(&peer->mxp_peers); /* remove from the global list */
1151         if (peer->mxp_conn) mxlnd_conn_disconnect(peer->mxp_conn, 1, 0);
1152         mxlnd_peer_decref(peer); /* drop global list ref */
1153         return;
1154 }
1155
1156 int
1157 mxlnd_del_peer(lnet_nid_t nid)
1158 {
1159         int             i       = 0;
1160         int             ret     = 0;
1161         struct kmx_peer *peer   = NULL;
1162         struct kmx_peer *next   = NULL;
1163
1164         if (nid != LNET_NID_ANY) {
1165                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1166         }
1167         write_lock(&kmxlnd_data.kmx_peers_lock);
1168         if (nid != LNET_NID_ANY) {
1169                 if (peer == NULL) {
1170                         ret = -ENOENT;
1171                 } else {
1172                         mxlnd_peer_decref(peer); /* and drops it */
1173                         mxlnd_del_peer_locked(peer);
1174                 }
1175         } else { /* LNET_NID_ANY */
1176                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1177                         list_for_each_entry_safe(peer, next,
1178                                                  &kmxlnd_data.kmx_peers[i], mxp_peers) {
1179                                 mxlnd_del_peer_locked(peer);
1180                         }
1181                 }
1182         }
1183         write_unlock(&kmxlnd_data.kmx_peers_lock);
1184
1185         return ret;
1186 }
1187
1188 struct kmx_conn *
1189 mxlnd_get_conn_by_idx(int index)
1190 {
1191         int                      i      = 0;
1192         struct kmx_peer         *peer   = NULL;
1193         struct kmx_conn         *conn   = NULL;
1194
1195         read_lock(&kmxlnd_data.kmx_peers_lock);
1196         for (i = 0; i < MXLND_HASH_SIZE; i++) {
1197                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
1198                         spin_lock(&peer->mxp_lock);
1199                         list_for_each_entry(conn, &peer->mxp_conns, mxk_list) {
1200                                 if (index-- > 0) {
1201                                         continue;
1202                                 }
1203
1204                                 mxlnd_conn_addref(conn); /* add ref here, dec in ctl() */
1205                                 spin_unlock(&peer->mxp_lock);
1206                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
1207                                 return conn;
1208                         }
1209                         spin_unlock(&peer->mxp_lock);
1210                 }
1211         }
1212         read_unlock(&kmxlnd_data.kmx_peers_lock);
1213
1214         return NULL;
1215 }
1216
1217 void
1218 mxlnd_close_matching_conns_locked(struct kmx_peer *peer)
1219 {
1220         struct kmx_conn *conn   = NULL;
1221         struct kmx_conn *next   = NULL;
1222
1223         spin_lock(&peer->mxp_lock);
1224         list_for_each_entry_safe(conn, next, &peer->mxp_conns, mxk_list) {
1225                 mxlnd_conn_disconnect(conn, 0 , 0);
1226         }
1227         spin_unlock(&peer->mxp_lock);
1228         return;
1229 }
1230
1231 int
1232 mxlnd_close_matching_conns(lnet_nid_t nid)
1233 {
1234         int             i       = 0;
1235         int             ret     = 0;
1236         struct kmx_peer *peer   = NULL;
1237
1238         read_lock(&kmxlnd_data.kmx_peers_lock);
1239         if (nid != LNET_NID_ANY) {
1240                 peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1241                 if (peer == NULL) {
1242                         ret = -ENOENT;
1243                 } else {
1244                         mxlnd_close_matching_conns_locked(peer);
1245                         mxlnd_peer_decref(peer); /* and drops it here */
1246                 }
1247         } else { /* LNET_NID_ANY */
1248                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
1249                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers)
1250                                 mxlnd_close_matching_conns_locked(peer);
1251                 }
1252         }
1253         read_unlock(&kmxlnd_data.kmx_peers_lock);
1254
1255         return ret;
1256 }
1257
1258 /**
1259  * mxlnd_ctl - modify MXLND parameters
1260  * @ni - LNET interface handle
1261  * @cmd - command to change
1262  * @arg - the ioctl data
1263  *
1264  * Not implemented yet.
1265  */
1266 int
1267 mxlnd_ctl(lnet_ni_t *ni, unsigned int cmd, void *arg)
1268 {
1269         struct libcfs_ioctl_data *data  = arg;
1270         int                       ret   = -EINVAL;
1271
1272         LASSERT (ni == kmxlnd_data.kmx_ni);
1273
1274         switch (cmd) {
1275         case IOC_LIBCFS_GET_PEER: {
1276                 lnet_nid_t      nid     = 0;
1277                 int             count   = 0;
1278
1279                 ret = mxlnd_get_peer_info(data->ioc_count, &nid, &count);
1280                 data->ioc_nid    = nid;
1281                 data->ioc_count  = count;
1282                 break;
1283         }
1284         case IOC_LIBCFS_DEL_PEER: {
1285                 ret = mxlnd_del_peer(data->ioc_nid);
1286                 break;
1287         }
1288         case IOC_LIBCFS_GET_CONN: {
1289                 struct kmx_conn *conn = NULL;
1290
1291                 conn = mxlnd_get_conn_by_idx(data->ioc_count);
1292                 if (conn == NULL) {
1293                         ret = -ENOENT;
1294                 } else {
1295                         ret = 0;
1296                         data->ioc_nid = conn->mxk_peer->mxp_nid;
1297                         mxlnd_conn_decref(conn); /* dec ref taken in get_conn_by_idx() */
1298                 }
1299                 break;
1300         }
1301         case IOC_LIBCFS_CLOSE_CONNECTION: {
1302                 ret = mxlnd_close_matching_conns(data->ioc_nid);
1303                 break;
1304         }
1305         default:
1306                 CDEBUG(D_NETERROR, "unknown ctl(%d)\n", cmd);
1307                 break;
1308         }
1309
1310         return ret;
1311 }
1312
1313 /**
1314  * mxlnd_peer_queue_tx_locked - add the tx to the global tx queue
1315  * @tx
1316  *
1317  * Add the tx to the peer's msg or data queue. The caller has locked the peer.
1318  */
1319 void
1320 mxlnd_peer_queue_tx_locked(struct kmx_ctx *tx)
1321 {
1322         u8                      msg_type        = tx->mxc_msg_type;
1323         //struct kmx_peer         *peer           = tx->mxc_peer;
1324         struct kmx_conn         *conn           = tx->mxc_conn;
1325
1326         LASSERT (msg_type != 0);
1327         LASSERT (tx->mxc_nid != 0);
1328         LASSERT (tx->mxc_peer != NULL);
1329         LASSERT (tx->mxc_conn != NULL);
1330
1331         tx->mxc_incarnation = conn->mxk_incarnation;
1332
1333         if (msg_type != MXLND_MSG_PUT_DATA &&
1334             msg_type != MXLND_MSG_GET_DATA) {
1335                 /* msg style tx */
1336                 if (mxlnd_tx_requires_credit(tx)) {
1337                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_credit_queue);
1338                         conn->mxk_ntx_msgs++;
1339                 } else if (msg_type == MXLND_MSG_CONN_REQ ||
1340                            msg_type == MXLND_MSG_CONN_ACK) {
1341                         /* put conn msgs at the front of the queue */
1342                         list_add(&tx->mxc_list, &conn->mxk_tx_free_queue);
1343                 } else {
1344                         /* PUT_ACK, PUT_NAK */
1345                         list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1346                         conn->mxk_ntx_msgs++;
1347                 }
1348         } else {
1349                 /* data style tx */
1350                 list_add_tail(&tx->mxc_list, &conn->mxk_tx_free_queue);
1351                 conn->mxk_ntx_data++;
1352         }
1353
1354         return;
1355 }
1356
1357 /**
1358  * mxlnd_peer_queue_tx - add the tx to the global tx queue
1359  * @tx
1360  *
1361  * Add the tx to the peer's msg or data queue
1362  */
1363 static inline void
1364 mxlnd_peer_queue_tx(struct kmx_ctx *tx)
1365 {
1366         LASSERT(tx->mxc_peer != NULL);
1367         LASSERT(tx->mxc_conn != NULL);
1368         spin_lock(&tx->mxc_conn->mxk_lock);
1369         mxlnd_peer_queue_tx_locked(tx);
1370         spin_unlock(&tx->mxc_conn->mxk_lock);
1371
1372         return;
1373 }
1374
1375 /**
1376  * mxlnd_queue_tx - add the tx to the global tx queue
1377  * @tx
1378  *
1379  * Add the tx to the global queue and up the tx_queue_sem
1380  */
1381 void
1382 mxlnd_queue_tx(struct kmx_ctx *tx)
1383 {
1384         struct kmx_peer *peer   = tx->mxc_peer;
1385         LASSERT (tx->mxc_nid != 0);
1386
1387         if (peer != NULL) {
1388                 if (peer->mxp_incompatible &&
1389                     tx->mxc_msg_type != MXLND_MSG_CONN_ACK) {
1390                         /* let this fail now */
1391                         tx->mxc_status.code = -ECONNABORTED;
1392                         mxlnd_conn_decref(peer->mxp_conn);
1393                         mxlnd_put_idle_tx(tx);
1394                         return;
1395                 }
1396                 if (tx->mxc_conn == NULL) {
1397                         int             ret     = 0;
1398                         struct kmx_conn *conn   = NULL;
1399
1400                         ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref for tx... */
1401                         if (ret != 0) {
1402                                 tx->mxc_status.code = ret;
1403                                 mxlnd_put_idle_tx(tx);
1404                                 goto done;
1405                         }
1406                         tx->mxc_conn = conn;
1407                         mxlnd_peer_decref(peer); /* and takes it from peer */
1408                 }
1409                 LASSERT(tx->mxc_conn != NULL);
1410                 mxlnd_peer_queue_tx(tx);
1411                 mxlnd_check_sends(peer);
1412         } else {
1413                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
1414                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_queue);
1415                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
1416                 up(&kmxlnd_data.kmx_tx_queue_sem);
1417         }
1418 done:
1419         return;
1420 }
1421
1422 int
1423 mxlnd_setup_iov(struct kmx_ctx *ctx, u32 niov, struct iovec *iov, u32 offset, u32 nob)
1424 {
1425         int             i                       = 0;
1426         int             sum                     = 0;
1427         int             old_sum                 = 0;
1428         int             nseg                    = 0;
1429         int             first_iov               = -1;
1430         int             first_iov_offset        = 0;
1431         int             first_found             = 0;
1432         int             last_iov                = -1;
1433         int             last_iov_length         = 0;
1434         mx_ksegment_t  *seg                     = NULL;
1435
1436         if (niov == 0) return 0;
1437         LASSERT(iov != NULL);
1438
1439         for (i = 0; i < niov; i++) {
1440                 sum = old_sum + (u32) iov[i].iov_len;
1441                 if (!first_found && (sum > offset)) {
1442                         first_iov = i;
1443                         first_iov_offset = offset - old_sum;
1444                         first_found = 1;
1445                         sum = (u32) iov[i].iov_len - first_iov_offset;
1446                         old_sum = 0;
1447                 }
1448                 if (sum >= nob) {
1449                         last_iov = i;
1450                         last_iov_length = (u32) iov[i].iov_len - (sum - nob);
1451                         if (first_iov == last_iov) last_iov_length -= first_iov_offset;
1452                         break;
1453                 }
1454                 old_sum = sum;
1455         }
1456         LASSERT(first_iov >= 0 && last_iov >= first_iov);
1457         nseg = last_iov - first_iov + 1;
1458         LASSERT(nseg > 0);
1459
1460         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1461         if (seg == NULL) {
1462                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1463                 return -1;
1464         }
1465         memset(seg, 0, nseg * sizeof(*seg));
1466         ctx->mxc_nseg = nseg;
1467         sum = 0;
1468         for (i = 0; i < nseg; i++) {
1469                 seg[i].segment_ptr = MX_KVA_TO_U64(iov[first_iov + i].iov_base);
1470                 seg[i].segment_length = (u32) iov[first_iov + i].iov_len;
1471                 if (i == 0) {
1472                         seg[i].segment_ptr += (u64) first_iov_offset;
1473                         seg[i].segment_length -= (u32) first_iov_offset;
1474                 }
1475                 if (i == (nseg - 1)) {
1476                         seg[i].segment_length = (u32) last_iov_length;
1477                 }
1478                 sum += seg[i].segment_length;
1479         }
1480         ctx->mxc_seg_list = seg;
1481         ctx->mxc_pin_type = MX_PIN_KERNEL;
1482 #ifdef MX_PIN_FULLPAGES
1483         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1484 #endif
1485         LASSERT(nob == sum);
1486         return 0;
1487 }
1488
1489 int
1490 mxlnd_setup_kiov(struct kmx_ctx *ctx, u32 niov, lnet_kiov_t *kiov, u32 offset, u32 nob)
1491 {
1492         int             i                       = 0;
1493         int             sum                     = 0;
1494         int             old_sum                 = 0;
1495         int             nseg                    = 0;
1496         int             first_kiov              = -1;
1497         int             first_kiov_offset       = 0;
1498         int             first_found             = 0;
1499         int             last_kiov               = -1;
1500         int             last_kiov_length        = 0;
1501         mx_ksegment_t  *seg                     = NULL;
1502
1503         if (niov == 0) return 0;
1504         LASSERT(kiov != NULL);
1505
1506         for (i = 0; i < niov; i++) {
1507                 sum = old_sum + kiov[i].kiov_len;
1508                 if (i == 0) sum -= kiov[i].kiov_offset;
1509                 if (!first_found && (sum > offset)) {
1510                         first_kiov = i;
1511                         first_kiov_offset = offset - old_sum;
1512                         //if (i == 0) first_kiov_offset + kiov[i].kiov_offset;
1513                         if (i == 0) first_kiov_offset = kiov[i].kiov_offset;
1514                         first_found = 1;
1515                         sum = kiov[i].kiov_len - first_kiov_offset;
1516                         old_sum = 0;
1517                 }
1518                 if (sum >= nob) {
1519                         last_kiov = i;
1520                         last_kiov_length = kiov[i].kiov_len - (sum - nob);
1521                         if (first_kiov == last_kiov) last_kiov_length -= first_kiov_offset;
1522                         break;
1523                 }
1524                 old_sum = sum;
1525         }
1526         LASSERT(first_kiov >= 0 && last_kiov >= first_kiov);
1527         nseg = last_kiov - first_kiov + 1;
1528         LASSERT(nseg > 0);
1529
1530         MXLND_ALLOC (seg, nseg * sizeof(*seg));
1531         if (seg == NULL) {
1532                 CDEBUG(D_NETERROR, "MXLND_ALLOC() failed\n");
1533                 return -1;
1534         }
1535         memset(seg, 0, niov * sizeof(*seg));
1536         ctx->mxc_nseg = niov;
1537         sum = 0;
1538         for (i = 0; i < niov; i++) {
1539                 seg[i].segment_ptr = lnet_page2phys(kiov[first_kiov + i].kiov_page);
1540                 seg[i].segment_length = kiov[first_kiov + i].kiov_len;
1541                 if (i == 0) {
1542                         seg[i].segment_ptr += (u64) first_kiov_offset;
1543                         /* we have to add back the original kiov_offset */
1544                         seg[i].segment_length -= first_kiov_offset +
1545                                                  kiov[first_kiov].kiov_offset;
1546                 }
1547                 if (i == (nseg - 1)) {
1548                         seg[i].segment_length = last_kiov_length;
1549                 }
1550                 sum += seg[i].segment_length;
1551         }
1552         ctx->mxc_seg_list = seg;
1553         ctx->mxc_pin_type = MX_PIN_PHYSICAL;
1554 #ifdef MX_PIN_FULLPAGES
1555         ctx->mxc_pin_type |= MX_PIN_FULLPAGES;
1556 #endif
1557         LASSERT(nob == sum);
1558         return 0;
1559 }
1560
1561 void
1562 mxlnd_send_nak(struct kmx_ctx *tx, lnet_nid_t nid, int type, int status, __u64 cookie)
1563 {
1564         LASSERT(type == MXLND_MSG_PUT_ACK);
1565         mxlnd_init_tx_msg(tx, type, sizeof(kmx_putack_msg_t), tx->mxc_nid);
1566         tx->mxc_cookie = cookie;
1567         tx->mxc_msg->mxm_u.put_ack.mxpam_src_cookie = cookie;
1568         tx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie = ((u64) status << 52); /* error code */
1569         tx->mxc_match = mxlnd_create_match(tx, status);
1570
1571         mxlnd_queue_tx(tx);
1572 }
1573
1574
1575 /**
1576  * mxlnd_send_data - get tx, map [k]iov, queue tx
1577  * @ni
1578  * @lntmsg
1579  * @peer
1580  * @msg_type
1581  * @cookie
1582  *
1583  * This setups the DATA send for PUT or GET.
1584  *
1585  * On success, it queues the tx, on failure it calls lnet_finalize()
1586  */
1587 void
1588 mxlnd_send_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_peer *peer, u8 msg_type, u64 cookie)
1589 {
1590         int                     ret             = 0;
1591         lnet_process_id_t       target          = lntmsg->msg_target;
1592         unsigned int            niov            = lntmsg->msg_niov;
1593         struct iovec           *iov             = lntmsg->msg_iov;
1594         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1595         unsigned int            offset          = lntmsg->msg_offset;
1596         unsigned int            nob             = lntmsg->msg_len;
1597         struct kmx_ctx         *tx              = NULL;
1598
1599         LASSERT(lntmsg != NULL);
1600         LASSERT(peer != NULL);
1601         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1602         LASSERT((cookie>>52) == 0);
1603
1604         tx = mxlnd_get_idle_tx();
1605         if (tx == NULL) {
1606                 CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1607                         msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1608                         libcfs_nid2str(target.nid));
1609                 goto failed_0;
1610         }
1611         tx->mxc_nid = target.nid;
1612         /* NOTE called when we have a ref on the conn, get one for this tx */
1613         mxlnd_conn_addref(peer->mxp_conn);
1614         tx->mxc_peer = peer;
1615         tx->mxc_conn = peer->mxp_conn;
1616         tx->mxc_msg_type = msg_type;
1617         tx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1618         tx->mxc_state = MXLND_CTX_PENDING;
1619         tx->mxc_lntmsg[0] = lntmsg;
1620         tx->mxc_cookie = cookie;
1621         tx->mxc_match = mxlnd_create_match(tx, 0);
1622
1623         /* This setups up the mx_ksegment_t to send the DATA payload  */
1624         if (nob == 0) {
1625                 /* do not setup the segments */
1626                 CDEBUG(D_NETERROR, "nob = 0; why didn't we use an EAGER reply "
1627                                    "to %s?\n", libcfs_nid2str(target.nid));
1628                 ret = 0;
1629         } else if (kiov == NULL) {
1630                 ret = mxlnd_setup_iov(tx, niov, iov, offset, nob);
1631         } else {
1632                 ret = mxlnd_setup_kiov(tx, niov, kiov, offset, nob);
1633         }
1634         if (ret != 0) {
1635                 CDEBUG(D_NETERROR, "Can't setup send DATA for %s\n", 
1636                                    libcfs_nid2str(target.nid));
1637                 tx->mxc_status.code = -EIO;
1638                 goto failed_1;
1639         }
1640         mxlnd_queue_tx(tx);
1641         return;
1642
1643 failed_1:
1644         mxlnd_conn_decref(peer->mxp_conn);
1645         mxlnd_put_idle_tx(tx);
1646         return;
1647
1648 failed_0:
1649         CDEBUG(D_NETERROR, "no tx avail\n");
1650         lnet_finalize(ni, lntmsg, -EIO);
1651         return;
1652 }
1653
1654 /**
1655  * mxlnd_recv_data - map [k]iov, post rx
1656  * @ni
1657  * @lntmsg
1658  * @rx
1659  * @msg_type
1660  * @cookie
1661  *
1662  * This setups the DATA receive for PUT or GET.
1663  *
1664  * On success, it returns 0, on failure it returns -1
1665  */
1666 int
1667 mxlnd_recv_data(lnet_ni_t *ni, lnet_msg_t *lntmsg, struct kmx_ctx *rx, u8 msg_type, u64 cookie)
1668 {
1669         int                     ret             = 0;
1670         lnet_process_id_t       target          = lntmsg->msg_target;
1671         unsigned int            niov            = lntmsg->msg_niov;
1672         struct iovec           *iov             = lntmsg->msg_iov;
1673         lnet_kiov_t            *kiov            = lntmsg->msg_kiov;
1674         unsigned int            offset          = lntmsg->msg_offset;
1675         unsigned int            nob             = lntmsg->msg_len;
1676         mx_return_t             mxret           = MX_SUCCESS;
1677
1678         /* above assumes MXLND_MSG_PUT_DATA */
1679         if (msg_type == MXLND_MSG_GET_DATA) {
1680                 niov = lntmsg->msg_md->md_niov;
1681                 iov = lntmsg->msg_md->md_iov.iov;
1682                 kiov = lntmsg->msg_md->md_iov.kiov;
1683                 offset = 0;
1684                 nob = lntmsg->msg_md->md_length;
1685         }
1686
1687         LASSERT(lntmsg != NULL);
1688         LASSERT(rx != NULL);
1689         LASSERT(msg_type == MXLND_MSG_PUT_DATA || msg_type == MXLND_MSG_GET_DATA);
1690         LASSERT((cookie>>52) == 0); /* ensure top 12 bits are 0 */
1691
1692         rx->mxc_msg_type = msg_type;
1693         rx->mxc_deadline = jiffies + MXLND_COMM_TIMEOUT;
1694         rx->mxc_state = MXLND_CTX_PENDING;
1695         rx->mxc_nid = target.nid;
1696         /* if posting a GET_DATA, we may not yet know the peer */
1697         if (rx->mxc_peer != NULL) {
1698                 rx->mxc_conn = rx->mxc_peer->mxp_conn;
1699         }
1700         rx->mxc_lntmsg[0] = lntmsg;
1701         rx->mxc_cookie = cookie;
1702         rx->mxc_match = mxlnd_create_match(rx, 0);
1703         /* This setups up the mx_ksegment_t to receive the DATA payload  */
1704         if (kiov == NULL) {
1705                 ret = mxlnd_setup_iov(rx, niov, iov, offset, nob);
1706         } else {
1707                 ret = mxlnd_setup_kiov(rx, niov, kiov, offset, nob);
1708         }
1709         if (msg_type == MXLND_MSG_GET_DATA) {
1710                 rx->mxc_lntmsg[1] = lnet_create_reply_msg(kmxlnd_data.kmx_ni, lntmsg);
1711                 if (rx->mxc_lntmsg[1] == NULL) {
1712                         CDEBUG(D_NETERROR, "Can't create reply for GET -> %s\n",
1713                                            libcfs_nid2str(target.nid));
1714                         ret = -1;
1715                 }
1716         }
1717         if (ret != 0) {
1718                 CDEBUG(D_NETERROR, "Can't setup %s rx for %s\n",
1719                        msg_type == MXLND_MSG_PUT_DATA ? "PUT_DATA" : "GET_DATA",
1720                        libcfs_nid2str(target.nid));
1721                 return -1;
1722         }
1723         ret = mxlnd_q_pending_ctx(rx);
1724         if (ret == -1) {
1725                 return -1;
1726         }
1727         CDEBUG(D_NET, "receiving %s 0x%llx\n", mxlnd_msgtype_to_str(msg_type), rx->mxc_cookie);
1728         mxret = mx_kirecv(kmxlnd_data.kmx_endpt,
1729                           rx->mxc_seg_list, rx->mxc_nseg,
1730                           rx->mxc_pin_type, rx->mxc_match,
1731                           0xF00FFFFFFFFFFFFFLL, (void *) rx,
1732                           &rx->mxc_mxreq);
1733         if (mxret != MX_SUCCESS) {
1734                 if (rx->mxc_conn != NULL) {
1735                         mxlnd_deq_pending_ctx(rx);
1736                 }
1737                 CDEBUG(D_NETERROR, "mx_kirecv() failed with %d for %s\n",
1738                                    (int) mxret, libcfs_nid2str(target.nid));
1739                 return -1;
1740         }
1741
1742         return 0;
1743 }
1744
1745 /**
1746  * mxlnd_send - the LND required send function
1747  * @ni
1748  * @private
1749  * @lntmsg
1750  *
1751  * This must not block. Since we may not have a peer struct for the receiver,
1752  * it will append send messages on a global tx list. We will then up the
1753  * tx_queued's semaphore to notify it of the new send. 
1754  */
1755 int
1756 mxlnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1757 {
1758         int                     ret             = 0;
1759         int                     type            = lntmsg->msg_type;
1760         lnet_hdr_t             *hdr             = &lntmsg->msg_hdr;
1761         lnet_process_id_t       target          = lntmsg->msg_target;
1762         lnet_nid_t              nid             = target.nid;
1763         int                     target_is_router = lntmsg->msg_target_is_router;
1764         int                     routing         = lntmsg->msg_routing;
1765         unsigned int            payload_niov    = lntmsg->msg_niov;
1766         struct iovec           *payload_iov     = lntmsg->msg_iov;
1767         lnet_kiov_t            *payload_kiov    = lntmsg->msg_kiov;
1768         unsigned int            payload_offset  = lntmsg->msg_offset;
1769         unsigned int            payload_nob     = lntmsg->msg_len;
1770         struct kmx_ctx         *tx              = NULL;
1771         struct kmx_msg         *txmsg           = NULL;
1772         struct kmx_ctx         *rx              = (struct kmx_ctx *) private; /* for REPLY */
1773         struct kmx_ctx         *rx_data         = NULL;
1774         struct kmx_conn        *conn            = NULL;
1775         int                     nob             = 0;
1776         uint32_t                length          = 0;
1777         struct kmx_peer         *peer           = NULL;
1778
1779         CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
1780                        payload_nob, payload_niov, libcfs_id2str(target));
1781
1782         LASSERT (payload_nob == 0 || payload_niov > 0);
1783         LASSERT (payload_niov <= LNET_MAX_IOV);
1784         /* payload is either all vaddrs or all pages */
1785         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1786
1787         /* private is used on LNET_GET_REPLY only, NULL for all other cases */
1788
1789         /* NOTE we may not know the peer if it is the very first PUT_REQ or GET_REQ
1790          * to a new peer, use the nid */
1791         peer = mxlnd_find_peer_by_nid(nid); /* adds peer ref */
1792         if (peer != NULL) {
1793                 if (unlikely(peer->mxp_incompatible)) {
1794                         mxlnd_peer_decref(peer); /* drop ref taken above */
1795                 } else {
1796                         spin_lock(&peer->mxp_lock);
1797                         conn = peer->mxp_conn;
1798                         if (conn) {
1799                                 mxlnd_conn_addref(conn);
1800                                 mxlnd_peer_decref(peer); /* drop peer ref taken above */
1801                         }
1802                         spin_unlock(&peer->mxp_lock);
1803                 }
1804         }
1805         if (conn == NULL && peer != NULL) {
1806                 CDEBUG(D_NETERROR, "conn==NULL peer=0x%p nid=0x%llx payload_nob=%d type=%s\n",
1807                        peer, nid, payload_nob, mxlnd_lnetmsg_to_str(type));
1808         }
1809
1810         switch (type) {
1811         case LNET_MSG_ACK:
1812                 LASSERT (payload_nob == 0);
1813                 break;
1814
1815         case LNET_MSG_REPLY:
1816         case LNET_MSG_PUT:
1817                 /* Is the payload small enough not to need DATA? */
1818                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob]);
1819                 if (nob <= MXLND_EAGER_SIZE)
1820                         break;                  /* send EAGER */
1821
1822                 tx = mxlnd_get_idle_tx();
1823                 if (unlikely(tx == NULL)) {
1824                         CDEBUG(D_NETERROR, "Can't allocate %s tx for %s\n",
1825                                type == LNET_MSG_PUT ? "PUT" : "REPLY",
1826                                libcfs_nid2str(nid));
1827                         if (conn) mxlnd_conn_decref(conn);
1828                         return -ENOMEM;
1829                 }
1830
1831                 /* the peer may be NULL */
1832                 tx->mxc_peer = peer;
1833                 tx->mxc_conn = conn; /* may be NULL */
1834                 /* we added a conn ref above */
1835                 mxlnd_init_tx_msg (tx, MXLND_MSG_PUT_REQ, sizeof(kmx_putreq_msg_t), nid);
1836                 txmsg = tx->mxc_msg;
1837                 txmsg->mxm_u.put_req.mxprm_hdr = *hdr;
1838                 txmsg->mxm_u.put_req.mxprm_cookie = tx->mxc_cookie;
1839                 tx->mxc_match = mxlnd_create_match(tx, 0);
1840
1841                 /* we must post a receive _before_ sending the request.
1842                  * we need to determine how much to receive, it will be either
1843                  * a put_ack or a put_nak. The put_ack is larger, so use it. */
1844
1845                 rx = mxlnd_get_idle_rx();
1846                 if (unlikely(rx == NULL)) {
1847                         CDEBUG(D_NETERROR, "Can't allocate rx for PUT_ACK for %s\n",
1848                                            libcfs_nid2str(nid));
1849                         mxlnd_put_idle_tx(tx);
1850                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1851                         return -ENOMEM;
1852                 }
1853                 rx->mxc_nid = nid;
1854                 rx->mxc_peer = peer;
1855                 /* conn may be NULL but unlikely since the first msg is always small */
1856                 /* NOTE no need to lock peer before adding conn ref since we took
1857                  * a conn ref for the tx (it cannot be freed between there and here ) */
1858                 if (conn) mxlnd_conn_addref(conn); /* for this rx */
1859                 rx->mxc_conn = conn;
1860                 rx->mxc_msg_type = MXLND_MSG_PUT_ACK;
1861                 rx->mxc_cookie = tx->mxc_cookie;
1862                 rx->mxc_match = mxlnd_create_match(rx, 0);
1863
1864                 length = offsetof(kmx_msg_t, mxm_u) + sizeof(kmx_putack_msg_t);
1865                 ret = mxlnd_recv_msg(lntmsg, rx, MXLND_MSG_PUT_ACK, rx->mxc_match, length);
1866                 if (unlikely(ret != 0)) {
1867                         CDEBUG(D_NETERROR, "recv_msg() failed for PUT_ACK for %s\n",
1868                                            libcfs_nid2str(nid));
1869                         rx->mxc_lntmsg[0] = NULL;
1870                         mxlnd_put_idle_rx(rx);
1871                         mxlnd_put_idle_tx(tx);
1872                         if (conn) {
1873                                 mxlnd_conn_decref(conn); /* for the rx... */
1874                                 mxlnd_conn_decref(conn); /* and for the tx */
1875                         }
1876                         return -EHOSTUNREACH;
1877                 }
1878
1879                 mxlnd_queue_tx(tx);
1880                 return 0;
1881
1882         case LNET_MSG_GET:
1883                 if (routing || target_is_router)
1884                         break;                  /* send EAGER */
1885
1886                 /* is the REPLY message too small for DATA? */
1887                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[lntmsg->msg_md->md_length]);
1888                 if (nob <= MXLND_EAGER_SIZE)
1889                         break;                  /* send EAGER */
1890
1891                 /* get tx (we need the cookie) , post rx for incoming DATA, 
1892                  * then post GET_REQ tx */
1893                 tx = mxlnd_get_idle_tx();
1894                 if (unlikely(tx == NULL)) {
1895                         CDEBUG(D_NETERROR, "Can't allocate GET tx for %s\n",
1896                                            libcfs_nid2str(nid));
1897                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1898                         return -ENOMEM;
1899                 }
1900                 rx_data = mxlnd_get_idle_rx();
1901                 if (unlikely(rx_data == NULL)) {
1902                         CDEBUG(D_NETERROR, "Can't allocate DATA rx for %s\n",
1903                                            libcfs_nid2str(nid));
1904                         mxlnd_put_idle_tx(tx);
1905                         if (conn) mxlnd_conn_decref(conn); /* for the ref taken above */
1906                         return -ENOMEM;
1907                 }
1908                 rx_data->mxc_peer = peer;
1909                 /* NOTE no need to lock peer before adding conn ref since we took
1910                  * a conn ref for the tx (it cannot be freed between there and here ) */
1911                 if (conn) mxlnd_conn_addref(conn); /* for the rx_data */
1912                 rx_data->mxc_conn = conn; /* may be NULL */
1913
1914                 ret = mxlnd_recv_data(ni, lntmsg, rx_data, MXLND_MSG_GET_DATA, tx->mxc_cookie);
1915                 if (unlikely(ret != 0)) {
1916                         CDEBUG(D_NETERROR, "Can't setup GET sink for %s\n",
1917                                            libcfs_nid2str(nid));
1918                         mxlnd_put_idle_rx(rx_data);
1919                         mxlnd_put_idle_tx(tx);
1920                         if (conn) {
1921                                 mxlnd_conn_decref(conn); /* for the rx_data... */
1922                                 mxlnd_conn_decref(conn); /* and for the tx */
1923                         }
1924                         return -EIO;
1925                 }
1926
1927                 tx->mxc_peer = peer;
1928                 tx->mxc_conn = conn; /* may be NULL */
1929                 /* conn ref taken above */
1930                 mxlnd_init_tx_msg(tx, MXLND_MSG_GET_REQ, sizeof(kmx_getreq_msg_t), nid);
1931                 txmsg = tx->mxc_msg;
1932                 txmsg->mxm_u.get_req.mxgrm_hdr = *hdr;
1933                 txmsg->mxm_u.get_req.mxgrm_cookie = tx->mxc_cookie;
1934                 tx->mxc_match = mxlnd_create_match(tx, 0);
1935
1936                 mxlnd_queue_tx(tx);
1937                 return 0;
1938
1939         default:
1940                 LBUG();
1941                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1942                 return -EIO;
1943         }
1944
1945         /* send EAGER */
1946
1947         LASSERT (offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[payload_nob])
1948                 <= MXLND_EAGER_SIZE);
1949
1950         tx = mxlnd_get_idle_tx();
1951         if (unlikely(tx == NULL)) {
1952                 CDEBUG(D_NETERROR, "Can't send %s to %s: tx descs exhausted\n",
1953                                    mxlnd_lnetmsg_to_str(type), libcfs_nid2str(nid));
1954                 if (conn) mxlnd_conn_decref(conn); /* drop ref taken above */
1955                 return -ENOMEM;
1956         }
1957
1958         tx->mxc_peer = peer;
1959         tx->mxc_conn = conn; /* may be NULL */
1960         /* conn ref taken above */
1961         nob = offsetof(kmx_eager_msg_t, mxem_payload[payload_nob]);
1962         mxlnd_init_tx_msg (tx, MXLND_MSG_EAGER, nob, nid);
1963         tx->mxc_match = mxlnd_create_match(tx, 0);
1964
1965         txmsg = tx->mxc_msg;
1966         txmsg->mxm_u.eager.mxem_hdr = *hdr;
1967
1968         if (payload_kiov != NULL)
1969                 lnet_copy_kiov2flat(MXLND_EAGER_SIZE, txmsg,
1970                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1971                             payload_niov, payload_kiov, payload_offset, payload_nob);
1972         else
1973                 lnet_copy_iov2flat(MXLND_EAGER_SIZE, txmsg,
1974                             offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
1975                             payload_niov, payload_iov, payload_offset, payload_nob);
1976
1977         tx->mxc_lntmsg[0] = lntmsg;              /* finalise lntmsg on completion */
1978         mxlnd_queue_tx(tx);
1979         return 0;
1980 }
1981
1982 /**
1983  * mxlnd_recv - the LND required recv function
1984  * @ni
1985  * @private
1986  * @lntmsg
1987  * @delayed
1988  * @niov
1989  * @kiov
1990  * @offset
1991  * @mlen
1992  * @rlen
1993  *
1994  * This must not block.
1995  */
1996 int
1997 mxlnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
1998              unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
1999              unsigned int offset, unsigned int mlen, unsigned int rlen)
2000 {
2001         int                     ret             = 0;
2002         int                     nob             = 0;
2003         int                     len             = 0;
2004         struct kmx_ctx          *rx             = private;
2005         struct kmx_msg          *rxmsg          = rx->mxc_msg;
2006         lnet_nid_t               nid            = rx->mxc_nid;
2007         struct kmx_ctx          *tx             = NULL;
2008         struct kmx_msg          *txmsg          = NULL;
2009         struct kmx_peer         *peer           = rx->mxc_peer;
2010         struct kmx_conn         *conn           = peer->mxp_conn;
2011         u64                      cookie         = 0LL;
2012         int                      msg_type       = rxmsg->mxm_type;
2013         int                      repost         = 1;
2014         int                      credit         = 0;
2015         int                      finalize       = 0;
2016
2017         LASSERT (mlen <= rlen);
2018         /* Either all pages or all vaddrs */
2019         LASSERT (!(kiov != NULL && iov != NULL));
2020         LASSERT (peer != NULL);
2021
2022         /* conn_addref(conn) already taken for the primary rx */
2023
2024         switch (msg_type) {
2025         case MXLND_MSG_EAGER:
2026                 nob = offsetof(kmx_msg_t, mxm_u.eager.mxem_payload[rlen]);
2027                 len = rx->mxc_status.xfer_length;
2028                 if (unlikely(nob > len)) {
2029                         CDEBUG(D_NETERROR, "Eager message from %s too big: %d(%d)\n",
2030                                            libcfs_nid2str(nid), nob, len);
2031                         ret = -EPROTO;
2032                         break;
2033                 }
2034
2035                 if (kiov != NULL)
2036                         lnet_copy_flat2kiov(niov, kiov, offset,
2037                                 MXLND_EAGER_SIZE, rxmsg,
2038                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2039                                 mlen);
2040                 else
2041                         lnet_copy_flat2iov(niov, iov, offset,
2042                                 MXLND_EAGER_SIZE, rxmsg,
2043                                 offsetof(kmx_msg_t, mxm_u.eager.mxem_payload),
2044                                 mlen);
2045                 finalize = 1;
2046                 credit = 1;
2047                 break;
2048
2049         case MXLND_MSG_PUT_REQ:
2050                 /* we are going to reuse the rx, store the needed info */
2051                 cookie = rxmsg->mxm_u.put_req.mxprm_cookie;
2052
2053                 /* get tx, post rx, send PUT_ACK */
2054
2055                 tx = mxlnd_get_idle_tx();
2056                 if (unlikely(tx == NULL)) {
2057                         CDEBUG(D_NETERROR, "Can't allocate tx for %s\n", libcfs_nid2str(nid));
2058                         /* Not replying will break the connection */
2059                         ret = -ENOMEM;
2060                         break;
2061                 }
2062                 if (unlikely(mlen == 0)) {
2063                         finalize = 1;
2064                         tx->mxc_peer = peer;
2065                         tx->mxc_conn = conn;
2066                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, 0, cookie);
2067                         /* repost = 1 */
2068                         break;
2069                 }
2070
2071                 mxlnd_init_tx_msg(tx, MXLND_MSG_PUT_ACK, sizeof(kmx_putack_msg_t), nid);
2072                 tx->mxc_peer = peer;
2073                 tx->mxc_conn = conn;
2074                 /* no need to lock peer first since we already have a ref */
2075                 mxlnd_conn_addref(conn); /* for the tx */
2076                 txmsg = tx->mxc_msg;
2077                 txmsg->mxm_u.put_ack.mxpam_src_cookie = cookie;
2078                 txmsg->mxm_u.put_ack.mxpam_dst_cookie = tx->mxc_cookie;
2079                 tx->mxc_cookie = cookie;
2080                 tx->mxc_match = mxlnd_create_match(tx, 0);
2081
2082                 /* we must post a receive _before_ sending the PUT_ACK */
2083                 mxlnd_ctx_init(rx);
2084                 rx->mxc_state = MXLND_CTX_PREP;
2085                 rx->mxc_peer = peer;
2086                 rx->mxc_conn = conn;
2087                 /* do not take another ref for this rx, it is already taken */
2088                 rx->mxc_nid = peer->mxp_nid;
2089                 ret = mxlnd_recv_data(ni, lntmsg, rx, MXLND_MSG_PUT_DATA, 
2090                                       txmsg->mxm_u.put_ack.mxpam_dst_cookie);
2091
2092                 if (unlikely(ret != 0)) {
2093                         /* Notify peer that it's over */
2094                         CDEBUG(D_NETERROR, "Can't setup PUT_DATA rx for %s: %d\n", 
2095                                            libcfs_nid2str(nid), ret);
2096                         mxlnd_ctx_init(tx);
2097                         tx->mxc_state = MXLND_CTX_PREP;
2098                         tx->mxc_peer = peer;
2099                         tx->mxc_conn = conn;
2100                         /* finalize = 0, let the PUT_ACK tx finalize this */
2101                         tx->mxc_lntmsg[0] = rx->mxc_lntmsg[0];
2102                         tx->mxc_lntmsg[1] = rx->mxc_lntmsg[1];
2103                         /* conn ref already taken above */
2104                         mxlnd_send_nak(tx, nid, MXLND_MSG_PUT_ACK, ret, cookie);
2105                         /* repost = 1 */
2106                         break;
2107                 }
2108
2109                 mxlnd_queue_tx(tx);
2110                 /* do not return a credit until after PUT_DATA returns */
2111                 repost = 0;
2112                 break;
2113
2114         case MXLND_MSG_GET_REQ:
2115                 if (likely(lntmsg != NULL)) {
2116                         mxlnd_send_data(ni, lntmsg, rx->mxc_peer, MXLND_MSG_GET_DATA,
2117                                         rx->mxc_msg->mxm_u.get_req.mxgrm_cookie);
2118                 } else {
2119                         /* GET didn't match anything */
2120                         /* The initiator has a rx mapped to [k]iov. We cannot send a nak.
2121                          * We have to embed the error code in the match bits.
2122                          * Send the error in bits 52-59 and the cookie in bits 0-51 */
2123                         u64             cookie  = rxmsg->mxm_u.get_req.mxgrm_cookie;
2124
2125                         tx = mxlnd_get_idle_tx();
2126                         if (unlikely(tx == NULL)) {
2127                                 CDEBUG(D_NETERROR, "Can't get tx for GET NAK for %s\n",
2128                                                    libcfs_nid2str(nid));
2129                                 ret = -ENOMEM;
2130                                 break;
2131                         }
2132                         tx->mxc_msg_type = MXLND_MSG_GET_DATA;
2133                         tx->mxc_state = MXLND_CTX_PENDING;
2134                         tx->mxc_nid = nid;
2135                         tx->mxc_peer = peer;
2136                         tx->mxc_conn = conn;
2137                         /* no need to lock peer first since we already have a ref */
2138                         mxlnd_conn_addref(conn); /* for this tx */
2139                         tx->mxc_cookie = cookie;
2140                         tx->mxc_match = mxlnd_create_match(tx, ENODATA);
2141                         tx->mxc_pin_type = MX_PIN_PHYSICAL;
2142                         mxlnd_queue_tx(tx);
2143                 }
2144                 /* finalize lntmsg after tx completes */
2145                 break;
2146
2147         default:
2148                 LBUG();
2149         }
2150
2151         if (repost) {
2152                 /* we received a message, increment peer's outstanding credits */
2153                 if (credit == 1) {
2154                         spin_lock(&conn->mxk_lock);
2155                         conn->mxk_outstanding++;
2156                         spin_unlock(&conn->mxk_lock);
2157                 }
2158                 /* we are done with the rx */
2159                 mxlnd_put_idle_rx(rx);
2160                 mxlnd_conn_decref(conn);
2161         }
2162
2163         if (finalize == 1) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg, 0); 
2164
2165         /* we received a credit, see if we can use it to send a msg */
2166         if (credit) mxlnd_check_sends(peer);
2167
2168         return ret;
2169 }
2170
2171 void
2172 mxlnd_sleep(unsigned long timeout)
2173 {
2174         set_current_state(TASK_INTERRUPTIBLE);
2175         schedule_timeout(timeout);
2176         return;
2177 }
2178
2179 /**
2180  * mxlnd_tx_queued - the generic send queue thread
2181  * @arg - thread id (as a void *)
2182  *
2183  * This thread moves send messages from the global tx_queue to the owning
2184  * peer's tx_[msg|data]_queue. If the peer does not exist, it creates one and adds
2185  * it to the global peer list.
2186  */
2187 int
2188 mxlnd_tx_queued(void *arg)
2189 {
2190         long                    id      = (long) arg;
2191         int                     ret     = 0;
2192         int                     found   = 0;
2193         struct kmx_ctx         *tx      = NULL;
2194         struct kmx_peer        *peer    = NULL;
2195         struct list_head       *tmp_tx  = NULL;
2196
2197         cfs_daemonize("mxlnd_tx_queued");
2198         //cfs_block_allsigs();
2199
2200         while (!kmxlnd_data.kmx_shutdown) {
2201                 ret = down_interruptible(&kmxlnd_data.kmx_tx_queue_sem);
2202                 if (kmxlnd_data.kmx_shutdown)
2203                         break;
2204                 if (ret != 0) // Should we check for -EINTR?
2205                         continue;
2206                 spin_lock(&kmxlnd_data.kmx_tx_queue_lock);
2207                 if (list_empty (&kmxlnd_data.kmx_tx_queue)) {
2208                         spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2209                         continue;
2210                 }
2211                 tmp_tx = &kmxlnd_data.kmx_tx_queue;
2212                 tx = list_entry (tmp_tx->next, struct kmx_ctx, mxc_list);
2213                 list_del_init(&tx->mxc_list);
2214                 spin_unlock(&kmxlnd_data.kmx_tx_queue_lock);
2215
2216                 found = 0;
2217                 peer = mxlnd_find_peer_by_nid(tx->mxc_nid); /* adds peer ref */
2218                 if (peer != NULL) {
2219                         tx->mxc_peer = peer;
2220                         spin_lock(&peer->mxp_lock);
2221                         if (peer->mxp_conn == NULL) {
2222                                 ret = mxlnd_conn_alloc_locked(&peer->mxp_conn, peer);
2223                                 if (ret != 0) {
2224                                         /* out of memory, give up and fail tx */
2225                                         tx->mxc_status.code = -ENOMEM;
2226                                         spin_unlock(&peer->mxp_lock);
2227                                         mxlnd_peer_decref(peer);
2228                                         mxlnd_put_idle_tx(tx);
2229                                         continue;
2230                                 }
2231                         }
2232                         tx->mxc_conn = peer->mxp_conn;
2233                         mxlnd_conn_addref(tx->mxc_conn); /* for this tx */
2234                         spin_unlock(&peer->mxp_lock);
2235                         mxlnd_peer_decref(peer); /* drop peer ref taken above */
2236                         mxlnd_queue_tx(tx);
2237                         found = 1;
2238                 }
2239                 if (found == 0) {
2240                         int              hash   = 0;
2241                         struct kmx_peer *peer = NULL;
2242                         struct kmx_peer *old = NULL;
2243
2244                         hash = mxlnd_nid_to_hash(tx->mxc_nid);
2245
2246                         LASSERT(tx->mxc_msg_type != MXLND_MSG_PUT_DATA &&
2247                                 tx->mxc_msg_type != MXLND_MSG_GET_DATA);
2248                         /* create peer */
2249                         /* adds conn ref for this function */
2250                         ret = mxlnd_peer_alloc(&peer, tx->mxc_nid);
2251                         if (ret != 0) {
2252                                 /* finalize message */
2253                                 tx->mxc_status.code = ret;
2254                                 mxlnd_put_idle_tx(tx);
2255                                 continue;
2256                         }
2257                         tx->mxc_peer = peer;
2258                         tx->mxc_conn = peer->mxp_conn;
2259                         /* this tx will keep the conn ref taken in peer_alloc() */
2260
2261                         /* add peer to global peer list, but look to see
2262                          * if someone already created it after we released
2263                          * the read lock */
2264                         write_lock(&kmxlnd_data.kmx_peers_lock);
2265                         list_for_each_entry(old, &kmxlnd_data.kmx_peers[hash], mxp_peers) {
2266                                 if (old->mxp_nid == peer->mxp_nid) {
2267                                         /* somebody beat us here, we created a duplicate */
2268                                         found = 1;
2269                                         break;
2270                                 }
2271                         }
2272
2273                         if (found == 0) {
2274                                 list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
2275                                 atomic_inc(&kmxlnd_data.kmx_npeers);
2276                         } else {
2277                                 tx->mxc_peer = old;
2278                                 spin_lock(&old->mxp_lock);
2279                                 tx->mxc_conn = old->mxp_conn;
2280                                 /* FIXME can conn be NULL? */
2281                                 LASSERT(old->mxp_conn != NULL);
2282                                 mxlnd_conn_addref(old->mxp_conn);
2283                                 spin_unlock(&old->mxp_lock);
2284                                 mxlnd_reduce_idle_rxs(*kmxlnd_tunables.kmx_credits - 1);
2285                                 mxlnd_conn_decref(peer->mxp_conn); /* drop ref taken above.. */
2286                                 mxlnd_conn_decref(peer->mxp_conn); /* drop peer's ref */
2287                                 mxlnd_peer_decref(peer);
2288                         }
2289                         write_unlock(&kmxlnd_data.kmx_peers_lock);
2290
2291                         mxlnd_queue_tx(tx);
2292                 }
2293         }
2294         mxlnd_thread_stop(id);
2295         return 0;
2296 }
2297
2298 /* When calling this, we must not have the peer lock. */
2299 void
2300 mxlnd_iconnect(struct kmx_peer *peer, u64 mask)
2301 {
2302         mx_return_t             mxret   = MX_SUCCESS;
2303         mx_request_t            request;
2304         struct kmx_conn         *conn   = peer->mxp_conn;
2305
2306         /* NOTE we are holding a conn ref every time we call this function,
2307          * we do not need to lock the peer before taking another ref */
2308         mxlnd_conn_addref(conn); /* hold until CONN_REQ or CONN_ACK completes */
2309
2310         LASSERT(mask == MXLND_MASK_ICON_REQ ||
2311                 mask == MXLND_MASK_ICON_ACK);
2312
2313         if (peer->mxp_reconnect_time == 0) {
2314                 peer->mxp_reconnect_time = jiffies;
2315         }
2316
2317         if (peer->mxp_nic_id == 0LL) {
2318                 mxlnd_peer_hostname_to_nic_id(peer);
2319                 if (peer->mxp_nic_id == 0LL) {
2320                         /* not mapped yet, return */
2321                         spin_lock(&conn->mxk_lock);
2322                         conn->mxk_status = MXLND_CONN_INIT;
2323                         spin_unlock(&conn->mxk_lock);
2324                         if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
2325                                 /* give up and notify LNET */
2326                                 mxlnd_conn_disconnect(conn, 0, 1);
2327                                 mxlnd_conn_alloc(&peer->mxp_conn, peer); /* adds ref for this
2328                                                                             function... */
2329                                 mxlnd_conn_decref(peer->mxp_conn); /* which we no 
2330                                                                       longer need */
2331                         }
2332                         mxlnd_conn_decref(conn);
2333                         return;
2334                 }
2335         }
2336
2337         mxret = mx_iconnect(kmxlnd_data.kmx_endpt, peer->mxp_nic_id,
2338                             peer->mxp_host->mxh_ep_id, MXLND_MSG_MAGIC, mask,
2339                             (void *) peer, &request);
2340         if (unlikely(mxret != MX_SUCCESS)) {
2341                 spin_lock(&conn->mxk_lock);
2342                 conn->mxk_status = MXLND_CONN_FAIL;
2343                 spin_unlock(&conn->mxk_lock);
2344                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
2345                        mx_strerror(mxret), mxret, libcfs_nid2str(peer->mxp_nid));
2346                 mxlnd_conn_decref(conn);
2347         }
2348         return;
2349 }
2350
2351 #define MXLND_STATS 0
2352
2353 int
2354 mxlnd_check_sends(struct kmx_peer *peer)
2355 {
2356         int                     ret             = 0;
2357         int                     found           = 0;
2358         mx_return_t             mxret           = MX_SUCCESS;
2359         struct kmx_ctx          *tx             = NULL;
2360         struct kmx_conn         *conn           = NULL;
2361         u8                      msg_type        = 0;
2362         int                     credit          = 0;
2363         int                     status          = 0;
2364         int                     ntx_posted      = 0;
2365         int                     credits         = 0;
2366 #if MXLND_STATS
2367         static unsigned long    last            = 0;
2368 #endif
2369
2370         if (unlikely(peer == NULL)) {
2371                 LASSERT(peer != NULL);
2372                 return -1;
2373         }
2374         spin_lock(&peer->mxp_lock);
2375         conn = peer->mxp_conn;
2376         /* NOTE take a ref for the duration of this function since it is called
2377          * when there might not be any queued txs for this peer */
2378         if (conn) mxlnd_conn_addref(conn); /* for duration of this function */
2379         spin_unlock(&peer->mxp_lock);
2380
2381         /* do not add another ref for this tx */
2382
2383         if (conn == NULL) {
2384                 /* we do not have any conns */
2385                 return -1;
2386         }
2387
2388 #if MXLND_STATS
2389         if (time_after(jiffies, last)) {
2390                 last = jiffies + HZ;
2391                 CDEBUG(D_NET, "status= %s credits= %d outstanding= %d ntx_msgs= %d "
2392                               "ntx_posted= %d ntx_data= %d data_posted= %d\n",
2393                               mxlnd_connstatus_to_str(conn->mxk_status), conn->mxk_credits,
2394                               conn->mxk_outstanding, conn->mxk_ntx_msgs, conn->mxk_ntx_posted,
2395                               conn->mxk_ntx_data, conn->mxk_data_posted);
2396         }
2397 #endif
2398
2399         /* cache peer state for asserts */
2400         spin_lock(&conn->mxk_lock);
2401         ntx_posted = conn->mxk_ntx_posted;
2402         credits = conn->mxk_credits;
2403         spin_unlock(&conn->mxk_lock);
2404
2405         LASSERT(ntx_posted <= *kmxlnd_tunables.kmx_credits);
2406         LASSERT(ntx_posted >= 0);
2407
2408         LASSERT(credits <= *kmxlnd_tunables.kmx_credits);
2409         LASSERT(credits >= 0);
2410
2411         /* check number of queued msgs, ignore data */
2412         spin_lock(&conn->mxk_lock);
2413         if (conn->mxk_outstanding >= MXLND_CREDIT_HIGHWATER) {
2414                 /* check if any txs queued that could return credits... */
2415                 if (list_empty(&conn->mxk_tx_credit_queue) || conn->mxk_ntx_msgs == 0) {
2416                         /* if not, send a NOOP */
2417                         tx = mxlnd_get_idle_tx();
2418                         if (likely(tx != NULL)) {
2419                                 tx->mxc_peer = peer;
2420                                 tx->mxc_conn = peer->mxp_conn;
2421                                 mxlnd_conn_addref(conn); /* for this tx */
2422                                 mxlnd_init_tx_msg (tx, MXLND_MSG_NOOP, 0, peer->mxp_nid);
2423                                 tx->mxc_match = mxlnd_create_match(tx, 0);
2424                                 mxlnd_peer_queue_tx_locked(tx);
2425                                 found = 1;
2426                                 goto done_locked;
2427                         }
2428                 }
2429         }
2430         spin_unlock(&conn->mxk_lock);
2431
2432         /* if the peer is not ready, try to connect */
2433         spin_lock(&conn->mxk_lock);
2434         if (unlikely(conn->mxk_status == MXLND_CONN_INIT ||
2435             conn->mxk_status == MXLND_CONN_FAIL ||
2436             conn->mxk_status == MXLND_CONN_REQ)) {
2437                 CDEBUG(D_NET, "status=%s\n", mxlnd_connstatus_to_str(conn->mxk_status));
2438                 conn->mxk_status = MXLND_CONN_WAIT;
2439                 spin_unlock(&conn->mxk_lock);
2440                 mxlnd_iconnect(peer, MXLND_MASK_ICON_REQ);
2441                 goto done;
2442         }
2443         spin_unlock(&conn->mxk_lock);
2444
2445         spin_lock(&conn->mxk_lock);
2446         while (!list_empty(&conn->mxk_tx_free_queue) ||
2447                !list_empty(&conn->mxk_tx_credit_queue)) {
2448                 /* We have something to send. If we have a queued tx that does not
2449                  * require a credit (free), choose it since its completion will 
2450                  * return a credit (here or at the peer), complete a DATA or 
2451                  * CONN_REQ or CONN_ACK. */
2452                 struct list_head *tmp_tx = NULL;
2453                 if (!list_empty(&conn->mxk_tx_free_queue)) {
2454                         tmp_tx = &conn->mxk_tx_free_queue;
2455                 } else {
2456                         tmp_tx = &conn->mxk_tx_credit_queue;
2457                 }
2458                 tx = list_entry(tmp_tx->next, struct kmx_ctx, mxc_list);
2459
2460                 msg_type = tx->mxc_msg_type;
2461
2462                 /* don't try to send a rx */
2463                 LASSERT(tx->mxc_type == MXLND_REQ_TX);
2464
2465                 /* ensure that it is a valid msg type */
2466                 LASSERT(msg_type == MXLND_MSG_CONN_REQ ||
2467                         msg_type == MXLND_MSG_CONN_ACK ||
2468                         msg_type == MXLND_MSG_NOOP     ||
2469                         msg_type == MXLND_MSG_EAGER    ||
2470                         msg_type == MXLND_MSG_PUT_REQ  ||
2471                         msg_type == MXLND_MSG_PUT_ACK  ||
2472                         msg_type == MXLND_MSG_PUT_DATA ||
2473                         msg_type == MXLND_MSG_GET_REQ  ||
2474                         msg_type == MXLND_MSG_GET_DATA);
2475                 LASSERT(tx->mxc_peer == peer);
2476                 LASSERT(tx->mxc_nid == peer->mxp_nid);
2477
2478                 credit = mxlnd_tx_requires_credit(tx);
2479                 if (credit) {
2480
2481                         if (conn->mxk_ntx_posted == *kmxlnd_tunables.kmx_credits) {
2482                                 CDEBUG(D_NET, "%s: posted enough\n",
2483                                               libcfs_nid2str(peer->mxp_nid));
2484                                 goto done_locked;
2485                         }
2486
2487                         if (conn->mxk_credits == 0) {
2488                                 CDEBUG(D_NET, "%s: no credits\n",
2489                                               libcfs_nid2str(peer->mxp_nid));
2490                                 goto done_locked;
2491                         }
2492
2493                         if (conn->mxk_credits == 1 &&      /* last credit reserved for */
2494                             conn->mxk_outstanding == 0) {  /* giving back credits */
2495                                 CDEBUG(D_NET, "%s: not using last credit\n",
2496                                               libcfs_nid2str(peer->mxp_nid));
2497                                 goto done_locked;
2498                         }
2499                 }
2500
2501                 if (unlikely(conn->mxk_status != MXLND_CONN_READY)) {
2502                         if ( ! (msg_type == MXLND_MSG_CONN_REQ ||
2503                                 msg_type == MXLND_MSG_CONN_ACK)) {
2504                                 CDEBUG(D_NET, "peer status is %s for tx 0x%llx (%s)\n",
2505                                              mxlnd_connstatus_to_str(conn->mxk_status),
2506                                              tx->mxc_cookie,
2507                                              mxlnd_msgtype_to_str(tx->mxc_msg_type));
2508                                 if (conn->mxk_status == MXLND_CONN_DISCONNECT) {
2509                                         list_del_init(&tx->mxc_list);
2510                                         tx->mxc_status.code = -ECONNABORTED;
2511                                         mxlnd_put_idle_tx(tx);
2512                                         mxlnd_conn_decref(conn);
2513                                 }
2514                                 goto done_locked;
2515                         }
2516                 }
2517
2518                 list_del_init(&tx->mxc_list);
2519
2520                 /* handle credits, etc now while we have the lock to avoid races */
2521                 if (credit) {
2522                         conn->mxk_credits--;
2523                         conn->mxk_ntx_posted++;
2524                 }
2525                 if (msg_type != MXLND_MSG_PUT_DATA &&
2526                     msg_type != MXLND_MSG_GET_DATA) {
2527                         if (msg_type != MXLND_MSG_CONN_REQ &&
2528                             msg_type != MXLND_MSG_CONN_ACK) {
2529                                 conn->mxk_ntx_msgs--;
2530                         }
2531                 }
2532                 if (tx->mxc_incarnation == 0 &&
2533                     conn->mxk_incarnation != 0) {
2534                         tx->mxc_incarnation = conn->mxk_incarnation;
2535                 }
2536                 spin_unlock(&conn->mxk_lock);
2537
2538                 /* if this is a NOOP and (1) mxp_conn->mxk_outstanding < CREDIT_HIGHWATER 
2539                  * or (2) there is a non-DATA msg that can return credits in the 
2540                  * queue, then drop this duplicate NOOP */
2541                 if (unlikely(msg_type == MXLND_MSG_NOOP)) {
2542                         spin_lock(&conn->mxk_lock);
2543                         if ((conn->mxk_outstanding < MXLND_CREDIT_HIGHWATER) ||
2544                             (conn->mxk_ntx_msgs >= 1)) {
2545                                 conn->mxk_credits++;
2546                                 conn->mxk_ntx_posted--;
2547                                 spin_unlock(&conn->mxk_lock);
2548                                 /* redundant NOOP */
2549                                 mxlnd_put_idle_tx(tx);
2550                                 mxlnd_conn_decref(conn);
2551                                 CDEBUG(D_NET, "%s: redundant noop\n",
2552                                               libcfs_nid2str(peer->mxp_nid));
2553                                 found = 1;
2554                                 goto done;
2555                         }
2556                         spin_unlock(&conn->mxk_lock);
2557                 }
2558
2559                 found = 1;
2560                 if (likely((msg_type != MXLND_MSG_PUT_DATA) &&
2561                     (msg_type != MXLND_MSG_GET_DATA))) {
2562                         mxlnd_pack_msg(tx);
2563                 }
2564
2565                 //ret = -ECONNABORTED;
2566                 mxret = MX_SUCCESS;
2567
2568                 spin_lock(&conn->mxk_lock);
2569                 status = conn->mxk_status;
2570                 spin_unlock(&conn->mxk_lock);
2571
2572                 if (likely((status == MXLND_CONN_READY) ||
2573                     (msg_type == MXLND_MSG_CONN_REQ) ||
2574                     (msg_type == MXLND_MSG_CONN_ACK))) {
2575                         ret = 0;
2576                         if (msg_type != MXLND_MSG_CONN_REQ &&
2577                             msg_type != MXLND_MSG_CONN_ACK) {
2578                                 /* add to the pending list */
2579                                 ret = mxlnd_q_pending_ctx(tx);
2580                                 if (ret == -1) {
2581                                         /* FIXME the conn is disconnected, now what? */
2582                                 }
2583                         } else {
2584                                 /* CONN_REQ/ACK */
2585                                 tx->mxc_state = MXLND_CTX_PENDING;
2586                         }
2587
2588                         if (ret == 0) {
2589                                 if (likely(msg_type != MXLND_MSG_PUT_DATA &&
2590                                     msg_type != MXLND_MSG_GET_DATA)) {
2591                                         /* send a msg style tx */
2592                                         LASSERT(tx->mxc_nseg == 1);
2593                                         LASSERT(tx->mxc_pin_type == MX_PIN_PHYSICAL);
2594                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2595                                                mxlnd_msgtype_to_str(msg_type),
2596                                                tx->mxc_cookie);
2597                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2598                                                           &tx->mxc_seg,
2599                                                           tx->mxc_nseg,
2600                                                           tx->mxc_pin_type,
2601                                                           conn->mxk_epa,
2602                                                           tx->mxc_match,
2603                                                           (void *) tx,
2604                                                           &tx->mxc_mxreq);
2605                                 } else {
2606                                         /* send a DATA tx */
2607                                         spin_lock(&conn->mxk_lock);
2608                                         conn->mxk_ntx_data--;
2609                                         conn->mxk_data_posted++;
2610                                         spin_unlock(&conn->mxk_lock);
2611                                         CDEBUG(D_NET, "sending %s 0x%llx\n",
2612                                                mxlnd_msgtype_to_str(msg_type),
2613                                                tx->mxc_cookie);
2614                                         mxret = mx_kisend(kmxlnd_data.kmx_endpt,
2615                                                           tx->mxc_seg_list,
2616                                                           tx->mxc_nseg,
2617                                                           tx->mxc_pin_type,
2618                                                           conn->mxk_epa,
2619                                                           tx->mxc_match,
2620                                                           (void *) tx,
2621                                                           &tx->mxc_mxreq);
2622                                 }
2623                         } else {
2624                                 mxret = MX_CONNECTION_FAILED;
2625                         }
2626                         if (likely(mxret == MX_SUCCESS)) {
2627                                 ret = 0;
2628                         } else {
2629                                 CDEBUG(D_NETERROR, "mx_kisend() failed with %s (%d) "
2630                                        "sending to %s\n", mx_strerror(mxret), (int) mxret,
2631                                        libcfs_nid2str(peer->mxp_nid));
2632                                 /* NOTE mx_kisend() only fails if there are not enough 
2633                                 * resources. Do not change the connection status. */
2634                                 if (mxret == MX_NO_RESOURCES) {
2635                                         tx->mxc_status.code = -ENOMEM;
2636                                 } else {
2637                                         tx->mxc_status.code = -ECONNABORTED;
2638                                 }
2639                                 if (credit) {
2640                                         spin_lock(&conn->mxk_lock);
2641                                         conn->mxk_ntx_posted--;
2642                                         conn->mxk_credits++;
2643                                         spin_unlock(&conn->mxk_lock);
2644                                 } else if (msg_type == MXLND_MSG_PUT_DATA ||
2645                                         msg_type == MXLND_MSG_GET_DATA) {
2646                                         spin_lock(&conn->mxk_lock);
2647                                         conn->mxk_data_posted--;
2648                                         spin_unlock(&conn->mxk_lock);
2649                                 }
2650                                 if (msg_type != MXLND_MSG_PUT_DATA &&
2651                                     msg_type != MXLND_MSG_GET_DATA &&
2652                                     msg_type != MXLND_MSG_CONN_REQ &&
2653                                     msg_type != MXLND_MSG_CONN_ACK) {
2654                                         spin_lock(&conn->mxk_lock);
2655                                         conn->mxk_outstanding += tx->mxc_msg->mxm_credits;
2656                                         spin_unlock(&conn->mxk_lock);
2657                                 }
2658                                 if (msg_type != MXLND_MSG_CONN_REQ &&
2659                                     msg_type != MXLND_MSG_CONN_ACK) {
2660                                         /* remove from the pending list */
2661                                         mxlnd_deq_pending_ctx(tx);
2662                                 }
2663                                 mxlnd_put_idle_tx(tx);
2664                                 mxlnd_conn_decref(conn);
2665                         }
2666                 }
2667                 spin_lock(&conn->mxk_lock);
2668         }
2669 done_locked:
2670         spin_unlock(&conn->mxk_lock);
2671 done:
2672         mxlnd_conn_decref(conn); /* drop ref taken at start of function */
2673         return found;
2674 }
2675
2676
2677 /**
2678  * mxlnd_handle_tx_completion - a tx completed, progress or complete the msg
2679  * @ctx - the tx descriptor
2680  *
2681  * Determine which type of send request it was and start the next step, if needed,
2682  * or, if done, signal completion to LNET. After we are done, put back on the
2683  * idle tx list.
2684  */
2685 void
2686 mxlnd_handle_tx_completion(struct kmx_ctx *tx)
2687 {
2688         int             failed  = (tx->mxc_status.code != MX_STATUS_SUCCESS);
2689         struct kmx_msg  *msg    = tx->mxc_msg;
2690         struct kmx_peer *peer   = tx->mxc_peer;
2691         struct kmx_conn *conn   = tx->mxc_conn;
2692         u8              type    = tx->mxc_msg_type;
2693         int             credit  = mxlnd_tx_requires_credit(tx);
2694         u64             cookie  = tx->mxc_cookie;
2695
2696         CDEBUG(D_NET, "entering %s (0x%llx):\n",
2697                       mxlnd_msgtype_to_str(tx->mxc_msg_type), cookie);
2698
2699         if (unlikely(conn == NULL)) {
2700                 mx_get_endpoint_addr_context(tx->mxc_status.source, (void **) &peer);
2701                 conn = peer->mxp_conn;
2702                 if (conn != NULL) {
2703                         /* do not add a ref for the tx, it was set before sending */
2704                         tx->mxc_conn = conn;
2705                         tx->mxc_peer = conn->mxk_peer;
2706                 }
2707         }
2708         LASSERT (peer != NULL);
2709         LASSERT (conn != NULL);
2710
2711         if (type != MXLND_MSG_PUT_DATA && type != MXLND_MSG_GET_DATA) {
2712                 LASSERT (type == msg->mxm_type);
2713         }
2714
2715         if (failed) {
2716                 tx->mxc_status.code = -EIO;
2717         } else {
2718                 spin_lock(&conn->mxk_lock);
2719                 conn->mxk_last_tx = jiffies;
2720                 spin_unlock(&conn->mxk_lock);
2721         }
2722
2723         switch (type) {
2724
2725         case MXLND_MSG_GET_DATA:
2726                 spin_lock(&conn->mxk_lock);
2727                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2728                         conn->mxk_outstanding++;
2729                         conn->mxk_data_posted--;
2730                 }
2731                 spin_unlock(&conn->mxk_lock);
2732                 break;
2733
2734         case MXLND_MSG_PUT_DATA:
2735                 spin_lock(&conn->mxk_lock);
2736                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2737                         conn->mxk_data_posted--;
2738                 }
2739                 spin_unlock(&conn->mxk_lock);
2740                 break;
2741
2742         case MXLND_MSG_NOOP:
2743         case MXLND_MSG_PUT_REQ:
2744         case MXLND_MSG_PUT_ACK:
2745         case MXLND_MSG_GET_REQ:
2746         case MXLND_MSG_EAGER:
2747         //case MXLND_MSG_NAK:
2748                 break;
2749
2750         case MXLND_MSG_CONN_ACK:
2751                 if (peer->mxp_incompatible) {
2752                         /* we sent our params, now close this conn */
2753                         mxlnd_conn_disconnect(conn, 0, 1);
2754                 }
2755         case MXLND_MSG_CONN_REQ:
2756                 if (failed) {
2757                         CDEBUG(D_NETERROR, "handle_tx_completion(): %s "
2758                                "failed with %s (%d) to %s\n",
2759                                type == MXLND_MSG_CONN_REQ ? "CONN_REQ" : "CONN_ACK",
2760                                mx_strstatus(tx->mxc_status.code),
2761                                tx->mxc_status.code,
2762                                libcfs_nid2str(tx->mxc_nid));
2763                         if (!peer->mxp_incompatible) {
2764                                 spin_lock(&conn->mxk_lock);
2765                                 conn->mxk_status = MXLND_CONN_FAIL;
2766                                 spin_unlock(&conn->mxk_lock);
2767                         }
2768                 }
2769                 break;
2770
2771         default:
2772                 CDEBUG(D_NETERROR, "Unknown msg type of %d\n", type);
2773                 LBUG();
2774         }
2775
2776         if (credit) {
2777                 spin_lock(&conn->mxk_lock);
2778                 if (conn->mxk_incarnation == tx->mxc_incarnation) {
2779                         conn->mxk_ntx_posted--;
2780                 }
2781                 spin_unlock(&conn->mxk_lock);
2782         }
2783
2784         CDEBUG(D_NET, "leaving mxlnd_handle_tx_completion()\n");
2785         mxlnd_put_idle_tx(tx);
2786         mxlnd_conn_decref(conn);
2787
2788         mxlnd_check_sends(peer);
2789
2790         return;
2791 }
2792
2793 void
2794 mxlnd_handle_rx_completion(struct kmx_ctx *rx)
2795 {
2796         int                     ret             = 0;
2797         int                     repost          = 1;
2798         int                     credit          = 1;
2799         u32                     nob             = rx->mxc_status.xfer_length;
2800         u64                     bits            = rx->mxc_status.match_info;
2801         struct kmx_msg         *msg             = rx->mxc_msg;
2802         struct kmx_peer        *peer            = rx->mxc_peer;
2803         struct kmx_conn        *conn            = rx->mxc_conn;
2804         u8                      type            = rx->mxc_msg_type;
2805         u64                     seq             = 0LL;
2806         lnet_msg_t             *lntmsg[2];
2807         int                     result          = 0;
2808         u64                     nic_id          = 0LL;
2809         u32                     ep_id           = 0;
2810         int                     peer_ref        = 0;
2811         int                     conn_ref        = 0;
2812         int                     incompatible    = 0;
2813
2814         /* NOTE We may only know the peer's nid if it is a PUT_REQ, GET_REQ, 
2815          * failed GET reply, CONN_REQ, or a CONN_ACK */
2816
2817         /* NOTE peer may still be NULL if it is a new peer and
2818          *      conn may be NULL if this is a re-connect */
2819         if (likely(peer != NULL && conn != NULL)) {
2820                 /* we have a reference on the conn */
2821                 conn_ref = 1;
2822         } else if (peer != NULL && conn == NULL) {
2823                 /* we have a reference on the peer */
2824                 peer_ref = 1;
2825         } else if (peer == NULL && conn != NULL) {
2826                 /* fatal error */
2827                 CDEBUG(D_NETERROR, "rx has conn but no peer\n");
2828                 LBUG();
2829         } /* else peer and conn == NULL */
2830
2831 #if 0
2832         if (peer == NULL || conn == NULL) {
2833                 /* if the peer was disconnected, the peer may exist but
2834                  * not have any valid conns */
2835                 decref = 0; /* no peer means no ref was taken for this rx */
2836         }
2837 #endif
2838
2839         if (conn == NULL && peer != NULL) {
2840                 spin_lock(&peer->mxp_lock);
2841                 conn = peer->mxp_conn;
2842                 if (conn) {
2843                         mxlnd_conn_addref(conn); /* conn takes ref... */
2844                         mxlnd_peer_decref(peer); /* from peer */
2845                         conn_ref = 1;
2846                         peer_ref = 0;
2847                 }
2848                 spin_unlock(&peer->mxp_lock);
2849                 rx->mxc_conn = conn;
2850         }
2851
2852 #if MXLND_DEBUG
2853         CDEBUG(D_NET, "receiving msg bits=0x%llx nob=%d peer=0x%p\n", bits, nob, peer);
2854 #endif
2855
2856         lntmsg[0] = NULL;
2857         lntmsg[1] = NULL;
2858
2859         if (rx->mxc_status.code != MX_STATUS_SUCCESS) {
2860                 CDEBUG(D_NETERROR, "rx from %s failed with %s (%d)\n",
2861                                    libcfs_nid2str(rx->mxc_nid),
2862                                    mx_strstatus(rx->mxc_status.code),
2863                                    (int) rx->mxc_status.code);
2864                 credit = 0;
2865                 goto cleanup;
2866         }
2867
2868         if (nob == 0) {
2869                 /* this may be a failed GET reply */
2870                 if (type == MXLND_MSG_GET_DATA) {
2871                         bits = rx->mxc_status.match_info & 0x0FF0000000000000LL;
2872                         ret = (u32) (bits>>52);
2873                         lntmsg[0] = rx->mxc_lntmsg[0];
2874                         result = -ret;
2875                         goto cleanup;
2876                 } else {
2877                         /* we had a rx complete with 0 bytes (no hdr, nothing) */
2878                         CDEBUG(D_NETERROR, "rx from %s returned with 0 bytes\n",
2879                                            libcfs_nid2str(rx->mxc_nid));
2880                         goto cleanup;
2881                 }
2882         }
2883
2884         /* NOTE PUT_DATA and GET_DATA do not have mxc_msg, do not call unpack() */
2885         if (type == MXLND_MSG_PUT_DATA) {
2886                 result = rx->mxc_status.code;
2887                 lntmsg[0] = rx->mxc_lntmsg[0];
2888                 goto cleanup;
2889         } else if (type == MXLND_MSG_GET_DATA) {
2890                 result = rx->mxc_status.code;
2891                 lntmsg[0] = rx->mxc_lntmsg[0];
2892                 lntmsg[1] = rx->mxc_lntmsg[1];
2893                 goto cleanup;
2894         }
2895
2896         ret = mxlnd_unpack_msg(msg, nob);
2897         if (ret != 0) {
2898                 CDEBUG(D_NETERROR, "Error %d unpacking rx from %s\n",
2899                                    ret, libcfs_nid2str(rx->mxc_nid));
2900                 goto cleanup;
2901         }
2902         rx->mxc_nob = nob;
2903         type = msg->mxm_type;
2904         seq = msg->mxm_seq;
2905
2906         if (type != MXLND_MSG_CONN_REQ &&
2907             (!lnet_ptlcompat_matchnid(rx->mxc_nid, msg->mxm_srcnid) ||
2908              !lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid))) {
2909                 CDEBUG(D_NETERROR, "rx with mismatched NID (type %s) (my nid is "
2910                        "0x%llx and rx msg dst is 0x%llx)\n",
2911                        mxlnd_msgtype_to_str(type), kmxlnd_data.kmx_ni->ni_nid,
2912                        msg->mxm_dstnid);
2913                 goto cleanup;
2914         }
2915
2916         if (type != MXLND_MSG_CONN_REQ && type != MXLND_MSG_CONN_ACK) {
2917                 if ((conn != NULL && msg->mxm_srcstamp != conn->mxk_incarnation) ||
2918                     msg->mxm_dststamp != kmxlnd_data.kmx_incarnation) {
2919                         if (conn != NULL) {
2920                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2921                                        "(mxm_srcstamp (%lld) != mxk_incarnation (%lld) "
2922                                        "|| mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2923                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2924                                        msg->mxm_srcstamp, conn->mxk_incarnation,
2925                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2926                         } else {
2927                                 CDEBUG(D_NETERROR, "Stale rx from %s with type %s "
2928                                        "mxm_dststamp (%lld) != kmx_incarnation (%lld))\n",
2929                                        libcfs_nid2str(rx->mxc_nid), mxlnd_msgtype_to_str(type),
2930                                        msg->mxm_dststamp, kmxlnd_data.kmx_incarnation);
2931                         }
2932                         credit = 0;
2933                         goto cleanup;
2934                 }
2935         }
2936
2937         CDEBUG(D_NET, "Received %s with %d credits\n",
2938                       mxlnd_msgtype_to_str(type), msg->mxm_credits);
2939
2940         if (msg->mxm_type != MXLND_MSG_CONN_REQ &&
2941             msg->mxm_type != MXLND_MSG_CONN_ACK) {
2942                 LASSERT(peer != NULL);
2943                 LASSERT(conn != NULL);
2944                 if (msg->mxm_credits != 0) {
2945                         spin_lock(&conn->mxk_lock);
2946                         if (msg->mxm_srcstamp == conn->mxk_incarnation) {
2947                                 if ((conn->mxk_credits + msg->mxm_credits) > 
2948                                      *kmxlnd_tunables.kmx_credits) {
2949                                         CDEBUG(D_NETERROR, "mxk_credits %d  mxm_credits %d\n",
2950                                                conn->mxk_credits, msg->mxm_credits);
2951                                 }
2952                                 conn->mxk_credits += msg->mxm_credits;
2953                                 LASSERT(conn->mxk_credits >= 0);
2954                                 LASSERT(conn->mxk_credits <= *kmxlnd_tunables.kmx_credits);
2955                         }
2956                         spin_unlock(&conn->mxk_lock);
2957                 }
2958         }
2959
2960         CDEBUG(D_NET, "switch %s for rx (0x%llx)\n", mxlnd_msgtype_to_str(type), seq);
2961         switch (type) {
2962         case MXLND_MSG_NOOP:
2963                 break;
2964
2965         case MXLND_MSG_EAGER:
2966                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.eager.mxem_hdr,
2967                                         msg->mxm_srcnid, rx, 0);
2968                 repost = ret < 0;
2969                 break;
2970
2971         case MXLND_MSG_PUT_REQ:
2972                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.put_req.mxprm_hdr,
2973                                         msg->mxm_srcnid, rx, 1);
2974                 repost = ret < 0;
2975                 break;
2976
2977         case MXLND_MSG_PUT_ACK: {
2978                 u64  cookie = (u64) msg->mxm_u.put_ack.mxpam_dst_cookie;
2979                 if (cookie > MXLND_MAX_COOKIE) {
2980                         CDEBUG(D_NETERROR, "NAK for msg_type %d from %s\n", rx->mxc_msg_type,
2981                                            libcfs_nid2str(rx->mxc_nid));
2982                         result = -((cookie >> 52) & 0xff);
2983                         lntmsg[0] = rx->mxc_lntmsg[0];
2984                 } else {
2985                         mxlnd_send_data(kmxlnd_data.kmx_ni, rx->mxc_lntmsg[0],
2986                                         rx->mxc_peer, MXLND_MSG_PUT_DATA,
2987                                         rx->mxc_msg->mxm_u.put_ack.mxpam_dst_cookie);
2988                 }
2989                 /* repost == 1 */
2990                 break;
2991         }
2992         case MXLND_MSG_GET_REQ:
2993                 ret = lnet_parse(kmxlnd_data.kmx_ni, &msg->mxm_u.get_req.mxgrm_hdr,
2994                                         msg->mxm_srcnid, rx, 1);
2995                 repost = ret < 0;
2996                 break;
2997
2998         case MXLND_MSG_CONN_REQ:
2999                 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3000                         CDEBUG(D_NETERROR, "Can't accept %s: bad dst nid %s\n",
3001                                         libcfs_nid2str(msg->mxm_srcnid),
3002                                         libcfs_nid2str(msg->mxm_dstnid));
3003                         goto cleanup;
3004                 }
3005                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3006                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible queue depth "
3007                                     "%d (%d wanted)\n",
3008                                         libcfs_nid2str(msg->mxm_srcnid),
3009                                         msg->mxm_u.conn_req.mxcrm_queue_depth,
3010                                         *kmxlnd_tunables.kmx_credits);
3011                         incompatible = 1;
3012                 }
3013                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3014                         CDEBUG(D_NETERROR, "Can't accept %s: incompatible EAGER size "
3015                                     "%d (%d wanted)\n",
3016                                         libcfs_nid2str(msg->mxm_srcnid),
3017                                         msg->mxm_u.conn_req.mxcrm_eager_size,
3018                                         (int) MXLND_EAGER_SIZE);
3019                         incompatible = 1;
3020                 }
3021                 if (peer == NULL) {
3022                         peer = mxlnd_find_peer_by_nid(msg->mxm_srcnid); /* adds peer ref */
3023                         if (peer == NULL) {
3024                                 int             hash    = 0;
3025                                 struct kmx_peer *existing_peer    = NULL;
3026                                 hash = mxlnd_nid_to_hash(msg->mxm_srcnid);
3027
3028                                 mx_decompose_endpoint_addr(rx->mxc_status.source,
3029                                                            &nic_id, &ep_id);
3030                                 rx->mxc_nid = msg->mxm_srcnid;
3031
3032                                 /* adds conn ref for peer and one for this function */
3033                                 ret = mxlnd_peer_alloc(&peer, msg->mxm_srcnid);
3034                                 if (ret != 0) {
3035                                         goto cleanup;
3036                                 }
3037                                 LASSERT(peer->mxp_host->mxh_ep_id == ep_id);
3038                                 write_lock(&kmxlnd_data.kmx_peers_lock);
3039                                 existing_peer = mxlnd_find_peer_by_nid_locked(msg->mxm_srcnid);
3040                                 if (existing_peer) {
3041                                         mxlnd_conn_decref(peer->mxp_conn);
3042                                         mxlnd_peer_decref(peer);
3043                                         peer = existing_peer;
3044                                         mxlnd_conn_addref(peer->mxp_conn);
3045                                 } else {
3046                                         list_add_tail(&peer->mxp_peers,
3047                                                       &kmxlnd_data.kmx_peers[hash]);
3048                                         write_unlock(&kmxlnd_data.kmx_peers_lock);
3049                                         atomic_inc(&kmxlnd_data.kmx_npeers);
3050                                 }
3051                         } else {
3052                                 ret = mxlnd_conn_alloc(&conn, peer); /* adds 2nd ref */
3053                                 mxlnd_peer_decref(peer); /* drop ref taken above */
3054                                 if (ret != 0) {
3055                                         CDEBUG(D_NETERROR, "Cannot allocate mxp_conn\n");
3056                                         goto cleanup;
3057                                 }
3058                         }
3059                         conn_ref = 1; /* peer/conn_alloc() added ref for this function */
3060                         conn = peer->mxp_conn;
3061                 } else {
3062                         struct kmx_conn *old_conn       = conn;
3063
3064                         /* do not call mx_disconnect() */
3065                         mxlnd_conn_disconnect(old_conn, 0, 0);
3066
3067                         /* the ref for this rx was taken on the old_conn */
3068                         mxlnd_conn_decref(old_conn);
3069
3070                         /* This allocs a conn, points peer->mxp_conn to this one.
3071                          * The old conn is still on the peer->mxp_conns list.
3072                          * As the pending requests complete, they will call
3073                          * conn_decref() which will eventually free it. */
3074                         ret = mxlnd_conn_alloc(&conn, peer);
3075                         if (ret != 0) {
3076                                 CDEBUG(D_NETERROR, "Cannot allocate peer->mxp_conn\n");
3077                                 goto cleanup;
3078                         }
3079                         /* conn_alloc() adds one ref for the peer and one for this function */
3080                         conn_ref = 1;
3081                 }
3082                 spin_lock(&peer->mxp_lock);
3083                 peer->mxp_incarnation = msg->mxm_srcstamp;
3084                 peer->mxp_incompatible = incompatible;
3085                 spin_unlock(&peer->mxp_lock);
3086                 spin_lock(&conn->mxk_lock);
3087                 conn->mxk_incarnation = msg->mxm_srcstamp;
3088                 conn->mxk_status = MXLND_CONN_WAIT;
3089                 spin_unlock(&conn->mxk_lock);
3090
3091                 /* handle_conn_ack() will create the CONN_ACK msg */
3092                 mxlnd_iconnect(peer, MXLND_MASK_ICON_ACK);
3093
3094                 break;
3095
3096         case MXLND_MSG_CONN_ACK:
3097                 if (!lnet_ptlcompat_matchnid(kmxlnd_data.kmx_ni->ni_nid, msg->mxm_dstnid)) {
3098                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3099                                "bad dst nid %s\n", libcfs_nid2str(msg->mxm_srcnid),
3100                                 libcfs_nid2str(msg->mxm_dstnid));
3101                         ret = -1;
3102                         goto failed;
3103                 }
3104                 if (msg->mxm_u.conn_req.mxcrm_queue_depth != *kmxlnd_tunables.kmx_credits) {
3105                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3106                                "incompatible queue depth %d (%d wanted)\n",
3107                                 libcfs_nid2str(msg->mxm_srcnid),
3108                                 msg->mxm_u.conn_req.mxcrm_queue_depth,
3109                                 *kmxlnd_tunables.kmx_credits);
3110                         spin_lock(&conn->mxk_lock);
3111                         conn->mxk_status = MXLND_CONN_FAIL;
3112                         spin_unlock(&conn->mxk_lock);
3113                         incompatible = 1;
3114                         ret = -1;
3115                 }
3116                 if (msg->mxm_u.conn_req.mxcrm_eager_size != MXLND_EAGER_SIZE) {
3117                         CDEBUG(D_NETERROR, "Can't accept CONN_ACK from %s: "
3118                                "incompatible EAGER size %d (%d wanted)\n",
3119                                 libcfs_nid2str(msg->mxm_srcnid),
3120                                 msg->mxm_u.conn_req.mxcrm_eager_size,
3121                                 (int) MXLND_EAGER_SIZE);
3122                         spin_lock(&conn->mxk_lock);
3123                         conn->mxk_status = MXLND_CONN_FAIL;
3124                         spin_unlock(&conn->mxk_lock);
3125                         incompatible = 1;
3126                         ret = -1;
3127                 }
3128                 spin_lock(&peer->mxp_lock);
3129                 peer->mxp_incarnation = msg->mxm_srcstamp;
3130                 peer->mxp_incompatible = incompatible;
3131                 spin_unlock(&peer->mxp_lock);
3132                 spin_lock(&conn->mxk_lock);
3133                 conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
3134                 conn->mxk_outstanding = 0;
3135                 conn->mxk_incarnation = msg->mxm_srcstamp;
3136                 conn->mxk_timeout = 0;
3137                 if (!incompatible) {
3138                         conn->mxk_status = MXLND_CONN_READY;
3139                 }
3140                 spin_unlock(&conn->mxk_lock);
3141                 if (incompatible) mxlnd_conn_disconnect(conn, 0, 1);
3142                 break;
3143
3144         default:
3145                 CDEBUG(D_NETERROR, "Bad MXLND message type %x from %s\n", msg->mxm_type,
3146                                 libcfs_nid2str(rx->mxc_nid));
3147                 ret = -EPROTO;
3148                 break;
3149         }
3150
3151 failed:
3152         if (ret < 0) {
3153                 MXLND_PRINT("setting PEER_CONN_FAILED\n");
3154                 spin_lock(&conn->mxk_lock);
3155                 conn->mxk_status = MXLND_CONN_FAIL;
3156                 spin_unlock(&conn->mxk_lock);
3157         }
3158
3159 cleanup:
3160         if (conn != NULL) {
3161                 spin_lock(&conn->mxk_lock);
3162                 conn->mxk_last_rx = cfs_time_current(); /* jiffies */
3163                 spin_unlock(&conn->mxk_lock);
3164         }
3165
3166         if (repost) {
3167                 /* lnet_parse() failed, etc., repost now */
3168                 mxlnd_put_idle_rx(rx);
3169                 if (conn != NULL && credit == 1) {
3170                         if (type == MXLND_MSG_PUT_DATA) {
3171                                 spin_lock(&conn->mxk_lock);
3172                                 conn->mxk_outstanding++;
3173                                 spin_unlock(&conn->mxk_lock);
3174                         } else if (type != MXLND_MSG_GET_DATA &&
3175                                   (type == MXLND_MSG_EAGER ||
3176                                    type == MXLND_MSG_PUT_REQ ||
3177                                    type == MXLND_MSG_NOOP)) {
3178                                 spin_lock(&conn->mxk_lock);
3179                                 conn->mxk_outstanding++;
3180                                 spin_unlock(&conn->mxk_lock);
3181                         }
3182                 }
3183                 if (conn_ref) mxlnd_conn_decref(conn);
3184                 LASSERT(peer_ref == 0);
3185         }
3186
3187         if (type == MXLND_MSG_PUT_DATA || type == MXLND_MSG_GET_DATA) {
3188                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", bits);
3189         } else {
3190                 CDEBUG(D_NET, "leaving for rx (0x%llx)\n", seq);
3191         }
3192
3193         if (lntmsg[0] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[0], result);
3194         if (lntmsg[1] != NULL) lnet_finalize(kmxlnd_data.kmx_ni, lntmsg[1], result);
3195
3196         if (conn != NULL && credit == 1) mxlnd_check_sends(peer);
3197
3198         return;
3199 }
3200
3201
3202
3203 void
3204 mxlnd_handle_conn_req(struct kmx_peer *peer, mx_status_t status)
3205 {
3206         struct kmx_ctx  *tx     = NULL;
3207         struct kmx_msg  *txmsg   = NULL;
3208         struct kmx_conn *conn   = peer->mxp_conn;
3209
3210         /* a conn ref was taken when calling mx_iconnect(), 
3211          * hold it until CONN_REQ or CONN_ACK completes */
3212
3213         CDEBUG(D_NET, "entering\n");
3214         if (status.code != MX_STATUS_SUCCESS) {
3215                 CDEBUG(D_NETERROR, "mx_iconnect() failed with %s (%d) to %s\n",
3216                         mx_strstatus(status.code), status.code,
3217                         libcfs_nid2str(peer->mxp_nid));
3218                 spin_lock(&conn->mxk_lock);
3219                 conn->mxk_status = MXLND_CONN_FAIL;
3220                 spin_unlock(&conn->mxk_lock);
3221
3222                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3223                         struct kmx_conn *new_conn       = NULL;
3224                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3225                         mxlnd_conn_disconnect(conn, 0, 1);
3226                         mxlnd_conn_alloc(&new_conn, peer); /* adds a ref for this function */
3227                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3228                         spin_lock(&peer->mxp_lock);
3229                         peer->mxp_reconnect_time = 0;
3230                         spin_unlock(&peer->mxp_lock);
3231                 }
3232
3233                 mxlnd_conn_decref(conn);
3234                 return;
3235         }
3236
3237         spin_lock(&conn->mxk_lock);
3238         conn->mxk_epa = status.source;
3239         spin_unlock(&conn->mxk_lock);
3240         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3241          *      we should not need to lock the peer */
3242         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3243
3244         /* mx_iconnect() succeeded, reset delay to 0 */
3245         spin_lock(&peer->mxp_lock);
3246         peer->mxp_reconnect_time = 0;
3247         spin_unlock(&peer->mxp_lock);
3248
3249         /* marshal CONN_REQ msg */
3250         /* we are still using the conn ref from iconnect() - do not take another */
3251         tx = mxlnd_get_idle_tx();
3252         if (tx == NULL) {
3253                 CDEBUG(D_NETERROR, "Can't allocate CONN_REQ tx for %s\n",
3254                                    libcfs_nid2str(peer->mxp_nid));
3255                 spin_lock(&conn->mxk_lock);
3256                 conn->mxk_status = MXLND_CONN_FAIL;
3257                 spin_unlock(&conn->mxk_lock);
3258                 mxlnd_conn_decref(conn);
3259                 return;
3260         }
3261
3262         tx->mxc_peer = peer;
3263         tx->mxc_conn = conn;
3264         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_REQ, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3265         txmsg = tx->mxc_msg;
3266         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3267         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3268         tx->mxc_match = mxlnd_create_match(tx, 0);
3269
3270         CDEBUG(D_NET, "sending MXLND_MSG_CONN_REQ\n");
3271         mxlnd_queue_tx(tx);
3272         return;
3273 }
3274
3275 void
3276 mxlnd_handle_conn_ack(struct kmx_peer *peer, mx_status_t status)
3277 {
3278         struct kmx_ctx  *tx     = NULL;
3279         struct kmx_msg  *txmsg   = NULL;
3280         struct kmx_conn *conn   = peer->mxp_conn;
3281
3282         /* a conn ref was taken when calling mx_iconnect(), 
3283          * hold it until CONN_REQ or CONN_ACK completes */
3284
3285         CDEBUG(D_NET, "entering\n");
3286         if (status.code != MX_STATUS_SUCCESS) {
3287                 CDEBUG(D_NETERROR, "mx_iconnect() failed for CONN_ACK with %s (%d) "
3288                        "to %s mxp_nid = 0x%llx mxp_nic_id = 0x%0llx mxh_ep_id = %d\n",
3289                         mx_strstatus(status.code), status.code,
3290                         libcfs_nid2str(peer->mxp_nid),
3291                         peer->mxp_nid,
3292                         peer->mxp_nic_id,
3293                         peer->mxp_host->mxh_ep_id);
3294                 spin_lock(&conn->mxk_lock);
3295                 conn->mxk_status = MXLND_CONN_FAIL;
3296                 spin_unlock(&conn->mxk_lock);
3297
3298                 if (time_after(jiffies, peer->mxp_reconnect_time + MXLND_WAIT_TIMEOUT)) {
3299                         struct kmx_conn *new_conn       = NULL;
3300                         CDEBUG(D_NETERROR, "timeout, calling conn_disconnect()\n");
3301                         mxlnd_conn_disconnect(conn, 0, 1);
3302                         mxlnd_conn_alloc(&new_conn, peer); /* adds ref for 
3303                                                               this function... */
3304                         mxlnd_conn_decref(new_conn); /* which we no longer need */
3305                         spin_lock(&peer->mxp_lock);
3306                         peer->mxp_reconnect_time = 0;
3307                         spin_unlock(&peer->mxp_lock);
3308                 }
3309
3310                 mxlnd_conn_decref(conn);
3311                 return;
3312         }
3313         spin_lock(&conn->mxk_lock);
3314         conn->mxk_epa = status.source;
3315         if (likely(!peer->mxp_incompatible)) {
3316                 conn->mxk_status = MXLND_CONN_READY;
3317         }
3318         spin_unlock(&conn->mxk_lock);
3319         /* NOTE we are holding a ref on the conn which has a ref on the peer,
3320          *      we should not have to lock the peer */
3321         mx_set_endpoint_addr_context(conn->mxk_epa, (void *) peer);
3322
3323         /* mx_iconnect() succeeded, reset delay to 0 */
3324         spin_lock(&peer->mxp_lock);
3325         peer->mxp_reconnect_time = 0;
3326         spin_unlock(&peer->mxp_lock);
3327
3328         /* marshal CONN_ACK msg */
3329         tx = mxlnd_get_idle_tx();
3330         if (tx == NULL) {
3331                 CDEBUG(D_NETERROR, "Can't allocate CONN_ACK tx for %s\n",
3332                                    libcfs_nid2str(peer->mxp_nid));
3333                 spin_lock(&conn->mxk_lock);
3334                 conn->mxk_status = MXLND_CONN_FAIL;
3335                 spin_unlock(&conn->mxk_lock);
3336                 mxlnd_conn_decref(conn);
3337                 return;
3338         }
3339
3340         tx->mxc_peer = peer;
3341         tx->mxc_conn = conn;
3342         CDEBUG(D_NET, "sending MXLND_MSG_CONN_ACK\n");
3343         mxlnd_init_tx_msg (tx, MXLND_MSG_CONN_ACK, sizeof(kmx_connreq_msg_t), peer->mxp_nid);
3344         txmsg = tx->mxc_msg;
3345         txmsg->mxm_u.conn_req.mxcrm_queue_depth = *kmxlnd_tunables.kmx_credits;
3346         txmsg->mxm_u.conn_req.mxcrm_eager_size = MXLND_EAGER_SIZE;
3347         tx->mxc_match = mxlnd_create_match(tx, 0);
3348
3349         mxlnd_queue_tx(tx);
3350         return;
3351 }
3352
3353 /**
3354  * mxlnd_request_waitd - the MX request completion thread(s)
3355  * @arg - thread id (as a void *)
3356  *
3357  * This thread waits for a MX completion and then completes the request.
3358  * We will create one thread per CPU.
3359  */
3360 int
3361 mxlnd_request_waitd(void *arg)
3362 {
3363         long                    id              = (long) arg;
3364         char                    name[24];
3365         __u32                   result          = 0;
3366         mx_return_t             mxret           = MX_SUCCESS;
3367         mx_status_t             status;
3368         struct kmx_ctx         *ctx             = NULL;
3369         enum kmx_req_state      req_type        = MXLND_REQ_TX;
3370         struct kmx_peer        *peer            = NULL;
3371         struct kmx_conn        *conn            = NULL;
3372 #if MXLND_POLLING
3373         int                     count           = 0;
3374 #endif
3375
3376         memset(name, 0, sizeof(name));
3377         snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", id);
3378         cfs_daemonize(name);
3379         //cfs_block_allsigs();
3380
3381         memset(&status, 0, sizeof(status));
3382
3383         CDEBUG(D_NET, "%s starting\n", name);
3384
3385         while (!kmxlnd_data.kmx_shutdown) {
3386                 mxret = MX_SUCCESS;
3387                 result = 0;
3388 #if MXLND_POLLING
3389                 if (id == 0 && count++ < *kmxlnd_tunables.kmx_polling) {
3390                         mxret = mx_test_any(kmxlnd_data.kmx_endpt, 0LL, 0LL,
3391                                             &status, &result);
3392                 } else {
3393                         count = 0;
3394                         mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3395                                             0LL, 0LL, &status, &result);
3396                 }
3397 #else
3398                 mxret = mx_wait_any(kmxlnd_data.kmx_endpt, MXLND_WAIT_TIMEOUT,
3399                                     0LL, 0LL, &status, &result);
3400 #endif
3401                 if (unlikely(kmxlnd_data.kmx_shutdown))
3402                         break;
3403
3404                 if (result != 1) {
3405                         /* nothing completed... */
3406                         continue;
3407                 }
3408
3409                 if (status.code != MX_STATUS_SUCCESS) {
3410                         CDEBUG(D_NETERROR, "wait_any() failed with %s (%d) with "
3411                                "match_info 0x%llx and length %d\n",
3412                                mx_strstatus(status.code), status.code,
3413                                (u64) status.match_info, status.msg_length);
3414                 }
3415
3416                 /* This may be a mx_iconnect() request completing,
3417                  * check the bit mask for CONN_REQ and CONN_ACK */
3418                 if (status.match_info == MXLND_MASK_ICON_REQ ||
3419                     status.match_info == MXLND_MASK_ICON_ACK) {
3420                         peer = (struct kmx_peer*) status.context;
3421                         if (status.match_info == MXLND_MASK_ICON_REQ) {
3422                                 mxlnd_handle_conn_req(peer, status);
3423                         } else {
3424                                 mxlnd_handle_conn_ack(peer, status);
3425                         }
3426                         continue;
3427                 }
3428
3429                 /* This must be a tx or rx */
3430
3431                 /* NOTE: if this is a RX from the unexpected callback, it may
3432                  * have very little info. If we dropped it in unexpected_recv(),
3433                  * it will not have a context. If so, ignore it. */
3434                 ctx = (struct kmx_ctx *) status.context;
3435                 if (ctx != NULL) {
3436
3437                         req_type = ctx->mxc_type;
3438                         conn = ctx->mxc_conn; /* this may be NULL */
3439                         mxlnd_deq_pending_ctx(ctx);
3440
3441                         /* copy status to ctx->mxc_status */
3442                         memcpy(&ctx->mxc_status, &status, sizeof(status));
3443
3444                         switch (req_type) {
3445                         case MXLND_REQ_TX:
3446                                 mxlnd_handle_tx_completion(ctx);
3447                                 break;
3448                         case MXLND_REQ_RX:
3449                                 mxlnd_handle_rx_completion(ctx);
3450                                 break;
3451                         default:
3452                                 CDEBUG(D_NETERROR, "Unknown ctx type %d\n", req_type);
3453                                 LBUG();
3454                                 break;
3455                         }
3456
3457                         /* FIXME may need to reconsider this */
3458                         /* conn is always set except for the first CONN_REQ rx
3459                          * from a new peer */
3460                         if (!(status.code == MX_STATUS_SUCCESS ||
3461                               status.code == MX_STATUS_TRUNCATED) &&
3462                               conn != NULL) {
3463                                 mxlnd_conn_disconnect(conn, 1, 1);
3464                         }
3465                 }
3466                 CDEBUG(D_NET, "waitd() completed task\n");
3467         }
3468         CDEBUG(D_NET, "%s stopping\n", name);
3469         mxlnd_thread_stop(id);
3470         return 0;
3471 }
3472
3473
3474 unsigned long
3475 mxlnd_check_timeouts(unsigned long now)
3476 {
3477         int                     i               = 0;
3478         int                     disconnect      = 0;
3479         unsigned long           next            = 0;
3480         struct  kmx_peer        *peer           = NULL;
3481         struct  kmx_conn        *conn           = NULL;
3482
3483         read_lock(&kmxlnd_data.kmx_peers_lock);
3484         for (i = 0; i < MXLND_HASH_SIZE; i++) {
3485                 list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3486
3487                         if (unlikely(kmxlnd_data.kmx_shutdown)) {
3488                                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3489                                 return next;
3490                         }
3491
3492                         spin_lock(&peer->mxp_lock);
3493                         conn = peer->mxp_conn;
3494                         if (conn) {
3495                                 mxlnd_conn_addref(conn);
3496                                 spin_unlock(&peer->mxp_lock);
3497                         } else {
3498                                 spin_unlock(&peer->mxp_lock);
3499                                 continue;
3500                         }
3501
3502                         spin_lock(&conn->mxk_lock);
3503
3504                         /* if nothing pending (timeout == 0) or
3505                          * if conn is already disconnected,
3506                          * skip this conn */
3507                         if (conn->mxk_timeout == 0 ||
3508                             conn->mxk_status == MXLND_CONN_DISCONNECT) {
3509                                 spin_unlock(&conn->mxk_lock);
3510                                 mxlnd_conn_decref(conn);
3511                                 continue;
3512                         }
3513
3514                         /* we want to find the timeout that will occur first.
3515                          * if it is in the future, we will sleep until then.
3516                          * if it is in the past, then we will sleep one
3517                          * second and repeat the process. */
3518                         if ((next == 0) || (conn->mxk_timeout < next)) {
3519                                 next = conn->mxk_timeout;
3520                         }
3521
3522                         disconnect = 0;
3523
3524                         if (time_after_eq(now, conn->mxk_timeout))  {
3525                                 disconnect = 1;
3526                         }
3527                         spin_unlock(&conn->mxk_lock);
3528
3529                         if (disconnect) {
3530                                 mxlnd_conn_disconnect(conn, 1, 1);
3531                         }
3532                         mxlnd_conn_decref(conn);
3533                 }
3534         }
3535         read_unlock(&kmxlnd_data.kmx_peers_lock);
3536         if (next == 0) next = now + MXLND_COMM_TIMEOUT;
3537
3538         return next;
3539 }
3540
3541 /**
3542  * mxlnd_timeoutd - enforces timeouts on messages
3543  * @arg - thread id (as a void *)
3544  *
3545  * This thread queries each peer for its earliest timeout. If a peer has timed out,
3546  * it calls mxlnd_conn_disconnect().
3547  *
3548  * After checking for timeouts, try progressing sends (call check_sends()).
3549  */
3550 int
3551 mxlnd_timeoutd(void *arg)
3552 {
3553         int                     i       = 0;
3554         long                    id      = (long) arg;
3555         unsigned long           now     = 0;
3556         unsigned long           next    = 0;
3557         unsigned long           delay   = HZ;
3558         struct kmx_peer        *peer    = NULL;
3559         struct kmx_conn        *conn    = NULL;
3560
3561         cfs_daemonize("mxlnd_timeoutd");
3562         //cfs_block_allsigs();
3563
3564         CDEBUG(D_NET, "timeoutd starting\n");
3565
3566         while (!kmxlnd_data.kmx_shutdown) {
3567
3568                 now = jiffies;
3569                 /* if the next timeout has not arrived, go back to sleep */
3570                 if (time_after(now, next)) {
3571                         next = mxlnd_check_timeouts(now);
3572                 }
3573
3574                read_lock(&kmxlnd_data.kmx_peers_lock);
3575                 for (i = 0; i < MXLND_HASH_SIZE; i++) {
3576                         list_for_each_entry(peer, &kmxlnd_data.kmx_peers[i], mxp_peers) {
3577                                 spin_lock(&peer->mxp_lock);
3578                                 conn = peer->mxp_conn;
3579                                 if (conn) mxlnd_conn_addref(conn); /* take ref... */
3580                                 spin_unlock(&peer->mxp_lock);
3581
3582                                 if (conn == NULL)
3583                                         continue;
3584
3585                                 if (conn->mxk_status != MXLND_CONN_DISCONNECT &&
3586                                     time_after(now, conn->mxk_last_tx + HZ)) {
3587                                         mxlnd_check_sends(peer);
3588                                 }
3589                                 mxlnd_conn_decref(conn); /* until here */
3590                         }
3591                 }
3592                 read_unlock(&kmxlnd_data.kmx_peers_lock);
3593
3594                 mxlnd_sleep(delay);
3595         }
3596         CDEBUG(D_NET, "timeoutd stopping\n");
3597         mxlnd_thread_stop(id);
3598         return 0;
3599 }