Whamcloud - gitweb
LU-1346 libcfs: replace libcfs wrappers with kernel API
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  */
30 /*
31  * This file is part of Lustre, http://www.lustre.org/
32  * Lustre is a trademark of Sun Microsystems, Inc.
33  *
34  * lnet/klnds/qswlnd/qswlnd.c
35  *
36  * Author: Eric Barton <eric@bartonsoftware.com>
37  */
38
39 #include "qswlnd.h"
40
41
42 lnd_t the_kqswlnd =
43 {
44         .lnd_type       = QSWLND,
45         .lnd_startup    = kqswnal_startup,
46         .lnd_shutdown   = kqswnal_shutdown,
47         .lnd_ctl        = kqswnal_ctl,
48         .lnd_send       = kqswnal_send,
49         .lnd_recv       = kqswnal_recv,
50 };
51
52 kqswnal_data_t          kqswnal_data;
53
54 int
55 kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
56 {
57         unsigned long      flags;
58         cfs_list_t        *tmp;
59         kqswnal_tx_t      *ktx;
60         lnet_hdr_t        *hdr;
61         int                index = data->ioc_count;
62         int                rc = -ENOENT;
63
64         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
65
66         cfs_list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
67                 if (index-- != 0)
68                         continue;
69
70                 ktx = cfs_list_entry (tmp, kqswnal_tx_t, ktx_list);
71                 hdr = (lnet_hdr_t *)ktx->ktx_buffer;
72
73                 data->ioc_count  = le32_to_cpu(hdr->payload_length);
74                 data->ioc_nid    = le64_to_cpu(hdr->dest_nid);
75                 data->ioc_u64[0] = ktx->ktx_nid;
76                 data->ioc_u32[0] = le32_to_cpu(hdr->type);
77                 data->ioc_u32[1] = ktx->ktx_launcher;
78                 data->ioc_flags  =
79                         (cfs_list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
80                                          (ktx->ktx_state << 2);
81                 rc = 0;
82                 break;
83         }
84
85         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
86         return (rc);
87 }
88
89 int
90 kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
91 {
92         struct libcfs_ioctl_data *data = arg;
93
94         LASSERT (ni == kqswnal_data.kqn_ni);
95
96         switch (cmd) {
97         case IOC_LIBCFS_GET_TXDESC:
98                 return (kqswnal_get_tx_desc (data));
99
100         case IOC_LIBCFS_REGISTER_MYNID:
101                 if (data->ioc_nid == ni->ni_nid)
102                         return 0;
103
104                 LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
105
106                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
107                        libcfs_nid2str(data->ioc_nid),
108                        libcfs_nid2str(ni->ni_nid));
109                 return 0;
110
111         default:
112                 return (-EINVAL);
113         }
114 }
115
116 void
117 kqswnal_shutdown(lnet_ni_t *ni)
118 {
119         unsigned long flags;
120         kqswnal_tx_t *ktx;
121         kqswnal_rx_t *krx;
122         
123         CDEBUG (D_NET, "shutdown\n");
124         LASSERT (ni->ni_data == &kqswnal_data);
125         LASSERT (ni == kqswnal_data.kqn_ni);
126
127         switch (kqswnal_data.kqn_init)
128         {
129         default:
130                 LASSERT (0);
131
132         case KQN_INIT_ALL:
133         case KQN_INIT_DATA:
134                 break;
135         }
136
137         /**********************************************************************/
138         /* Signal the start of shutdown... */
139         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
140         kqswnal_data.kqn_shuttingdown = 1;
141         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
142
143         /**********************************************************************/
144         /* wait for sends that have allocated a tx desc to launch or give up */
145         while (cfs_atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
146                 CDEBUG(D_NET, "waiting for %d pending sends\n",
147                        cfs_atomic_read (&kqswnal_data.kqn_pending_txs));
148                 cfs_pause(cfs_time_seconds(1));
149         }
150
151         /**********************************************************************/
152         /* close elan comms */
153         /* Shut down receivers first; rx callbacks might try sending... */
154         if (kqswnal_data.kqn_eprx_small != NULL)
155                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
156
157         if (kqswnal_data.kqn_eprx_large != NULL)
158                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
159
160         /* NB ep_free_rcvr() returns only after we've freed off all receive
161          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
162          * means we must have completed any messages we passed to
163          * lnet_parse() */
164
165         if (kqswnal_data.kqn_eptx != NULL)
166                 ep_free_xmtr (kqswnal_data.kqn_eptx);
167
168         /* NB ep_free_xmtr() returns only after all outstanding transmits
169          * have called their callback... */
170         LASSERT(cfs_list_empty(&kqswnal_data.kqn_activetxds));
171
172         /**********************************************************************/
173         /* flag threads to terminate, wake them and wait for them to die */
174         kqswnal_data.kqn_shuttingdown = 2;
175         cfs_waitq_broadcast (&kqswnal_data.kqn_sched_waitq);
176
177         while (cfs_atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
178                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
179                        cfs_atomic_read (&kqswnal_data.kqn_nthreads));
180                 cfs_pause(cfs_time_seconds(1));
181         }
182
183         /**********************************************************************/
184         /* No more threads.  No more portals, router or comms callbacks!
185          * I control the horizontals and the verticals...
186          */
187
188         LASSERT (cfs_list_empty (&kqswnal_data.kqn_readyrxds));
189         LASSERT (cfs_list_empty (&kqswnal_data.kqn_donetxds));
190         LASSERT (cfs_list_empty (&kqswnal_data.kqn_delayedtxds));
191
192         /**********************************************************************/
193         /* Unmap message buffers and free all descriptors and buffers
194          */
195
196         /* FTTB, we need to unmap any remaining mapped memory.  When
197          * ep_dvma_release() get fixed (and releases any mappings in the
198          * region), we can delete all the code from here -------->  */
199
200         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
201                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
202                  * the pre-mapped stuff is still mapped since all tx descs
203                  * must be idle */
204
205                 if (ktx->ktx_buffer != NULL)
206                         ep_dvma_unload(kqswnal_data.kqn_ep,
207                                        kqswnal_data.kqn_ep_tx_nmh,
208                                        &ktx->ktx_ebuffer);
209         }
210
211         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
212                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
213                  * NB subsequent pages get merged */
214
215                 if (krx->krx_kiov[0].kiov_page != NULL)
216                         ep_dvma_unload(kqswnal_data.kqn_ep,
217                                        kqswnal_data.kqn_ep_rx_nmh,
218                                        &krx->krx_elanbuffer);
219         }
220         /* <----------- to here */
221
222         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
223                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
224
225         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
226                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
227
228         while (kqswnal_data.kqn_txds != NULL) {
229                 ktx = kqswnal_data.kqn_txds;
230
231                 if (ktx->ktx_buffer != NULL)
232                         LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
233
234                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
235                 LIBCFS_FREE(ktx, sizeof(*ktx));
236         }
237
238         while (kqswnal_data.kqn_rxds != NULL) {
239                 int           i;
240
241                 krx = kqswnal_data.kqn_rxds;
242                 for (i = 0; i < krx->krx_npages; i++)
243                         if (krx->krx_kiov[i].kiov_page != NULL)
244                                 __free_page (krx->krx_kiov[i].kiov_page);
245
246                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
247                 LIBCFS_FREE(krx, sizeof (*krx));
248         }
249
250         /* resets flags, pointers to NULL etc */
251         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
252
253         CDEBUG (D_MALLOC, "done kmem %d\n", cfs_atomic_read(&libcfs_kmemory));
254
255         PORTAL_MODULE_UNUSE;
256 }
257
258 int
259 kqswnal_startup (lnet_ni_t *ni)
260 {
261         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
262         int               rc;
263         int               i;
264         kqswnal_rx_t     *krx;
265         kqswnal_tx_t     *ktx;
266         int               elan_page_idx;
267
268         LASSERT (ni->ni_lnd == &the_kqswlnd);
269
270         /* Only 1 instance supported */
271         if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
272                 CERROR ("Only 1 instance supported\n");
273                 return -EPERM;
274         }
275
276         if (ni->ni_interfaces[0] != NULL) {
277                 CERROR("Explicit interface config not supported\n");
278                 return -EPERM;
279         }
280
281         if (*kqswnal_tunables.kqn_credits >=
282             *kqswnal_tunables.kqn_ntxmsgs) {
283                 LCONSOLE_ERROR_MSG(0x12e, "Configuration error: please set "
284                                    "ntxmsgs(%d) > credits(%d)\n",
285                                    *kqswnal_tunables.kqn_ntxmsgs,
286                                    *kqswnal_tunables.kqn_credits);
287         }
288         
289         CDEBUG (D_MALLOC, "start kmem %d\n", cfs_atomic_read(&libcfs_kmemory));
290         
291         /* ensure all pointers NULL etc */
292         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
293
294         kqswnal_data.kqn_ni = ni;
295         ni->ni_data = &kqswnal_data;
296         ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
297         ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
298
299         CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
300         CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
301         spin_lock_init(&kqswnal_data.kqn_idletxd_lock);
302
303         CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
304         CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
305         CFS_INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
306
307         spin_lock_init(&kqswnal_data.kqn_sched_lock);
308         cfs_waitq_init (&kqswnal_data.kqn_sched_waitq);
309
310         /* pointers/lists/locks initialised */
311         kqswnal_data.kqn_init = KQN_INIT_DATA;
312         PORTAL_MODULE_USE;
313         
314         kqswnal_data.kqn_ep = ep_system();
315         if (kqswnal_data.kqn_ep == NULL) {
316                 CERROR("Can't initialise EKC\n");
317                 kqswnal_shutdown(ni);
318                 return (-ENODEV);
319         }
320
321         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
322                 CERROR("Can't get elan ID\n");
323                 kqswnal_shutdown(ni);
324                 return (-ENODEV);
325         }
326
327         kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
328         kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
329
330         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
331         
332         /**********************************************************************/
333         /* Get the transmitter */
334
335         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
336         if (kqswnal_data.kqn_eptx == NULL)
337         {
338                 CERROR ("Can't allocate transmitter\n");
339                 kqswnal_shutdown (ni);
340                 return (-ENOMEM);
341         }
342
343         /**********************************************************************/
344         /* Get the receivers */
345
346         kqswnal_data.kqn_eprx_small = 
347                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
348                                EP_MSG_SVC_PORTALS_SMALL,
349                                *kqswnal_tunables.kqn_ep_envelopes_small);
350         if (kqswnal_data.kqn_eprx_small == NULL)
351         {
352                 CERROR ("Can't install small msg receiver\n");
353                 kqswnal_shutdown (ni);
354                 return (-ENOMEM);
355         }
356
357         kqswnal_data.kqn_eprx_large = 
358                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
359                                EP_MSG_SVC_PORTALS_LARGE,
360                                *kqswnal_tunables.kqn_ep_envelopes_large);
361         if (kqswnal_data.kqn_eprx_large == NULL)
362         {
363                 CERROR ("Can't install large msg receiver\n");
364                 kqswnal_shutdown (ni);
365                 return (-ENOMEM);
366         }
367
368         /**********************************************************************/
369         /* Reserve Elan address space for transmit descriptors NB we may
370          * either send the contents of associated buffers immediately, or
371          * map them for the peer to suck/blow... */
372         kqswnal_data.kqn_ep_tx_nmh = 
373                 ep_dvma_reserve(kqswnal_data.kqn_ep,
374                                 KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
375                                 EP_PERM_WRITE);
376         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
377                 CERROR("Can't reserve tx dma space\n");
378                 kqswnal_shutdown(ni);
379                 return (-ENOMEM);
380         }
381
382         /**********************************************************************/
383         /* Reserve Elan address space for receive buffers */
384         kqswnal_data.kqn_ep_rx_nmh =
385                 ep_dvma_reserve(kqswnal_data.kqn_ep,
386                                 KQSW_NRXMSGPAGES_SMALL * 
387                                 (*kqswnal_tunables.kqn_nrxmsgs_small) +
388                                 KQSW_NRXMSGPAGES_LARGE * 
389                                 (*kqswnal_tunables.kqn_nrxmsgs_large),
390                                 EP_PERM_WRITE);
391         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
392                 CERROR("Can't reserve rx dma space\n");
393                 kqswnal_shutdown(ni);
394                 return (-ENOMEM);
395         }
396
397         /**********************************************************************/
398         /* Allocate/Initialise transmit descriptors */
399
400         kqswnal_data.kqn_txds = NULL;
401         for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
402         {
403                 int           premapped_pages;
404                 int           basepage = i * KQSW_NTXMSGPAGES;
405
406                 LIBCFS_ALLOC (ktx, sizeof(*ktx));
407                 if (ktx == NULL) {
408                         kqswnal_shutdown (ni);
409                         return (-ENOMEM);
410                 }
411
412                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
413                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
414                 kqswnal_data.kqn_txds = ktx;
415
416                 LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
417                 if (ktx->ktx_buffer == NULL)
418                 {
419                         kqswnal_shutdown (ni);
420                         return (-ENOMEM);
421                 }
422
423                 /* Map pre-allocated buffer NOW, to save latency on transmit */
424                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
425                                                         KQSW_TX_BUFFER_SIZE);
426                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
427                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
428                              kqswnal_data.kqn_ep_tx_nmh, basepage,
429                              &all_rails, &ktx->ktx_ebuffer);
430
431                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
432                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
433
434                 CFS_INIT_LIST_HEAD (&ktx->ktx_schedlist);
435
436                 ktx->ktx_state = KTX_IDLE;
437                 ktx->ktx_rail = -1;             /* unset rail */
438
439                 cfs_list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
440         }
441
442         /**********************************************************************/
443         /* Allocate/Initialise receive descriptors */
444         kqswnal_data.kqn_rxds = NULL;
445         elan_page_idx = 0;
446         for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
447         {
448                 EP_NMD        elanbuffer;
449                 int           j;
450
451                 LIBCFS_ALLOC(krx, sizeof(*krx));
452                 if (krx == NULL) {
453                         kqswnal_shutdown(ni);
454                         return (-ENOMEM);
455                 }
456
457                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
458                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
459                 kqswnal_data.kqn_rxds = krx;
460
461                 if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
462                 {
463                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
464                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
465                 }
466                 else
467                 {
468                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
469                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
470                 }
471
472                 LASSERT (krx->krx_npages > 0);
473                 for (j = 0; j < krx->krx_npages; j++)
474                 {
475                         struct page *page = alloc_page(GFP_KERNEL);
476                         
477                         if (page == NULL) {
478                                 kqswnal_shutdown (ni);
479                                 return (-ENOMEM);
480                         }
481
482                         krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
483                                                           .kiov_offset = 0,
484                                                           .kiov_len = PAGE_SIZE};
485                         LASSERT(page_address(page) != NULL);
486
487                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
488                                      page_address(page),
489                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
490                                      elan_page_idx, &all_rails, &elanbuffer);
491                         
492                         if (j == 0) {
493                                 krx->krx_elanbuffer = elanbuffer;
494                         } else {
495                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
496                                                   &krx->krx_elanbuffer, 
497                                                   &elanbuffer);
498                                 /* NB contiguous mapping */
499                                 LASSERT(rc);
500                         }
501                         elan_page_idx++;
502
503                 }
504         }
505         LASSERT (elan_page_idx ==
506                  (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
507                  (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
508
509         /**********************************************************************/
510         /* Queue receives, now that it's OK to run their completion callbacks */
511
512         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
513                 /* NB this enqueue can allocate/sleep (attr == 0) */
514                 krx->krx_state = KRX_POSTED;
515                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
516                                       &krx->krx_elanbuffer, 0);
517                 if (rc != EP_SUCCESS) {
518                         CERROR ("failed ep_queue_receive %d\n", rc);
519                         kqswnal_shutdown (ni);
520                         return (-EIO);
521                 }
522         }
523
524         /**********************************************************************/
525         /* Spawn scheduling threads */
526         for (i = 0; i < cfs_num_online_cpus(); i++) {
527                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
528                 if (rc != 0)
529                 {
530                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
531                         kqswnal_shutdown (ni);
532                         return (-ESRCH);
533                 }
534         }
535
536         kqswnal_data.kqn_init = KQN_INIT_ALL;
537         return (0);
538 }
539
540 void __exit
541 kqswnal_finalise (void)
542 {
543         lnet_unregister_lnd(&the_kqswlnd);
544         kqswnal_tunables_fini();
545 }
546
547 static int __init
548 kqswnal_initialise (void)
549 {
550         int   rc = kqswnal_tunables_init();
551         
552         if (rc != 0)
553                 return rc;
554
555         lnet_register_lnd(&the_kqswlnd);
556         return (0);
557 }
558
559 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
560 MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
561 MODULE_LICENSE("GPL");
562
563 module_init (kqswnal_initialise);
564 module_exit (kqswnal_finalise);