Whamcloud - gitweb
add macro LCONSOLE_ERROR_MSG with extra parameter and map
[fs/lustre-release.git] / lnet / klnds / qswlnd / qswlnd.c
1  /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  * Copyright (C) 2002-2004 Cluster File Systems, Inc.
4  *   Author: Eric Barton <eric@bartonsoftware.com>
5  *
6  * This file is part of Portals, http://www.lustre.org
7  *
8  * Portals is free software; you can redistribute it and/or
9  * modify it under the terms of version 2 of the GNU General Public
10  * License as published by the Free Software Foundation.
11  *
12  * Portals is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with Portals; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  *
21  */
22
23 #include "qswlnd.h"
24
25
26 lnd_t the_kqswlnd =
27 {
28         .lnd_type       = QSWLND,
29         .lnd_startup    = kqswnal_startup,
30         .lnd_shutdown   = kqswnal_shutdown,
31         .lnd_ctl        = kqswnal_ctl,
32         .lnd_send       = kqswnal_send,
33         .lnd_recv       = kqswnal_recv,
34 };
35
36 kqswnal_data_t          kqswnal_data;
37
38 int
39 kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
40 {
41         unsigned long      flags;
42         struct list_head  *tmp;
43         kqswnal_tx_t      *ktx;
44         lnet_hdr_t        *hdr;
45         int                index = data->ioc_count;
46         int                rc = -ENOENT;
47
48         spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
49
50         list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
51                 if (index-- != 0)
52                         continue;
53
54                 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
55                 hdr = (lnet_hdr_t *)ktx->ktx_buffer;
56
57                 data->ioc_count  = le32_to_cpu(hdr->payload_length);
58                 data->ioc_nid    = le64_to_cpu(hdr->dest_nid);
59                 data->ioc_u64[0] = ktx->ktx_nid;
60                 data->ioc_u32[0] = le32_to_cpu(hdr->type);
61                 data->ioc_u32[1] = ktx->ktx_launcher;
62                 data->ioc_flags  = (list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
63                                    (ktx->ktx_state << 2);
64                 rc = 0;
65                 break;
66         }
67         
68         spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
69         return (rc);
70 }
71
72 int
73 kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
74 {
75         struct libcfs_ioctl_data *data = arg;
76
77         LASSERT (ni == kqswnal_data.kqn_ni);
78
79         switch (cmd) {
80         case IOC_LIBCFS_GET_TXDESC:
81                 return (kqswnal_get_tx_desc (data));
82
83         case IOC_LIBCFS_REGISTER_MYNID:
84                 if (data->ioc_nid == ni->ni_nid)
85                         return 0;
86                 
87                 LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
88
89                 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
90                        libcfs_nid2str(data->ioc_nid),
91                        libcfs_nid2str(ni->ni_nid));
92                 return 0;
93                 
94         default:
95                 return (-EINVAL);
96         }
97 }
98
99 void
100 kqswnal_shutdown(lnet_ni_t *ni)
101 {
102         unsigned long flags;
103         kqswnal_tx_t *ktx;
104         kqswnal_rx_t *krx;
105         
106         CDEBUG (D_NET, "shutdown\n");
107         LASSERT (ni->ni_data == &kqswnal_data);
108         LASSERT (ni == kqswnal_data.kqn_ni);
109
110         switch (kqswnal_data.kqn_init)
111         {
112         default:
113                 LASSERT (0);
114
115         case KQN_INIT_ALL:
116         case KQN_INIT_DATA:
117                 break;
118         }
119
120         /**********************************************************************/
121         /* Signal the start of shutdown... */
122         spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
123         kqswnal_data.kqn_shuttingdown = 1;
124         spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
125
126         /**********************************************************************/
127         /* wait for sends that have allocated a tx desc to launch or give up */
128         while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
129                 CDEBUG(D_NET, "waiting for %d pending sends\n",
130                        atomic_read (&kqswnal_data.kqn_pending_txs));
131                 cfs_pause(cfs_time_seconds(1));
132         }
133
134         /**********************************************************************/
135         /* close elan comms */
136         /* Shut down receivers first; rx callbacks might try sending... */
137         if (kqswnal_data.kqn_eprx_small != NULL)
138                 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
139
140         if (kqswnal_data.kqn_eprx_large != NULL)
141                 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
142
143         /* NB ep_free_rcvr() returns only after we've freed off all receive
144          * buffers (see shutdown handling in kqswnal_requeue_rx()).  This
145          * means we must have completed any messages we passed to
146          * lnet_parse() */
147
148         if (kqswnal_data.kqn_eptx != NULL)
149                 ep_free_xmtr (kqswnal_data.kqn_eptx);
150
151         /* NB ep_free_xmtr() returns only after all outstanding transmits
152          * have called their callback... */
153         LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
154
155         /**********************************************************************/
156         /* flag threads to terminate, wake them and wait for them to die */
157         kqswnal_data.kqn_shuttingdown = 2;
158         wake_up_all (&kqswnal_data.kqn_sched_waitq);
159
160         while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
161                 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
162                        atomic_read (&kqswnal_data.kqn_nthreads));
163                 cfs_pause(cfs_time_seconds(1));
164         }
165
166         /**********************************************************************/
167         /* No more threads.  No more portals, router or comms callbacks!
168          * I control the horizontals and the verticals...
169          */
170
171         LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
172         LASSERT (list_empty (&kqswnal_data.kqn_donetxds));
173         LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
174
175         /**********************************************************************/
176         /* Unmap message buffers and free all descriptors and buffers
177          */
178
179         /* FTTB, we need to unmap any remaining mapped memory.  When
180          * ep_dvma_release() get fixed (and releases any mappings in the
181          * region), we can delete all the code from here -------->  */
182
183         for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
184                 /* If ktx has a buffer, it got mapped; unmap now.  NB only
185                  * the pre-mapped stuff is still mapped since all tx descs
186                  * must be idle */
187
188                 if (ktx->ktx_buffer != NULL)
189                         ep_dvma_unload(kqswnal_data.kqn_ep,
190                                        kqswnal_data.kqn_ep_tx_nmh,
191                                        &ktx->ktx_ebuffer);
192         }
193
194         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
195                 /* If krx_kiov[0].kiov_page got allocated, it got mapped.  
196                  * NB subsequent pages get merged */
197
198                 if (krx->krx_kiov[0].kiov_page != NULL)
199                         ep_dvma_unload(kqswnal_data.kqn_ep,
200                                        kqswnal_data.kqn_ep_rx_nmh,
201                                        &krx->krx_elanbuffer);
202         }
203         /* <----------- to here */
204
205         if (kqswnal_data.kqn_ep_rx_nmh != NULL)
206                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
207
208         if (kqswnal_data.kqn_ep_tx_nmh != NULL)
209                 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
210
211         while (kqswnal_data.kqn_txds != NULL) {
212                 ktx = kqswnal_data.kqn_txds;
213
214                 if (ktx->ktx_buffer != NULL)
215                         LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
216
217                 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
218                 LIBCFS_FREE(ktx, sizeof(*ktx));
219         }
220
221         while (kqswnal_data.kqn_rxds != NULL) {
222                 int           i;
223
224                 krx = kqswnal_data.kqn_rxds;
225                 for (i = 0; i < krx->krx_npages; i++)
226                         if (krx->krx_kiov[i].kiov_page != NULL)
227                                 __free_page (krx->krx_kiov[i].kiov_page);
228
229                 kqswnal_data.kqn_rxds = krx->krx_alloclist;
230                 LIBCFS_FREE(krx, sizeof (*krx));
231         }
232
233         /* resets flags, pointers to NULL etc */
234         memset(&kqswnal_data, 0, sizeof (kqswnal_data));
235
236         CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&libcfs_kmemory));
237
238         PORTAL_MODULE_UNUSE;
239 }
240
241 int
242 kqswnal_startup (lnet_ni_t *ni)
243 {
244         EP_RAILMASK       all_rails = EP_RAILMASK_ALL;
245         int               rc;
246         int               i;
247         kqswnal_rx_t     *krx;
248         kqswnal_tx_t     *ktx;
249         int               elan_page_idx;
250
251         LASSERT (ni->ni_lnd == &the_kqswlnd);
252
253 #if KQSW_CKSUM
254         if (the_lnet.ln_ptlcompat != 0) {
255                 CERROR("Checksumming version not portals compatible\n");
256                 return -ENODEV;
257         }
258 #endif
259         /* Only 1 instance supported */
260         if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
261                 CERROR ("Only 1 instance supported\n");
262                 return -EPERM;
263         }
264
265         if (ni->ni_interfaces[0] != NULL) {
266                 CERROR("Explicit interface config not supported\n");
267                 return -EPERM;
268         }
269
270         if (*kqswnal_tunables.kqn_credits >=
271             *kqswnal_tunables.kqn_ntxmsgs) {
272                 LCONSOLE_ERROR_MSG(0x12e, "Configuration error: please set "
273                                    "ntxmsgs(%d) > credits(%d)\n",
274                                    *kqswnal_tunables.kqn_ntxmsgs,
275                                    *kqswnal_tunables.kqn_credits);
276         }
277         
278         CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&libcfs_kmemory));
279         
280         /* ensure all pointers NULL etc */
281         memset (&kqswnal_data, 0, sizeof (kqswnal_data));
282
283         kqswnal_data.kqn_ni = ni;
284         ni->ni_data = &kqswnal_data;
285         ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
286         ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
287
288         INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
289         INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
290         spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
291
292         INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
293         INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
294         INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
295
296         spin_lock_init (&kqswnal_data.kqn_sched_lock);
297         init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
298
299         /* pointers/lists/locks initialised */
300         kqswnal_data.kqn_init = KQN_INIT_DATA;
301         PORTAL_MODULE_USE;
302         
303         kqswnal_data.kqn_ep = ep_system();
304         if (kqswnal_data.kqn_ep == NULL) {
305                 CERROR("Can't initialise EKC\n");
306                 kqswnal_shutdown(ni);
307                 return (-ENODEV);
308         }
309
310         if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
311                 CERROR("Can't get elan ID\n");
312                 kqswnal_shutdown(ni);
313                 return (-ENODEV);
314         }
315
316         kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
317         kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
318
319         ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
320         
321         /**********************************************************************/
322         /* Get the transmitter */
323
324         kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
325         if (kqswnal_data.kqn_eptx == NULL)
326         {
327                 CERROR ("Can't allocate transmitter\n");
328                 kqswnal_shutdown (ni);
329                 return (-ENOMEM);
330         }
331
332         /**********************************************************************/
333         /* Get the receivers */
334
335         kqswnal_data.kqn_eprx_small = 
336                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
337                                EP_MSG_SVC_PORTALS_SMALL,
338                                *kqswnal_tunables.kqn_ep_envelopes_small);
339         if (kqswnal_data.kqn_eprx_small == NULL)
340         {
341                 CERROR ("Can't install small msg receiver\n");
342                 kqswnal_shutdown (ni);
343                 return (-ENOMEM);
344         }
345
346         kqswnal_data.kqn_eprx_large = 
347                 ep_alloc_rcvr (kqswnal_data.kqn_ep,
348                                EP_MSG_SVC_PORTALS_LARGE,
349                                *kqswnal_tunables.kqn_ep_envelopes_large);
350         if (kqswnal_data.kqn_eprx_large == NULL)
351         {
352                 CERROR ("Can't install large msg receiver\n");
353                 kqswnal_shutdown (ni);
354                 return (-ENOMEM);
355         }
356
357         /**********************************************************************/
358         /* Reserve Elan address space for transmit descriptors NB we may
359          * either send the contents of associated buffers immediately, or
360          * map them for the peer to suck/blow... */
361         kqswnal_data.kqn_ep_tx_nmh = 
362                 ep_dvma_reserve(kqswnal_data.kqn_ep,
363                                 KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
364                                 EP_PERM_WRITE);
365         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
366                 CERROR("Can't reserve tx dma space\n");
367                 kqswnal_shutdown(ni);
368                 return (-ENOMEM);
369         }
370
371         /**********************************************************************/
372         /* Reserve Elan address space for receive buffers */
373         kqswnal_data.kqn_ep_rx_nmh =
374                 ep_dvma_reserve(kqswnal_data.kqn_ep,
375                                 KQSW_NRXMSGPAGES_SMALL * 
376                                 (*kqswnal_tunables.kqn_nrxmsgs_small) +
377                                 KQSW_NRXMSGPAGES_LARGE * 
378                                 (*kqswnal_tunables.kqn_nrxmsgs_large),
379                                 EP_PERM_WRITE);
380         if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
381                 CERROR("Can't reserve rx dma space\n");
382                 kqswnal_shutdown(ni);
383                 return (-ENOMEM);
384         }
385
386         /**********************************************************************/
387         /* Allocate/Initialise transmit descriptors */
388
389         kqswnal_data.kqn_txds = NULL;
390         for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
391         {
392                 int           premapped_pages;
393                 int           basepage = i * KQSW_NTXMSGPAGES;
394
395                 LIBCFS_ALLOC (ktx, sizeof(*ktx));
396                 if (ktx == NULL) {
397                         kqswnal_shutdown (ni);
398                         return (-ENOMEM);
399                 }
400
401                 memset(ktx, 0, sizeof(*ktx));   /* NULL pointers; zero flags */
402                 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
403                 kqswnal_data.kqn_txds = ktx;
404
405                 LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
406                 if (ktx->ktx_buffer == NULL)
407                 {
408                         kqswnal_shutdown (ni);
409                         return (-ENOMEM);
410                 }
411
412                 /* Map pre-allocated buffer NOW, to save latency on transmit */
413                 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
414                                                         KQSW_TX_BUFFER_SIZE);
415                 ep_dvma_load(kqswnal_data.kqn_ep, NULL, 
416                              ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE, 
417                              kqswnal_data.kqn_ep_tx_nmh, basepage,
418                              &all_rails, &ktx->ktx_ebuffer);
419
420                 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
421                 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
422
423                 INIT_LIST_HEAD (&ktx->ktx_schedlist);
424
425                 ktx->ktx_state = KTX_IDLE;
426                 ktx->ktx_rail = -1;             /* unset rail */
427
428                 list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
429         }
430
431         /**********************************************************************/
432         /* Allocate/Initialise receive descriptors */
433         kqswnal_data.kqn_rxds = NULL;
434         elan_page_idx = 0;
435         for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
436         {
437                 EP_NMD        elanbuffer;
438                 int           j;
439
440                 LIBCFS_ALLOC(krx, sizeof(*krx));
441                 if (krx == NULL) {
442                         kqswnal_shutdown(ni);
443                         return (-ENOMEM);
444                 }
445
446                 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
447                 krx->krx_alloclist = kqswnal_data.kqn_rxds;
448                 kqswnal_data.kqn_rxds = krx;
449
450                 if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
451                 {
452                         krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
453                         krx->krx_eprx   = kqswnal_data.kqn_eprx_small;
454                 }
455                 else
456                 {
457                         krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
458                         krx->krx_eprx   = kqswnal_data.kqn_eprx_large;
459                 }
460
461                 LASSERT (krx->krx_npages > 0);
462                 for (j = 0; j < krx->krx_npages; j++)
463                 {
464                         struct page *page = alloc_page(GFP_KERNEL);
465                         
466                         if (page == NULL) {
467                                 kqswnal_shutdown (ni);
468                                 return (-ENOMEM);
469                         }
470
471                         krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
472                                                           .kiov_offset = 0,
473                                                           .kiov_len = PAGE_SIZE};
474                         LASSERT(page_address(page) != NULL);
475
476                         ep_dvma_load(kqswnal_data.kqn_ep, NULL,
477                                      page_address(page),
478                                      PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
479                                      elan_page_idx, &all_rails, &elanbuffer);
480                         
481                         if (j == 0) {
482                                 krx->krx_elanbuffer = elanbuffer;
483                         } else {
484                                 rc = ep_nmd_merge(&krx->krx_elanbuffer,
485                                                   &krx->krx_elanbuffer, 
486                                                   &elanbuffer);
487                                 /* NB contiguous mapping */
488                                 LASSERT(rc);
489                         }
490                         elan_page_idx++;
491
492                 }
493         }
494         LASSERT (elan_page_idx ==
495                  (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
496                  (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
497
498         /**********************************************************************/
499         /* Queue receives, now that it's OK to run their completion callbacks */
500
501         for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
502                 /* NB this enqueue can allocate/sleep (attr == 0) */
503                 krx->krx_state = KRX_POSTED;
504                 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
505                                       &krx->krx_elanbuffer, 0);
506                 if (rc != EP_SUCCESS) {
507                         CERROR ("failed ep_queue_receive %d\n", rc);
508                         kqswnal_shutdown (ni);
509                         return (-EIO);
510                 }
511         }
512
513         /**********************************************************************/
514         /* Spawn scheduling threads */
515         for (i = 0; i < num_online_cpus(); i++) {
516                 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
517                 if (rc != 0)
518                 {
519                         CERROR ("failed to spawn scheduling thread: %d\n", rc);
520                         kqswnal_shutdown (ni);
521                         return (-ESRCH);
522                 }
523         }
524
525         kqswnal_data.kqn_init = KQN_INIT_ALL;
526         return (0);
527 }
528
529 void __exit
530 kqswnal_finalise (void)
531 {
532         lnet_unregister_lnd(&the_kqswlnd);
533         kqswnal_tunables_fini();
534 }
535
536 static int __init
537 kqswnal_initialise (void)
538 {
539         int   rc = kqswnal_tunables_init();
540         
541         if (rc != 0)
542                 return rc;
543
544         lnet_register_lnd(&the_kqswlnd);
545         return (0);
546 }
547
548 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
549 MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
550 MODULE_LICENSE("GPL");
551
552 module_init (kqswnal_initialise);
553 module_exit (kqswnal_finalise);