Whamcloud - gitweb
508a48cbc7f2e37f3b2962270a91515e62d55e88
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_utils.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21 /*
22  *      All utilities required by lgmanl
23  */
24
25 #include "gmnal.h"
26
27 /*
28  *      Am I one of the gmnal rxthreads ?
29  */
30 int
31 gmnal_is_rxthread(gmnal_data_t *nal_data)
32 {
33         int i;
34         for (i=0; i<num_rx_threads; i++) {
35                 if (nal_data->rxthread_pid[i] == current->pid)
36                         return(1);
37         }
38         return(0);
39 }
40
41
42 /*
43  *      Allocate tx descriptors/tokens (large and small)
44  *      allocate a number of small tx buffers and register with GM
45  *      so they are wired and set up for DMA. This is a costly operation.
46  *      Also allocate a corrosponding descriptor to keep track of 
47  *      the buffer.
48  *      Put all small descriptors on singly linked list to be available to send 
49  *      function.
50  *      Allocate the rest of the available tx tokens for large messages. These will be
51  *      used to do gm_gets in gmnal_copyiov     
52  */
53 int
54 gmnal_alloc_txd(gmnal_data_t *nal_data)
55 {
56         int ntx= 0, nstx= 0, nrxt_stx= 0,
57             nltx= 0, i = 0;
58         gmnal_stxd_t    *txd = NULL;
59         gmnal_ltxd_t    *ltxd = NULL;
60         void    *txbuffer = NULL;
61
62         CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
63
64         GMNAL_GM_LOCK(nal_data);
65         /*
66          *      total number of transmit tokens
67          */
68         ntx = gm_num_send_tokens(nal_data->gm_port);
69         GMNAL_GM_UNLOCK(nal_data);
70         CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx);
71         
72         /*
73          *      allocate a number for small sends
74          *      num_stxds from gmnal_module.c
75          */
76         nstx = num_stxds;
77         /*
78          *      give that number plus 1 to the receive threads
79          */
80         nrxt_stx = nstx + 1;
81
82         /*
83          *      give the rest for gm_gets
84          */
85         nltx = ntx - (nrxt_stx + nstx);
86         if (nltx < 1) {
87                 CDEBUG(D_ERROR, "No tokens available for large messages\n");
88                 return(GMNAL_STATUS_FAIL);
89         }
90
91
92         /*
93          * A semaphore is initialised with the 
94          * number of transmit tokens available.
95          * To get a stxd, acquire the token semaphore.
96          * this decrements the available token count
97          * (if no tokens you block here, someone returning a 
98          * stxd will release the semaphore and wake you)
99          * When token is obtained acquire the spinlock 
100          * to manipulate the list
101          */
102         GMNAL_TXD_TOKEN_INIT(nal_data, nstx);
103         GMNAL_TXD_LOCK_INIT(nal_data);
104         GMNAL_RXT_TXD_TOKEN_INIT(nal_data, nrxt_stx);
105         GMNAL_RXT_TXD_LOCK_INIT(nal_data);
106         GMNAL_LTXD_TOKEN_INIT(nal_data, nltx);
107         GMNAL_LTXD_LOCK_INIT(nal_data);
108         
109         for (i=0; i<=nstx; i++) {
110                 PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
111                 if (!txd) {
112                         CDEBUG(D_ERROR, "Failed to malloc txd [%d]\n", i);
113                         return(GMNAL_STATUS_NOMEM);
114                 }
115                 GMNAL_GM_LOCK(nal_data);
116                 txbuffer = gm_dma_malloc(nal_data->gm_port, 
117                                          GMNAL_SMALL_MSG_SIZE(nal_data));
118                 GMNAL_GM_UNLOCK(nal_data);
119                 if (!txbuffer) {
120                         CDEBUG(D_ERROR, "Failed to gm_dma_malloc txbuffer [%d],"
121                                " size [%d]\n", i, 
122                                GMNAL_SMALL_MSG_SIZE(nal_data));
123                         PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
124                         return(GMNAL_STATUS_FAIL);
125                 }
126                 txd->buffer = txbuffer;
127                 txd->buffer_size = GMNAL_SMALL_MSG_SIZE(nal_data);
128                 txd->gm_size = gm_min_size_for_length(txd->buffer_size);
129                 txd->nal_data = (struct _gmnal_data_t*)nal_data;
130                 txd->rxt = 0;
131
132                 txd->next = nal_data->stxd;
133                 nal_data->stxd = txd;
134                 CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
135                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
136         }
137
138         for (i=0; i<=nrxt_stx; i++) {
139                 PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
140                 if (!txd) {
141                         CDEBUG(D_ERROR, "Failed to malloc txd [%d]\n", i);
142                         return(GMNAL_STATUS_NOMEM);
143                 }
144                 GMNAL_GM_LOCK(nal_data);
145                 txbuffer = gm_dma_malloc(nal_data->gm_port, 
146                                          GMNAL_SMALL_MSG_SIZE(nal_data));
147                 GMNAL_GM_UNLOCK(nal_data);
148                 if (!txbuffer) {
149                         CDEBUG(D_ERROR, "Failed to gm_dma_malloc txbuffer [%d],"
150                                " size [%d]\n", i, 
151                                GMNAL_SMALL_MSG_SIZE(nal_data));
152                         PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
153                         return(GMNAL_STATUS_FAIL);
154                 }
155                 txd->buffer = txbuffer;
156                 txd->buffer_size = GMNAL_SMALL_MSG_SIZE(nal_data);
157                 txd->gm_size = gm_min_size_for_length(txd->buffer_size);
158                 txd->nal_data = (struct _gmnal_data_t*)nal_data;
159                 txd->rxt = 1;
160
161                 txd->next = nal_data->rxt_stxd;
162                 nal_data->rxt_stxd = txd;
163                 CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
164                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
165         }
166
167         /*
168          *      string together large tokens
169          */
170         for (i=0; i<=nltx ; i++) {
171                 PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t));
172                 ltxd->next = nal_data->ltxd;
173                 nal_data->ltxd = ltxd;
174         }
175         return(GMNAL_STATUS_OK);
176 }
177
178 /*      Free the list of wired and gm_registered small tx buffers and 
179  *      the tx descriptors that go along with them.
180  */
181 void
182 gmnal_free_txd(gmnal_data_t *nal_data)
183 {
184         gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL;
185         gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL;
186
187         CDEBUG(D_TRACE, "gmnal_free_small tx\n");
188
189         while(txd) {
190                 CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
191                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
192                 _txd = txd;
193                 txd = txd->next;
194                 GMNAL_GM_LOCK(nal_data);
195                 gm_dma_free(nal_data->gm_port, _txd->buffer);
196                 GMNAL_GM_UNLOCK(nal_data);
197                 PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
198         }
199         txd = nal_data->rxt_stxd;
200         while(txd) {
201                 CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
202                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
203                 _txd = txd;
204                 txd = txd->next;
205                 GMNAL_GM_LOCK(nal_data);
206                 gm_dma_free(nal_data->gm_port, _txd->buffer);
207                 GMNAL_GM_UNLOCK(nal_data);
208                 PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
209         }
210         ltxd = nal_data->ltxd;
211         while(txd) {
212                 _ltxd = ltxd;
213                 ltxd = ltxd->next;
214                 PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t));
215         }
216         
217         return;
218 }
219
220
221 /*
222  *      Get a txd from the list
223  *      This get us a wired and gm_registered small tx buffer.
224  *      This implicitly gets us a send token also.
225  */
226 gmnal_stxd_t *
227 gmnal_get_stxd(gmnal_data_t *nal_data, int block)
228 {
229
230         gmnal_stxd_t    *txd = NULL;
231         pid_t           pid = current->pid;
232
233
234         CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", 
235                nal_data, block, pid);
236
237         if (gmnal_is_rxthread(nal_data)) {
238                 CDEBUG(D_INFO, "RXTHREAD Attempting to get token\n");
239                 GMNAL_RXT_TXD_GETTOKEN(nal_data);
240                 GMNAL_RXT_TXD_LOCK(nal_data);
241                 txd = nal_data->rxt_stxd;
242                 nal_data->rxt_stxd = txd->next;
243                 GMNAL_RXT_TXD_UNLOCK(nal_data);
244                 CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", 
245                        txd, nal_data->rxt_stxd);
246                 txd->kniov = 0;
247                 txd->rxt = 1;
248         } else {
249                 if (block) {
250                         CDEBUG(D_INFO, "Attempting to get token\n");
251                         GMNAL_TXD_GETTOKEN(nal_data);
252                         CDEBUG(D_PORTALS, "Got token\n");
253                 } else {
254                         if (GMNAL_TXD_TRYGETTOKEN(nal_data)) {
255                                 CDEBUG(D_ERROR, "can't get token\n");
256                                 return(NULL);
257                         }
258                 }
259                 GMNAL_TXD_LOCK(nal_data);
260                 txd = nal_data->stxd;
261                 nal_data->stxd = txd->next;
262                 GMNAL_TXD_UNLOCK(nal_data);
263                 CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd, 
264                        nal_data->stxd);
265                 txd->kniov = 0;
266         }       /* general txd get */
267         return(txd);
268 }
269
270 /*
271  *      Return a txd to the list
272  */
273 void
274 gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd)
275 {
276         CDEBUG(D_TRACE, "nal_data [%p], txd[%p] rxt[%d]\n", nal_data, 
277                txd, txd->rxt);
278
279         /*
280          *      this transmit descriptor is 
281          *      for the rxthread
282          */
283         if (txd->rxt) {
284                 GMNAL_RXT_TXD_LOCK(nal_data);
285                 txd->next = nal_data->rxt_stxd;
286                 nal_data->rxt_stxd = txd;
287                 GMNAL_RXT_TXD_UNLOCK(nal_data);
288                 GMNAL_RXT_TXD_RETURNTOKEN(nal_data);
289                 CDEBUG(D_INFO, "Returned stxd to rxthread list\n");
290         } else {
291                 GMNAL_TXD_LOCK(nal_data);
292                 txd->next = nal_data->stxd;
293                 nal_data->stxd = txd;
294                 GMNAL_TXD_UNLOCK(nal_data);
295                 GMNAL_TXD_RETURNTOKEN(nal_data);
296                 CDEBUG(D_INFO, "Returned stxd to general list\n");
297         }
298         return;
299 }
300
301
302 /*
303  *      Get a large transmit descriptor from the free list
304  *      This implicitly gets us a transmit  token .
305  *      always wait for one.
306  */
307 gmnal_ltxd_t *
308 gmnal_get_ltxd(gmnal_data_t *nal_data)
309 {
310
311         gmnal_ltxd_t    *ltxd = NULL;
312
313         CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data);
314
315         GMNAL_LTXD_GETTOKEN(nal_data);
316         GMNAL_LTXD_LOCK(nal_data);
317         ltxd = nal_data->ltxd;
318         nal_data->ltxd = ltxd->next;
319         GMNAL_LTXD_UNLOCK(nal_data);
320         CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd);
321         return(ltxd);
322 }
323
324 /*
325  *      Return an ltxd to the list
326  */
327 void
328 gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
329 {
330         CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd);
331
332         GMNAL_LTXD_LOCK(nal_data);
333         ltxd->next = nal_data->ltxd;
334         nal_data->ltxd = ltxd;
335         GMNAL_LTXD_UNLOCK(nal_data);
336         GMNAL_LTXD_RETURNTOKEN(nal_data);
337         return;
338 }
339 /*
340  *      allocate a number of small rx buffers and register with GM
341  *      so they are wired and set up for DMA. This is a costly operation.
342  *      Also allocate a corrosponding descriptor to keep track of 
343  *      the buffer.
344  *      Put all descriptors on singly linked list to be available to 
345  *      receive thread.
346  */
347 int
348 gmnal_alloc_srxd(gmnal_data_t *nal_data)
349 {
350         int nrx = 0, nsrx = 0, i = 0;
351         gmnal_srxd_t    *rxd = NULL;
352         void    *rxbuffer = NULL;
353
354         CDEBUG(D_TRACE, "gmnal_alloc_small rx\n");
355
356         GMNAL_GM_LOCK(nal_data);
357         nrx = gm_num_receive_tokens(nal_data->gm_port);
358         GMNAL_GM_UNLOCK(nal_data);
359         CDEBUG(D_INFO, "total number of receive tokens available is [%d]\n", 
360                nrx);
361         
362         nsrx = nrx/2;
363         nsrx = 12;
364         /*
365          *      make the number of rxds twice our total
366          *      number of stxds plus 1
367          */
368         nsrx = num_stxds*2 + 2;
369
370         CDEBUG(D_INFO, "Allocated [%d] receive tokens to small messages\n", 
371                nsrx);
372
373
374         GMNAL_GM_LOCK(nal_data);
375         nal_data->srxd_hash = gm_create_hash(gm_hash_compare_ptrs, 
376                                              gm_hash_hash_ptr, 0, 0, nsrx, 0);
377         GMNAL_GM_UNLOCK(nal_data);
378         if (!nal_data->srxd_hash) {
379                         CDEBUG(D_ERROR, "Failed to create hash table\n");
380                         return(GMNAL_STATUS_NOMEM);
381         }
382
383         GMNAL_RXD_TOKEN_INIT(nal_data, nsrx);
384         GMNAL_RXD_LOCK_INIT(nal_data);
385
386         for (i=0; i<=nsrx; i++) {
387                 PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t));
388                 if (!rxd) {
389                         CDEBUG(D_ERROR, "Failed to malloc rxd [%d]\n", i);
390                         return(GMNAL_STATUS_NOMEM);
391                 }
392 #if 0
393                 PORTAL_ALLOC(rxbuffer, GMNAL_SMALL_MSG_SIZE(nal_data));
394                 if (!rxbuffer) {
395                         CDEBUG(D_ERROR, "Failed to malloc rxbuffer [%d], "
396                                "size [%d]\n", i, 
397                                GMNAL_SMALL_MSG_SIZE(nal_data));
398                         PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
399                         return(GMNAL_STATUS_FAIL);
400                 }
401                 CDEBUG(D_NET, "Calling gm_register_memory with port [%p] "
402                        "rxbuffer [%p], size [%d]\n", nal_data->gm_port, 
403                        rxbuffer, GMNAL_SMALL_MSG_SIZE(nal_data));
404                 GMNAL_GM_LOCK(nal_data);
405                 gm_status = gm_register_memory(nal_data->gm_port, rxbuffer, 
406                                                GMNAL_SMALL_MSG_SIZE(nal_data));
407                 GMNAL_GM_UNLOCK(nal_data);
408                 if (gm_status != GM_SUCCESS) {
409                         CDEBUG(D_ERROR, "gm_register_memory failed buffer [%p],"
410                                " index [%d]\n", rxbuffer, i);
411                         switch(gm_status) {
412                                 case(GM_FAILURE):
413                                         CDEBUG(D_ERROR, "GM_FAILURE\n");
414                                 break;
415                                 case(GM_PERMISSION_DENIED):
416                                         CDEBUG(D_ERROR, "PERMISSION_DENIED\n");
417                                 break;
418                                 case(GM_INVALID_PARAMETER):
419                                         CDEBUG(D_ERROR, "INVALID_PARAMETER\n");
420                                 break;
421                                 default:
422                                         CDEBUG(D_ERROR, "Unknown error[%d]\n", 
423                                                gm_status);
424                                 break;
425                                 
426                         }
427                         return(GMNAL_STATUS_FAIL);
428                 }
429 #else
430                 GMNAL_GM_LOCK(nal_data);
431                 rxbuffer = gm_dma_malloc(nal_data->gm_port, 
432                                          GMNAL_SMALL_MSG_SIZE(nal_data));
433                 GMNAL_GM_UNLOCK(nal_data);
434                 if (!rxbuffer) {
435                         CDEBUG(D_ERROR, "Failed to gm_dma_malloc rxbuffer [%d],"
436                                " size [%d]\n", i, 
437                                GMNAL_SMALL_MSG_SIZE(nal_data));
438                         PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
439                         return(GMNAL_STATUS_FAIL);
440                 }
441 #endif
442                 
443                 rxd->buffer = rxbuffer;
444                 rxd->size = GMNAL_SMALL_MSG_SIZE(nal_data);
445                 rxd->gmsize = gm_min_size_for_length(rxd->size);
446
447                 if (gm_hash_insert(nal_data->srxd_hash, 
448                                    (void*)rxbuffer, (void*)rxd)) {
449
450                         CDEBUG(D_ERROR, "failed to create hash entry rxd[%p] "
451                                "for rxbuffer[%p]\n", rxd, rxbuffer);
452                         return(GMNAL_STATUS_FAIL);
453                 }
454
455                 rxd->next = nal_data->srxd;
456                 nal_data->srxd = rxd;
457                 CDEBUG(D_INFO, "Registered rxd [%p] with buffer [%p], "
458                        "size [%d]\n", rxd, rxd->buffer, rxd->size);
459         }
460
461         return(GMNAL_STATUS_OK);
462 }
463
464
465
466 /*      Free the list of wired and gm_registered small rx buffers and the 
467  *      rx descriptors that go along with them.
468  */
469 void
470 gmnal_free_srxd(gmnal_data_t *nal_data)
471 {
472         gmnal_srxd_t *rxd = nal_data->srxd, *_rxd = NULL;
473
474         CDEBUG(D_TRACE, "gmnal_free_small rx\n");
475
476         while(rxd) {
477                 CDEBUG(D_INFO, "Freeing rxd [%p] buffer [%p], size [%d]\n",
478                        rxd, rxd->buffer, rxd->size);
479                 _rxd = rxd;
480                 rxd = rxd->next;
481
482 #if 0
483                 GMNAL_GM_LOCK(nal_data);
484                 gm_deregister_memory(nal_data->gm_port, _rxd->buffer, 
485                                      _rxd->size);
486                 GMNAL_GM_UNLOCK(nal_data);
487                 PORTAL_FREE(_rxd->buffer, GMNAL_SMALL_RXBUFFER_SIZE);
488 #else
489                 GMNAL_GM_LOCK(nal_data);
490                 gm_dma_free(nal_data->gm_port, _rxd->buffer);
491                 GMNAL_GM_UNLOCK(nal_data);
492 #endif
493                 PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t));
494         }
495         return;
496 }
497
498
499 /*
500  *      Get a rxd from the free list
501  *      This get us a wired and gm_registered small rx buffer.
502  *      This implicitly gets us a receive token also.
503  */
504 gmnal_srxd_t *
505 gmnal_get_srxd(gmnal_data_t *nal_data, int block)
506 {
507
508         gmnal_srxd_t    *rxd = NULL;
509         CDEBUG(D_TRACE, "nal_data [%p] block [%d]\n", nal_data, block);
510
511         if (block) {
512                 GMNAL_RXD_GETTOKEN(nal_data);
513         } else {
514                 if (GMNAL_RXD_TRYGETTOKEN(nal_data)) {
515                         CDEBUG(D_INFO, "gmnal_get_srxd Can't get token\n");
516                         return(NULL);
517                 }
518         }
519         GMNAL_RXD_LOCK(nal_data);
520         rxd = nal_data->srxd;
521         if (rxd)
522                 nal_data->srxd = rxd->next;
523         GMNAL_RXD_UNLOCK(nal_data);
524         CDEBUG(D_INFO, "got [%p], head is [%p]\n", rxd, nal_data->srxd);
525         return(rxd);
526 }
527
528 /*
529  *      Return an rxd to the list
530  */
531 void
532 gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd)
533 {
534         CDEBUG(D_TRACE, "nal_data [%p], rxd[%p]\n", nal_data, rxd);
535
536         GMNAL_RXD_LOCK(nal_data);
537         rxd->next = nal_data->srxd;
538         nal_data->srxd = rxd;
539         GMNAL_RXD_UNLOCK(nal_data);
540         GMNAL_RXD_RETURNTOKEN(nal_data);
541         return;
542 }
543
544 /*
545  *      Given a pointer to a srxd find 
546  *      the relevant descriptor for it
547  *      This is done by searching a hash
548  *      list that is created when the srxd's 
549  *      are created
550  */
551 gmnal_srxd_t *
552 gmnal_rxbuffer_to_srxd(gmnal_data_t *nal_data, void *rxbuffer)
553 {
554         gmnal_srxd_t    *srxd = NULL;
555         CDEBUG(D_TRACE, "nal_data [%p], rxbuffer [%p]\n", nal_data, rxbuffer);
556         srxd = gm_hash_find(nal_data->srxd_hash, rxbuffer);
557         CDEBUG(D_INFO, "srxd is [%p]\n", srxd);
558         return(srxd);
559 }
560
561
562 void
563 gmnal_stop_rxthread(gmnal_data_t *nal_data)
564 {
565         int     delay = 30;
566
567
568
569         CDEBUG(D_TRACE, "Attempting to stop rxthread nal_data [%p]\n", 
570                 nal_data);
571         
572         nal_data->rxthread_stop_flag = GMNAL_THREAD_STOP;
573
574         gmnal_remove_rxtwe(nal_data);
575         /*
576          *      kick the thread 
577          */
578         up(&nal_data->rxtwe_wait);
579
580         while(nal_data->rxthread_flag != GMNAL_THREAD_RESET && delay--) {
581                 CDEBUG(D_INFO, "gmnal_stop_rxthread sleeping\n");
582                 gmnal_yield(1);
583                 up(&nal_data->rxtwe_wait);
584         }
585
586         if (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
587                 CDEBUG(D_ERROR, "I don't know how to wake the thread\n");
588         } else {
589                 CDEBUG(D_INFO, "rx thread seems to have stopped\n");
590         }
591 }
592
593 void
594 gmnal_stop_ctthread(gmnal_data_t *nal_data)
595 {
596         int     delay = 15;
597
598
599
600         CDEBUG(D_TRACE, "Attempting to stop ctthread nal_data [%p]\n", 
601                nal_data);
602         
603         nal_data->ctthread_flag = GMNAL_THREAD_STOP;
604         GMNAL_GM_LOCK(nal_data);
605         gm_set_alarm(nal_data->gm_port, &nal_data->ctthread_alarm, 10, 
606                      NULL, NULL);
607         GMNAL_GM_UNLOCK(nal_data);
608
609         while(nal_data->ctthread_flag == GMNAL_THREAD_STOP && delay--) {
610                 CDEBUG(D_INFO, "gmnal_stop_ctthread sleeping\n");
611                 gmnal_yield(1);
612         }
613
614         if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
615                 CDEBUG(D_ERROR, "I DON'T KNOW HOW TO WAKE THE THREAD\n");
616         } else {
617                 CDEBUG(D_INFO, "CT THREAD SEEMS TO HAVE STOPPED\n");
618         }
619 }
620
621
622
623 char * 
624 gmnal_gm_error(gm_status_t status)
625 {
626         return(gm_strerror(status));
627
628         switch(status) {
629                 case(GM_SUCCESS):
630                         return("SUCCESS");
631                 case(GM_FAILURE):
632                         return("FAILURE");
633                 case(GM_INPUT_BUFFER_TOO_SMALL):
634                         return("INPUT_BUFFER_TOO_SMALL");
635                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
636                         return("OUTPUT_BUFFER_TOO_SMALL");
637                 case(GM_TRY_AGAIN ):
638                         return("TRY_AGAIN");
639                 case(GM_BUSY):
640                         return("BUSY");
641                 case(GM_MEMORY_FAULT):
642                         return("MEMORY_FAULT");
643                 case(GM_INTERRUPTED):
644                         return("INTERRUPTED");
645                 case(GM_INVALID_PARAMETER):
646                         return("INVALID_PARAMETER");
647                 case(GM_OUT_OF_MEMORY):
648                         return("OUT_OF_MEMORY");
649                 case(GM_INVALID_COMMAND):
650                         return("INVALID_COMMAND");
651                 case(GM_PERMISSION_DENIED):
652                         return("PERMISSION_DENIED");
653                 case(GM_INTERNAL_ERROR):
654                         return("INTERNAL_ERROR");
655                 case(GM_UNATTACHED):
656                         return("UNATTACHED");
657                 case(GM_UNSUPPORTED_DEVICE):
658                         return("UNSUPPORTED_DEVICE");
659                 case(GM_SEND_TIMED_OUT):
660                         return("GM_SEND_TIMEDOUT");
661                 case(GM_SEND_REJECTED):
662                         return("GM_SEND_REJECTED");
663                 case(GM_SEND_TARGET_PORT_CLOSED):
664                         return("GM_SEND_TARGET_PORT_CLOSED");
665                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
666                         return("GM_SEND_TARGET_NODE_UNREACHABLE");
667                 case(GM_SEND_DROPPED):
668                         return("GM_SEND_DROPPED");
669                 case(GM_SEND_PORT_CLOSED):
670                         return("GM_SEND_PORT_CLOSED");
671                 case(GM_NODE_ID_NOT_YET_SET):
672                         return("GM_NODE_ID_NOT_YET_SET");
673                 case(GM_STILL_SHUTTING_DOWN):
674                         return("GM_STILL_SHUTTING_DOWN");
675                 case(GM_CLONE_BUSY):
676                         return("GM_CLONE_BUSY");
677                 case(GM_NO_SUCH_DEVICE):
678                         return("GM_NO_SUCH_DEVICE");
679                 case(GM_ABORTED):
680                         return("GM_ABORTED");
681                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
682                         return("GM_INCOMPATIBLE_LIB_AND_DRIVER");
683                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
684                         return("GM_UNTRANSLATED_SYSTEM_ERROR");
685                 case(GM_ACCESS_DENIED):
686                         return("GM_ACCESS_DENIED");
687
688
689 /*
690  *      These ones are in the docs but aren't in the header file 
691                 case(GM_DEV_NOT_FOUND):
692                         return("GM_DEV_NOT_FOUND");
693                 case(GM_INVALID_PORT_NUMBER):
694                         return("GM_INVALID_PORT_NUMBER");
695                 case(GM_UC_ERROR):
696                         return("GM_US_ERROR");
697                 case(GM_PAGE_TABLE_FULL):
698                         return("GM_PAGE_TABLE_FULL");
699                 case(GM_MINOR_OVERFLOW):
700                         return("GM_MINOR_OVERFLOW");
701                 case(GM_SEND_ORPHANED):
702                         return("GM_SEND_ORPHANED");
703                 case(GM_HARDWARE_FAULT):
704                         return("GM_HARDWARE_FAULT");
705                 case(GM_DATA_CORRUPTED):
706                         return("GM_DATA_CORRUPTED");
707                 case(GM_TIMED_OUT):
708                         return("GM_TIMED_OUT");
709                 case(GM_USER_ERROR):
710                         return("GM_USER_ERROR");
711                 case(GM_NO_MATCH):
712                         return("GM_NOMATCH");
713                 case(GM_NOT_SUPPORTED_IN_KERNEL):
714                         return("GM_NOT_SUPPORTED_IN_KERNEL");
715                 case(GM_NOT_SUPPORTED_ON_ARCH):
716                         return("GM_NOT_SUPPORTED_ON_ARCH");
717                 case(GM_PTE_REF_CNT_OVERFLOW):
718                         return("GM_PTR_REF_CNT_OVERFLOW");
719                 case(GM_NO_DRIVER_SUPPORT):
720                         return("GM_NO_DRIVER_SUPPORT");
721                 case(GM_FIRMWARE_NOT_RUNNING):
722                         return("GM_FIRMWARE_NOT_RUNNING");
723
724  *      These ones are in the docs but aren't in the header file 
725  */
726                 default:
727                         return("UNKNOWN GM ERROR CODE");
728         }
729 }
730
731
732 char *
733 gmnal_rxevent(gm_recv_event_t   *ev)
734 {
735         short   event;
736         event = GM_RECV_EVENT_TYPE(ev);
737         switch(event) {
738                 case(GM_NO_RECV_EVENT):
739                         return("GM_NO_RECV_EVENT");
740                 case(GM_SENDS_FAILED_EVENT):
741                         return("GM_SEND_FAILED_EVENT");
742                 case(GM_ALARM_EVENT):
743                         return("GM_ALARM_EVENT");
744                 case(GM_SENT_EVENT):
745                         return("GM_SENT_EVENT");
746                 case(_GM_SLEEP_EVENT):
747                         return("_GM_SLEEP_EVENT");
748                 case(GM_RAW_RECV_EVENT):
749                         return("GM_RAW_RECV_EVENT");
750                 case(GM_BAD_SEND_DETECTED_EVENT):
751                         return("GM_BAD_SEND_DETECTED_EVENT");
752                 case(GM_SEND_TOKEN_VIOLATION_EVENT):
753                         return("GM_SEND_TOKEN_VIOLATION_EVENT");
754                 case(GM_RECV_TOKEN_VIOLATION_EVENT):
755                         return("GM_RECV_TOKEN_VIOLATION_EVENT");
756                 case(GM_BAD_RECV_TOKEN_EVENT):
757                         return("GM_BAD_RECV_TOKEN_EVENT");
758                 case(GM_ALARM_VIOLATION_EVENT):
759                         return("GM_ALARM_VIOLATION_EVENT");
760                 case(GM_RECV_EVENT):
761                         return("GM_RECV_EVENT");
762                 case(GM_HIGH_RECV_EVENT):
763                         return("GM_HIGH_RECV_EVENT");
764                 case(GM_PEER_RECV_EVENT):
765                         return("GM_PEER_RECV_EVENT");
766                 case(GM_HIGH_PEER_RECV_EVENT):
767                         return("GM_HIGH_PEER_RECV_EVENT");
768                 case(GM_FAST_RECV_EVENT):
769                         return("GM_FAST_RECV_EVENT");
770                 case(GM_FAST_HIGH_RECV_EVENT):
771                         return("GM_FAST_HIGH_RECV_EVENT");
772                 case(GM_FAST_PEER_RECV_EVENT):
773                         return("GM_FAST_PEER_RECV_EVENT");
774                 case(GM_FAST_HIGH_PEER_RECV_EVENT):
775                         return("GM_FAST_HIGH_PEER_RECV_EVENT");
776                 case(GM_REJECTED_SEND_EVENT):
777                         return("GM_REJECTED_SEND_EVENT");
778                 case(GM_ORPHANED_SEND_EVENT):
779                         return("GM_ORPHANED_SEND_EVENT");
780                 case(GM_BAD_RESEND_DETECTED_EVENT):
781                         return("GM_BAD_RESEND_DETETED_EVENT");
782                 case(GM_DROPPED_SEND_EVENT):
783                         return("GM_DROPPED_SEND_EVENT");
784                 case(GM_BAD_SEND_VMA_EVENT):
785                         return("GM_BAD_SEND_VMA_EVENT");
786                 case(GM_BAD_RECV_VMA_EVENT):
787                         return("GM_BAD_RECV_VMA_EVENT");
788                 case(_GM_FLUSHED_ALARM_EVENT):
789                         return("GM_FLUSHED_ALARM_EVENT");
790                 case(GM_SENT_TOKENS_EVENT):
791                         return("GM_SENT_TOKENS_EVENTS");
792                 case(GM_IGNORE_RECV_EVENT):
793                         return("GM_IGNORE_RECV_EVENT");
794                 case(GM_ETHERNET_RECV_EVENT):
795                         return("GM_ETHERNET_RECV_EVENT");
796                 case(GM_NEW_NO_RECV_EVENT):
797                         return("GM_NEW_NO_RECV_EVENT");
798                 case(GM_NEW_SENDS_FAILED_EVENT):
799                         return("GM_NEW_SENDS_FAILED_EVENT");
800                 case(GM_NEW_ALARM_EVENT):
801                         return("GM_NEW_ALARM_EVENT");
802                 case(GM_NEW_SENT_EVENT):
803                         return("GM_NEW_SENT_EVENT");
804                 case(_GM_NEW_SLEEP_EVENT):
805                         return("GM_NEW_SLEEP_EVENT");
806                 case(GM_NEW_RAW_RECV_EVENT):
807                         return("GM_NEW_RAW_RECV_EVENT");
808                 case(GM_NEW_BAD_SEND_DETECTED_EVENT):
809                         return("GM_NEW_BAD_SEND_DETECTED_EVENT");
810                 case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT):
811                         return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT");
812                 case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT):
813                         return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT");
814                 case(GM_NEW_BAD_RECV_TOKEN_EVENT):
815                         return("GM_NEW_BAD_RECV_TOKEN_EVENT");
816                 case(GM_NEW_ALARM_VIOLATION_EVENT):
817                         return("GM_NEW_ALARM_VIOLATION_EVENT");
818                 case(GM_NEW_RECV_EVENT):
819                         return("GM_NEW_RECV_EVENT");
820                 case(GM_NEW_HIGH_RECV_EVENT):
821                         return("GM_NEW_HIGH_RECV_EVENT");
822                 case(GM_NEW_PEER_RECV_EVENT):
823                         return("GM_NEW_PEER_RECV_EVENT");
824                 case(GM_NEW_HIGH_PEER_RECV_EVENT):
825                         return("GM_NEW_HIGH_PEER_RECV_EVENT");
826                 case(GM_NEW_FAST_RECV_EVENT):
827                         return("GM_NEW_FAST_RECV_EVENT");
828                 case(GM_NEW_FAST_HIGH_RECV_EVENT):
829                         return("GM_NEW_FAST_HIGH_RECV_EVENT");
830                 case(GM_NEW_FAST_PEER_RECV_EVENT):
831                         return("GM_NEW_FAST_PEER_RECV_EVENT");
832                 case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT):
833                         return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT");
834                 case(GM_NEW_REJECTED_SEND_EVENT):
835                         return("GM_NEW_REJECTED_SEND_EVENT");
836                 case(GM_NEW_ORPHANED_SEND_EVENT):
837                         return("GM_NEW_ORPHANED_SEND_EVENT");
838                 case(_GM_NEW_PUT_NOTIFICATION_EVENT):
839                         return("_GM_NEW_PUT_NOTIFICATION_EVENT");
840                 case(GM_NEW_FREE_SEND_TOKEN_EVENT):
841                         return("GM_NEW_FREE_SEND_TOKEN_EVENT");
842                 case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT):
843                         return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT");
844                 case(GM_NEW_BAD_RESEND_DETECTED_EVENT):
845                         return("GM_NEW_BAD_RESEND_DETECTED_EVENT");
846                 case(GM_NEW_DROPPED_SEND_EVENT):
847                         return("GM_NEW_DROPPED_SEND_EVENT");
848                 case(GM_NEW_BAD_SEND_VMA_EVENT):
849                         return("GM_NEW_BAD_SEND_VMA_EVENT");
850                 case(GM_NEW_BAD_RECV_VMA_EVENT):
851                         return("GM_NEW_BAD_RECV_VMA_EVENT");
852                 case(_GM_NEW_FLUSHED_ALARM_EVENT):
853                         return("GM_NEW_FLUSHED_ALARM_EVENT");
854                 case(GM_NEW_SENT_TOKENS_EVENT):
855                         return("GM_NEW_SENT_TOKENS_EVENT");
856                 case(GM_NEW_IGNORE_RECV_EVENT):
857                         return("GM_NEW_IGNORE_RECV_EVENT");
858                 case(GM_NEW_ETHERNET_RECV_EVENT):
859                         return("GM_NEW_ETHERNET_RECV_EVENT");
860                 default:
861                         return("Unknown Recv event");
862 #if 0
863                 case(/* _GM_PUT_NOTIFICATION_EVENT */
864                 case(/* GM_FREE_SEND_TOKEN_EVENT */
865                 case(/* GM_FREE_HIGH_SEND_TOKEN_EVENT */
866 #endif
867         }
868 }
869
870
871 void
872 gmnal_yield(int delay)
873 {
874         set_current_state(TASK_INTERRUPTIBLE);
875         schedule_timeout(delay);
876 }
877
878 int
879 gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, 
880                     int len)
881 {
882
883         CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, 
884                GMNAL_SMALL_MSG_SIZE(nal_data));
885
886         if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) 
887                      < GMNAL_SMALL_MSG_SIZE(nal_data)) {
888
889                 CDEBUG(D_INFO, "Yep, small message\n");
890                 return(1);
891         } else {
892                 CDEBUG(D_ERROR, "No, not small message\n");
893                 /*
894                  *      could be made up of lots of little ones !
895                  */
896                 return(0);
897         }
898
899 }
900
901 /* 
902  *      extract info from the receive event.
903  *      Have to do this before the next call to gm_receive
904  *      Deal with all endian stuff here.
905  *      Then stick work entry on list where rxthreads
906  *      can get it to complete the receive
907  */
908 int
909 gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
910 {
911         gmnal_rxtwe_t   *we = NULL;
912
913         CDEBUG(D_NET, "adding entry to list\n");
914
915         PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t));
916         if (!we) {
917                 CDEBUG(D_ERROR, "failed to malloc\n");
918                 return(GMNAL_STATUS_FAIL);
919         }
920         we->buffer = gm_ntohp(recv->buffer);
921         we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
922         we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
923         we->type = (int)gm_ntoh_u8(recv->type);
924         we->length = (int)gm_ntohl(recv->length);
925
926         spin_lock(&nal_data->rxtwe_lock);
927         if (nal_data->rxtwe_tail) {
928                 nal_data->rxtwe_tail->next = we;
929         } else {
930                 nal_data->rxtwe_head = we;
931                 nal_data->rxtwe_tail = we;
932         }
933         nal_data->rxtwe_tail = we;
934         spin_unlock(&nal_data->rxtwe_lock);
935
936         up(&nal_data->rxtwe_wait);
937         return(GMNAL_STATUS_OK);
938 }
939
940 void
941 gmnal_remove_rxtwe(gmnal_data_t *nal_data)
942 {
943         gmnal_rxtwe_t   *_we, *we = nal_data->rxtwe_head;
944
945         CDEBUG(D_NET, "removing all work list entries\n");
946
947         spin_lock(&nal_data->rxtwe_lock);
948         CDEBUG(D_NET, "Got lock\n");
949         while (we) {
950                 _we = we;
951                 we = we->next;
952                 PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t));
953         }
954         spin_unlock(&nal_data->rxtwe_lock);
955         nal_data->rxtwe_head = NULL;
956         nal_data->rxtwe_tail = NULL;
957 }
958
959 gmnal_rxtwe_t *
960 gmnal_get_rxtwe(gmnal_data_t *nal_data)
961 {
962         gmnal_rxtwe_t   *we = NULL;
963
964         CDEBUG(D_NET, "Getting entry to list\n");
965
966         do  {
967                 while(down_interruptible(&nal_data->rxtwe_wait) != 0)
968                         /* do nothing */;
969                 if (nal_data->rxthread_stop_flag == GMNAL_THREAD_STOP) {
970                         /*
971                          *      time to stop
972                          *      TO DO some one free the work entries
973                          */
974                         return(NULL);
975                 }
976                 spin_lock(&nal_data->rxtwe_lock);
977                 if (nal_data->rxtwe_head) {
978                         CDEBUG(D_INFO, "Got a work entry\n");
979                         we = nal_data->rxtwe_head;
980                         nal_data->rxtwe_head = we->next;
981                         if (!nal_data->rxtwe_head)
982                                 nal_data->rxtwe_tail = NULL;
983                 } else {
984                         CDEBUG(D_WARNING, "woken but no work\n");
985                 }
986                 spin_unlock(&nal_data->rxtwe_lock);
987         } while (!we);
988
989         CDEBUG(D_INFO, "Returning we[%p]\n", we);
990         return(we);
991 }
992
993
994 /*
995  *      Start the caretaker thread and a number of receiver threads
996  *      The caretaker thread gets events from the gm library.
997  *      It passes receive events to the receiver threads via a work list.
998  *      It processes other events itself in gm_unknown. These will be
999  *      callback events or sleeps.
1000  */
1001 int
1002 gmnal_start_kernel_threads(gmnal_data_t *nal_data)
1003 {
1004
1005         int     threads = 0;
1006         /*
1007          *      the alarm is used to wake the caretaker thread from 
1008          *      gm_unknown call (sleeping) to exit it.
1009          */
1010         CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
1011         gm_initialize_alarm(&nal_data->ctthread_alarm);
1012         nal_data->ctthread_flag = GMNAL_THREAD_RESET;
1013
1014
1015         CDEBUG(D_INFO, "Starting caretaker thread\n");
1016         nal_data->ctthread_pid = 
1017                  kernel_thread(gmnal_ct_thread, (void*)nal_data, 0);
1018         if (nal_data->ctthread_pid <= 0) {
1019                 CDEBUG(D_ERROR, "Caretaker thread failed to start\n");
1020                 return(GMNAL_STATUS_FAIL);
1021         }
1022
1023         while (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
1024                 gmnal_yield(1);
1025                 CDEBUG(D_INFO, "Waiting for caretaker thread signs of life\n");
1026         }
1027
1028         CDEBUG(D_INFO, "caretaker thread has started\n");
1029
1030
1031         /*
1032          *      Now start a number of receiver threads
1033          *      these treads get work to do from the caretaker (ct) thread
1034          */
1035         nal_data->rxthread_flag = GMNAL_THREAD_RESET;
1036         nal_data->rxthread_stop_flag = GMNAL_THREAD_RESET;
1037
1038         for (threads=0; threads<NRXTHREADS; threads++)
1039                 nal_data->rxthread_pid[threads] = -1;
1040         spin_lock_init(&nal_data->rxtwe_lock);
1041         spin_lock_init(&nal_data->rxthread_flag_lock);
1042         sema_init(&nal_data->rxtwe_wait, 0);
1043         nal_data->rxtwe_head = NULL;
1044         nal_data->rxtwe_tail = NULL;
1045         /*
1046          *      If the default number of receive threades isn't
1047          *      modified at load time, then start one thread per cpu
1048          */
1049         if (num_rx_threads == -1)
1050                 num_rx_threads = smp_num_cpus;
1051         CDEBUG(D_INFO, "Starting [%d] receive threads\n", num_rx_threads);
1052         for (threads=0; threads<num_rx_threads; threads++) {
1053                 nal_data->rxthread_pid[threads] = 
1054                        kernel_thread(gmnal_rx_thread, (void*)nal_data, 0);
1055                 if (nal_data->rxthread_pid[threads] <= 0) {
1056                         CDEBUG(D_ERROR, "Receive thread failed to start\n");
1057                         gmnal_stop_rxthread(nal_data);
1058                         gmnal_stop_ctthread(nal_data);
1059                         return(GMNAL_STATUS_FAIL);
1060                 }
1061         }
1062
1063         for (;;) {
1064                 spin_lock(&nal_data->rxthread_flag_lock);
1065                 if (nal_data->rxthread_flag == GMNAL_RXTHREADS_STARTED) {
1066                         spin_unlock(&nal_data->rxthread_flag_lock);
1067                         break;
1068                 }
1069                 spin_unlock(&nal_data->rxthread_flag_lock);
1070                 gmnal_yield(1);
1071         }
1072
1073         CDEBUG(D_INFO, "receive threads seem to have started\n");
1074
1075         return(GMNAL_STATUS_OK);
1076 }