Whamcloud - gitweb
Land b_release_1_4_3 onto HEAD (20050619_0305)
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_utils.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21 /*
22  *      All utilities required by lgmanl
23  */
24
25 #include "gmnal.h"
26
27 /*
28  *      Am I one of the gmnal rxthreads ?
29  */
30 int
31 gmnal_is_rxthread(gmnal_data_t *nal_data)
32 {
33         int i;
34         for (i=0; i<num_rx_threads; i++) {
35                 if (nal_data->rxthread_pid[i] == current->pid)
36                         return(1);
37         }
38         return(0);
39 }
40
41
42 /*
43  *      Allocate tx descriptors/tokens (large and small)
44  *      allocate a number of small tx buffers and register with GM
45  *      so they are wired and set up for DMA. This is a costly operation.
46  *      Also allocate a corrosponding descriptor to keep track of 
47  *      the buffer.
48  *      Put all small descriptors on singly linked list to be available to send 
49  *      function.
50  *      Allocate the rest of the available tx tokens for large messages. These will be
51  *      used to do gm_gets in gmnal_copyiov     
52  */
53 int
54 gmnal_alloc_txd(gmnal_data_t *nal_data)
55 {
56         int ntx= 0, nstx= 0, nrxt_stx= 0,
57             nltx= 0, i = 0;
58         gmnal_stxd_t    *txd = NULL;
59         gmnal_ltxd_t    *ltxd = NULL;
60         void    *txbuffer = NULL;
61
62         CDEBUG(D_TRACE, "gmnal_alloc_small tx\n");
63
64         GMNAL_GM_LOCK(nal_data);
65         /*
66          *      total number of transmit tokens
67          */
68         ntx = gm_num_send_tokens(nal_data->gm_port);
69         GMNAL_GM_UNLOCK(nal_data);
70         CDEBUG(D_INFO, "total number of send tokens available is [%d]\n", ntx);
71
72         /*
73          *      allocate a number for small sends
74          *      num_stxds from gmnal_module.c
75          */
76         nstx = num_stxds;
77         /*
78          *      give that number plus 1 to the receive threads
79          */
80         nrxt_stx = nstx + 1;
81
82         /*
83          *      give the rest for gm_gets
84          */
85         nltx = ntx - (nrxt_stx + nstx);
86         if (nltx < 1) {
87                 CERROR("No tokens available for large messages\n");
88                 return(GMNAL_STATUS_FAIL);
89         }
90
91
92         /*
93          * A semaphore is initialised with the
94          * number of transmit tokens available.
95          * To get a stxd, acquire the token semaphore.
96          * this decrements the available token count
97          * (if no tokens you block here, someone returning a
98          * stxd will release the semaphore and wake you)
99          * When token is obtained acquire the spinlock
100          * to manipulate the list
101          */
102         GMNAL_TXD_TOKEN_INIT(nal_data, nstx);
103         GMNAL_TXD_LOCK_INIT(nal_data);
104         GMNAL_RXT_TXD_TOKEN_INIT(nal_data, nrxt_stx);
105         GMNAL_RXT_TXD_LOCK_INIT(nal_data);
106         GMNAL_LTXD_TOKEN_INIT(nal_data, nltx);
107         GMNAL_LTXD_LOCK_INIT(nal_data);
108
109         for (i=0; i<=nstx; i++) {
110                 PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
111                 if (!txd) {
112                         CERROR("Failed to malloc txd [%d]\n", i);
113                         return(GMNAL_STATUS_NOMEM);
114                 }
115                 GMNAL_GM_LOCK(nal_data);
116                 txbuffer = gm_dma_malloc(nal_data->gm_port,
117                                          GMNAL_SMALL_MSG_SIZE(nal_data));
118                 GMNAL_GM_UNLOCK(nal_data);
119                 if (!txbuffer) {
120                         CERROR("Failed to gm_dma_malloc txbuffer [%d], "
121                                "size [%d]\n", i,GMNAL_SMALL_MSG_SIZE(nal_data));
122                         PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
123                         return(GMNAL_STATUS_FAIL);
124                 }
125                 txd->buffer = txbuffer;
126                 txd->buffer_size = GMNAL_SMALL_MSG_SIZE(nal_data);
127                 txd->gm_size = gm_min_size_for_length(txd->buffer_size);
128                 txd->nal_data = (struct _gmnal_data_t*)nal_data;
129                 txd->rxt = 0;
130
131                 txd->next = nal_data->stxd;
132                 nal_data->stxd = txd;
133                 CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
134                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
135         }
136
137         for (i=0; i<=nrxt_stx; i++) {
138                 PORTAL_ALLOC(txd, sizeof(gmnal_stxd_t));
139                 if (!txd) {
140                         CERROR("Failed to malloc txd [%d]\n", i);
141                         return(GMNAL_STATUS_NOMEM);
142                 }
143                 GMNAL_GM_LOCK(nal_data);
144                 txbuffer = gm_dma_malloc(nal_data->gm_port, 
145                                          GMNAL_SMALL_MSG_SIZE(nal_data));
146                 GMNAL_GM_UNLOCK(nal_data);
147                 if (!txbuffer) {
148                         CERROR("Failed to gm_dma_malloc txbuffer [%d],"
149                                " size [%d]\n",i,GMNAL_SMALL_MSG_SIZE(nal_data));
150                         PORTAL_FREE(txd, sizeof(gmnal_stxd_t));
151                         return(GMNAL_STATUS_FAIL);
152                 }
153                 txd->buffer = txbuffer;
154                 txd->buffer_size = GMNAL_SMALL_MSG_SIZE(nal_data);
155                 txd->gm_size = gm_min_size_for_length(txd->buffer_size);
156                 txd->nal_data = (struct _gmnal_data_t*)nal_data;
157                 txd->rxt = 1;
158
159                 txd->next = nal_data->rxt_stxd;
160                 nal_data->rxt_stxd = txd;
161                 CDEBUG(D_INFO, "Registered txd [%p] with buffer [%p], "
162                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
163         }
164
165         /*
166          *      string together large tokens
167          */
168         for (i=0; i<=nltx ; i++) {
169                 PORTAL_ALLOC(ltxd, sizeof(gmnal_ltxd_t));
170                 ltxd->next = nal_data->ltxd;
171                 nal_data->ltxd = ltxd;
172         }
173         return(GMNAL_STATUS_OK);
174 }
175
176 /*      Free the list of wired and gm_registered small tx buffers and 
177  *      the tx descriptors that go along with them.
178  */
179 void
180 gmnal_free_txd(gmnal_data_t *nal_data)
181 {
182         gmnal_stxd_t *txd = nal_data->stxd, *_txd = NULL;
183         gmnal_ltxd_t *ltxd = NULL, *_ltxd = NULL;
184
185         CDEBUG(D_TRACE, "gmnal_free_small tx\n");
186
187         while(txd) {
188                 CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
189                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
190                 _txd = txd;
191                 txd = txd->next;
192                 GMNAL_GM_LOCK(nal_data);
193                 gm_dma_free(nal_data->gm_port, _txd->buffer);
194                 GMNAL_GM_UNLOCK(nal_data);
195                 PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
196         }
197         txd = nal_data->rxt_stxd;
198         while(txd) {
199                 CDEBUG(D_INFO, "Freeing txd [%p] with buffer [%p], "
200                        "size [%d]\n", txd, txd->buffer, txd->buffer_size);
201                 _txd = txd;
202                 txd = txd->next;
203                 GMNAL_GM_LOCK(nal_data);
204                 gm_dma_free(nal_data->gm_port, _txd->buffer);
205                 GMNAL_GM_UNLOCK(nal_data);
206                 PORTAL_FREE(_txd, sizeof(gmnal_stxd_t));
207         }
208         ltxd = nal_data->ltxd;
209         while(txd) {
210                 _ltxd = ltxd;
211                 ltxd = ltxd->next;
212                 PORTAL_FREE(_ltxd, sizeof(gmnal_ltxd_t));
213         }
214         
215         return;
216 }
217
218
219 /*
220  *      Get a txd from the list
221  *      This get us a wired and gm_registered small tx buffer.
222  *      This implicitly gets us a send token also.
223  */
224 gmnal_stxd_t *
225 gmnal_get_stxd(gmnal_data_t *nal_data, int block)
226 {
227
228         gmnal_stxd_t    *txd = NULL;
229         pid_t           pid = current->pid;
230
231
232         CDEBUG(D_TRACE, "gmnal_get_stxd nal_data [%p] block[%d] pid [%d]\n", 
233                nal_data, block, pid);
234
235         if (gmnal_is_rxthread(nal_data)) {
236                 CDEBUG(D_INFO, "RXTHREAD Attempting to get token\n");
237                 GMNAL_RXT_TXD_GETTOKEN(nal_data);
238                 GMNAL_RXT_TXD_LOCK(nal_data);
239                 txd = nal_data->rxt_stxd;
240                 nal_data->rxt_stxd = txd->next;
241                 GMNAL_RXT_TXD_UNLOCK(nal_data);
242                 CDEBUG(D_INFO, "RXTHREAD got [%p], head is [%p]\n", 
243                        txd, nal_data->rxt_stxd);
244                 txd->kniov = 0;
245                 txd->rxt = 1;
246         } else {
247                 if (block) {
248                         CDEBUG(D_INFO, "Attempting to get token\n");
249                         GMNAL_TXD_GETTOKEN(nal_data);
250                         CDEBUG(D_PORTALS, "Got token\n");
251                 } else {
252                         if (GMNAL_TXD_TRYGETTOKEN(nal_data)) {
253                                 CERROR("can't get token\n");
254                                 return(NULL);
255                         }
256                 }
257                 GMNAL_TXD_LOCK(nal_data);
258                 txd = nal_data->stxd;
259                 nal_data->stxd = txd->next;
260                 GMNAL_TXD_UNLOCK(nal_data);
261                 CDEBUG(D_INFO, "got [%p], head is [%p]\n", txd,
262                        nal_data->stxd);
263                 txd->kniov = 0;
264         }       /* general txd get */
265         return(txd);
266 }
267
268 /*
269  *      Return a txd to the list
270  */
271 void
272 gmnal_return_stxd(gmnal_data_t *nal_data, gmnal_stxd_t *txd)
273 {
274         CDEBUG(D_TRACE, "nal_data [%p], txd[%p] rxt[%d]\n", nal_data,
275                txd, txd->rxt);
276
277         /*
278          *      this transmit descriptor is 
279          *      for the rxthread
280          */
281         if (txd->rxt) {
282                 GMNAL_RXT_TXD_LOCK(nal_data);
283                 txd->next = nal_data->rxt_stxd;
284                 nal_data->rxt_stxd = txd;
285                 GMNAL_RXT_TXD_UNLOCK(nal_data);
286                 GMNAL_RXT_TXD_RETURNTOKEN(nal_data);
287                 CDEBUG(D_INFO, "Returned stxd to rxthread list\n");
288         } else {
289                 GMNAL_TXD_LOCK(nal_data);
290                 txd->next = nal_data->stxd;
291                 nal_data->stxd = txd;
292                 GMNAL_TXD_UNLOCK(nal_data);
293                 GMNAL_TXD_RETURNTOKEN(nal_data);
294                 CDEBUG(D_INFO, "Returned stxd to general list\n");
295         }
296         return;
297 }
298
299
300 /*
301  *      Get a large transmit descriptor from the free list
302  *      This implicitly gets us a transmit  token .
303  *      always wait for one.
304  */
305 gmnal_ltxd_t *
306 gmnal_get_ltxd(gmnal_data_t *nal_data)
307 {
308
309         gmnal_ltxd_t    *ltxd = NULL;
310
311         CDEBUG(D_TRACE, "nal_data [%p]\n", nal_data);
312
313         GMNAL_LTXD_GETTOKEN(nal_data);
314         GMNAL_LTXD_LOCK(nal_data);
315         ltxd = nal_data->ltxd;
316         nal_data->ltxd = ltxd->next;
317         GMNAL_LTXD_UNLOCK(nal_data);
318         CDEBUG(D_INFO, "got [%p], head is [%p]\n", ltxd, nal_data->ltxd);
319         return(ltxd);
320 }
321
322 /*
323  *      Return an ltxd to the list
324  */
325 void
326 gmnal_return_ltxd(gmnal_data_t *nal_data, gmnal_ltxd_t *ltxd)
327 {
328         CDEBUG(D_TRACE, "nal_data [%p], ltxd[%p]\n", nal_data, ltxd);
329
330         GMNAL_LTXD_LOCK(nal_data);
331         ltxd->next = nal_data->ltxd;
332         nal_data->ltxd = ltxd;
333         GMNAL_LTXD_UNLOCK(nal_data);
334         GMNAL_LTXD_RETURNTOKEN(nal_data);
335         return;
336 }
337 /*
338  *      allocate a number of small rx buffers and register with GM
339  *      so they are wired and set up for DMA. This is a costly operation.
340  *      Also allocate a corrosponding descriptor to keep track of 
341  *      the buffer.
342  *      Put all descriptors on singly linked list to be available to 
343  *      receive thread.
344  */
345 int
346 gmnal_alloc_srxd(gmnal_data_t *nal_data)
347 {
348         int nrx = 0, nsrx = 0, i = 0;
349         gmnal_srxd_t    *rxd = NULL;
350         void    *rxbuffer = NULL;
351
352         CDEBUG(D_TRACE, "gmnal_alloc_small rx\n");
353
354         GMNAL_GM_LOCK(nal_data);
355         nrx = gm_num_receive_tokens(nal_data->gm_port);
356         GMNAL_GM_UNLOCK(nal_data);
357         CDEBUG(D_INFO, "total number of receive tokens available is [%d]\n",
358                nrx);
359
360         nsrx = nrx/2;
361         nsrx = 12;
362         /*
363          *      make the number of rxds twice our total
364          *      number of stxds plus 1
365          */
366         nsrx = num_stxds*2 + 2;
367
368         CDEBUG(D_INFO, "Allocated [%d] receive tokens to small messages\n",
369                nsrx);
370
371
372         GMNAL_GM_LOCK(nal_data);
373         nal_data->srxd_hash = gm_create_hash(gm_hash_compare_ptrs, 
374                                              gm_hash_hash_ptr, 0, 0, nsrx, 0);
375         GMNAL_GM_UNLOCK(nal_data);
376         if (!nal_data->srxd_hash) {
377                         CERROR("Failed to create hash table\n");
378                         return(GMNAL_STATUS_NOMEM);
379         }
380
381         GMNAL_RXD_TOKEN_INIT(nal_data, nsrx);
382         GMNAL_RXD_LOCK_INIT(nal_data);
383
384         for (i=0; i<=nsrx; i++) {
385                 PORTAL_ALLOC(rxd, sizeof(gmnal_srxd_t));
386                 if (!rxd) {
387                         CERROR("Failed to malloc rxd [%d]\n", i);
388                         return(GMNAL_STATUS_NOMEM);
389                 }
390 #if 0
391                 PORTAL_ALLOC(rxbuffer, GMNAL_SMALL_MSG_SIZE(nal_data));
392                 if (!rxbuffer) {
393                         CERROR("Failed to malloc rxbuffer [%d], "
394                                "size [%d]\n", i,GMNAL_SMALL_MSG_SIZE(nal_data));
395                         PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
396                         return(GMNAL_STATUS_FAIL);
397                 }
398                 CDEBUG(D_NET, "Calling gm_register_memory with port [%p] "
399                        "rxbuffer [%p], size [%d]\n", nal_data->gm_port,
400                        rxbuffer, GMNAL_SMALL_MSG_SIZE(nal_data));
401                 GMNAL_GM_LOCK(nal_data);
402                 gm_status = gm_register_memory(nal_data->gm_port, rxbuffer,
403                                                GMNAL_SMALL_MSG_SIZE(nal_data));
404                 GMNAL_GM_UNLOCK(nal_data);
405                 if (gm_status != GM_SUCCESS) {
406                         CERROR("gm_register_memory failed buffer [%p],"
407                                " index [%d]\n", rxbuffer, i);
408                         switch(gm_status) {
409                                 case(GM_FAILURE):
410                                         CERROR("GM_FAILURE\n");
411                                 break;
412                                 case(GM_PERMISSION_DENIED):
413                                         CERROR("PERMISSION_DENIED\n");
414                                 break;
415                                 case(GM_INVALID_PARAMETER):
416                                         CERROR("INVALID_PARAMETER\n");
417                                 break;
418                                 default:
419                                         CERROR("Unknown error[%d]\n",gm_status);
420                                 break;
421                         }
422                         return(GMNAL_STATUS_FAIL);
423                 }
424 #else
425                 GMNAL_GM_LOCK(nal_data);
426                 rxbuffer = gm_dma_malloc(nal_data->gm_port, 
427                                          GMNAL_SMALL_MSG_SIZE(nal_data));
428                 GMNAL_GM_UNLOCK(nal_data);
429                 if (!rxbuffer) {
430                         CERROR("Failed to gm_dma_malloc rxbuffer [%d], "
431                                "size [%d]\n",i ,GMNAL_SMALL_MSG_SIZE(nal_data));
432                         PORTAL_FREE(rxd, sizeof(gmnal_srxd_t));
433                         return(GMNAL_STATUS_FAIL);
434                 }
435 #endif
436
437                 rxd->buffer = rxbuffer;
438                 rxd->size = GMNAL_SMALL_MSG_SIZE(nal_data);
439                 rxd->gmsize = gm_min_size_for_length(rxd->size);
440
441                 if (gm_hash_insert(nal_data->srxd_hash,
442                                    (void*)rxbuffer, (void*)rxd)) {
443
444                         CERROR("failed to create hash entry rxd[%p] "
445                                "for rxbuffer[%p]\n", rxd, rxbuffer);
446                         return(GMNAL_STATUS_FAIL);
447                 }
448
449                 rxd->next = nal_data->srxd;
450                 nal_data->srxd = rxd;
451                 CDEBUG(D_INFO, "Registered rxd [%p] with buffer [%p], "
452                        "size [%d]\n", rxd, rxd->buffer, rxd->size);
453         }
454
455         return(GMNAL_STATUS_OK);
456 }
457
458
459
460 /*      Free the list of wired and gm_registered small rx buffers and the 
461  *      rx descriptors that go along with them.
462  */
463 void
464 gmnal_free_srxd(gmnal_data_t *nal_data)
465 {
466         gmnal_srxd_t *rxd = nal_data->srxd, *_rxd = NULL;
467
468         CDEBUG(D_TRACE, "gmnal_free_small rx\n");
469
470         while(rxd) {
471                 CDEBUG(D_INFO, "Freeing rxd [%p] buffer [%p], size [%d]\n",
472                        rxd, rxd->buffer, rxd->size);
473                 _rxd = rxd;
474                 rxd = rxd->next;
475
476 #if 0
477                 GMNAL_GM_LOCK(nal_data);
478                 gm_deregister_memory(nal_data->gm_port, _rxd->buffer, 
479                                      _rxd->size);
480                 GMNAL_GM_UNLOCK(nal_data);
481                 PORTAL_FREE(_rxd->buffer, GMNAL_SMALL_RXBUFFER_SIZE);
482 #else
483                 GMNAL_GM_LOCK(nal_data);
484                 gm_dma_free(nal_data->gm_port, _rxd->buffer);
485                 GMNAL_GM_UNLOCK(nal_data);
486 #endif
487                 PORTAL_FREE(_rxd, sizeof(gmnal_srxd_t));
488         }
489         return;
490 }
491
492
493 /*
494  *      Get a rxd from the free list
495  *      This get us a wired and gm_registered small rx buffer.
496  *      This implicitly gets us a receive token also.
497  */
498 gmnal_srxd_t *
499 gmnal_get_srxd(gmnal_data_t *nal_data, int block)
500 {
501
502         gmnal_srxd_t    *rxd = NULL;
503         CDEBUG(D_TRACE, "nal_data [%p] block [%d]\n", nal_data, block);
504
505         if (block) {
506                 GMNAL_RXD_GETTOKEN(nal_data);
507         } else {
508                 if (GMNAL_RXD_TRYGETTOKEN(nal_data)) {
509                         CDEBUG(D_INFO, "gmnal_get_srxd Can't get token\n");
510                         return(NULL);
511                 }
512         }
513         GMNAL_RXD_LOCK(nal_data);
514         rxd = nal_data->srxd;
515         if (rxd)
516                 nal_data->srxd = rxd->next;
517         GMNAL_RXD_UNLOCK(nal_data);
518         CDEBUG(D_INFO, "got [%p], head is [%p]\n", rxd, nal_data->srxd);
519         return(rxd);
520 }
521
522 /*
523  *      Return an rxd to the list
524  */
525 void
526 gmnal_return_srxd(gmnal_data_t *nal_data, gmnal_srxd_t *rxd)
527 {
528         CDEBUG(D_TRACE, "nal_data [%p], rxd[%p]\n", nal_data, rxd);
529
530         GMNAL_RXD_LOCK(nal_data);
531         rxd->next = nal_data->srxd;
532         nal_data->srxd = rxd;
533         GMNAL_RXD_UNLOCK(nal_data);
534         GMNAL_RXD_RETURNTOKEN(nal_data);
535         return;
536 }
537
538 /*
539  *      Given a pointer to a srxd find 
540  *      the relevant descriptor for it
541  *      This is done by searching a hash
542  *      list that is created when the srxd's 
543  *      are created
544  */
545 gmnal_srxd_t *
546 gmnal_rxbuffer_to_srxd(gmnal_data_t *nal_data, void *rxbuffer)
547 {
548         gmnal_srxd_t    *srxd = NULL;
549         CDEBUG(D_TRACE, "nal_data [%p], rxbuffer [%p]\n", nal_data, rxbuffer);
550         srxd = gm_hash_find(nal_data->srxd_hash, rxbuffer);
551         CDEBUG(D_INFO, "srxd is [%p]\n", srxd);
552         return(srxd);
553 }
554
555
556 void
557 gmnal_stop_rxthread(gmnal_data_t *nal_data)
558 {
559         int     delay = 30;
560
561
562
563         CDEBUG(D_TRACE, "Attempting to stop rxthread nal_data [%p]\n", 
564                 nal_data);
565         
566         nal_data->rxthread_stop_flag = GMNAL_THREAD_STOP;
567
568         gmnal_remove_rxtwe(nal_data);
569         /*
570          *      kick the thread 
571          */
572         up(&nal_data->rxtwe_wait);
573
574         while(nal_data->rxthread_flag != GMNAL_THREAD_RESET && delay--) {
575                 CDEBUG(D_INFO, "gmnal_stop_rxthread sleeping\n");
576                 gmnal_yield(1);
577                 up(&nal_data->rxtwe_wait);
578         }
579
580         if (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
581                 CERROR("I don't know how to wake the thread\n");
582         } else {
583                 CDEBUG(D_INFO, "rx thread seems to have stopped\n");
584         }
585 }
586
587 void
588 gmnal_stop_ctthread(gmnal_data_t *nal_data)
589 {
590         int     delay = 15;
591
592
593
594         CDEBUG(D_TRACE, "Attempting to stop ctthread nal_data [%p]\n", 
595                nal_data);
596         
597         nal_data->ctthread_flag = GMNAL_THREAD_STOP;
598         GMNAL_GM_LOCK(nal_data);
599         gm_set_alarm(nal_data->gm_port, &nal_data->ctthread_alarm, 10, 
600                      NULL, NULL);
601         GMNAL_GM_UNLOCK(nal_data);
602
603         while(nal_data->ctthread_flag == GMNAL_THREAD_STOP && delay--) {
604                 CDEBUG(D_INFO, "gmnal_stop_ctthread sleeping\n");
605                 gmnal_yield(1);
606         }
607
608         if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
609                 CERROR("I DON'T KNOW HOW TO WAKE THE THREAD\n");
610         } else {
611                 CDEBUG(D_INFO, "CT THREAD SEEMS TO HAVE STOPPED\n");
612         }
613 }
614
615
616
617 char * 
618 gmnal_gm_error(gm_status_t status)
619 {
620         return(gm_strerror(status));
621
622         switch(status) {
623                 case(GM_SUCCESS):
624                         return("SUCCESS");
625                 case(GM_FAILURE):
626                         return("FAILURE");
627                 case(GM_INPUT_BUFFER_TOO_SMALL):
628                         return("INPUT_BUFFER_TOO_SMALL");
629                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
630                         return("OUTPUT_BUFFER_TOO_SMALL");
631                 case(GM_TRY_AGAIN ):
632                         return("TRY_AGAIN");
633                 case(GM_BUSY):
634                         return("BUSY");
635                 case(GM_MEMORY_FAULT):
636                         return("MEMORY_FAULT");
637                 case(GM_INTERRUPTED):
638                         return("INTERRUPTED");
639                 case(GM_INVALID_PARAMETER):
640                         return("INVALID_PARAMETER");
641                 case(GM_OUT_OF_MEMORY):
642                         return("OUT_OF_MEMORY");
643                 case(GM_INVALID_COMMAND):
644                         return("INVALID_COMMAND");
645                 case(GM_PERMISSION_DENIED):
646                         return("PERMISSION_DENIED");
647                 case(GM_INTERNAL_ERROR):
648                         return("INTERNAL_ERROR");
649                 case(GM_UNATTACHED):
650                         return("UNATTACHED");
651                 case(GM_UNSUPPORTED_DEVICE):
652                         return("UNSUPPORTED_DEVICE");
653                 case(GM_SEND_TIMED_OUT):
654                         return("GM_SEND_TIMEDOUT");
655                 case(GM_SEND_REJECTED):
656                         return("GM_SEND_REJECTED");
657                 case(GM_SEND_TARGET_PORT_CLOSED):
658                         return("GM_SEND_TARGET_PORT_CLOSED");
659                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
660                         return("GM_SEND_TARGET_NODE_UNREACHABLE");
661                 case(GM_SEND_DROPPED):
662                         return("GM_SEND_DROPPED");
663                 case(GM_SEND_PORT_CLOSED):
664                         return("GM_SEND_PORT_CLOSED");
665                 case(GM_NODE_ID_NOT_YET_SET):
666                         return("GM_NODE_ID_NOT_YET_SET");
667                 case(GM_STILL_SHUTTING_DOWN):
668                         return("GM_STILL_SHUTTING_DOWN");
669                 case(GM_CLONE_BUSY):
670                         return("GM_CLONE_BUSY");
671                 case(GM_NO_SUCH_DEVICE):
672                         return("GM_NO_SUCH_DEVICE");
673                 case(GM_ABORTED):
674                         return("GM_ABORTED");
675                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
676                         return("GM_INCOMPATIBLE_LIB_AND_DRIVER");
677                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
678                         return("GM_UNTRANSLATED_SYSTEM_ERROR");
679                 case(GM_ACCESS_DENIED):
680                         return("GM_ACCESS_DENIED");
681
682
683 /*
684  *      These ones are in the docs but aren't in the header file 
685                 case(GM_DEV_NOT_FOUND):
686                         return("GM_DEV_NOT_FOUND");
687                 case(GM_INVALID_PORT_NUMBER):
688                         return("GM_INVALID_PORT_NUMBER");
689                 case(GM_UC_ERROR):
690                         return("GM_US_ERROR");
691                 case(GM_PAGE_TABLE_FULL):
692                         return("GM_PAGE_TABLE_FULL");
693                 case(GM_MINOR_OVERFLOW):
694                         return("GM_MINOR_OVERFLOW");
695                 case(GM_SEND_ORPHANED):
696                         return("GM_SEND_ORPHANED");
697                 case(GM_HARDWARE_FAULT):
698                         return("GM_HARDWARE_FAULT");
699                 case(GM_DATA_CORRUPTED):
700                         return("GM_DATA_CORRUPTED");
701                 case(GM_TIMED_OUT):
702                         return("GM_TIMED_OUT");
703                 case(GM_USER_ERROR):
704                         return("GM_USER_ERROR");
705                 case(GM_NO_MATCH):
706                         return("GM_NOMATCH");
707                 case(GM_NOT_SUPPORTED_IN_KERNEL):
708                         return("GM_NOT_SUPPORTED_IN_KERNEL");
709                 case(GM_NOT_SUPPORTED_ON_ARCH):
710                         return("GM_NOT_SUPPORTED_ON_ARCH");
711                 case(GM_PTE_REF_CNT_OVERFLOW):
712                         return("GM_PTR_REF_CNT_OVERFLOW");
713                 case(GM_NO_DRIVER_SUPPORT):
714                         return("GM_NO_DRIVER_SUPPORT");
715                 case(GM_FIRMWARE_NOT_RUNNING):
716                         return("GM_FIRMWARE_NOT_RUNNING");
717
718  *      These ones are in the docs but aren't in the header file 
719  */
720                 default:
721                         return("UNKNOWN GM ERROR CODE");
722         }
723 }
724
725
726 char *
727 gmnal_rxevent(gm_recv_event_t   *ev)
728 {
729         short   event;
730         event = GM_RECV_EVENT_TYPE(ev);
731         switch(event) {
732                 case(GM_NO_RECV_EVENT):
733                         return("GM_NO_RECV_EVENT");
734                 case(GM_SENDS_FAILED_EVENT):
735                         return("GM_SEND_FAILED_EVENT");
736                 case(GM_ALARM_EVENT):
737                         return("GM_ALARM_EVENT");
738                 case(GM_SENT_EVENT):
739                         return("GM_SENT_EVENT");
740                 case(_GM_SLEEP_EVENT):
741                         return("_GM_SLEEP_EVENT");
742                 case(GM_RAW_RECV_EVENT):
743                         return("GM_RAW_RECV_EVENT");
744                 case(GM_BAD_SEND_DETECTED_EVENT):
745                         return("GM_BAD_SEND_DETECTED_EVENT");
746                 case(GM_SEND_TOKEN_VIOLATION_EVENT):
747                         return("GM_SEND_TOKEN_VIOLATION_EVENT");
748                 case(GM_RECV_TOKEN_VIOLATION_EVENT):
749                         return("GM_RECV_TOKEN_VIOLATION_EVENT");
750                 case(GM_BAD_RECV_TOKEN_EVENT):
751                         return("GM_BAD_RECV_TOKEN_EVENT");
752                 case(GM_ALARM_VIOLATION_EVENT):
753                         return("GM_ALARM_VIOLATION_EVENT");
754                 case(GM_RECV_EVENT):
755                         return("GM_RECV_EVENT");
756                 case(GM_HIGH_RECV_EVENT):
757                         return("GM_HIGH_RECV_EVENT");
758                 case(GM_PEER_RECV_EVENT):
759                         return("GM_PEER_RECV_EVENT");
760                 case(GM_HIGH_PEER_RECV_EVENT):
761                         return("GM_HIGH_PEER_RECV_EVENT");
762                 case(GM_FAST_RECV_EVENT):
763                         return("GM_FAST_RECV_EVENT");
764                 case(GM_FAST_HIGH_RECV_EVENT):
765                         return("GM_FAST_HIGH_RECV_EVENT");
766                 case(GM_FAST_PEER_RECV_EVENT):
767                         return("GM_FAST_PEER_RECV_EVENT");
768                 case(GM_FAST_HIGH_PEER_RECV_EVENT):
769                         return("GM_FAST_HIGH_PEER_RECV_EVENT");
770                 case(GM_REJECTED_SEND_EVENT):
771                         return("GM_REJECTED_SEND_EVENT");
772                 case(GM_ORPHANED_SEND_EVENT):
773                         return("GM_ORPHANED_SEND_EVENT");
774                 case(GM_BAD_RESEND_DETECTED_EVENT):
775                         return("GM_BAD_RESEND_DETETED_EVENT");
776                 case(GM_DROPPED_SEND_EVENT):
777                         return("GM_DROPPED_SEND_EVENT");
778                 case(GM_BAD_SEND_VMA_EVENT):
779                         return("GM_BAD_SEND_VMA_EVENT");
780                 case(GM_BAD_RECV_VMA_EVENT):
781                         return("GM_BAD_RECV_VMA_EVENT");
782                 case(_GM_FLUSHED_ALARM_EVENT):
783                         return("GM_FLUSHED_ALARM_EVENT");
784                 case(GM_SENT_TOKENS_EVENT):
785                         return("GM_SENT_TOKENS_EVENTS");
786                 case(GM_IGNORE_RECV_EVENT):
787                         return("GM_IGNORE_RECV_EVENT");
788                 case(GM_ETHERNET_RECV_EVENT):
789                         return("GM_ETHERNET_RECV_EVENT");
790                 case(GM_NEW_NO_RECV_EVENT):
791                         return("GM_NEW_NO_RECV_EVENT");
792                 case(GM_NEW_SENDS_FAILED_EVENT):
793                         return("GM_NEW_SENDS_FAILED_EVENT");
794                 case(GM_NEW_ALARM_EVENT):
795                         return("GM_NEW_ALARM_EVENT");
796                 case(GM_NEW_SENT_EVENT):
797                         return("GM_NEW_SENT_EVENT");
798                 case(_GM_NEW_SLEEP_EVENT):
799                         return("GM_NEW_SLEEP_EVENT");
800                 case(GM_NEW_RAW_RECV_EVENT):
801                         return("GM_NEW_RAW_RECV_EVENT");
802                 case(GM_NEW_BAD_SEND_DETECTED_EVENT):
803                         return("GM_NEW_BAD_SEND_DETECTED_EVENT");
804                 case(GM_NEW_SEND_TOKEN_VIOLATION_EVENT):
805                         return("GM_NEW_SEND_TOKEN_VIOLATION_EVENT");
806                 case(GM_NEW_RECV_TOKEN_VIOLATION_EVENT):
807                         return("GM_NEW_RECV_TOKEN_VIOLATION_EVENT");
808                 case(GM_NEW_BAD_RECV_TOKEN_EVENT):
809                         return("GM_NEW_BAD_RECV_TOKEN_EVENT");
810                 case(GM_NEW_ALARM_VIOLATION_EVENT):
811                         return("GM_NEW_ALARM_VIOLATION_EVENT");
812                 case(GM_NEW_RECV_EVENT):
813                         return("GM_NEW_RECV_EVENT");
814                 case(GM_NEW_HIGH_RECV_EVENT):
815                         return("GM_NEW_HIGH_RECV_EVENT");
816                 case(GM_NEW_PEER_RECV_EVENT):
817                         return("GM_NEW_PEER_RECV_EVENT");
818                 case(GM_NEW_HIGH_PEER_RECV_EVENT):
819                         return("GM_NEW_HIGH_PEER_RECV_EVENT");
820                 case(GM_NEW_FAST_RECV_EVENT):
821                         return("GM_NEW_FAST_RECV_EVENT");
822                 case(GM_NEW_FAST_HIGH_RECV_EVENT):
823                         return("GM_NEW_FAST_HIGH_RECV_EVENT");
824                 case(GM_NEW_FAST_PEER_RECV_EVENT):
825                         return("GM_NEW_FAST_PEER_RECV_EVENT");
826                 case(GM_NEW_FAST_HIGH_PEER_RECV_EVENT):
827                         return("GM_NEW_FAST_HIGH_PEER_RECV_EVENT");
828                 case(GM_NEW_REJECTED_SEND_EVENT):
829                         return("GM_NEW_REJECTED_SEND_EVENT");
830                 case(GM_NEW_ORPHANED_SEND_EVENT):
831                         return("GM_NEW_ORPHANED_SEND_EVENT");
832                 case(_GM_NEW_PUT_NOTIFICATION_EVENT):
833                         return("_GM_NEW_PUT_NOTIFICATION_EVENT");
834                 case(GM_NEW_FREE_SEND_TOKEN_EVENT):
835                         return("GM_NEW_FREE_SEND_TOKEN_EVENT");
836                 case(GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT):
837                         return("GM_NEW_FREE_HIGH_SEND_TOKEN_EVENT");
838                 case(GM_NEW_BAD_RESEND_DETECTED_EVENT):
839                         return("GM_NEW_BAD_RESEND_DETECTED_EVENT");
840                 case(GM_NEW_DROPPED_SEND_EVENT):
841                         return("GM_NEW_DROPPED_SEND_EVENT");
842                 case(GM_NEW_BAD_SEND_VMA_EVENT):
843                         return("GM_NEW_BAD_SEND_VMA_EVENT");
844                 case(GM_NEW_BAD_RECV_VMA_EVENT):
845                         return("GM_NEW_BAD_RECV_VMA_EVENT");
846                 case(_GM_NEW_FLUSHED_ALARM_EVENT):
847                         return("GM_NEW_FLUSHED_ALARM_EVENT");
848                 case(GM_NEW_SENT_TOKENS_EVENT):
849                         return("GM_NEW_SENT_TOKENS_EVENT");
850                 case(GM_NEW_IGNORE_RECV_EVENT):
851                         return("GM_NEW_IGNORE_RECV_EVENT");
852                 case(GM_NEW_ETHERNET_RECV_EVENT):
853                         return("GM_NEW_ETHERNET_RECV_EVENT");
854                 default:
855                         return("Unknown Recv event");
856 #if 0
857                 case(/* _GM_PUT_NOTIFICATION_EVENT */
858                 case(/* GM_FREE_SEND_TOKEN_EVENT */
859                 case(/* GM_FREE_HIGH_SEND_TOKEN_EVENT */
860 #endif
861         }
862 }
863
864
865 void
866 gmnal_yield(int delay)
867 {
868         set_current_state(TASK_INTERRUPTIBLE);
869         schedule_timeout(delay);
870 }
871
872 int
873 gmnal_is_small_msg(gmnal_data_t *nal_data, int niov, struct iovec *iov, 
874                     int len)
875 {
876
877         CDEBUG(D_TRACE, "len [%d] limit[%d]\n", len, 
878                GMNAL_SMALL_MSG_SIZE(nal_data));
879
880         if ((len + sizeof(ptl_hdr_t) + sizeof(gmnal_msghdr_t)) 
881                      < GMNAL_SMALL_MSG_SIZE(nal_data)) {
882
883                 CDEBUG(D_INFO, "Yep, small message\n");
884                 return(1);
885         } else {
886                 CERROR("No, not small message\n");
887                 /*
888                  *      could be made up of lots of little ones !
889                  */
890                 return(0);
891         }
892
893 }
894
895 /* 
896  *      extract info from the receive event.
897  *      Have to do this before the next call to gm_receive
898  *      Deal with all endian stuff here.
899  *      Then stick work entry on list where rxthreads
900  *      can get it to complete the receive
901  */
902 int
903 gmnal_add_rxtwe(gmnal_data_t *nal_data, gm_recv_t *recv)
904 {
905         gmnal_rxtwe_t   *we = NULL;
906
907         CDEBUG(D_NET, "adding entry to list\n");
908
909         PORTAL_ALLOC(we, sizeof(gmnal_rxtwe_t));
910         if (!we) {
911                 CERROR("failed to malloc\n");
912                 return(GMNAL_STATUS_FAIL);
913         }
914         we->buffer = gm_ntohp(recv->buffer);
915         we->snode = (int)gm_ntoh_u16(recv->sender_node_id);
916         we->sport = (int)gm_ntoh_u8(recv->sender_port_id);
917         we->type = (int)gm_ntoh_u8(recv->type);
918         we->length = (int)gm_ntohl(recv->length);
919
920         spin_lock(&nal_data->rxtwe_lock);
921         if (nal_data->rxtwe_tail) {
922                 nal_data->rxtwe_tail->next = we;
923         } else {
924                 nal_data->rxtwe_head = we;
925                 nal_data->rxtwe_tail = we;
926         }
927         nal_data->rxtwe_tail = we;
928         spin_unlock(&nal_data->rxtwe_lock);
929
930         up(&nal_data->rxtwe_wait);
931         return(GMNAL_STATUS_OK);
932 }
933
934 void
935 gmnal_remove_rxtwe(gmnal_data_t *nal_data)
936 {
937         gmnal_rxtwe_t   *_we, *we = nal_data->rxtwe_head;
938
939         CDEBUG(D_NET, "removing all work list entries\n");
940
941         spin_lock(&nal_data->rxtwe_lock);
942         CDEBUG(D_NET, "Got lock\n");
943         while (we) {
944                 _we = we;
945                 we = we->next;
946                 PORTAL_FREE(_we, sizeof(gmnal_rxtwe_t));
947         }
948         spin_unlock(&nal_data->rxtwe_lock);
949         nal_data->rxtwe_head = NULL;
950         nal_data->rxtwe_tail = NULL;
951 }
952
953 gmnal_rxtwe_t *
954 gmnal_get_rxtwe(gmnal_data_t *nal_data)
955 {
956         gmnal_rxtwe_t   *we = NULL;
957
958         CDEBUG(D_NET, "Getting entry to list\n");
959
960         do  {
961                 while(down_interruptible(&nal_data->rxtwe_wait) != 0)
962                         /* do nothing */;
963                 if (nal_data->rxthread_stop_flag == GMNAL_THREAD_STOP) {
964                         /*
965                          *      time to stop
966                          *      TO DO some one free the work entries
967                          */
968                         return(NULL);
969                 }
970                 spin_lock(&nal_data->rxtwe_lock);
971                 if (nal_data->rxtwe_head) {
972                         CDEBUG(D_INFO, "Got a work entry\n");
973                         we = nal_data->rxtwe_head;
974                         nal_data->rxtwe_head = we->next;
975                         if (!nal_data->rxtwe_head)
976                                 nal_data->rxtwe_tail = NULL;
977                 } else {
978                         CWARN("woken but no work\n");
979                 }
980                 spin_unlock(&nal_data->rxtwe_lock);
981         } while (!we);
982
983         CDEBUG(D_INFO, "Returning we[%p]\n", we);
984         return(we);
985 }
986
987
988 /*
989  *      Start the caretaker thread and a number of receiver threads
990  *      The caretaker thread gets events from the gm library.
991  *      It passes receive events to the receiver threads via a work list.
992  *      It processes other events itself in gm_unknown. These will be
993  *      callback events or sleeps.
994  */
995 int
996 gmnal_start_kernel_threads(gmnal_data_t *nal_data)
997 {
998
999         int     threads = 0;
1000         /*
1001          *      the alarm is used to wake the caretaker thread from 
1002          *      gm_unknown call (sleeping) to exit it.
1003          */
1004         CDEBUG(D_NET, "Initializing caretaker thread alarm and flag\n");
1005         gm_initialize_alarm(&nal_data->ctthread_alarm);
1006         nal_data->ctthread_flag = GMNAL_THREAD_RESET;
1007
1008
1009         CDEBUG(D_INFO, "Starting caretaker thread\n");
1010         nal_data->ctthread_pid = 
1011                  kernel_thread(gmnal_ct_thread, (void*)nal_data, 0);
1012         if (nal_data->ctthread_pid <= 0) {
1013                 CERROR("Caretaker thread failed to start\n");
1014                 return(GMNAL_STATUS_FAIL);
1015         }
1016
1017         while (nal_data->rxthread_flag != GMNAL_THREAD_RESET) {
1018                 gmnal_yield(1);
1019                 CDEBUG(D_INFO, "Waiting for caretaker thread signs of life\n");
1020         }
1021
1022         CDEBUG(D_INFO, "caretaker thread has started\n");
1023
1024
1025         /*
1026          *      Now start a number of receiver threads
1027          *      these treads get work to do from the caretaker (ct) thread
1028          */
1029         nal_data->rxthread_flag = GMNAL_THREAD_RESET;
1030         nal_data->rxthread_stop_flag = GMNAL_THREAD_RESET;
1031
1032         for (threads=0; threads<NRXTHREADS; threads++)
1033                 nal_data->rxthread_pid[threads] = -1;
1034         spin_lock_init(&nal_data->rxtwe_lock);
1035         spin_lock_init(&nal_data->rxthread_flag_lock);
1036         sema_init(&nal_data->rxtwe_wait, 0);
1037         nal_data->rxtwe_head = NULL;
1038         nal_data->rxtwe_tail = NULL;
1039         /*
1040          *      If the default number of receive threades isn't
1041          *      modified at load time, then start one thread per cpu
1042          */
1043         if (num_rx_threads == -1)
1044                 num_rx_threads = smp_num_cpus;
1045         CDEBUG(D_INFO, "Starting [%d] receive threads\n", num_rx_threads);
1046         for (threads=0; threads<num_rx_threads; threads++) {
1047                 nal_data->rxthread_pid[threads] = 
1048                        kernel_thread(gmnal_rx_thread, (void*)nal_data, 0);
1049                 if (nal_data->rxthread_pid[threads] <= 0) {
1050                         CERROR("Receive thread failed to start\n");
1051                         gmnal_stop_rxthread(nal_data);
1052                         gmnal_stop_ctthread(nal_data);
1053                         return(GMNAL_STATUS_FAIL);
1054                 }
1055         }
1056
1057         for (;;) {
1058                 spin_lock(&nal_data->rxthread_flag_lock);
1059                 if (nal_data->rxthread_flag == GMNAL_RXTHREADS_STARTED) {
1060                         spin_unlock(&nal_data->rxthread_flag_lock);
1061                         break;
1062                 }
1063                 spin_unlock(&nal_data->rxthread_flag_lock);
1064                 gmnal_yield(1);
1065         }
1066
1067         CDEBUG(D_INFO, "receive threads seem to have started\n");
1068
1069         return(GMNAL_STATUS_OK);
1070 }