Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  */
33 /*
34  * This file is part of Lustre, http://www.lustre.org/
35  * Lustre is a trademark of Sun Microsystems, Inc.
36  *
37  * lnet/klnds/qswlnd/qswlnd.c
38  *
39  * Author: Eric Barton <eric@bartonsoftware.com>
40  */
41
42 #include "qswlnd.h"
43
44
45 lnd_t the_kqswlnd =
46 {
47         .lnd_type       = QSWLND,
48         .lnd_startup    = kqswnal_startup,
49         .lnd_shutdown   = kqswnal_shutdown,
50         .lnd_ctl        = kqswnal_ctl,
51         .lnd_send       = kqswnal_send,
52         .lnd_recv       = kqswnal_recv,
53 };
54
55 kqswnal_data_t          kqswnal_data;
56
57 int
58 kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
59 {
60         unsigned long      flags;
61         struct list_head  *tmp;
62         kqswnal_tx_t      *ktx;
63         lnet_hdr_t        *hdr;
64         int                index = data->ioc_count;
65         int                rc = -ENOENT;
66
67         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
68
69         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
70                 if (index-- != 0)
71                         continue;
72
73                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
74                 hdr = (lnet_hdr_t *)ktx->ktx_buffer;
75
76                 data->ioc_count  = le32_to_cpu(hdr->payload_length);
77                 data->ioc_nid    = le64_to_cpu(hdr->dest_nid);
78                 data->ioc_u64[0] = ktx->ktx_nid;
79                 data->ioc_u32[0] = le32_to_cpu(hdr->type);
80                 data->ioc_u32[1] = ktx->ktx_launcher;
81                 data->ioc_flags  = (list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
82                                    (ktx->ktx_state << 2);
83                 rc = 0;
84                 break;
85         }
86         
87         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
88         return (rc);
89 }
90
91 int
92 kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
93 {
94         struct libcfs_ioctl_data *data = arg;
95
96         LASSERT (ni == kqswnal_data.kqn_ni);
97
98         switch (cmd) {
99         case IOC_LIBCFS_GET_TXDESC:
100                 return (kqswnal_get_tx_desc (data));
101
102         case IOC_LIBCFS_REGISTER_MYNID:
103                 if (data->ioc_nid == ni->ni_nid)
104                         return 0;
105                 
106                 LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
107
108                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
109                        libcfs_nid2str(data->ioc_nid),
110                        libcfs_nid2str(ni->ni_nid));
111                 return 0;
112                 
113         default:
114                 return (-EINVAL);
115         }
116 }
117
118 void
119 kqswnal_shutdown(lnet_ni_t *ni)
120 {
121         unsigned long flags;
122         kqswnal_tx_t *ktx;
123         kqswnal_rx_t *krx;
124         
125         CDEBUG (D_NET, "shutdown\n");
126         LASSERT (ni->ni_data == &kqswnal_data);
127         LASSERT (ni == kqswnal_data.kqn_ni);
128
129         switch (kqswnal_data.kqn_init)
130         {
131         default:
132                 LASSERT (0);
133
134         case KQN_INIT_ALL:
135         case KQN_INIT_DATA:
136                 break;
137         }
138
139         /**********************************************************************/
140         /* Signal the start of shutdown... */
141         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
142         kqswnal_data.kqn_shuttingdown = 1;
143         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
144
145         /**********************************************************************/
146         /* wait for sends that have allocated a tx desc to launch or give up */
147         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
148                 CDEBUG(D_NET, "waiting for %d pending sends\n",
149                        atomic_read (&kqswnal_data.kqn_pending_txs));
150                 cfs_pause(cfs_time_seconds(1));
151         }
152
153         /**********************************************************************/
154         /* close elan comms */
155         /* Shut down receivers first; rx callbacks might try sending... */
156         if (kqswnal_data.kqn_eprx_small != NULL)
157                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
158
159         if (kqswnal_data.kqn_eprx_large != NULL)
160                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
161
162         /* NB ep_free_rcvr() returns only after we've freed off all receive
163          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
164          * means we must have completed any messages we passed to
165          * lnet_parse() */
166
167         if (kqswnal_data.kqn_eptx != NULL)
168                 ep_free_xmtr (kqswnal_data.kqn_eptx);
169
170         /* NB ep_free_xmtr() returns only after all outstanding transmits
171          * have called their callback... */
172         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
173
174         /**********************************************************************/
175         /* flag threads to terminate, wake them and wait for them to die */
176         kqswnal_data.kqn_shuttingdown = 2;
177         wake_up_all (&kqswnal_data.kqn_sched_waitq);
178
179         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
180                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
181                        atomic_read (&kqswnal_data.kqn_nthreads));
182                 cfs_pause(cfs_time_seconds(1));
183         }
184
185         /**********************************************************************/
186         /* No more threads.  No more portals, router or comms callbacks!
187          * I control the horizontals and the verticals...
188          */
189
190         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
191         LASSERT (list_empty (&kqswnal_data.kqn_donetxds));
192         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
193
194         /**********************************************************************/
195         /* Unmap message buffers and free all descriptors and buffers
196          */
197
198         /* FTTB, we need to unmap any remaining mapped memory.  When
199          * ep_dvma_release() get fixed (and releases any mappings in the
200          * region), we can delete all the code from here -------->  */
201
202         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
203                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
204                  * the pre-mapped stuff is still mapped since all tx descs
205                  * must be idle */
206
207                 if (ktx->ktx_buffer != NULL)
208                         ep_dvma_unload(kqswnal_data.kqn_ep,
209                                        kqswnal_data.kqn_ep_tx_nmh,
210                                        &ktx->ktx_ebuffer);
211         }
212
213         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
214                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
215                  * NB subsequent pages get merged */
216
217                 if (krx->krx_kiov[0].kiov_page != NULL)
218                         ep_dvma_unload(kqswnal_data.kqn_ep,
219                                        kqswnal_data.kqn_ep_rx_nmh,
220                                        &krx->krx_elanbuffer);
221         }
222         /* <----------- to here */
223
224         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
225                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
226
227         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
228                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
229
230         while (kqswnal_data.kqn_txds != NULL) {
231                 ktx = kqswnal_data.kqn_txds;
232
233                 if (ktx->ktx_buffer != NULL)
234                         LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
235
236                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
237                 LIBCFS_FREE(ktx, sizeof(*ktx));
238         }
239
240         while (kqswnal_data.kqn_rxds != NULL) {
241                 int           i;
242
243                 krx = kqswnal_data.kqn_rxds;
244                 for (i = 0; i < krx->krx_npages; i++)
245                         if (krx->krx_kiov[i].kiov_page != NULL)
246                                 __free_page (krx->krx_kiov[i].kiov_page);
247
248                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
249                 LIBCFS_FREE(krx, sizeof (*krx));
250         }
251
252         /* resets flags, pointers to NULL etc */
253         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
254
255         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&libcfs_kmemory));
256
257         PORTAL_MODULE_UNUSE;
258 }
259
260 int
261 kqswnal_startup (lnet_ni_t *ni)
262 {
263         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
264         int               rc;
265         int               i;
266         kqswnal_rx_t     *krx;
267         kqswnal_tx_t     *ktx;
268         int               elan_page_idx;
269
270         LASSERT (ni->ni_lnd == &the_kqswlnd);
271
272 #if KQSW_CKSUM
273         if (the_lnet.ln_ptlcompat != 0) {
274                 CERROR("Checksumming version not portals compatible\n");
275                 return -ENODEV;
276         }
277 #endif
278         /* Only 1 instance supported */
279         if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
280                 CERROR ("Only 1 instance supported\n");
281                 return -EPERM;
282         }
283
284         if (ni->ni_interfaces[0] != NULL) {
285                 CERROR("Explicit interface config not supported\n");
286                 return -EPERM;
287         }
288
289         if (*kqswnal_tunables.kqn_credits >=
290             *kqswnal_tunables.kqn_ntxmsgs) {
291                 LCONSOLE_ERROR_MSG(0x12e, "Configuration error: please set "
292                                    "ntxmsgs(%d) > credits(%d)\n",
293                                    *kqswnal_tunables.kqn_ntxmsgs,
294                                    *kqswnal_tunables.kqn_credits);
295         }
296         
297         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&libcfs_kmemory));
298         
299         /* ensure all pointers NULL etc */
300         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
301
302         kqswnal_data.kqn_ni = ni;
303         ni->ni_data = &kqswnal_data;
304         ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
305         ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
306
307         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
308         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
309         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
310
311         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
312         INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
313         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
314
315         spin_lock_init (&kqswnal_data.kqn_sched_lock);
316         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
317
318         /* pointers/lists/locks initialised */
319         kqswnal_data.kqn_init = KQN_INIT_DATA;
320         PORTAL_MODULE_USE;
321         
322         kqswnal_data.kqn_ep = ep_system();
323         if (kqswnal_data.kqn_ep == NULL) {
324                 CERROR("Can't initialise EKC\n");
325                 kqswnal_shutdown(ni);
326                 return (-ENODEV);
327         }
328
329         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
330                 CERROR("Can't get elan ID\n");
331                 kqswnal_shutdown(ni);
332                 return (-ENODEV);
333         }
334
335         kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
336         kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
337
338         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
339         
340         /**********************************************************************/
341         /* Get the transmitter */
342
343         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
344         if (kqswnal_data.kqn_eptx == NULL)
345         {
346                 CERROR ("Can't allocate transmitter\n");
347                 kqswnal_shutdown (ni);
348                 return (-ENOMEM);
349         }
350
351         /**********************************************************************/
352         /* Get the receivers */
353
354         kqswnal_data.kqn_eprx_small = 
355                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
356                                EP_MSG_SVC_PORTALS_SMALL,
357                                *kqswnal_tunables.kqn_ep_envelopes_small);
358         if (kqswnal_data.kqn_eprx_small == NULL)
359         {
360                 CERROR ("Can't install small msg receiver\n");
361                 kqswnal_shutdown (ni);
362                 return (-ENOMEM);
363         }
364
365         kqswnal_data.kqn_eprx_large = 
366                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
367                                EP_MSG_SVC_PORTALS_LARGE,
368                                *kqswnal_tunables.kqn_ep_envelopes_large);
369         if (kqswnal_data.kqn_eprx_large == NULL)
370         {
371                 CERROR ("Can't install large msg receiver\n");
372                 kqswnal_shutdown (ni);
373                 return (-ENOMEM);
374         }
375
376         /**********************************************************************/
377         /* Reserve Elan address space for transmit descriptors NB we may
378          * either send the contents of associated buffers immediately, or
379          * map them for the peer to suck/blow... */
380         kqswnal_data.kqn_ep_tx_nmh = 
381                 ep_dvma_reserve(kqswnal_data.kqn_ep,
382                                 KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
383                                 EP_PERM_WRITE);
384         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
385                 CERROR("Can't reserve tx dma space\n");
386                 kqswnal_shutdown(ni);
387                 return (-ENOMEM);
388         }
389
390         /**********************************************************************/
391         /* Reserve Elan address space for receive buffers */
392         kqswnal_data.kqn_ep_rx_nmh =
393                 ep_dvma_reserve(kqswnal_data.kqn_ep,
394                                 KQSW_NRXMSGPAGES_SMALL * 
395                                 (*kqswnal_tunables.kqn_nrxmsgs_small) +
396                                 KQSW_NRXMSGPAGES_LARGE * 
397                                 (*kqswnal_tunables.kqn_nrxmsgs_large),
398                                 EP_PERM_WRITE);
399         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
400                 CERROR("Can't reserve rx dma space\n");
401                 kqswnal_shutdown(ni);
402                 return (-ENOMEM);
403         }
404
405         /**********************************************************************/
406         /* Allocate/Initialise transmit descriptors */
407
408         kqswnal_data.kqn_txds = NULL;
409         for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
410         {
411                 int           premapped_pages;
412                 int           basepage = i * KQSW_NTXMSGPAGES;
413
414                 LIBCFS_ALLOC (ktx, sizeof(*ktx));
415                 if (ktx == NULL) {
416                         kqswnal_shutdown (ni);
417                         return (-ENOMEM);
418                 }
419
420                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
421                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
422                 kqswnal_data.kqn_txds = ktx;
423
424                 LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
425                 if (ktx->ktx_buffer == NULL)
426                 {
427                         kqswnal_shutdown (ni);
428                         return (-ENOMEM);
429                 }
430
431                 /* Map pre-allocated buffer NOW, to save latency on transmit */
432                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
433                                                         KQSW_TX_BUFFER_SIZE);
434                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
435                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
436                              kqswnal_data.kqn_ep_tx_nmh, basepage,
437                              &all_rails, &ktx->ktx_ebuffer);
438
439                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
440                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
441
442                 INIT_LIST_HEAD (&ktx->ktx_schedlist);
443
444                 ktx->ktx_state = KTX_IDLE;
445                 ktx->ktx_rail = -1;             /* unset rail */
446
447                 list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
448         }
449
450         /**********************************************************************/
451         /* Allocate/Initialise receive descriptors */
452         kqswnal_data.kqn_rxds = NULL;
453         elan_page_idx = 0;
454         for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
455         {
456                 EP_NMD        elanbuffer;
457                 int           j;
458
459                 LIBCFS_ALLOC(krx, sizeof(*krx));
460                 if (krx == NULL) {
461                         kqswnal_shutdown(ni);
462                         return (-ENOMEM);
463                 }
464
465                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
466                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
467                 kqswnal_data.kqn_rxds = krx;
468
469                 if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
470                 {
471                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
472                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
473                 }
474                 else
475                 {
476                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
477                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
478                 }
479
480                 LASSERT (krx->krx_npages > 0);
481                 for (j = 0; j < krx->krx_npages; j++)
482                 {
483                         struct page *page = alloc_page(GFP_KERNEL);
484                         
485                         if (page == NULL) {
486                                 kqswnal_shutdown (ni);
487                                 return (-ENOMEM);
488                         }
489
490                         krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
491                                                           .kiov_offset = 0,
492                                                           .kiov_len = PAGE_SIZE};
493                         LASSERT(page_address(page) != NULL);
494
495                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
496                                      page_address(page),
497                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
498                                      elan_page_idx, &all_rails, &elanbuffer);
499                         
500                         if (j == 0) {
501                                 krx->krx_elanbuffer = elanbuffer;
502                         } else {
503                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
504                                                   &krx->krx_elanbuffer, 
505                                                   &elanbuffer);
506                                 /* NB contiguous mapping */
507                                 LASSERT(rc);
508                         }
509                         elan_page_idx++;
510
511                 }
512         }
513         LASSERT (elan_page_idx ==
514                  (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
515                  (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
516
517         /**********************************************************************/
518         /* Queue receives, now that it's OK to run their completion callbacks */
519
520         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
521                 /* NB this enqueue can allocate/sleep (attr == 0) */
522                 krx->krx_state = KRX_POSTED;
523                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
524                                       &krx->krx_elanbuffer, 0);
525                 if (rc != EP_SUCCESS) {
526                         CERROR ("failed ep_queue_receive %d\n", rc);
527                         kqswnal_shutdown (ni);
528                         return (-EIO);
529                 }
530         }
531
532         /**********************************************************************/
533         /* Spawn scheduling threads */
534         for (i = 0; i < num_online_cpus(); i++) {
535                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
536                 if (rc != 0)
537                 {
538                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
539                         kqswnal_shutdown (ni);
540                         return (-ESRCH);
541                 }
542         }
543
544         kqswnal_data.kqn_init = KQN_INIT_ALL;
545         return (0);
546 }
547
548 void __exit
549 kqswnal_finalise (void)
550 {
551         lnet_unregister_lnd(&the_kqswlnd);
552         kqswnal_tunables_fini();
553 }
554
555 static int __init
556 kqswnal_initialise (void)
557 {
558         int   rc = kqswnal_tunables_init();
559         
560         if (rc != 0)
561                 return rc;
562
563         lnet_register_lnd(&the_kqswlnd);
564         return (0);
565 }
566
567 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
568 MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
569 MODULE_LICENSE("GPL");
570
571 module_init (kqswnal_initialise);
572 module_exit (kqswnal_finalise);