Whamcloud - gitweb
LU-1346 libcfs: cleanup libcfs primitive (linux-prim.h)
[fs/lustre-release.git] / lnet / klnds / mxlnd / mxlnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, Intel Corporation.
31  *
32  * Copyright (C) 2006 Myricom, Inc.
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  *
38  * lnet/klnds/mxlnd/mxlnd.c
39  *
40  * Author: Eric Barton <eric@bartonsoftware.com>
41  * Author: Scott Atchley <atchley at myri.com>
42  */
43
44 #include "mxlnd.h"
45
46 lnd_t the_kmxlnd = {
47         .lnd_type       = MXLND,
48         .lnd_startup    = mxlnd_startup,
49         .lnd_shutdown   = mxlnd_shutdown,
50         .lnd_ctl        = mxlnd_ctl,
51         .lnd_send       = mxlnd_send,
52         .lnd_recv       = mxlnd_recv,
53 };
54
55 kmx_data_t               kmxlnd_data;
56
57 void
58 mxlnd_free_pages(kmx_pages_t *p)
59 {
60         int     npages = p->mxg_npages;
61         int     i;
62
63         CDEBUG(D_MALLOC, "freeing %d pages\n", npages);
64
65         for (i = 0; i < npages; i++) {
66                 if (p->mxg_pages[i] != NULL) {
67                         __free_page(p->mxg_pages[i]);
68                         spin_lock(&kmxlnd_data.kmx_mem_lock);
69                         kmxlnd_data.kmx_mem_used -= PAGE_SIZE;
70                         spin_unlock(&kmxlnd_data.kmx_mem_lock);
71                 }
72         }
73
74         MXLND_FREE(p, offsetof(kmx_pages_t, mxg_pages[npages]));
75 }
76
77 int
78 mxlnd_alloc_pages(kmx_pages_t **pp, int npages)
79 {
80         kmx_pages_t    *p       = NULL;
81         int             i       = 0;
82
83         CDEBUG(D_MALLOC, "allocing %d pages\n", npages);
84
85         MXLND_ALLOC(p, offsetof(kmx_pages_t, mxg_pages[npages]));
86         if (p == NULL) {
87                 CERROR("Can't allocate descriptor for %d pages\n", npages);
88                 return -ENOMEM;
89         }
90
91         memset(p, 0, offsetof(kmx_pages_t, mxg_pages[npages]));
92         p->mxg_npages = npages;
93
94         for (i = 0; i < npages; i++) {
95                 p->mxg_pages[i] = alloc_page(GFP_KERNEL);
96                 if (p->mxg_pages[i] == NULL) {
97                         CERROR("Can't allocate page %d of %d\n", i, npages);
98                         mxlnd_free_pages(p);
99                         return -ENOMEM;
100                 }
101                 spin_lock(&kmxlnd_data.kmx_mem_lock);
102                 kmxlnd_data.kmx_mem_used += PAGE_SIZE;
103                 spin_unlock(&kmxlnd_data.kmx_mem_lock);
104         }
105
106         *pp = p;
107         return 0;
108 }
109
110 /**
111  * mxlnd_ctx_init - reset ctx struct to the default values
112  * @ctx - a kmx_ctx pointer
113  */
114 void
115 mxlnd_ctx_init(kmx_ctx_t *ctx)
116 {
117         if (ctx == NULL) return;
118
119         /* do not change mxc_type */
120         ctx->mxc_incarnation = 0;
121         ctx->mxc_deadline = 0;
122         ctx->mxc_state = MXLND_CTX_IDLE;
123         if (!cfs_list_empty(&ctx->mxc_list))
124                 cfs_list_del_init(&ctx->mxc_list);
125         /* ignore mxc_rx_list */
126         if (ctx->mxc_type == MXLND_REQ_TX) {
127                 ctx->mxc_nid = 0;
128                 ctx->mxc_peer = NULL;
129                 ctx->mxc_conn = NULL;
130         }
131         /* ignore mxc_msg */
132         ctx->mxc_lntmsg[0] = NULL;
133         ctx->mxc_lntmsg[1] = NULL;
134         ctx->mxc_msg_type = 0;
135         ctx->mxc_cookie = 0LL;
136         ctx->mxc_match = 0LL;
137         /* ctx->mxc_seg.segment_ptr points to backing page */
138         ctx->mxc_seg.segment_length = 0;
139         if (ctx->mxc_seg_list != NULL) {
140                 LASSERT(ctx->mxc_nseg > 0);
141                 MXLND_FREE(ctx->mxc_seg_list, ctx->mxc_nseg * sizeof(mx_ksegment_t));
142         }
143         ctx->mxc_seg_list = NULL;
144         ctx->mxc_nseg = 0;
145         ctx->mxc_nob = 0;
146         memset(&ctx->mxc_mxreq, 0, sizeof(mx_request_t));
147         memset(&ctx->mxc_status, 0, sizeof(mx_status_t));
148         ctx->mxc_errno = 0;
149         /* ctx->mxc_get */
150         /* ctx->mxc_put */
151
152         ctx->mxc_msg->mxm_type = 0;
153         ctx->mxc_msg->mxm_credits = 0;
154         ctx->mxc_msg->mxm_nob = 0;
155
156         return;
157 }
158
159 /**
160  * mxlnd_free_txs - free kmx_txs and associated pages
161  *
162  * Called from mxlnd_shutdown()
163  */
164 void
165 mxlnd_free_txs(void)
166 {
167         int             i       = 0;
168         kmx_ctx_t       *tx     = NULL;
169
170         if (kmxlnd_data.kmx_tx_pages) {
171                 for (i = 0; i < MXLND_TX_MSGS(); i++) {
172                         tx = &kmxlnd_data.kmx_txs[i];
173                         if (tx->mxc_seg_list != NULL) {
174                                 LASSERT(tx->mxc_nseg > 0);
175                                 MXLND_FREE(tx->mxc_seg_list,
176                                            tx->mxc_nseg *
177                                            sizeof(*tx->mxc_seg_list));
178                         }
179                 }
180                 MXLND_FREE(kmxlnd_data.kmx_txs,
181                             MXLND_TX_MSGS() * sizeof(kmx_ctx_t));
182                 mxlnd_free_pages(kmxlnd_data.kmx_tx_pages);
183         }
184
185         return;
186 }
187
188 /**
189  * mxlnd_init_txs - allocate tx descriptors then stash on txs and idle tx lists
190  *
191  * Called from mxlnd_startup()
192  * returns 0 on success, else -ENOMEM
193  */
194 int
195 mxlnd_init_txs(void)
196 {
197         int             ret     = 0;
198         int             i       = 0;
199         int             ipage   = 0;
200         int             offset  = 0;
201         void           *addr    = NULL;
202         kmx_ctx_t      *tx      = NULL;
203         kmx_pages_t    *pages   = NULL;
204         struct page    *page    = NULL;
205
206         /* pre-mapped messages are not bigger than 1 page */
207         CLASSERT(MXLND_MSG_SIZE <= PAGE_SIZE);
208
209         /* No fancy arithmetic when we do the buffer calculations */
210         CLASSERT (PAGE_SIZE % MXLND_MSG_SIZE == 0);
211
212         ret = mxlnd_alloc_pages(&pages, MXLND_TX_MSG_PAGES());
213         if (ret != 0) {
214                 CERROR("Can't allocate tx pages\n");
215                 return -ENOMEM;
216         }
217         kmxlnd_data.kmx_tx_pages = pages;
218
219         MXLND_ALLOC(kmxlnd_data.kmx_txs, MXLND_TX_MSGS() * sizeof(kmx_ctx_t));
220         if (&kmxlnd_data.kmx_txs == NULL) {
221                 CERROR("Can't allocate %d tx descriptors\n", MXLND_TX_MSGS());
222                 mxlnd_free_pages(pages);
223                 return -ENOMEM;
224         }
225
226         memset(kmxlnd_data.kmx_txs, 0, MXLND_TX_MSGS() * sizeof(kmx_ctx_t));
227
228         for (i = 0; i < MXLND_TX_MSGS(); i++) {
229
230                 tx = &kmxlnd_data.kmx_txs[i];
231                 tx->mxc_type = MXLND_REQ_TX;
232
233                 CFS_INIT_LIST_HEAD(&tx->mxc_list);
234
235                 /* map mxc_msg to page */
236                 page = pages->mxg_pages[ipage];
237                 addr = page_address(page);
238                 LASSERT(addr != NULL);
239                 tx->mxc_msg = (kmx_msg_t *)(addr + offset);
240                 tx->mxc_seg.segment_ptr = MX_PA_TO_U64(virt_to_phys(tx->mxc_msg));
241
242                 mxlnd_ctx_init(tx);
243
244                 offset += MXLND_MSG_SIZE;
245                 LASSERT (offset <= PAGE_SIZE);
246
247                 if (offset == PAGE_SIZE) {
248                         offset = 0;
249                         ipage++;
250                         LASSERT (ipage <= MXLND_TX_MSG_PAGES());
251                 }
252
253                 /* in startup(), no locks required */
254                 cfs_list_add_tail(&tx->mxc_list, &kmxlnd_data.kmx_tx_idle);
255         }
256
257         return 0;
258 }
259
260 /**
261  * mxlnd_free_peers - free peers
262  *
263  * Called from mxlnd_shutdown()
264  */
265 void
266 mxlnd_free_peers(void)
267 {
268         int             i      = 0;
269         int             count  = 0;
270         kmx_peer_t     *peer   = NULL;
271         kmx_peer_t     *next   = NULL;
272
273         for (i = 0; i < MXLND_HASH_SIZE; i++) {
274                 cfs_list_for_each_entry_safe(peer, next,
275                                              &kmxlnd_data.kmx_peers[i],
276                                              mxp_list) {
277                         cfs_list_del_init(&peer->mxp_list);
278                         if (peer->mxp_conn) mxlnd_conn_decref(peer->mxp_conn);
279                         mxlnd_peer_decref(peer);
280                         count++;
281                 }
282         }
283         CDEBUG(D_NET, "%s: freed %d peers\n", __func__, count);
284 }
285
286 /**
287  * mxlnd_init_mx - open the endpoint, set our ID, register the EAGER callback
288  * @ni - the network interface
289  *
290  * Returns 0 on success, -1 on failure
291  */
292 int
293 mxlnd_init_mx(lnet_ni_t *ni)
294 {
295         int                     ret     = 0;
296         mx_return_t             mxret;
297         u32                     board   = *kmxlnd_tunables.kmx_board;
298         u32                     ep_id   = *kmxlnd_tunables.kmx_ep_id;
299         u64                     nic_id  = 0LL;
300         char                    *ifname = NULL;
301         __u32                   ip;
302         __u32                   netmask;
303         int                     if_up   = 0;
304
305         mxret = mx_init();
306         if (mxret != MX_SUCCESS) {
307                 CERROR("mx_init() failed with %s (%d)\n", mx_strerror(mxret), mxret);
308                 return -1;
309         }
310
311         if (ni->ni_interfaces[0] != NULL) {
312                 /* Use the IPoMX interface specified in 'networks=' */
313
314                 CLASSERT (LNET_MAX_INTERFACES > 1);
315                 if (ni->ni_interfaces[1] != NULL) {
316                         CERROR("Multiple interfaces not supported\n");
317                         goto failed_with_init;
318                 }
319
320                 ifname = ni->ni_interfaces[0];
321         } else {
322                 ifname = *kmxlnd_tunables.kmx_default_ipif;
323         }
324
325         ret = libcfs_ipif_query(ifname, &if_up, &ip, &netmask);
326         if (ret != 0) {
327                 CERROR("Can't query IPoMX interface %s: %d\n",
328                        ifname, ret);
329                 goto failed_with_init;
330         }
331
332         if (!if_up) {
333                 CERROR("Can't query IPoMX interface %s: it's down\n",
334                        ifname);
335                 goto failed_with_init;
336         }
337
338         mxret = mx_open_endpoint(board, ep_id, MXLND_MSG_MAGIC,
339                                  NULL, 0, &kmxlnd_data.kmx_endpt);
340         if (mxret != MX_SUCCESS) {
341                 CERROR("mx_open_endpoint() failed with %d\n", mxret);
342                 goto failed_with_init;
343         }
344
345         mx_get_endpoint_addr(kmxlnd_data.kmx_endpt, &kmxlnd_data.kmx_epa);
346         mx_decompose_endpoint_addr(kmxlnd_data.kmx_epa, &nic_id, &ep_id);
347         mxret = mx_connect(kmxlnd_data.kmx_endpt, nic_id, ep_id,
348                            MXLND_MSG_MAGIC, MXLND_CONNECT_TIMEOUT/HZ*1000,
349                            &kmxlnd_data.kmx_epa);
350         if (mxret != MX_SUCCESS) {
351                 CNETERR("unable to connect to myself (%s)\n", mx_strerror(mxret));
352                 goto failed_with_endpoint;
353         }
354
355         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ip);
356         CDEBUG(D_NET, "My NID is 0x%llx\n", ni->ni_nid);
357
358         /* this will catch all unexpected receives. */
359         mxret = mx_register_unexp_handler(kmxlnd_data.kmx_endpt,
360                                           (mx_unexp_handler_t) mxlnd_unexpected_recv,
361                                           NULL);
362         if (mxret != MX_SUCCESS) {
363                 CERROR("mx_register_unexp_callback() failed with %s\n",
364                          mx_strerror(mxret));
365                 goto failed_with_endpoint;
366         }
367         mxret = mx_set_request_timeout(kmxlnd_data.kmx_endpt, NULL,
368                                        MXLND_COMM_TIMEOUT/HZ*1000);
369         if (mxret != MX_SUCCESS) {
370                 CERROR("mx_set_request_timeout() failed with %s\n",
371                         mx_strerror(mxret));
372                 goto failed_with_endpoint;
373         }
374         return 0;
375
376 failed_with_endpoint:
377         mx_close_endpoint(kmxlnd_data.kmx_endpt);
378 failed_with_init:
379         mx_finalize();
380         return -1;
381 }
382
383
384 /**
385  * mxlnd_thread_start - spawn a kernel thread with this function
386  * @fn - function pointer
387  * @arg - pointer to the parameter data
388  * @name - name of new thread
389  *
390  * Returns 0 on success and a negative value on failure
391  */
392 int
393 mxlnd_thread_start(int (*fn)(void *arg), void *arg, char *name)
394 {
395         cfs_task *task;
396         int     i   = (int) ((long) arg);
397
398         cfs_atomic_inc(&kmxlnd_data.kmx_nthreads);
399         init_completion(&kmxlnd_data.kmx_completions[i]);
400
401         task = kthread_run(fn, arg, name);
402         if (IS_ERR(task)) {
403                 CERROR("cfs_create_thread() failed with %d\n", PTR_ERR(task));
404                 cfs_atomic_dec(&kmxlnd_data.kmx_nthreads);
405         }
406         return PTR_ERR(task);
407 }
408
409 /**
410  * mxlnd_thread_stop - decrement thread counter
411  *
412  * The thread returns 0 when it detects shutdown.
413  * We are simply decrementing the thread counter.
414  */
415 void
416 mxlnd_thread_stop(long id)
417 {
418         int     i       = (int) id;
419         cfs_atomic_dec (&kmxlnd_data.kmx_nthreads);
420         complete(&kmxlnd_data.kmx_completions[i]);
421 }
422
423 /**
424  * mxlnd_shutdown - stop IO, clean up state
425  * @ni - LNET interface handle
426  *
427  * No calls to the LND should be made after calling this function.
428  */
429 void
430 mxlnd_shutdown (lnet_ni_t *ni)
431 {
432         int                     i               = 0;
433         int                     nthreads        = MXLND_NDAEMONS
434                                                   + *kmxlnd_tunables.kmx_n_waitd;
435
436         LASSERT (ni == kmxlnd_data.kmx_ni);
437         LASSERT (ni->ni_data == &kmxlnd_data);
438         CDEBUG(D_NET, "in shutdown()\n");
439
440         CDEBUG(D_MALLOC, "before MXLND cleanup: libcfs_kmemory %d "
441                          "kmx_mem_used %ld\n", cfs_atomic_read(&libcfs_kmemory),
442                          kmxlnd_data.kmx_mem_used);
443
444
445         CDEBUG(D_NET, "setting shutdown = 1\n");
446         cfs_atomic_set(&kmxlnd_data.kmx_shutdown, 1);
447
448         switch (kmxlnd_data.kmx_init) {
449
450         case MXLND_INIT_ALL:
451
452                 /* calls write_[un]lock(kmx_global_lock) */
453                 mxlnd_del_peer(LNET_NID_ANY);
454
455                 /* wakeup request_waitds */
456                 mx_wakeup(kmxlnd_data.kmx_endpt);
457                 up(&kmxlnd_data.kmx_tx_queue_sem);
458                 up(&kmxlnd_data.kmx_conn_sem);
459                 mxlnd_sleep(2 * HZ);
460
461                 /* fall through */
462
463         case MXLND_INIT_THREADS:
464
465                 CDEBUG(D_NET, "waiting on threads\n");
466                 /* wait for threads to complete */
467                 for (i = 0; i < nthreads; i++) {
468                         wait_for_completion(&kmxlnd_data.kmx_completions[i]);
469                 }
470                 LASSERT(cfs_atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
471
472                 CDEBUG(D_NET, "freeing completions\n");
473                 MXLND_FREE(kmxlnd_data.kmx_completions,
474                             nthreads * sizeof(struct completion));
475
476                 /* fall through */
477
478         case MXLND_INIT_MX:
479
480                 CDEBUG(D_NET, "stopping mx\n");
481
482                 /* no peers left, close the endpoint */
483                 mx_close_endpoint(kmxlnd_data.kmx_endpt);
484                 mx_finalize();
485
486                 /* fall through */
487
488         case MXLND_INIT_TXS:
489
490                 CDEBUG(D_NET, "freeing txs\n");
491
492                 /* free all txs and associated pages */
493                 mxlnd_free_txs();
494
495                 /* fall through */
496
497         case MXLND_INIT_DATA:
498
499                 CDEBUG(D_NET, "freeing peers\n");
500
501                 /* peers should be gone, but check again */
502                 mxlnd_free_peers();
503
504                 /* conn zombies should be gone, but check again */
505                 mxlnd_free_conn_zombies();
506
507                 /* fall through */
508
509         case MXLND_INIT_NOTHING:
510                 break;
511         }
512         CDEBUG(D_NET, "shutdown complete\n");
513
514         CDEBUG(D_MALLOC, "after MXLND cleanup: libcfs_kmemory %d "
515                          "kmx_mem_used %ld\n", cfs_atomic_read(&libcfs_kmemory),
516                          kmxlnd_data.kmx_mem_used);
517
518         kmxlnd_data.kmx_init = MXLND_INIT_NOTHING;
519         module_put(THIS_MODULE);
520         return;
521 }
522
523 /**
524  * mxlnd_startup - initialize state, open an endpoint, start IO
525  * @ni - LNET interface handle
526  *
527  * Initialize state, open an endpoint, start monitoring threads.
528  * Should only be called once.
529  */
530 int
531 mxlnd_startup (lnet_ni_t *ni)
532 {
533         int             i               = 0;
534         int             ret             = 0;
535         int             nthreads        = MXLND_NDAEMONS /* tx_queued, timeoutd, connd */
536                                           + *kmxlnd_tunables.kmx_n_waitd;
537         struct timeval  tv;
538
539         LASSERT (ni->ni_lnd == &the_kmxlnd);
540
541         if (kmxlnd_data.kmx_init != MXLND_INIT_NOTHING) {
542                 CERROR("Only 1 instance supported\n");
543                 return -EPERM;
544         }
545         CDEBUG(D_MALLOC, "before MXLND startup: libcfs_kmemory %d "
546                          "kmx_mem_used %ld\n", cfs_atomic_read(&libcfs_kmemory),
547                          kmxlnd_data.kmx_mem_used);
548
549         ni->ni_maxtxcredits = MXLND_TX_MSGS();
550         ni->ni_peertxcredits = *kmxlnd_tunables.kmx_peercredits;
551         if (ni->ni_maxtxcredits < ni->ni_peertxcredits)
552                 ni->ni_maxtxcredits = ni->ni_peertxcredits;
553
554         try_module_get(THIS_MODULE);
555         memset (&kmxlnd_data, 0, sizeof (kmxlnd_data));
556
557         kmxlnd_data.kmx_ni = ni;
558         ni->ni_data = &kmxlnd_data;
559
560         do_gettimeofday(&tv);
561         kmxlnd_data.kmx_incarnation = (((__u64)tv.tv_sec) * 1000000) + tv.tv_usec;
562         CDEBUG(D_NET, "my incarnation is %llu\n", kmxlnd_data.kmx_incarnation);
563
564         rwlock_init (&kmxlnd_data.kmx_global_lock);
565         spin_lock_init (&kmxlnd_data.kmx_mem_lock);
566
567         CFS_INIT_LIST_HEAD (&kmxlnd_data.kmx_conn_reqs);
568         CFS_INIT_LIST_HEAD (&kmxlnd_data.kmx_conn_zombies);
569         CFS_INIT_LIST_HEAD (&kmxlnd_data.kmx_orphan_msgs);
570         spin_lock_init (&kmxlnd_data.kmx_conn_lock);
571         sema_init(&kmxlnd_data.kmx_conn_sem, 0);
572
573         for (i = 0; i < MXLND_HASH_SIZE; i++) {
574                 CFS_INIT_LIST_HEAD (&kmxlnd_data.kmx_peers[i]);
575         }
576
577         CFS_INIT_LIST_HEAD (&kmxlnd_data.kmx_tx_idle);
578         spin_lock_init (&kmxlnd_data.kmx_tx_idle_lock);
579         kmxlnd_data.kmx_tx_next_cookie = 1;
580         CFS_INIT_LIST_HEAD (&kmxlnd_data.kmx_tx_queue);
581         spin_lock_init (&kmxlnd_data.kmx_tx_queue_lock);
582         sema_init(&kmxlnd_data.kmx_tx_queue_sem, 0);
583
584         kmxlnd_data.kmx_init = MXLND_INIT_DATA;
585         /*****************************************************/
586
587         ret = mxlnd_init_txs();
588         if (ret != 0) {
589                 CERROR("Can't alloc tx descs: %d\n", ret);
590                 goto failed;
591         }
592         kmxlnd_data.kmx_init = MXLND_INIT_TXS;
593         /*****************************************************/
594
595         ret = mxlnd_init_mx(ni);
596         if (ret != 0) {
597                 CERROR("Can't init mx\n");
598                 goto failed;
599         }
600
601         kmxlnd_data.kmx_init = MXLND_INIT_MX;
602         /*****************************************************/
603
604         /* start threads */
605
606         MXLND_ALLOC(kmxlnd_data.kmx_completions,
607                      nthreads * sizeof(struct completion));
608         if (kmxlnd_data.kmx_completions == NULL) {
609                 CERROR("failed to alloc kmxlnd_data.kmx_completions\n");
610                 goto failed;
611         }
612         memset(kmxlnd_data.kmx_completions, 0,
613                nthreads * sizeof(struct completion));
614
615         CDEBUG(D_NET, "using %d %s in mx_wait_any()\n",
616                 *kmxlnd_tunables.kmx_n_waitd,
617                 *kmxlnd_tunables.kmx_n_waitd == 1 ? "thread" : "threads");
618
619         for (i = 0; i < *kmxlnd_tunables.kmx_n_waitd; i++) {
620                 char                    name[24];
621                 memset(name, 0, sizeof(name));
622                 snprintf(name, sizeof(name), "mxlnd_request_waitd_%02ld", i);
623                 ret = mxlnd_thread_start(mxlnd_request_waitd, (void*)((long)i));
624                 if (ret < 0) {
625                         CERROR("Starting mxlnd_request_waitd[%d] "
626                                 "failed with %d\n", i, ret);
627                         cfs_atomic_set(&kmxlnd_data.kmx_shutdown, 1);
628                         mx_wakeup(kmxlnd_data.kmx_endpt);
629                         for (--i; i >= 0; i--) {
630                                 wait_for_completion(&kmxlnd_data.kmx_completions[i]);
631                         }
632                         LASSERT(cfs_atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
633                         MXLND_FREE(kmxlnd_data.kmx_completions,
634                                 nthreads * sizeof(struct completion));
635
636                         goto failed;
637                 }
638         }
639         ret = mxlnd_thread_start(mxlnd_tx_queued, (void *)((long)i++),
640                                  "mxlnd_tx_queued");
641         if (ret < 0) {
642                 CERROR("Starting mxlnd_tx_queued failed with %d\n", ret);
643                 cfs_atomic_set(&kmxlnd_data.kmx_shutdown, 1);
644                 mx_wakeup(kmxlnd_data.kmx_endpt);
645                 for (--i; i >= 0; i--) {
646                         wait_for_completion(&kmxlnd_data.kmx_completions[i]);
647                 }
648                 LASSERT(cfs_atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
649                 MXLND_FREE(kmxlnd_data.kmx_completions,
650                         nthreads * sizeof(struct completion));
651                 goto failed;
652         }
653         ret = mxlnd_thread_start(mxlnd_timeoutd, (void *)((long)i++),
654                                  "mxlnd_timeoutd");
655         if (ret < 0) {
656                 CERROR("Starting mxlnd_timeoutd failed with %d\n", ret);
657                 cfs_atomic_set(&kmxlnd_data.kmx_shutdown, 1);
658                 mx_wakeup(kmxlnd_data.kmx_endpt);
659                 up(&kmxlnd_data.kmx_tx_queue_sem);
660                 for (--i; i >= 0; i--) {
661                         wait_for_completion(&kmxlnd_data.kmx_completions[i]);
662                 }
663                 LASSERT(cfs_atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
664                 MXLND_FREE(kmxlnd_data.kmx_completions,
665                         nthreads * sizeof(struct completion));
666                 goto failed;
667         }
668         ret = mxlnd_thread_start(mxlnd_connd, (void *)((long)i++),
669                                  "mxlnd_connd");
670         if (ret < 0) {
671                 CERROR("Starting mxlnd_connd failed with %d\n", ret);
672                 cfs_atomic_set(&kmxlnd_data.kmx_shutdown, 1);
673                 mx_wakeup(kmxlnd_data.kmx_endpt);
674                 up(&kmxlnd_data.kmx_tx_queue_sem);
675                 for (--i; i >= 0; i--) {
676                         wait_for_completion(&kmxlnd_data.kmx_completions[i]);
677                 }
678                 LASSERT(cfs_atomic_read(&kmxlnd_data.kmx_nthreads) == 0);
679                 MXLND_FREE(kmxlnd_data.kmx_completions,
680                         nthreads * sizeof(struct completion));
681                 goto failed;
682         }
683
684         kmxlnd_data.kmx_init = MXLND_INIT_THREADS;
685         /*****************************************************/
686
687         kmxlnd_data.kmx_init = MXLND_INIT_ALL;
688         CDEBUG(D_MALLOC, "startup complete (kmx_mem_used %ld)\n", kmxlnd_data.kmx_mem_used);
689
690         return 0;
691 failed:
692         CERROR("mxlnd_startup failed\n");
693         mxlnd_shutdown(ni);
694         return (-ENETDOWN);
695 }
696
697 static int mxlnd_init(void)
698 {
699         lnet_register_lnd(&the_kmxlnd);
700         return 0;
701 }
702
703 static void mxlnd_exit(void)
704 {
705         lnet_unregister_lnd(&the_kmxlnd);
706         return;
707 }
708
709 module_init(mxlnd_init);
710 module_exit(mxlnd_exit);
711
712 MODULE_LICENSE("GPL");
713 MODULE_AUTHOR("Myricom, Inc. - help@myri.com");
714 MODULE_DESCRIPTION("Kernel MyrinetExpress LND");
715 MODULE_VERSION("0.6.0");