Whamcloud - gitweb
This update includes Hex error ID's and checksum calculation for console error messages.
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1 /*
2  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
3  *   Author: Eric Barton <eric@bartonsoftware.com>
4  *
5  * This file is part of Portals, http://www.lustre.org
6  *
7  * Portals is free software; you can redistribute it and/or
8  * modify it under the terms of version 2 of the GNU General Public
9  * License as published by the Free Software Foundation.
10  *
11  * Portals is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Portals; if not, write to the Free Software
18  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
19  *
20  */
21
22 #include "qswlnd.h"
23
24
25 lnd_t the_kqswlnd =
26 {
27         .lnd_type       = QSWLND,
28         .lnd_startup    = kqswnal_startup,
29         .lnd_shutdown   = kqswnal_shutdown,
30         .lnd_ctl        = kqswnal_ctl,
31         .lnd_send       = kqswnal_send,
32         .lnd_recv       = kqswnal_recv,
33 };
34
35 kqswnal_data_t          kqswnal_data;
36
37 int
38 kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
39 {
40         unsigned long      flags;
41         struct list_head  *tmp;
42         kqswnal_tx_t      *ktx;
43         lnet_hdr_t        *hdr;
44         int                index = data->ioc_count;
45         int                rc = -ENOENT;
46
47         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
48
49         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
50                 if (index-- != 0)
51                         continue;
52
53                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
54                 hdr = (lnet_hdr_t *)ktx->ktx_buffer;
55
56                 data->ioc_count  = le32_to_cpu(hdr->payload_length);
57                 data->ioc_nid    = le64_to_cpu(hdr->dest_nid);
58                 data->ioc_u64[0] = ktx->ktx_nid;
59                 data->ioc_u32[0] = le32_to_cpu(hdr->type);
60                 data->ioc_u32[1] = ktx->ktx_launcher;
61                 data->ioc_flags  = (list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
62                                    (ktx->ktx_state << 2);
63                 rc = 0;
64                 break;
65         }
66         
67         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
68         return (rc);
69 }
70
71 int
72 kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
73 {
74         struct libcfs_ioctl_data *data = arg;
75
76         LASSERT (ni == kqswnal_data.kqn_ni);
77
78         switch (cmd) {
79         case IOC_LIBCFS_GET_TXDESC:
80                 return (kqswnal_get_tx_desc (data));
81
82         case IOC_LIBCFS_REGISTER_MYNID:
83                 if (data->ioc_nid == ni->ni_nid)
84                         return 0;
85                 
86                 LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
87
88                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
89                        libcfs_nid2str(data->ioc_nid),
90                        libcfs_nid2str(ni->ni_nid));
91                 return 0;
92                 
93         default:
94                 return (-EINVAL);
95         }
96 }
97
98 void
99 kqswnal_shutdown(lnet_ni_t *ni)
100 {
101         unsigned long flags;
102         kqswnal_tx_t *ktx;
103         kqswnal_rx_t *krx;
104         
105         CDEBUG (D_NET, "shutdown\n");
106         LASSERT (ni->ni_data == &kqswnal_data);
107         LASSERT (ni == kqswnal_data.kqn_ni);
108
109         switch (kqswnal_data.kqn_init)
110         {
111         default:
112                 LASSERT (0);
113
114         case KQN_INIT_ALL:
115         case KQN_INIT_DATA:
116                 break;
117         }
118
119         /**********************************************************************/
120         /* Signal the start of shutdown... */
121         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
122         kqswnal_data.kqn_shuttingdown = 1;
123         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
124
125         /**********************************************************************/
126         /* wait for sends that have allocated a tx desc to launch or give up */
127         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
128                 CDEBUG(D_NET, "waiting for %d pending sends\n",
129                        atomic_read (&kqswnal_data.kqn_pending_txs));
130                 cfs_pause(cfs_time_seconds(1));
131         }
132
133         /**********************************************************************/
134         /* close elan comms */
135         /* Shut down receivers first; rx callbacks might try sending... */
136         if (kqswnal_data.kqn_eprx_small != NULL)
137                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
138
139         if (kqswnal_data.kqn_eprx_large != NULL)
140                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
141
142         /* NB ep_free_rcvr() returns only after we've freed off all receive
143          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
144          * means we must have completed any messages we passed to
145          * lnet_parse() */
146
147         if (kqswnal_data.kqn_eptx != NULL)
148                 ep_free_xmtr (kqswnal_data.kqn_eptx);
149
150         /* NB ep_free_xmtr() returns only after all outstanding transmits
151          * have called their callback... */
152         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
153
154         /**********************************************************************/
155         /* flag threads to terminate, wake them and wait for them to die */
156         kqswnal_data.kqn_shuttingdown = 2;
157         wake_up_all (&kqswnal_data.kqn_sched_waitq);
158
159         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
160                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
161                        atomic_read (&kqswnal_data.kqn_nthreads));
162                 cfs_pause(cfs_time_seconds(1));
163         }
164
165         /**********************************************************************/
166         /* No more threads.  No more portals, router or comms callbacks!
167          * I control the horizontals and the verticals...
168          */
169
170         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
171         LASSERT (list_empty (&kqswnal_data.kqn_donetxds));
172         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
173
174         /**********************************************************************/
175         /* Unmap message buffers and free all descriptors and buffers
176          */
177
178         /* FTTB, we need to unmap any remaining mapped memory.  When
179          * ep_dvma_release() get fixed (and releases any mappings in the
180          * region), we can delete all the code from here -------->  */
181
182         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
183                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
184                  * the pre-mapped stuff is still mapped since all tx descs
185                  * must be idle */
186
187                 if (ktx->ktx_buffer != NULL)
188                         ep_dvma_unload(kqswnal_data.kqn_ep,
189                                        kqswnal_data.kqn_ep_tx_nmh,
190                                        &ktx->ktx_ebuffer);
191         }
192
193         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
194                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
195                  * NB subsequent pages get merged */
196
197                 if (krx->krx_kiov[0].kiov_page != NULL)
198                         ep_dvma_unload(kqswnal_data.kqn_ep,
199                                        kqswnal_data.kqn_ep_rx_nmh,
200                                        &krx->krx_elanbuffer);
201         }
202         /* <----------- to here */
203
204         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
205                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
206
207         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
208                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
209
210         while (kqswnal_data.kqn_txds != NULL) {
211                 ktx = kqswnal_data.kqn_txds;
212
213                 if (ktx->ktx_buffer != NULL)
214                         LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
215
216                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
217                 LIBCFS_FREE(ktx, sizeof(*ktx));
218         }
219
220         while (kqswnal_data.kqn_rxds != NULL) {
221                 int           i;
222
223                 krx = kqswnal_data.kqn_rxds;
224                 for (i = 0; i < krx->krx_npages; i++)
225                         if (krx->krx_kiov[i].kiov_page != NULL)
226                                 __free_page (krx->krx_kiov[i].kiov_page);
227
228                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
229                 LIBCFS_FREE(krx, sizeof (*krx));
230         }
231
232         /* resets flags, pointers to NULL etc */
233         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
234
235         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&libcfs_kmemory));
236
237         PORTAL_MODULE_UNUSE;
238 }
239
240 int
241 kqswnal_startup (lnet_ni_t *ni)
242 {
243         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
244         int               rc;
245         int               i;
246         kqswnal_rx_t     *krx;
247         kqswnal_tx_t     *ktx;
248         int               elan_page_idx;
249
250         LASSERT (ni->ni_lnd == &the_kqswlnd);
251
252 #if KQSW_CKSUM
253         if (the_lnet.ln_ptlcompat != 0) {
254                 CERROR("Checksumming version not portals compatible\n");
255                 return -ENODEV;
256         }
257 #endif
258         /* Only 1 instance supported */
259         if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
260                 CERROR ("Only 1 instance supported\n");
261                 return -EPERM;
262         }
263
264         if (ni->ni_interfaces[0] != NULL) {
265                 CERROR("Explicit interface config not supported\n");
266                 return -EPERM;
267         }
268
269         if (*kqswnal_tunables.kqn_credits >=
270             *kqswnal_tunables.kqn_ntxmsgs) {
271                 LCONSOLE_ERROR(0x12e, "Configuration error: please set "
272                                "ntxmsgs(%d) > credits(%d)\n",
273                                *kqswnal_tunables.kqn_ntxmsgs,
274                                *kqswnal_tunables.kqn_credits);
275         }
276         
277         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&libcfs_kmemory));
278         
279         /* ensure all pointers NULL etc */
280         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
281
282         kqswnal_data.kqn_ni = ni;
283         ni->ni_data = &kqswnal_data;
284         ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
285         ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
286
287         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
288         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
289         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
290
291         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
292         INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
293         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
294
295         spin_lock_init (&kqswnal_data.kqn_sched_lock);
296         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
297
298         /* pointers/lists/locks initialised */
299         kqswnal_data.kqn_init = KQN_INIT_DATA;
300         PORTAL_MODULE_USE;
301         
302         kqswnal_data.kqn_ep = ep_system();
303         if (kqswnal_data.kqn_ep == NULL) {
304                 CERROR("Can't initialise EKC\n");
305                 kqswnal_shutdown(ni);
306                 return (-ENODEV);
307         }
308
309         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
310                 CERROR("Can't get elan ID\n");
311                 kqswnal_shutdown(ni);
312                 return (-ENODEV);
313         }
314
315         kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
316         kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
317
318         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
319         
320         /**********************************************************************/
321         /* Get the transmitter */
322
323         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
324         if (kqswnal_data.kqn_eptx == NULL)
325         {
326                 CERROR ("Can't allocate transmitter\n");
327                 kqswnal_shutdown (ni);
328                 return (-ENOMEM);
329         }
330
331         /**********************************************************************/
332         /* Get the receivers */
333
334         kqswnal_data.kqn_eprx_small = 
335                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
336                                EP_MSG_SVC_PORTALS_SMALL,
337                                *kqswnal_tunables.kqn_ep_envelopes_small);
338         if (kqswnal_data.kqn_eprx_small == NULL)
339         {
340                 CERROR ("Can't install small msg receiver\n");
341                 kqswnal_shutdown (ni);
342                 return (-ENOMEM);
343         }
344
345         kqswnal_data.kqn_eprx_large = 
346                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
347                                EP_MSG_SVC_PORTALS_LARGE,
348                                *kqswnal_tunables.kqn_ep_envelopes_large);
349         if (kqswnal_data.kqn_eprx_large == NULL)
350         {
351                 CERROR ("Can't install large msg receiver\n");
352                 kqswnal_shutdown (ni);
353                 return (-ENOMEM);
354         }
355
356         /**********************************************************************/
357         /* Reserve Elan address space for transmit descriptors NB we may
358          * either send the contents of associated buffers immediately, or
359          * map them for the peer to suck/blow... */
360         kqswnal_data.kqn_ep_tx_nmh = 
361                 ep_dvma_reserve(kqswnal_data.kqn_ep,
362                                 KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
363                                 EP_PERM_WRITE);
364         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
365                 CERROR("Can't reserve tx dma space\n");
366                 kqswnal_shutdown(ni);
367                 return (-ENOMEM);
368         }
369
370         /**********************************************************************/
371         /* Reserve Elan address space for receive buffers */
372         kqswnal_data.kqn_ep_rx_nmh =
373                 ep_dvma_reserve(kqswnal_data.kqn_ep,
374                                 KQSW_NRXMSGPAGES_SMALL * 
375                                 (*kqswnal_tunables.kqn_nrxmsgs_small) +
376                                 KQSW_NRXMSGPAGES_LARGE * 
377                                 (*kqswnal_tunables.kqn_nrxmsgs_large),
378                                 EP_PERM_WRITE);
379         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
380                 CERROR("Can't reserve rx dma space\n");
381                 kqswnal_shutdown(ni);
382                 return (-ENOMEM);
383         }
384
385         /**********************************************************************/
386         /* Allocate/Initialise transmit descriptors */
387
388         kqswnal_data.kqn_txds = NULL;
389         for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
390         {
391                 int           premapped_pages;
392                 int           basepage = i * KQSW_NTXMSGPAGES;
393
394                 LIBCFS_ALLOC (ktx, sizeof(*ktx));
395                 if (ktx == NULL) {
396                         kqswnal_shutdown (ni);
397                         return (-ENOMEM);
398                 }
399
400                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
401                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
402                 kqswnal_data.kqn_txds = ktx;
403
404                 LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
405                 if (ktx->ktx_buffer == NULL)
406                 {
407                         kqswnal_shutdown (ni);
408                         return (-ENOMEM);
409                 }
410
411                 /* Map pre-allocated buffer NOW, to save latency on transmit */
412                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
413                                                         KQSW_TX_BUFFER_SIZE);
414                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
415                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
416                              kqswnal_data.kqn_ep_tx_nmh, basepage,
417                              &all_rails, &ktx->ktx_ebuffer);
418
419                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
420                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
421
422                 INIT_LIST_HEAD (&ktx->ktx_schedlist);
423
424                 ktx->ktx_state = KTX_IDLE;
425                 ktx->ktx_rail = -1;             /* unset rail */
426
427                 list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
428         }
429
430         /**********************************************************************/
431         /* Allocate/Initialise receive descriptors */
432         kqswnal_data.kqn_rxds = NULL;
433         elan_page_idx = 0;
434         for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
435         {
436                 EP_NMD        elanbuffer;
437                 int           j;
438
439                 LIBCFS_ALLOC(krx, sizeof(*krx));
440                 if (krx == NULL) {
441                         kqswnal_shutdown(ni);
442                         return (-ENOMEM);
443                 }
444
445                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
446                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
447                 kqswnal_data.kqn_rxds = krx;
448
449                 if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
450                 {
451                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
452                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
453                 }
454                 else
455                 {
456                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
457                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
458                 }
459
460                 LASSERT (krx->krx_npages > 0);
461                 for (j = 0; j < krx->krx_npages; j++)
462                 {
463                         struct page *page = alloc_page(GFP_KERNEL);
464                         
465                         if (page == NULL) {
466                                 kqswnal_shutdown (ni);
467                                 return (-ENOMEM);
468                         }
469
470                         krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
471                                                           .kiov_offset = 0,
472                                                           .kiov_len = PAGE_SIZE};
473                         LASSERT(page_address(page) != NULL);
474
475                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
476                                      page_address(page),
477                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
478                                      elan_page_idx, &all_rails, &elanbuffer);
479                         
480                         if (j == 0) {
481                                 krx->krx_elanbuffer = elanbuffer;
482                         } else {
483                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
484                                                   &krx->krx_elanbuffer, 
485                                                   &elanbuffer);
486                                 /* NB contiguous mapping */
487                                 LASSERT(rc);
488                         }
489                         elan_page_idx++;
490
491                 }
492         }
493         LASSERT (elan_page_idx ==
494                  (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
495                  (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
496
497         /**********************************************************************/
498         /* Queue receives, now that it's OK to run their completion callbacks */
499
500         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
501                 /* NB this enqueue can allocate/sleep (attr == 0) */
502                 krx->krx_state = KRX_POSTED;
503                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
504                                       &krx->krx_elanbuffer, 0);
505                 if (rc != EP_SUCCESS) {
506                         CERROR ("failed ep_queue_receive %d\n", rc);
507                         kqswnal_shutdown (ni);
508                         return (-EIO);
509                 }
510         }
511
512         /**********************************************************************/
513         /* Spawn scheduling threads */
514         for (i = 0; i < num_online_cpus(); i++) {
515                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
516                 if (rc != 0)
517                 {
518                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
519                         kqswnal_shutdown (ni);
520                         return (-ESRCH);
521                 }
522         }
523
524         kqswnal_data.kqn_init = KQN_INIT_ALL;
525         return (0);
526 }
527
528 void __exit
529 kqswnal_finalise (void)
530 {
531         lnet_unregister_lnd(&the_kqswlnd);
532         kqswnal_tunables_fini();
533 }
534
535 static int __init
536 kqswnal_initialise (void)
537 {
538         int   rc = kqswnal_tunables_init();
539         
540         if (rc != 0)
541                 return rc;
542
543         lnet_register_lnd(&the_kqswlnd);
544         return (0);
545 }
546
547 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
548 MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
549 MODULE_LICENSE("GPL");
550
551 module_init (kqswnal_initialise);
552 module_exit (kqswnal_finalise);