2 * Copyright (C) 2002-2004 Cluster File Systems, Inc.
3 * Author: Eric Barton <eric@bartonsoftware.com>
5 * This file is part of Portals, http://www.lustre.org
7 * Portals is free software; you can redistribute it and/or
8 * modify it under the terms of version 2 of the GNU General Public
9 * License as published by the Free Software Foundation.
11 * Portals is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with Portals; if not, write to the Free Software
18 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 .lnd_startup = kqswnal_startup,
29 .lnd_shutdown = kqswnal_shutdown,
30 .lnd_ctl = kqswnal_ctl,
31 .lnd_send = kqswnal_send,
32 .lnd_recv = kqswnal_recv,
35 kqswnal_data_t kqswnal_data;
38 kqswnal_get_tx_desc (struct libcfs_ioctl_data *data)
41 struct list_head *tmp;
44 int index = data->ioc_count;
47 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
49 list_for_each (tmp, &kqswnal_data.kqn_activetxds) {
53 ktx = list_entry (tmp, kqswnal_tx_t, ktx_list);
54 hdr = (lnet_hdr_t *)ktx->ktx_buffer;
56 data->ioc_count = le32_to_cpu(hdr->payload_length);
57 data->ioc_nid = le64_to_cpu(hdr->dest_nid);
58 data->ioc_u64[0] = ktx->ktx_nid;
59 data->ioc_u32[0] = le32_to_cpu(hdr->type);
60 data->ioc_u32[1] = ktx->ktx_launcher;
61 data->ioc_flags = (list_empty (&ktx->ktx_schedlist) ? 0 : 1) |
62 (ktx->ktx_state << 2);
67 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
72 kqswnal_ctl (lnet_ni_t *ni, unsigned int cmd, void *arg)
74 struct libcfs_ioctl_data *data = arg;
76 LASSERT (ni == kqswnal_data.kqn_ni);
79 case IOC_LIBCFS_GET_TXDESC:
80 return (kqswnal_get_tx_desc (data));
82 case IOC_LIBCFS_REGISTER_MYNID:
83 if (data->ioc_nid == ni->ni_nid)
86 LASSERT (LNET_NIDNET(data->ioc_nid) == LNET_NIDNET(ni->ni_nid));
88 CERROR("obsolete IOC_LIBCFS_REGISTER_MYNID for %s(%s)\n",
89 libcfs_nid2str(data->ioc_nid),
90 libcfs_nid2str(ni->ni_nid));
99 kqswnal_shutdown(lnet_ni_t *ni)
105 CDEBUG (D_NET, "shutdown\n");
106 LASSERT (ni->ni_data == &kqswnal_data);
107 LASSERT (ni == kqswnal_data.kqn_ni);
109 switch (kqswnal_data.kqn_init)
119 /**********************************************************************/
120 /* Signal the start of shutdown... */
121 spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
122 kqswnal_data.kqn_shuttingdown = 1;
123 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
125 /**********************************************************************/
126 /* wait for sends that have allocated a tx desc to launch or give up */
127 while (atomic_read (&kqswnal_data.kqn_pending_txs) != 0) {
128 CDEBUG(D_NET, "waiting for %d pending sends\n",
129 atomic_read (&kqswnal_data.kqn_pending_txs));
130 cfs_pause(cfs_time_seconds(1));
133 /**********************************************************************/
134 /* close elan comms */
135 /* Shut down receivers first; rx callbacks might try sending... */
136 if (kqswnal_data.kqn_eprx_small != NULL)
137 ep_free_rcvr (kqswnal_data.kqn_eprx_small);
139 if (kqswnal_data.kqn_eprx_large != NULL)
140 ep_free_rcvr (kqswnal_data.kqn_eprx_large);
142 /* NB ep_free_rcvr() returns only after we've freed off all receive
143 * buffers (see shutdown handling in kqswnal_requeue_rx()). This
144 * means we must have completed any messages we passed to
147 if (kqswnal_data.kqn_eptx != NULL)
148 ep_free_xmtr (kqswnal_data.kqn_eptx);
150 /* NB ep_free_xmtr() returns only after all outstanding transmits
151 * have called their callback... */
152 LASSERT(list_empty(&kqswnal_data.kqn_activetxds));
154 /**********************************************************************/
155 /* flag threads to terminate, wake them and wait for them to die */
156 kqswnal_data.kqn_shuttingdown = 2;
157 wake_up_all (&kqswnal_data.kqn_sched_waitq);
159 while (atomic_read (&kqswnal_data.kqn_nthreads) != 0) {
160 CDEBUG(D_NET, "waiting for %d threads to terminate\n",
161 atomic_read (&kqswnal_data.kqn_nthreads));
162 cfs_pause(cfs_time_seconds(1));
165 /**********************************************************************/
166 /* No more threads. No more portals, router or comms callbacks!
167 * I control the horizontals and the verticals...
170 LASSERT (list_empty (&kqswnal_data.kqn_readyrxds));
171 LASSERT (list_empty (&kqswnal_data.kqn_donetxds));
172 LASSERT (list_empty (&kqswnal_data.kqn_delayedtxds));
174 /**********************************************************************/
175 /* Unmap message buffers and free all descriptors and buffers
178 /* FTTB, we need to unmap any remaining mapped memory. When
179 * ep_dvma_release() get fixed (and releases any mappings in the
180 * region), we can delete all the code from here --------> */
182 for (ktx = kqswnal_data.kqn_txds; ktx != NULL; ktx = ktx->ktx_alloclist) {
183 /* If ktx has a buffer, it got mapped; unmap now. NB only
184 * the pre-mapped stuff is still mapped since all tx descs
187 if (ktx->ktx_buffer != NULL)
188 ep_dvma_unload(kqswnal_data.kqn_ep,
189 kqswnal_data.kqn_ep_tx_nmh,
193 for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
194 /* If krx_kiov[0].kiov_page got allocated, it got mapped.
195 * NB subsequent pages get merged */
197 if (krx->krx_kiov[0].kiov_page != NULL)
198 ep_dvma_unload(kqswnal_data.kqn_ep,
199 kqswnal_data.kqn_ep_rx_nmh,
200 &krx->krx_elanbuffer);
202 /* <----------- to here */
204 if (kqswnal_data.kqn_ep_rx_nmh != NULL)
205 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_rx_nmh);
207 if (kqswnal_data.kqn_ep_tx_nmh != NULL)
208 ep_dvma_release(kqswnal_data.kqn_ep, kqswnal_data.kqn_ep_tx_nmh);
210 while (kqswnal_data.kqn_txds != NULL) {
211 ktx = kqswnal_data.kqn_txds;
213 if (ktx->ktx_buffer != NULL)
214 LIBCFS_FREE(ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
216 kqswnal_data.kqn_txds = ktx->ktx_alloclist;
217 LIBCFS_FREE(ktx, sizeof(*ktx));
220 while (kqswnal_data.kqn_rxds != NULL) {
223 krx = kqswnal_data.kqn_rxds;
224 for (i = 0; i < krx->krx_npages; i++)
225 if (krx->krx_kiov[i].kiov_page != NULL)
226 __free_page (krx->krx_kiov[i].kiov_page);
228 kqswnal_data.kqn_rxds = krx->krx_alloclist;
229 LIBCFS_FREE(krx, sizeof (*krx));
232 /* resets flags, pointers to NULL etc */
233 memset(&kqswnal_data, 0, sizeof (kqswnal_data));
235 CDEBUG (D_MALLOC, "done kmem %d\n", atomic_read(&libcfs_kmemory));
241 kqswnal_startup (lnet_ni_t *ni)
243 EP_RAILMASK all_rails = EP_RAILMASK_ALL;
250 LASSERT (ni->ni_lnd == &the_kqswlnd);
253 if (the_lnet.ln_ptlcompat != 0) {
254 CERROR("Checksumming version not portals compatible\n");
258 /* Only 1 instance supported */
259 if (kqswnal_data.kqn_init != KQN_INIT_NOTHING) {
260 CERROR ("Only 1 instance supported\n");
264 if (ni->ni_interfaces[0] != NULL) {
265 CERROR("Explicit interface config not supported\n");
269 if (*kqswnal_tunables.kqn_credits >=
270 *kqswnal_tunables.kqn_ntxmsgs) {
271 LCONSOLE_ERROR("Configuration error: please set "
272 "ntxmsgs(%d) > credits(%d)\n",
273 *kqswnal_tunables.kqn_ntxmsgs,
274 *kqswnal_tunables.kqn_credits);
277 CDEBUG (D_MALLOC, "start kmem %d\n", atomic_read(&libcfs_kmemory));
279 /* ensure all pointers NULL etc */
280 memset (&kqswnal_data, 0, sizeof (kqswnal_data));
282 kqswnal_data.kqn_ni = ni;
283 ni->ni_data = &kqswnal_data;
284 ni->ni_peertxcredits = *kqswnal_tunables.kqn_peercredits;
285 ni->ni_maxtxcredits = *kqswnal_tunables.kqn_credits;
287 INIT_LIST_HEAD (&kqswnal_data.kqn_idletxds);
288 INIT_LIST_HEAD (&kqswnal_data.kqn_activetxds);
289 spin_lock_init (&kqswnal_data.kqn_idletxd_lock);
291 INIT_LIST_HEAD (&kqswnal_data.kqn_delayedtxds);
292 INIT_LIST_HEAD (&kqswnal_data.kqn_donetxds);
293 INIT_LIST_HEAD (&kqswnal_data.kqn_readyrxds);
295 spin_lock_init (&kqswnal_data.kqn_sched_lock);
296 init_waitqueue_head (&kqswnal_data.kqn_sched_waitq);
298 /* pointers/lists/locks initialised */
299 kqswnal_data.kqn_init = KQN_INIT_DATA;
302 kqswnal_data.kqn_ep = ep_system();
303 if (kqswnal_data.kqn_ep == NULL) {
304 CERROR("Can't initialise EKC\n");
305 kqswnal_shutdown(ni);
309 if (ep_waitfor_nodeid(kqswnal_data.kqn_ep) == ELAN_INVALID_NODE) {
310 CERROR("Can't get elan ID\n");
311 kqswnal_shutdown(ni);
315 kqswnal_data.kqn_nnodes = ep_numnodes (kqswnal_data.kqn_ep);
316 kqswnal_data.kqn_elanid = ep_nodeid (kqswnal_data.kqn_ep);
318 ni->ni_nid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), kqswnal_data.kqn_elanid);
320 /**********************************************************************/
321 /* Get the transmitter */
323 kqswnal_data.kqn_eptx = ep_alloc_xmtr (kqswnal_data.kqn_ep);
324 if (kqswnal_data.kqn_eptx == NULL)
326 CERROR ("Can't allocate transmitter\n");
327 kqswnal_shutdown (ni);
331 /**********************************************************************/
332 /* Get the receivers */
334 kqswnal_data.kqn_eprx_small =
335 ep_alloc_rcvr (kqswnal_data.kqn_ep,
336 EP_MSG_SVC_PORTALS_SMALL,
337 *kqswnal_tunables.kqn_ep_envelopes_small);
338 if (kqswnal_data.kqn_eprx_small == NULL)
340 CERROR ("Can't install small msg receiver\n");
341 kqswnal_shutdown (ni);
345 kqswnal_data.kqn_eprx_large =
346 ep_alloc_rcvr (kqswnal_data.kqn_ep,
347 EP_MSG_SVC_PORTALS_LARGE,
348 *kqswnal_tunables.kqn_ep_envelopes_large);
349 if (kqswnal_data.kqn_eprx_large == NULL)
351 CERROR ("Can't install large msg receiver\n");
352 kqswnal_shutdown (ni);
356 /**********************************************************************/
357 /* Reserve Elan address space for transmit descriptors NB we may
358 * either send the contents of associated buffers immediately, or
359 * map them for the peer to suck/blow... */
360 kqswnal_data.kqn_ep_tx_nmh =
361 ep_dvma_reserve(kqswnal_data.kqn_ep,
362 KQSW_NTXMSGPAGES*(*kqswnal_tunables.kqn_ntxmsgs),
364 if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
365 CERROR("Can't reserve tx dma space\n");
366 kqswnal_shutdown(ni);
370 /**********************************************************************/
371 /* Reserve Elan address space for receive buffers */
372 kqswnal_data.kqn_ep_rx_nmh =
373 ep_dvma_reserve(kqswnal_data.kqn_ep,
374 KQSW_NRXMSGPAGES_SMALL *
375 (*kqswnal_tunables.kqn_nrxmsgs_small) +
376 KQSW_NRXMSGPAGES_LARGE *
377 (*kqswnal_tunables.kqn_nrxmsgs_large),
379 if (kqswnal_data.kqn_ep_tx_nmh == NULL) {
380 CERROR("Can't reserve rx dma space\n");
381 kqswnal_shutdown(ni);
385 /**********************************************************************/
386 /* Allocate/Initialise transmit descriptors */
388 kqswnal_data.kqn_txds = NULL;
389 for (i = 0; i < (*kqswnal_tunables.kqn_ntxmsgs); i++)
392 int basepage = i * KQSW_NTXMSGPAGES;
394 LIBCFS_ALLOC (ktx, sizeof(*ktx));
396 kqswnal_shutdown (ni);
400 memset(ktx, 0, sizeof(*ktx)); /* NULL pointers; zero flags */
401 ktx->ktx_alloclist = kqswnal_data.kqn_txds;
402 kqswnal_data.kqn_txds = ktx;
404 LIBCFS_ALLOC (ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE);
405 if (ktx->ktx_buffer == NULL)
407 kqswnal_shutdown (ni);
411 /* Map pre-allocated buffer NOW, to save latency on transmit */
412 premapped_pages = kqswnal_pages_spanned(ktx->ktx_buffer,
413 KQSW_TX_BUFFER_SIZE);
414 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
415 ktx->ktx_buffer, KQSW_TX_BUFFER_SIZE,
416 kqswnal_data.kqn_ep_tx_nmh, basepage,
417 &all_rails, &ktx->ktx_ebuffer);
419 ktx->ktx_basepage = basepage + premapped_pages; /* message mapping starts here */
420 ktx->ktx_npages = KQSW_NTXMSGPAGES - premapped_pages; /* for this many pages */
422 INIT_LIST_HEAD (&ktx->ktx_schedlist);
424 ktx->ktx_state = KTX_IDLE;
425 ktx->ktx_rail = -1; /* unset rail */
427 list_add_tail (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
430 /**********************************************************************/
431 /* Allocate/Initialise receive descriptors */
432 kqswnal_data.kqn_rxds = NULL;
434 for (i = 0; i < *kqswnal_tunables.kqn_nrxmsgs_small + *kqswnal_tunables.kqn_nrxmsgs_large; i++)
439 LIBCFS_ALLOC(krx, sizeof(*krx));
441 kqswnal_shutdown(ni);
445 memset(krx, 0, sizeof(*krx)); /* clear flags, null pointers etc */
446 krx->krx_alloclist = kqswnal_data.kqn_rxds;
447 kqswnal_data.kqn_rxds = krx;
449 if (i < *kqswnal_tunables.kqn_nrxmsgs_small)
451 krx->krx_npages = KQSW_NRXMSGPAGES_SMALL;
452 krx->krx_eprx = kqswnal_data.kqn_eprx_small;
456 krx->krx_npages = KQSW_NRXMSGPAGES_LARGE;
457 krx->krx_eprx = kqswnal_data.kqn_eprx_large;
460 LASSERT (krx->krx_npages > 0);
461 for (j = 0; j < krx->krx_npages; j++)
463 struct page *page = alloc_page(GFP_KERNEL);
466 kqswnal_shutdown (ni);
470 krx->krx_kiov[j] = (lnet_kiov_t) {.kiov_page = page,
472 .kiov_len = PAGE_SIZE};
473 LASSERT(page_address(page) != NULL);
475 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
477 PAGE_SIZE, kqswnal_data.kqn_ep_rx_nmh,
478 elan_page_idx, &all_rails, &elanbuffer);
481 krx->krx_elanbuffer = elanbuffer;
483 rc = ep_nmd_merge(&krx->krx_elanbuffer,
484 &krx->krx_elanbuffer,
486 /* NB contiguous mapping */
493 LASSERT (elan_page_idx ==
494 (*kqswnal_tunables.kqn_nrxmsgs_small * KQSW_NRXMSGPAGES_SMALL) +
495 (*kqswnal_tunables.kqn_nrxmsgs_large * KQSW_NRXMSGPAGES_LARGE));
497 /**********************************************************************/
498 /* Queue receives, now that it's OK to run their completion callbacks */
500 for (krx = kqswnal_data.kqn_rxds; krx != NULL; krx = krx->krx_alloclist) {
501 /* NB this enqueue can allocate/sleep (attr == 0) */
502 krx->krx_state = KRX_POSTED;
503 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
504 &krx->krx_elanbuffer, 0);
505 if (rc != EP_SUCCESS) {
506 CERROR ("failed ep_queue_receive %d\n", rc);
507 kqswnal_shutdown (ni);
512 /**********************************************************************/
513 /* Spawn scheduling threads */
514 for (i = 0; i < num_online_cpus(); i++) {
515 rc = kqswnal_thread_start (kqswnal_scheduler, NULL);
518 CERROR ("failed to spawn scheduling thread: %d\n", rc);
519 kqswnal_shutdown (ni);
524 kqswnal_data.kqn_init = KQN_INIT_ALL;
529 kqswnal_finalise (void)
531 lnet_unregister_lnd(&the_kqswlnd);
532 kqswnal_tunables_fini();
536 kqswnal_initialise (void)
538 int rc = kqswnal_tunables_init();
543 lnet_register_lnd(&the_kqswlnd);
547 MODULE_AUTHOR("Cluster File Systems, Inc. <info@clusterfs.com>");
548 MODULE_DESCRIPTION("Kernel Quadrics/Elan LND v1.01");
549 MODULE_LICENSE("GPL");
551 module_init (kqswnal_initialise);
552 module_exit (kqswnal_finalise);