Whamcloud - gitweb
b=16098
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lnet/klnds/qswlnd/qswlnd.c
37  *
38  * Author: Eric Barton <eric@bartonsoftware.com>
39  */
40
41 #include "qswlnd.h"
42
43
44 lnd_t the_kqswlnd =
45 {
46         .lnd_type       = QSWLND,
47         .lnd_startup    = kqswnal_startup,
48         .lnd_shutdown   = kqswnal_shutdown,
49         .lnd_ctl        = kqswnal_ctl,
50         .lnd_send       = kqswnal_send,
51         .lnd_recv       = kqswnal_recv,
52 };
53
54 kqswnal_data_t          kqswnal_data;
55
56 int
57 kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
58 {
59         unsigned long      flags;
60         struct list_head  *tmp;
61         kqswnal_tx_t      *ktx;
62         lnet_hdr_t        *hdr;
63         int                index = data->ioc_count;
64         int                rc = -ENOENT;
65
66         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
67
68         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
69                 if (index-- != 0)
70                         continue;
71
72                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
73                 hdr = (lnet_hdr_t *)ktx->ktx_buffer;
74
75                 data->ioc_count  = le32_to_cpu(hdr->payload_length);
76                 data->ioc_nid    = le64_to_cpu(hdr->dest_nid);
77                 data->ioc_u64[0] = ktx->ktx_nid;
78                 data->ioc_u32[0] = le32_to_cpu(hdr->type);
79                 data->ioc_u32[1] = ktx->ktx_launcher;
80                 data->ioc_flags  = (list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
81                                    (ktx->ktx_state << 2);
82                 rc = 0;
83                 break;
84         }
85         
86         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
87         return (rc);
88 }
89
90 int
91 kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
92 {
93         struct libcfs_ioctl_data *data = arg;
94
95         LASSERT (ni == kqswnal_data.kqn_ni);
96
97         switch (cmd) {
98         case IOC_LIBCFS_GET_TXDESC:
99                 return (kqswnal_get_tx_desc (data));
100
101         case IOC_LIBCFS_REGISTER_MYNID:
102                 if (data->ioc_nid == ni->ni_nid)
103                         return 0;
104                 
105                 LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
106
107                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
108                        libcfs_nid2str(data->ioc_nid),
109                        libcfs_nid2str(ni->ni_nid));
110                 return 0;
111                 
112         default:
113                 return (-EINVAL);
114         }
115 }
116
117 void
118 kqswnal_shutdown(lnet_ni_t *ni)
119 {
120         unsigned long flags;
121         kqswnal_tx_t *ktx;
122         kqswnal_rx_t *krx;
123         
124         CDEBUG (D_NET, "shutdown\n");
125         LASSERT (ni->ni_data == &kqswnal_data);
126         LASSERT (ni == kqswnal_data.kqn_ni);
127
128         switch (kqswnal_data.kqn_init)
129         {
130         default:
131                 LASSERT (0);
132
133         case KQN_INIT_ALL:
134         case KQN_INIT_DATA:
135                 break;
136         }
137
138         /**********************************************************************/
139         /* Signal the start of shutdown... */
140         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
141         kqswnal_data.kqn_shuttingdown = 1;
142         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
143
144         /**********************************************************************/
145         /* wait for sends that have allocated a tx desc to launch or give up */
146         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
147                 CDEBUG(D_NET, "waiting for %d pending sends\n",
148                        atomic_read (&kqswnal_data.kqn_pending_txs));
149                 cfs_pause(cfs_time_seconds(1));
150         }
151
152         /**********************************************************************/
153         /* close elan comms */
154         /* Shut down receivers first; rx callbacks might try sending... */
155         if (kqswnal_data.kqn_eprx_small != NULL)
156                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
157
158         if (kqswnal_data.kqn_eprx_large != NULL)
159                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
160
161         /* NB ep_free_rcvr() returns only after we've freed off all receive
162          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
163          * means we must have completed any messages we passed to
164          * lnet_parse() */
165
166         if (kqswnal_data.kqn_eptx != NULL)
167                 ep_free_xmtr (kqswnal_data.kqn_eptx);
168
169         /* NB ep_free_xmtr() returns only after all outstanding transmits
170          * have called their callback... */
171         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
172
173         /**********************************************************************/
174         /* flag threads to terminate, wake them and wait for them to die */
175         kqswnal_data.kqn_shuttingdown = 2;
176         wake_up_all (&kqswnal_data.kqn_sched_waitq);
177
178         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
179                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
180                        atomic_read (&kqswnal_data.kqn_nthreads));
181                 cfs_pause(cfs_time_seconds(1));
182         }
183
184         /**********************************************************************/
185         /* No more threads.  No more portals, router or comms callbacks!
186          * I control the horizontals and the verticals...
187          */
188
189         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
190         LASSERT (list_empty (&kqswnal_data.kqn_donetxds));
191         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
192
193         /**********************************************************************/
194         /* Unmap message buffers and free all descriptors and buffers
195          */
196
197         /* FTTB, we need to unmap any remaining mapped memory.  When
198          * ep_dvma_release() get fixed (and releases any mappings in the
199          * region), we can delete all the code from here -------->  */
200
201         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
202                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
203                  * the pre-mapped stuff is still mapped since all tx descs
204                  * must be idle */
205
206                 if (ktx->ktx_buffer != NULL)
207                         ep_dvma_unload(kqswnal_data.kqn_ep,
208                                        kqswnal_data.kqn_ep_tx_nmh,
209                                        &ktx->ktx_ebuffer);
210         }
211
212         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
213                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
214                  * NB subsequent pages get merged */
215
216                 if (krx->krx_kiov[0].kiov_page != NULL)
217                         ep_dvma_unload(kqswnal_data.kqn_ep,
218                                        kqswnal_data.kqn_ep_rx_nmh,
219                                        &krx->krx_elanbuffer);
220         }
221         /* <----------- to here */
222
223         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
224                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
225
226         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
227                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
228
229         while (kqswnal_data.kqn_txds != NULL) {
230                 ktx = kqswnal_data.kqn_txds;
231
232                 if (ktx->ktx_buffer != NULL)
233                         LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
234
235                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
236                 LIBCFS_FREE(ktx, sizeof(*ktx));
237         }
238
239         while (kqswnal_data.kqn_rxds != NULL) {
240                 int           i;
241
242                 krx = kqswnal_data.kqn_rxds;
243                 for (i = 0; i < krx->krx_npages; i++)
244                         if (krx->krx_kiov[i].kiov_page != NULL)
245                                 __free_page (krx->krx_kiov[i].kiov_page);
246
247                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
248                 LIBCFS_FREE(krx, sizeof (*krx));
249         }
250
251         /* resets flags, pointers to NULL etc */
252         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
253
254         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&libcfs_kmemory));
255
256         PORTAL_MODULE_UNUSE;
257 }
258
259 int
260 kqswnal_startup (lnet_ni_t *ni)
261 {
262         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
263         int               rc;
264         int               i;
265         kqswnal_rx_t     *krx;
266         kqswnal_tx_t     *ktx;
267         int               elan_page_idx;
268
269         LASSERT (ni->ni_lnd == &the_kqswlnd);
270
271         /* Only 1 instance supported */
272         if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
273                 CERROR ("Only 1 instance supported\n");
274                 return -EPERM;
275         }
276
277         if (ni->ni_interfaces[0] != NULL) {
278                 CERROR("Explicit interface config not supported\n");
279                 return -EPERM;
280         }
281
282         if (*kqswnal_tunables.kqn_credits >=
283             *kqswnal_tunables.kqn_ntxmsgs) {
284                 LCONSOLE_ERROR_MSG(0x12e, "Configuration error: please set "
285                                    "ntxmsgs(%d) > credits(%d)\n",
286                                    *kqswnal_tunables.kqn_ntxmsgs,
287                                    *kqswnal_tunables.kqn_credits);
288         }
289         
290         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&libcfs_kmemory));
291         
292         /* ensure all pointers NULL etc */
293         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
294
295         kqswnal_data.kqn_ni = ni;
296         ni->ni_data = &kqswnal_data;
297         ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
298         ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
299
300         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
301         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
302         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
303
304         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
305         INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
306         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
307
308         spin_lock_init (&kqswnal_data.kqn_sched_lock);
309         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
310
311         /* pointers/lists/locks initialised */
312         kqswnal_data.kqn_init = KQN_INIT_DATA;
313         PORTAL_MODULE_USE;
314         
315         kqswnal_data.kqn_ep = ep_system();
316         if (kqswnal_data.kqn_ep == NULL) {
317                 CERROR("Can't initialise EKC\n");
318                 kqswnal_shutdown(ni);
319                 return (-ENODEV);
320         }
321
322         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
323                 CERROR("Can't get elan ID\n");
324                 kqswnal_shutdown(ni);
325                 return (-ENODEV);
326         }
327
328         kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
329         kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
330
331         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
332         
333         /**********************************************************************/
334         /* Get the transmitter */
335
336         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
337         if (kqswnal_data.kqn_eptx == NULL)
338         {
339                 CERROR ("Can't allocate transmitter\n");
340                 kqswnal_shutdown (ni);
341                 return (-ENOMEM);
342         }
343
344         /**********************************************************************/
345         /* Get the receivers */
346
347         kqswnal_data.kqn_eprx_small = 
348                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
349                                EP_MSG_SVC_PORTALS_SMALL,
350                                *kqswnal_tunables.kqn_ep_envelopes_small);
351         if (kqswnal_data.kqn_eprx_small == NULL)
352         {
353                 CERROR ("Can't install small msg receiver\n");
354                 kqswnal_shutdown (ni);
355                 return (-ENOMEM);
356         }
357
358         kqswnal_data.kqn_eprx_large = 
359                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
360                                EP_MSG_SVC_PORTALS_LARGE,
361                                *kqswnal_tunables.kqn_ep_envelopes_large);
362         if (kqswnal_data.kqn_eprx_large == NULL)
363         {
364                 CERROR ("Can't install large msg receiver\n");
365                 kqswnal_shutdown (ni);
366                 return (-ENOMEM);
367         }
368
369         /**********************************************************************/
370         /* Reserve Elan address space for transmit descriptors NB we may
371          * either send the contents of associated buffers immediately, or
372          * map them for the peer to suck/blow... */
373         kqswnal_data.kqn_ep_tx_nmh = 
374                 ep_dvma_reserve(kqswnal_data.kqn_ep,
375                                 KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
376                                 EP_PERM_WRITE);
377         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
378                 CERROR("Can't reserve tx dma space\n");
379                 kqswnal_shutdown(ni);
380                 return (-ENOMEM);
381         }
382
383         /**********************************************************************/
384         /* Reserve Elan address space for receive buffers */
385         kqswnal_data.kqn_ep_rx_nmh =
386                 ep_dvma_reserve(kqswnal_data.kqn_ep,
387                                 KQSW_NRXMSGPAGES_SMALL * 
388                                 (*kqswnal_tunables.kqn_nrxmsgs_small) +
389                                 KQSW_NRXMSGPAGES_LARGE * 
390                                 (*kqswnal_tunables.kqn_nrxmsgs_large),
391                                 EP_PERM_WRITE);
392         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
393                 CERROR("Can't reserve rx dma space\n");
394                 kqswnal_shutdown(ni);
395                 return (-ENOMEM);
396         }
397
398         /**********************************************************************/
399         /* Allocate/Initialise transmit descriptors */
400
401         kqswnal_data.kqn_txds = NULL;
402         for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
403         {
404                 int           premapped_pages;
405                 int           basepage = i * KQSW_NTXMSGPAGES;
406
407                 LIBCFS_ALLOC (ktx, sizeof(*ktx));
408                 if (ktx == NULL) {
409                         kqswnal_shutdown (ni);
410                         return (-ENOMEM);
411                 }
412
413                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
414                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
415                 kqswnal_data.kqn_txds = ktx;
416
417                 LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
418                 if (ktx->ktx_buffer == NULL)
419                 {
420                         kqswnal_shutdown (ni);
421                         return (-ENOMEM);
422                 }
423
424                 /* Map pre-allocated buffer NOW, to save latency on transmit */
425                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
426                                                         KQSW_TX_BUFFER_SIZE);
427                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
428                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
429                              kqswnal_data.kqn_ep_tx_nmh, basepage,
430                              &all_rails, &ktx->ktx_ebuffer);
431
432                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
433                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
434
435                 INIT_LIST_HEAD (&ktx->ktx_schedlist);
436
437                 ktx->ktx_state = KTX_IDLE;
438                 ktx->ktx_rail = -1;             /* unset rail */
439
440                 list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
441         }
442
443         /**********************************************************************/
444         /* Allocate/Initialise receive descriptors */
445         kqswnal_data.kqn_rxds = NULL;
446         elan_page_idx = 0;
447         for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
448         {
449                 EP_NMD        elanbuffer;
450                 int           j;
451
452                 LIBCFS_ALLOC(krx, sizeof(*krx));
453                 if (krx == NULL) {
454                         kqswnal_shutdown(ni);
455                         return (-ENOMEM);
456                 }
457
458                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
459                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
460                 kqswnal_data.kqn_rxds = krx;
461
462                 if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
463                 {
464                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
465                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
466                 }
467                 else
468                 {
469                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
470                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
471                 }
472
473                 LASSERT (krx->krx_npages > 0);
474                 for (j = 0; j < krx->krx_npages; j++)
475                 {
476                         struct page *page = alloc_page(GFP_KERNEL);
477                         
478                         if (page == NULL) {
479                                 kqswnal_shutdown (ni);
480                                 return (-ENOMEM);
481                         }
482
483                         krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
484                                                           .kiov_offset = 0,
485                                                           .kiov_len = PAGE_SIZE};
486                         LASSERT(page_address(page) != NULL);
487
488                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
489                                      page_address(page),
490                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
491                                      elan_page_idx, &all_rails, &elanbuffer);
492                         
493                         if (j == 0) {
494                                 krx->krx_elanbuffer = elanbuffer;
495                         } else {
496                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
497                                                   &krx->krx_elanbuffer, 
498                                                   &elanbuffer);
499                                 /* NB contiguous mapping */
500                                 LASSERT(rc);
501                         }
502                         elan_page_idx++;
503
504                 }
505         }
506         LASSERT (elan_page_idx ==
507                  (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
508                  (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
509
510         /**********************************************************************/
511         /* Queue receives, now that it's OK to run their completion callbacks */
512
513         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
514                 /* NB this enqueue can allocate/sleep (attr == 0) */
515                 krx->krx_state = KRX_POSTED;
516                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
517                                       &krx->krx_elanbuffer, 0);
518                 if (rc != EP_SUCCESS) {
519                         CERROR ("failed ep_queue_receive %d\n", rc);
520                         kqswnal_shutdown (ni);
521                         return (-EIO);
522                 }
523         }
524
525         /**********************************************************************/
526         /* Spawn scheduling threads */
527         for (i = 0; i < num_online_cpus(); i++) {
528                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
529                 if (rc != 0)
530                 {
531                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
532                         kqswnal_shutdown (ni);
533                         return (-ESRCH);
534                 }
535         }
536
537         kqswnal_data.kqn_init = KQN_INIT_ALL;
538         return (0);
539 }
540
541 void __exit
542 kqswnal_finalise (void)
543 {
544         lnet_unregister_lnd(&the_kqswlnd);
545         kqswnal_tunables_fini();
546 }
547
548 static int __init
549 kqswnal_initialise (void)
550 {
551         int   rc = kqswnal_tunables_init();
552         
553         if (rc != 0)
554                 return rc;
555
556         lnet_register_lnd(&the_kqswlnd);
557         return (0);
558 }
559
560 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
561 MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
562 MODULE_LICENSE("GPL");
563
564 module_init (kqswnal_initialise);
565 module_exit (kqswnal_finalise);