Whamcloud - gitweb
b=20500
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 lnd_t the_kmxlnd = {
47         .lnd_type       = MXLND,
48         .lnd_startup    = mxlnd_startup,
49         .lnd_shutdown   = mxlnd_shutdown,
50         .lnd_ctl        = mxlnd_ctl,
51         .lnd_send       = mxlnd_send,
52         .lnd_recv       = mxlnd_recv,
53 };
54
55 kmx_data_t               kmxlnd_data;
56
57 /**
58  * Free ctx struct
59  * \param ctx  a kmx_peer pointer
60  *
61  * The calling function should remove the ctx from the ctx list first
62  * then free it.
63  */
64 void
65 mxlnd_ctx_free(struct kmx_ctx *ctx)
66 {
67         if (ctx == NULL) return;
68
69         if (ctx->mxc_page != NULL) {
70                 __free_page(ctx->mxc_page);
71                 write_lock(&kmxlnd_data.kmx_global_lock);
72                 kmxlnd_data.kmx_mem_used -= MXLND_EAGER_SIZE;
73                 write_unlock(&kmxlnd_data.kmx_global_lock);
74         }
75
76         if (ctx->mxc_seg_list != NULL) {
77                 LASSERT(ctx->mxc_nseg > 0);
78                 MXLND_FREE(ctx->mxc_seg_list, ctx->mxc_nseg * sizeof(mx_ksegment_t));
79         }
80
81         MXLND_FREE (ctx, sizeof (*ctx));
82         return;
83 }
84
85 /**
86  * Allocate and initialize a new ctx struct
87  * \param ctxp  address of a kmx_ctx pointer
88  *
89  * Returns 0 on success and -EINVAL, -ENOMEM on failure
90  */
91 int
92 mxlnd_ctx_alloc(struct kmx_ctx **ctxp, enum kmx_req_type type)
93 {
94         int             ret     = 0;
95         struct kmx_ctx  *ctx    = NULL;
96
97         if (ctxp == NULL) return -EINVAL;
98
99         MXLND_ALLOC(ctx, sizeof (*ctx));
100         if (ctx == NULL) {
101                 CDEBUG(D_NETERROR, "Cannot allocate ctx\n");
102                 return -ENOMEM;
103         }
104         memset(ctx, 0, sizeof(*ctx));
105         spin_lock_init(&ctx->mxc_lock);
106
107         ctx->mxc_type = type;
108         ctx->mxc_page = alloc_page (GFP_KERNEL);
109         if (ctx->mxc_page == NULL) {
110                 CDEBUG(D_NETERROR, "Can't allocate page\n");
111                 ret = -ENOMEM;
112                 goto failed;
113         }
114         write_lock(&kmxlnd_data.kmx_global_lock);
115         kmxlnd_data.kmx_mem_used += MXLND_EAGER_SIZE;
116         write_unlock(&kmxlnd_data.kmx_global_lock);
117         ctx->mxc_msg = (struct kmx_msg *)((char *)page_address(ctx->mxc_page));
118         ctx->mxc_seg.segment_ptr = MX_PA_TO_U64(lnet_page2phys(ctx->mxc_page));
119         ctx->mxc_state = MXLND_CTX_IDLE;
120
121         *ctxp = ctx;
122         return 0;
123
124 failed:
125         mxlnd_ctx_free(ctx);
126         return ret;
127 }
128
129 /**
130  * Reset ctx struct to the default values
131  * \param ctx  a kmx_ctx pointer
132  */
133 void
134 mxlnd_ctx_init(struct kmx_ctx *ctx)
135 {
136         if (ctx == NULL) return;
137
138         /* do not change mxc_type */
139         ctx->mxc_incarnation = 0;
140         ctx->mxc_deadline = 0;
141         ctx->mxc_state = MXLND_CTX_IDLE;
142         /* ignore mxc_global_list */
143         if (ctx->mxc_list.next != NULL && !list_empty(&ctx->mxc_list)) {
144                 if (ctx->mxc_peer != NULL) spin_lock(&ctx->mxc_lock);
145                 list_del_init(&ctx->mxc_list);
146                 if (ctx->mxc_peer != NULL) spin_unlock(&ctx->mxc_lock);
147         }
148         /* ignore mxc_rx_list */
149         /* ignore mxc_lock */
150         ctx->mxc_nid = 0;
151         ctx->mxc_peer = NULL;
152         ctx->mxc_conn = NULL;
153         /* ignore mxc_msg */
154         /* ignore mxc_page */
155         ctx->mxc_lntmsg[0] = NULL;
156         ctx->mxc_lntmsg[1] = NULL;
157         ctx->mxc_msg_type = 0;
158         ctx->mxc_cookie = 0LL;
159         ctx->mxc_match = 0LL;
160         /* ctx->mxc_seg.segment_ptr points to mxc_page */
161         ctx->mxc_seg.segment_length = 0;
162         if (ctx->mxc_seg_list != NULL) {
163                 LASSERT(ctx->mxc_nseg > 0);
164                 MXLND_FREE(ctx->mxc_seg_list, ctx->mxc_nseg * sizeof(mx_ksegment_t));
165         }
166         ctx->mxc_seg_list = NULL;
167         ctx->mxc_nseg = 0;
168         ctx->mxc_nob = 0;
169         ctx->mxc_mxreq = NULL;
170         memset(&ctx->mxc_status, 0, sizeof(mx_status_t));
171         /* ctx->mxc_get */
172         /* ctx->mxc_put */
173
174         ctx->mxc_msg->mxm_type = 0;
175         ctx->mxc_msg->mxm_credits = 0;
176         ctx->mxc_msg->mxm_nob = 0;
177         ctx->mxc_msg->mxm_seq = 0;
178
179         return;
180 }
181
182 /**
183  * Free kmx_txs and associated pages
184  *
185  * Called from mxlnd_shutdown()
186  */
187 void
188 mxlnd_free_txs(void)
189 {
190         struct kmx_ctx          *tx     = NULL;
191         struct kmx_ctx          *next   = NULL;
192
193         list_for_each_entry_safe(tx, next, &kmxlnd_data.kmx_txs, mxc_global_list) {
194                 list_del_init(&tx->mxc_global_list);
195                 mxlnd_ctx_free(tx);
196         }
197         return;
198 }
199
200 /**
201  * Allocate tx descriptors then stash on txs and idle tx lists
202  *
203  * Called from mxlnd_startup()
204  * returns 0 on success, else -ENOMEM
205  */
206 int
207 mxlnd_init_txs(void)
208 {
209         int             ret     = 0;
210         int             i       = 0;
211         struct kmx_ctx  *tx      = NULL;
212
213         for (i = 0; i < *kmxlnd_tunables.kmx_ntx; i++) {
214                 ret = mxlnd_ctx_alloc(&tx, MXLND_REQ_TX);
215                 if (ret != 0) {
216                         mxlnd_free_txs();
217                         return ret;
218                 }
219                 mxlnd_ctx_init(tx);
220                 /* in startup(), no locks required */
221                 list_add_tail(&tx->mxc_global_list, &kmxlnd_data.kmx_txs);
222                 list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
223         }
224         return 0;
225 }
226
227 /**
228  * Free initial kmx_rx descriptors and associated pages
229  *
230  * Called from mxlnd_shutdown()
231  */
232 void
233 mxlnd_free_rxs(void)
234 {
235         struct kmx_ctx          *rx     = NULL;
236         struct kmx_ctx          *next   = NULL;
237
238         list_for_each_entry_safe(rx, next, &kmxlnd_data.kmx_rxs, mxc_global_list) {
239                 list_del_init(&rx->mxc_global_list);
240                 mxlnd_ctx_free(rx);
241         }
242         return;
243 }
244
245 /**
246  * Allocate initial rx descriptors 
247  *
248  * Called from startup(). We create MXLND_MAX_PEERS plus MXLND_NTX
249  * rx descriptors. We create one for each potential peer to handle 
250  * the initial connect request. We create on for each tx in case the 
251  * send requires a non-eager receive.
252  *
253  * Returns 0 on success, else -ENOMEM
254  */
255 int
256 mxlnd_init_rxs(void)
257 {
258         int             ret     = 0;
259         int             i       = 0;
260         struct kmx_ctx  *rx      = NULL;
261
262         for (i = 0; i < (*kmxlnd_tunables.kmx_ntx + *kmxlnd_tunables.kmx_max_peers); i++) {
263                 ret = mxlnd_ctx_alloc(&rx, MXLND_REQ_RX);
264                 if (ret != 0) {
265                         mxlnd_free_rxs();
266                         return ret;
267                 }
268                 mxlnd_ctx_init(rx);
269                 /* in startup(), no locks required */
270                 list_add_tail(&rx->mxc_global_list, &kmxlnd_data.kmx_rxs);
271                 list_add_tail(&rx->mxc_list, &kmxlnd_data.kmx_rx_idle);
272         }
273         return 0;
274 }
275
276 /**
277  * Free peers
278  *
279  * Called from mxlnd_shutdown()
280  */
281 void
282 mxlnd_free_peers(void)
283 {
284         int                      i      = 0;
285         struct kmx_peer         *peer   = NULL;
286         struct kmx_peer         *next   = NULL;
287
288         for (i = 0; i < MXLND_HASH_SIZE; i++) {
289                 list_for_each_entry_safe(peer, next, &kmxlnd_data.kmx_peers[i], mxp_peers) {
290                         list_del_init(&peer->mxp_peers);
291                         if (peer->mxp_conn) mxlnd_conn_decref(peer->mxp_conn);
292                         mxlnd_peer_decref(peer);
293                 }
294         }
295 }
296
297 /**
298  * Open the endpoint, set our ID, register the EAGER callback
299  * \param ni  the network interface
300  *
301  * Returns 0 on success, -1 on failure
302  */
303 int
304 mxlnd_init_mx(lnet_ni_t *ni)
305 {
306         int                     ret     = 0;
307         int                     hash    = 0;
308         mx_return_t             mxret;
309         mx_endpoint_addr_t      epa;
310         u32                     board   = *kmxlnd_tunables.kmx_board;
311         u32                     ep_id   = *kmxlnd_tunables.kmx_ep_id;
312         u64                     nic_id  = 0LL;
313         char                    *ifname = NULL;
314         __u32                   ip;
315         __u32                   netmask;
316         int                     up      = 0;
317         struct kmx_peer         *peer   = NULL;
318
319         mxret = mx_init();
320         if (mxret != MX_SUCCESS) {
321                 CERROR("mx_init() failed with %s (%d)\n", mx_strerror(mxret), mxret);
322                 return -1;
323         }
324
325         if (ni->ni_interfaces[0] != NULL) {
326                 /* Use the IPoMX interface specified in 'networks=' */
327
328                 CLASSERT (LNET_MAX_INTERFACES > 1);
329                 if (ni->ni_interfaces[1] != NULL) {
330                         CERROR("Multiple interfaces not supported\n");
331                         goto failed_with_init;
332                 }
333
334                 ifname = ni->ni_interfaces[0];
335         } else {
336                 ifname = *kmxlnd_tunables.kmx_default_ipif;
337         }
338
339         ret = libcfs_ipif_query(ifname, &up, &ip, &netmask);
340         if (ret != 0) {
341                 CERROR("Can't query IPoMX interface %s: %d\n",
342                        ifname, ret);
343                 goto failed_with_init;
344         }
345
346         if (!up) {
347                 CERROR("Can't query IPoMX interface %s: it's down\n",
348                        ifname);
349                 goto failed_with_init;
350         }
351
352         mxret = mx_open_endpoint(board, ep_id, MXLND_MSG_MAGIC,
353                                  NULL, 0, &kmxlnd_data.kmx_endpt);
354         if (mxret != MX_SUCCESS) {
355                 CERROR("mx_open_endpoint() failed with %d\n", mxret);
356                 goto failed_with_init;
357         }
358
359         mx_get_endpoint_addr(kmxlnd_data.kmx_endpt, &epa);
360         mx_decompose_endpoint_addr(epa, &nic_id, &ep_id);
361
362         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
363         CDEBUG(D_NET, "My NID is 0x%llx\n", ni->ni_nid);
364
365         ret = mxlnd_peer_alloc(&peer, ni->ni_nid, board, ep_id, nic_id);
366         if (ret != 0) {
367                 goto failed_with_endpoint;
368         }
369         peer->mxp_conn->mxk_epa = epa;
370
371         peer->mxp_incarnation = kmxlnd_data.kmx_incarnation;
372         peer->mxp_incompatible = 0;
373         spin_lock(&peer->mxp_conn->mxk_lock);
374         peer->mxp_conn->mxk_credits = *kmxlnd_tunables.kmx_credits;
375         peer->mxp_conn->mxk_outstanding = 0;
376         peer->mxp_conn->mxk_incarnation = kmxlnd_data.kmx_incarnation;
377         peer->mxp_conn->mxk_timeout = 0;
378         peer->mxp_conn->mxk_status = MXLND_CONN_READY;
379         spin_unlock(&peer->mxp_conn->mxk_lock);
380         mx_set_endpoint_addr_context(peer->mxp_conn->mxk_epa, (void *) peer);
381
382         hash = mxlnd_nid_to_hash(ni->ni_nid);
383         list_add_tail(&peer->mxp_peers, &kmxlnd_data.kmx_peers[hash]);
384         atomic_inc(&kmxlnd_data.kmx_npeers);
385
386         mxlnd_conn_decref(peer->mxp_conn); /* drop 2nd ref taken in peer_alloc */
387
388         kmxlnd_data.kmx_localhost = peer;
389
390         /* this will catch all unexpected receives. */
391         mxret = mx_register_unexp_handler(kmxlnd_data.kmx_endpt,
392                                           (mx_unexp_handler_t) mxlnd_unexpected_recv,
393                                           NULL);
394         if (mxret != MX_SUCCESS) {
395                 CERROR("mx_register_unexp_callback() failed with %s\n", 
396                          mx_strerror(mxret));
397                 goto failed_with_peer;
398         }
399         mxret = mx_set_request_timeout(kmxlnd_data.kmx_endpt, NULL, MXLND_COMM_TIMEOUT/HZ*1000);
400         if (mxret != MX_SUCCESS) {
401                 CERROR("mx_set_request_timeout() failed with %s\n", 
402                         mx_strerror(mxret));
403                 goto failed_with_peer;
404         }
405         return 0;
406
407 failed_with_peer:
408         mxlnd_conn_decref(peer->mxp_conn);
409         mxlnd_conn_decref(peer->mxp_conn);
410         mxlnd_peer_decref(peer);
411 failed_with_endpoint:
412         mx_close_endpoint(kmxlnd_data.kmx_endpt);
413 failed_with_init:
414         mx_finalize();
415         return -1;
416 }
417
418
419 /**
420  * Spawn a kernel thread with this function
421  * \param fn  function pointer
422  * \param arg pointer to the parameter data
423  *
424  * Returns 0 on success and a negative value on failure
425  */
426 int
427 mxlnd_thread_start(int (*fn)(void *arg), void *arg)
428 {
429         int     pid = 0;
430         int     i   = (int) ((long) arg);
431
432         atomic_inc(&kmxlnd_data.kmx_nthreads);
433         init_completion(&kmxlnd_data.kmx_completions[i]);
434
435         pid = kernel_thread (fn, arg, 0);
436         if (pid < 0) {
437                 CERROR("kernel_thread() failed with %d\n", pid);
438                 atomic_dec(&kmxlnd_data.kmx_nthreads);
439         }
440         return pid;
441 }
442
443 /**
444  * Decrement thread counter
445  *
446  * The thread returns 0 when it detects shutdown.
447  * We are simply decrementing the thread counter.
448  */
449 void
450 mxlnd_thread_stop(long id)
451 {
452         int     i       = (int) id;
453         atomic_dec (&kmxlnd_data.kmx_nthreads);
454         complete(&kmxlnd_data.kmx_completions[i]);
455 }
456
457 /**
458  * Stop IO, clean up state
459  * \param ni LNET interface handle
460  *
461  * No calls to the LND should be made after calling this function.
462  */
463 void
464 mxlnd_shutdown (lnet_ni_t *ni)
465 {
466         int     i               = 0;
467         int     nthreads        = 2 + *kmxlnd_tunables.kmx_n_waitd;
468
469         LASSERT (ni == kmxlnd_data.kmx_ni);
470         LASSERT (ni->ni_data == &kmxlnd_data);
471         CDEBUG(D_NET, "in shutdown()\n");
472
473         CDEBUG(D_MALLOC, "before MXLND cleanup: libcfs_kmemory %d "
474                          "kmx_mem_used %ld\n", atomic_read (&libcfs_kmemory), 
475                          kmxlnd_data.kmx_mem_used);
476
477         switch (kmxlnd_data.kmx_init) {
478
479         case MXLND_INIT_ALL:
480
481                 CDEBUG(D_NET, "setting shutdown = 1\n");
482                 /* set shutdown and wakeup request_waitds */
483                 kmxlnd_data.kmx_shutdown = 1;
484                 mb();
485                 mx_wakeup(kmxlnd_data.kmx_endpt);
486                 up(&kmxlnd_data.kmx_tx_queue_sem);
487                 mxlnd_sleep(2 * HZ);
488
489                 read_lock(&kmxlnd_data.kmx_global_lock);
490                 mxlnd_close_matching_conns(LNET_NID_ANY);
491                 read_unlock(&kmxlnd_data.kmx_global_lock);
492
493                 /* fall through */
494
495         case MXLND_INIT_THREADS:
496
497                 CDEBUG(D_NET, "waiting on threads\n");
498                 /* wait for threads to complete */
499                 for (i = 0; i < nthreads; i++) {
500                         wait_for_completion(&kmxlnd_data.kmx_completions[i]);
501                 }
502                 LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
503
504                 CDEBUG(D_NET, "freeing completions\n");
505                 MXLND_FREE(kmxlnd_data.kmx_completions, 
506                             MXLND_NCOMPLETIONS * sizeof(struct completion));
507
508                 /* fall through */
509
510         case MXLND_INIT_MX:
511
512                 CDEBUG(D_NET, "stopping mx\n");
513
514                 /* wakeup waiters if they missed the above.
515                  * close endpoint to stop all traffic.
516                  * this will cancel and cleanup all requests, etc. */
517
518                 mx_wakeup(kmxlnd_data.kmx_endpt);
519                 mx_close_endpoint(kmxlnd_data.kmx_endpt);
520                 mx_finalize();
521
522                 /* fall through */
523
524         case MXLND_INIT_RXS:
525
526                 CDEBUG(D_NET, "freeing rxs\n");
527
528                 /* free all rxs and associated pages */
529                 mxlnd_free_rxs();
530
531                 /* fall through */
532
533         case MXLND_INIT_TXS:
534
535                 CDEBUG(D_NET, "freeing txs\n");
536
537                 /* free all txs and associated pages */
538                 mxlnd_free_txs();
539
540                 /* fall through */
541
542         case MXLND_INIT_DATA:
543
544                 CDEBUG(D_NET, "freeing peers\n");
545
546                 /* free peer list */
547                 mxlnd_free_peers();
548
549                 /* fall through */
550
551         case MXLND_INIT_NOTHING:
552                 break;
553         }
554         CDEBUG(D_NET, "shutdown complete\n");
555
556         CDEBUG(D_MALLOC, "after MXLND cleanup: libcfs_kmemory %d "
557                          "kmx_mem_used %ld\n", atomic_read (&libcfs_kmemory), 
558                          kmxlnd_data.kmx_mem_used);
559
560         kmxlnd_data.kmx_init = MXLND_INIT_NOTHING;
561         PORTAL_MODULE_UNUSE;
562         return;
563 }
564
565 /**
566  * Initialize state, open an endpoint, start IO
567  * \param ni LNET interface handle
568  *
569  * Initialize state, open an endpoint, start monitoring threads.
570  * Should only be called once.
571  */
572 int
573 mxlnd_startup (lnet_ni_t *ni)
574 {
575         int             i               = 0;
576         int             ret             = 0;
577         int             nthreads        = 2; /* for timeoutd and tx_queued */
578         struct timeval  tv;
579
580         LASSERT (ni->ni_lnd == &the_kmxlnd);
581
582         if (kmxlnd_data.kmx_init != MXLND_INIT_NOTHING) {
583                 CERROR("Only 1 instance supported\n");
584                 return -EPERM;
585         }
586         CDEBUG(D_MALLOC, "before MXLND startup: libcfs_kmemory %d "
587                          "kmx_mem_used %ld\n", atomic_read (&libcfs_kmemory), 
588                          kmxlnd_data.kmx_mem_used);
589
590         /* reserve 1/2 of tx for connect request messages */
591         ni->ni_maxtxcredits = *kmxlnd_tunables.kmx_ntx / 2;
592         ni->ni_peertxcredits = *kmxlnd_tunables.kmx_credits;
593         if (ni->ni_maxtxcredits < ni->ni_peertxcredits)
594                 ni->ni_maxtxcredits = ni->ni_peertxcredits;
595
596         PORTAL_MODULE_USE;
597         memset (&kmxlnd_data, 0, sizeof (kmxlnd_data));
598
599         kmxlnd_data.kmx_ni = ni;
600         ni->ni_data = &kmxlnd_data;
601
602         do_gettimeofday(&tv);
603         kmxlnd_data.kmx_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
604         CDEBUG(D_NET, "my incarnation is %lld\n", kmxlnd_data.kmx_incarnation);
605
606         rwlock_init (&kmxlnd_data.kmx_global_lock);
607         spin_lock_init (&kmxlnd_data.kmx_mem_lock);
608
609         INIT_LIST_HEAD (&kmxlnd_data.kmx_conn_req);
610         spin_lock_init (&kmxlnd_data.kmx_conn_lock);
611         sema_init(&kmxlnd_data.kmx_conn_sem, 0);
612
613         for (i = 0; i < MXLND_HASH_SIZE; i++) {
614                 INIT_LIST_HEAD (&kmxlnd_data.kmx_peers[i]);
615         }
616         //rwlock_init (&kmxlnd_data.kmx_peers_lock);
617
618         INIT_LIST_HEAD (&kmxlnd_data.kmx_txs);
619         INIT_LIST_HEAD (&kmxlnd_data.kmx_tx_idle);
620         spin_lock_init (&kmxlnd_data.kmx_tx_idle_lock);
621         kmxlnd_data.kmx_tx_next_cookie = 1;
622         INIT_LIST_HEAD (&kmxlnd_data.kmx_tx_queue);
623         spin_lock_init (&kmxlnd_data.kmx_tx_queue_lock);
624         sema_init(&kmxlnd_data.kmx_tx_queue_sem, 0);
625
626         INIT_LIST_HEAD (&kmxlnd_data.kmx_rxs);
627         spin_lock_init (&kmxlnd_data.kmx_rxs_lock);
628         INIT_LIST_HEAD (&kmxlnd_data.kmx_rx_idle);
629         spin_lock_init (&kmxlnd_data.kmx_rx_idle_lock);
630
631         kmxlnd_data.kmx_init = MXLND_INIT_DATA;
632         /*****************************************************/
633
634         ret = mxlnd_init_txs();
635         if (ret != 0) {
636                 CERROR("Can't alloc tx descs: %d\n", ret);
637                 goto failed;
638         }
639         kmxlnd_data.kmx_init = MXLND_INIT_TXS;
640         /*****************************************************/
641
642         ret = mxlnd_init_rxs();
643         if (ret != 0) {
644                 CERROR("Can't alloc rx descs: %d\n", ret);
645                 goto failed;
646         }
647         kmxlnd_data.kmx_init = MXLND_INIT_RXS;
648         /*****************************************************/
649
650         ret = mxlnd_init_mx(ni);
651         if (ret != 0) {
652                 CERROR("Can't init mx\n");
653                 goto failed;
654         }
655
656         kmxlnd_data.kmx_init = MXLND_INIT_MX;
657         /*****************************************************/
658
659         /* start threads */
660
661         nthreads += *kmxlnd_tunables.kmx_n_waitd;
662         MXLND_ALLOC (kmxlnd_data.kmx_completions,
663                      nthreads * sizeof(struct completion));
664         if (kmxlnd_data.kmx_completions == NULL) {
665                 CERROR("failed to alloc kmxlnd_data.kmx_completions\n");
666                 goto failed;
667         }
668         memset(kmxlnd_data.kmx_completions, 0, 
669                nthreads * sizeof(struct completion));
670
671         {
672                 CDEBUG(D_NET, "using %d %s in mx_wait_any()\n",
673                         *kmxlnd_tunables.kmx_n_waitd, 
674                         *kmxlnd_tunables.kmx_n_waitd == 1 ? "thread" : "threads");
675
676                 for (i = 0; i < *kmxlnd_tunables.kmx_n_waitd; i++) {
677                         ret = mxlnd_thread_start(mxlnd_request_waitd, (void*)((long)i));
678                         if (ret < 0) {
679                                 CERROR("Starting mxlnd_request_waitd[%d] failed with %d\n", i, ret);
680                                 kmxlnd_data.kmx_shutdown = 1;
681                                 mx_wakeup(kmxlnd_data.kmx_endpt);
682                                 for (--i; i >= 0; i--) {
683                                         wait_for_completion(&kmxlnd_data.kmx_completions[i]);
684                                 }
685                                 LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
686                                 MXLND_FREE(kmxlnd_data.kmx_completions, 
687                                         MXLND_NCOMPLETIONS * sizeof(struct completion));
688
689                                 goto failed;
690                         }
691                 }
692                 ret = mxlnd_thread_start(mxlnd_tx_queued, (void*)((long)i++));
693                 if (ret < 0) {
694                         CERROR("Starting mxlnd_tx_queued failed with %d\n", ret);
695                         kmxlnd_data.kmx_shutdown = 1;
696                         mx_wakeup(kmxlnd_data.kmx_endpt);
697                         for (--i; i >= 0; i--) {
698                                 wait_for_completion(&kmxlnd_data.kmx_completions[i]);
699                         }
700                         LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
701                         MXLND_FREE(kmxlnd_data.kmx_completions, 
702                                 MXLND_NCOMPLETIONS * sizeof(struct completion));
703                         goto failed;
704                 }
705                 ret = mxlnd_thread_start(mxlnd_timeoutd, (void*)((long)i++));
706                 if (ret < 0) {
707                         CERROR("Starting mxlnd_timeoutd failed with %d\n", ret);
708                         kmxlnd_data.kmx_shutdown = 1;
709                         mx_wakeup(kmxlnd_data.kmx_endpt);
710                         up(&kmxlnd_data.kmx_tx_queue_sem);
711                         for (--i; i >= 0; i--) {
712                                 wait_for_completion(&kmxlnd_data.kmx_completions[i]);
713                         }
714                         LASSERT(atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
715                         MXLND_FREE(kmxlnd_data.kmx_completions, 
716                                 MXLND_NCOMPLETIONS * sizeof(struct completion));
717                         goto failed;
718                 }
719         }
720
721         kmxlnd_data.kmx_init = MXLND_INIT_THREADS;
722         /*****************************************************/
723
724         kmxlnd_data.kmx_init = MXLND_INIT_ALL;
725         CDEBUG(D_MALLOC, "startup complete (kmx_mem_used %ld)\n", kmxlnd_data.kmx_mem_used);
726
727         return 0;
728 failed:
729         CERROR("mxlnd_startup failed\n");
730         mxlnd_shutdown(ni);
731         return (-ENETDOWN);
732 }
733
734 static int mxlnd_init(void)
735 {
736         lnet_register_lnd(&the_kmxlnd);
737         return 0;
738 }
739
740 static void mxlnd_exit(void)
741 {
742         lnet_unregister_lnd(&the_kmxlnd);
743         return;
744 }
745
746 module_init(mxlnd_init);
747 module_exit(mxlnd_exit);
748
749 MODULE_LICENSE("GPL");
750 MODULE_AUTHOR("Myricom, Inc. - help@myri.com");
751 MODULE_DESCRIPTION("Kernel MyrinetExpress LND");
752 MODULE_VERSION("0.6.0");