Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lustre / portals / knals / gmnal / gmnal_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmnal.h"
27
28 /*
29  *      The caretaker thread
30  *      This is main thread of execution for the NAL side
31  *      This guy waits in gm_blocking_recvive and gets
32  *      woken up when the myrinet adaptor gets an interrupt.
33  *      Hands off receive operations to the receive thread 
34  *      This thread Looks after gm_callbacks etc inline.
35  */
36 int
37 gmnal_ct_thread(void *arg)
38 {
39         gmnal_data_t            *nal_data;
40         gm_recv_event_t         *rxevent = NULL;
41         gm_recv_t               *recv = NULL;
42
43         if (!arg) {
44                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
45                 return(-1);
46         }
47
48         nal_data = (gmnal_data_t*)arg;
49         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
50
51         daemonize();
52
53         nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
54
55         GMNAL_GM_LOCK(nal_data);
56         while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
57                 CDEBUG(D_NET, "waiting\n");
58                 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
59                 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
60                         CDEBUG(D_INFO, "time to exit\n");
61                         break;
62                 }
63                 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
64                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
65
66                         case(GM_RECV_EVENT):
67                                 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
68                                 recv = (gm_recv_t*)&rxevent->recv;
69                                 GMNAL_GM_UNLOCK(nal_data);
70                                 gmnal_add_rxtwe(nal_data, recv);
71                                 GMNAL_GM_LOCK(nal_data);
72                                 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
73                         break;
74                         case(_GM_SLEEP_EVENT):
75                                 /*
76                                  *      Blocking receive above just returns
77                                  *      immediatly with _GM_SLEEP_EVENT
78                                  *      Don't know what this is
79                                  */
80                                 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
81                                 GMNAL_GM_UNLOCK(nal_data);
82                                 gm_unknown(nal_data->gm_port, rxevent);
83                                 GMNAL_GM_LOCK(nal_data);
84                                 CDEBUG(D_INFO, "Awake from gm_unknown\n");
85                                 break;
86                                 
87                         default:
88                                 /*
89                                  *      Don't know what this is
90                                  *      gm_unknown will make sense of it
91                                  *      Should be able to do something with
92                                  *      FAST_RECV_EVENTS here.
93                                  */
94                                 CDEBUG(D_NET, "Passing event to gm_unknown\n");
95                                 GMNAL_GM_UNLOCK(nal_data);
96                                 gm_unknown(nal_data->gm_port, rxevent);
97                                 GMNAL_GM_LOCK(nal_data);
98                                 CDEBUG(D_INFO, "Processed unknown event\n");
99                 }
100         }
101         GMNAL_GM_UNLOCK(nal_data);
102         nal_data->ctthread_flag = GMNAL_THREAD_RESET;
103         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
104         return(GMNAL_STATUS_OK);
105 }
106
107
108 /*
109  *      process a receive event
110  */
111 int gmnal_rx_thread(void *arg)
112 {
113         gmnal_data_t            *nal_data;
114         void                    *buffer;
115         gmnal_rxtwe_t           *we = NULL;
116
117         if (!arg) {
118                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
119                 return(-1);
120         }
121
122         nal_data = (gmnal_data_t*)arg;
123         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
124
125         daemonize();
126         /*
127          *      set 1 bit for each thread started
128          *      doesn't matter which bit
129          */
130         spin_lock(&nal_data->rxthread_flag_lock);
131         if (nal_data->rxthread_flag)
132                 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
133         else
134                 nal_data->rxthread_flag = 1;
135         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
136         spin_unlock(&nal_data->rxthread_flag_lock);
137
138         while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
139                 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
140                 we = gmnal_get_rxtwe(nal_data);
141                 if (!we) {
142                         CDEBUG(D_INFO, "Receive thread time to exit\n");
143                         break;
144                 }
145
146                 buffer = we->buffer;
147                 switch(((gmnal_msghdr_t*)buffer)->type) {
148                 case(GMNAL_SMALL_MESSAGE):
149                         gmnal_pre_receive(nal_data, we, 
150                                            GMNAL_SMALL_MESSAGE);
151                 break;  
152                 case(GMNAL_LARGE_MESSAGE_INIT):
153                         gmnal_pre_receive(nal_data, we, 
154                                            GMNAL_LARGE_MESSAGE_INIT);
155                 break;  
156                 case(GMNAL_LARGE_MESSAGE_ACK):
157                         gmnal_pre_receive(nal_data, we, 
158                                            GMNAL_LARGE_MESSAGE_ACK);
159                 break;  
160                 default:
161                         CDEBUG(D_ERROR, "Unsupported message type\n");
162                         gmnal_rx_bad(nal_data, we, NULL);
163                 }
164                 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
165         }
166
167         spin_lock(&nal_data->rxthread_flag_lock);
168         nal_data->rxthread_flag/=2;
169         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
170         spin_unlock(&nal_data->rxthread_flag_lock);
171         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
172         return(GMNAL_STATUS_OK);
173 }
174
175
176
177 /*
178  *      Start processing a small message receive
179  *      Get here from gmnal_receive_thread
180  *      Hand off to lib_parse, which calls cb_recv
181  *      which hands back to gmnal_small_receive
182  *      Deal with all endian stuff here.
183  */
184 int
185 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
186 {
187         gmnal_srxd_t    *srxd = NULL;
188         void            *buffer = NULL;
189         unsigned int snode, sport, type, length;
190         gmnal_msghdr_t  *gmnal_msghdr;
191         ptl_hdr_t       *portals_hdr;
192
193         CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
194                nal_data, we, gmnal_type);
195
196         buffer = we->buffer;
197         snode = we->snode;
198         sport = we->sport;
199         type = we->type;
200         buffer = we->buffer;
201         length = we->length;
202
203         gmnal_msghdr = (gmnal_msghdr_t*)buffer;
204         portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
205
206         CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
207                "type [%d], length [%d], buffer [%p]\n",
208                snode, sport, type, length, buffer);
209         CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
210                "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, 
211                gmnal_msghdr->magic, gmnal_msghdr->type);
212         CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
213                "dest_node ["LPD64"]\n", portals_hdr->src_nid, 
214                portals_hdr->dest_nid);
215
216         
217         /*
218          *      Get a receive descriptor for this message
219          */
220         srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
221         CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
222         srxd->nal_data = nal_data;
223         if (!srxd) {
224                 CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
225                 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
226                 return(GMNAL_STATUS_FAIL);
227         }
228
229         /*
230          *      no need to bother portals library with this
231          */
232         if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
233                 gmnal_large_tx_ack_received(nal_data, srxd);
234                 return(GMNAL_STATUS_OK);
235         }
236
237         srxd->type = gmnal_type;
238         srxd->nsiov = gmnal_msghdr->niov;
239         srxd->gm_source_node = gmnal_msghdr->sender_node_id;
240         
241         CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", 
242                buffer+GMNAL_MSGHDR_SIZE);
243         /*
244          *      control passes to lib, which calls cb_recv 
245          *      cb_recv is responsible for returning the buffer 
246          *      for future receive
247          */
248         lib_parse(nal_data->nal_cb, portals_hdr, srxd);
249
250         return(GMNAL_STATUS_OK);
251 }
252
253
254
255 /*
256  *      After a receive has been processed, 
257  *      hang out the receive buffer again.
258  *      This implicitly returns a receive token.
259  */
260 int
261 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
262 {
263         CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
264
265         CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
266
267         GMNAL_GM_LOCK(nal_data);
268         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
269                                         srxd->gmsize, GM_LOW_PRIORITY, 0 );
270         GMNAL_GM_UNLOCK(nal_data);
271
272         return(GMNAL_STATUS_OK);
273 }
274
275
276 /*
277  *      Handle a bad message
278  *      A bad message is one we don't expect or can't interpret
279  */
280 int
281 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
282 {
283         CDEBUG(D_TRACE, "Can't handle message\n");
284
285         if (!srxd)
286                 srxd = gmnal_rxbuffer_to_srxd(nal_data, 
287                                                we->buffer);
288         if (srxd) {
289                 gmnal_rx_requeue_buffer(nal_data, srxd);
290         } else {
291                 CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
292                 /*
293                  *      get rid of it ?
294                  */
295                 return(GMNAL_STATUS_FAIL);
296         }
297
298         return(GMNAL_STATUS_OK);
299 }
300
301
302
303 /*
304  *      Process a small message receive.
305  *      Get here from gmnal_receive_thread, gmnal_pre_receive
306  *      lib_parse, cb_recv
307  *      Put data from prewired receive buffer into users buffer(s)
308  *      Hang out the receive buffer again for another receive
309  *      Call lib_finalize
310  */
311 int
312 gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
313                 unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
314 {
315         gmnal_srxd_t    *srxd = NULL;
316         void    *buffer = NULL;
317         gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
318
319
320         CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
321
322         if (!private) {
323                 CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
324                 lib_finalize(nal_cb, private, cookie, PTL_FAIL);
325                 return(PTL_FAIL);
326         }
327
328         srxd = (gmnal_srxd_t*)private;
329         buffer = srxd->buffer;
330         buffer += sizeof(gmnal_msghdr_t);
331         buffer += sizeof(ptl_hdr_t);
332
333         while(niov--) {
334                 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov, 
335                        iov->iov_len);
336                 gm_bcopy(buffer, iov->iov_base, iov->iov_len);                  
337                 buffer += iov->iov_len;
338                 iov++;
339         }
340
341
342         /*
343          *      let portals library know receive is complete
344          */
345         CDEBUG(D_PORTALS, "calling lib_finalize\n");
346         lib_finalize(nal_cb, private, cookie, PTL_OK);
347         /*
348          *      return buffer so it can be used again
349          */
350         CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
351         GMNAL_GM_LOCK(nal_data);
352         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
353                                            srxd->gmsize, GM_LOW_PRIORITY, 0);   
354         GMNAL_GM_UNLOCK(nal_data);
355
356         return(PTL_OK);
357 }
358
359
360 /*
361  *      Start a small transmit. 
362  *      Get a send token (and wired transmit buffer).
363  *      Copy data from senders buffer to wired buffer and
364  *      initiate gm_send from the wired buffer.
365  *      The callback function informs when the send is complete.
366  */
367 int
368 gmnal_small_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
369                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
370                 unsigned int niov, struct iovec *iov, int size)
371 {
372         gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
373         gmnal_stxd_t    *stxd = NULL;
374         void            *buffer = NULL;
375         gmnal_msghdr_t  *msghdr = NULL;
376         int             tot_size = 0;
377         unsigned int    local_nid;
378         gm_status_t     gm_status = GM_SUCCESS;
379
380         CDEBUG(D_TRACE, "gmnal_small_tx nal_cb [%p] private [%p] cookie [%p] "
381                "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
382                "iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type, 
383                global_nid, pid, niov, iov, size);
384
385         CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
386                hdr->dest_nid, hdr->src_nid);
387
388         if (!nal_data) {
389                 CDEBUG(D_ERROR, "no nal_data\n");
390                 return(GMNAL_STATUS_FAIL);
391         } else {
392                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
393         }
394
395         GMNAL_GM_LOCK(nal_data);
396         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
397                                             &local_nid);
398         GMNAL_GM_UNLOCK(nal_data);
399         if (gm_status != GM_SUCCESS) {
400                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
401                 return(GMNAL_STATUS_FAIL);
402         }
403         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
404
405         stxd = gmnal_get_stxd(nal_data, 1);
406         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
407
408         stxd->type = GMNAL_SMALL_MESSAGE;
409         stxd->cookie = cookie;
410
411         /*
412          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
413          *      Then copy the data in
414          */
415         buffer = stxd->buffer;
416         msghdr = (gmnal_msghdr_t*)buffer;
417
418         msghdr->magic = GMNAL_MAGIC;
419         msghdr->type = GMNAL_SMALL_MESSAGE;
420         msghdr->sender_node_id = nal_data->gm_global_nid;
421         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
422
423         buffer += sizeof(gmnal_msghdr_t);
424
425         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
426         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
427
428         buffer += sizeof(ptl_hdr_t);
429
430         while(niov--) {
431                 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n", 
432                        iov, iov->iov_len, buffer);
433                 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
434                 buffer+= iov->iov_len;
435                 iov++;
436         }
437
438         CDEBUG(D_INFO, "sending\n");
439         tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
440         stxd->msg_size = tot_size;
441
442
443         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
444                "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
445                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
446                stxd->msg_size, global_nid, local_nid, stxd);
447
448         GMNAL_GM_LOCK(nal_data);
449         stxd->gm_priority = GM_LOW_PRIORITY;
450         stxd->gm_target_node = local_nid;
451         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
452                                       stxd->gm_size, stxd->msg_size, 
453                                       GM_LOW_PRIORITY, local_nid, 
454                                       gmnal_small_tx_callback, (void*)stxd);
455         GMNAL_GM_UNLOCK(nal_data);
456         CDEBUG(D_INFO, "done\n");
457                 
458         return(PTL_OK);
459 }
460
461
462 /*
463  *      A callback to indicate the small transmit operation is compete
464  *      Check for erros and try to deal with them.
465  *      Call lib_finalise to inform the client application that the send 
466  *      is complete and the memory can be reused.
467  *      Return the stxd when finished with it (returns a send token)
468  */
469 void 
470 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
471 {
472         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
473         lib_msg_t       *cookie = stxd->cookie;
474         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
475         nal_cb_t        *nal_cb = nal_data->nal_cb;
476
477         if (!stxd) {
478                 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
479                 return;
480         }
481         if (status != GM_SUCCESS) {
482                 CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s]\n", 
483                        stxd, gmnal_gm_error(status));
484         }
485
486         switch(status) {
487                 case(GM_SUCCESS):
488                 break;
489
490
491
492                 case(GM_SEND_DROPPED):
493                 /*
494                  *      do a resend on the dropped ones
495                  */
496                         CDEBUG(D_ERROR, "send stxd [%p] was dropped "
497                                "resending\n", context);
498                         GMNAL_GM_LOCK(nal_data);
499                         gm_send_to_peer_with_callback(nal_data->gm_port, 
500                                                       stxd->buffer, 
501                                                       stxd->gm_size, 
502                                                       stxd->msg_size, 
503                                                       stxd->gm_priority, 
504                                                       stxd->gm_target_node, 
505                                                       gmnal_small_tx_callback,
506                                                       context);
507                         GMNAL_GM_UNLOCK(nal_data);
508                 
509                 return;
510                 case(GM_TIMED_OUT):
511                 case(GM_SEND_TIMED_OUT):
512                 /*
513                  *      drop these ones
514                  */
515                         CDEBUG(D_INFO, "calling gm_drop_sends\n");
516                         GMNAL_GM_LOCK(nal_data);
517                         gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
518                                       stxd->gm_target_node, GMNAL_GM_PORT, 
519                                       gmnal_drop_sends_callback, context);
520                         GMNAL_GM_UNLOCK(nal_data);
521
522                 return;
523
524
525                 /*
526                  *      abort on these ?
527                  */
528                 case(GM_TRY_AGAIN):
529                 case(GM_INTERRUPTED):
530                 case(GM_FAILURE):
531                 case(GM_INPUT_BUFFER_TOO_SMALL):
532                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
533                 case(GM_BUSY):
534                 case(GM_MEMORY_FAULT):
535                 case(GM_INVALID_PARAMETER):
536                 case(GM_OUT_OF_MEMORY):
537                 case(GM_INVALID_COMMAND):
538                 case(GM_PERMISSION_DENIED):
539                 case(GM_INTERNAL_ERROR):
540                 case(GM_UNATTACHED):
541                 case(GM_UNSUPPORTED_DEVICE):
542                 case(GM_SEND_REJECTED):
543                 case(GM_SEND_TARGET_PORT_CLOSED):
544                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
545                 case(GM_SEND_PORT_CLOSED):
546                 case(GM_NODE_ID_NOT_YET_SET):
547                 case(GM_STILL_SHUTTING_DOWN):
548                 case(GM_CLONE_BUSY):
549                 case(GM_NO_SUCH_DEVICE):
550                 case(GM_ABORTED):
551                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
552                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
553                 case(GM_ACCESS_DENIED):
554                 case(GM_NO_DRIVER_SUPPORT):
555                 case(GM_PTE_REF_CNT_OVERFLOW):
556                 case(GM_NOT_SUPPORTED_IN_KERNEL):
557                 case(GM_NOT_SUPPORTED_ON_ARCH):
558                 case(GM_NO_MATCH):
559                 case(GM_USER_ERROR):
560                 case(GM_DATA_CORRUPTED):
561                 case(GM_HARDWARE_FAULT):
562                 case(GM_SEND_ORPHANED):
563                 case(GM_MINOR_OVERFLOW):
564                 case(GM_PAGE_TABLE_FULL):
565                 case(GM_UC_ERROR):
566                 case(GM_INVALID_PORT_NUMBER):
567                 case(GM_DEV_NOT_FOUND):
568                 case(GM_FIRMWARE_NOT_RUNNING):
569                 case(GM_YP_NO_MATCH):
570                 default:
571                         CDEBUG(D_ERROR, "Unknown send error\n");
572                 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
573                                       stxd->gm_target_node, GMNAL_GM_PORT,
574                                       gmnal_resume_sending_callback, context);
575                 return;
576
577         }
578
579         /*
580          *      TO DO
581          *      If this is a large message init,
582          *      we're not finished with the data yet,
583          *      so can't call lib_finalise.
584          *      However, we're also holding on to a 
585          *      stxd here (to keep track of the source
586          *      iovec only). Should use another structure
587          *      to keep track of iovec and return stxd to 
588          *      free list earlier.
589          */
590         if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
591                 CDEBUG(D_INFO, "large transmit done\n");
592                 return;
593         }
594         gmnal_return_stxd(nal_data, stxd);
595         lib_finalize(nal_cb, stxd, cookie, PTL_OK);
596         return;
597 }
598
599 /*
600  *      After an error on the port
601  *      call this to allow future sends to complete
602  */
603 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
604                                  gm_status_t status)
605 {
606         gmnal_data_t    *nal_data;
607         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
608         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
609         gmnal_return_stxd(stxd->nal_data, stxd);
610         return;
611 }
612
613
614 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
615                                 gm_status_t status)
616 {
617         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
618         gmnal_data_t    *nal_data = stxd->nal_data;
619
620         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
621         if (status == GM_SUCCESS) {
622                 GMNAL_GM_LOCK(nal_data);
623                 gm_send_to_peer_with_callback(gm_port, stxd->buffer, 
624                                               stxd->gm_size, stxd->msg_size, 
625                                               stxd->gm_priority, 
626                                               stxd->gm_target_node, 
627                                               gmnal_small_tx_callback, 
628                                               context);
629                 GMNAL_GM_LOCK(nal_data);
630         } else {
631                 CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
632                        "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
633         }
634
635
636         return;
637 }
638
639
640 /*
641  *      Begine a large transmit.
642  *      Do a gm_register of the memory pointed to by the iovec 
643  *      and send details to the receiver. The receiver does a gm_get
644  *      to pull the data and sends and ack when finished. Upon receipt of
645  *      this ack, deregister the memory. Only 1 send token is required here.
646  */
647 int
648 gmnal_large_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
649                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
650                 unsigned int niov, struct iovec *iov, int size)
651 {
652
653         gmnal_data_t    *nal_data;
654         gmnal_stxd_t    *stxd = NULL;
655         void            *buffer = NULL;
656         gmnal_msghdr_t  *msghdr = NULL;
657         unsigned int    local_nid;
658         int             mlen = 0;       /* the size of the init message data */
659         struct iovec    *iov_dup = NULL;
660         gm_status_t     gm_status;
661         int             niov_dup;
662
663
664         CDEBUG(D_TRACE, "gmnal_large_tx nal_cb [%p] private [%p], cookie [%p] "
665                "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
666                "iov [%p], size [%d]\n", nal_cb, private, cookie, hdr, type, 
667                global_nid, pid, niov, iov, size);
668
669         if (nal_cb)
670                 nal_data = (gmnal_data_t*)nal_cb->nal_data;
671         else  {
672                 CDEBUG(D_ERROR, "no nal_cb.\n");
673                 return(GMNAL_STATUS_FAIL);
674         }
675         
676
677         /*
678          *      Get stxd and buffer. Put local address of data in buffer, 
679          *      send local addresses to target, 
680          *      wait for the target node to suck the data over.
681          *      The stxd is used to ren
682          */
683         stxd = gmnal_get_stxd(nal_data, 1);
684         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
685
686         stxd->type = GMNAL_LARGE_MESSAGE_INIT;
687         stxd->cookie = cookie;
688
689         /*
690          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
691          *      Then copy the iov in
692          */
693         buffer = stxd->buffer;
694         msghdr = (gmnal_msghdr_t*)buffer;
695
696         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
697
698         msghdr->magic = GMNAL_MAGIC;
699         msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
700         msghdr->sender_node_id = nal_data->gm_global_nid;
701         msghdr->stxd = stxd;
702         msghdr->niov = niov ;
703         buffer += sizeof(gmnal_msghdr_t);
704         mlen = sizeof(gmnal_msghdr_t);
705         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
706
707
708         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
709
710         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
711         buffer += sizeof(ptl_hdr_t);
712         mlen += sizeof(ptl_hdr_t); 
713         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
714
715         /*
716          *      copy the iov to the buffer so target knows 
717          *      where to get the data from
718          */
719         CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
720         gm_bcopy(iov, buffer, niov*sizeof(struct iovec));
721         mlen += niov*(sizeof(struct iovec));
722         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
723
724
725         /*
726          *      Store the iovs in the stxd for we can get 
727          *      them later if we need them
728          */
729         CDEBUG(D_NET, "Copying iov [%p] to [%p]\n", iov, stxd->iov);
730         gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
731         stxd->niov = niov;
732         
733
734         /*
735          *      register the memory so the NIC can get hold of the data
736          *      This is a slow process. it'd be good to overlap it 
737          *      with something else.
738          */
739         iov_dup = iov;
740         niov_dup = niov;
741         while(niov--) {
742                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
743                        iov->iov_base, iov->iov_len);
744                 GMNAL_GM_LOCK(nal_data);
745                 gm_status = gm_register_memory(nal_data->gm_port, 
746                                                iov->iov_base, iov->iov_len);
747                 if (gm_status != GM_SUCCESS) {
748                         GMNAL_GM_UNLOCK(nal_data);
749                         CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
750                                "for memory [%p] len ["LPSZ"]\n", 
751                                gm_status, gmnal_gm_error(gm_status), 
752                                iov->iov_base, iov->iov_len);
753                         GMNAL_GM_LOCK(nal_data);
754                         while (iov_dup != iov) {
755                                 gm_deregister_memory(nal_data->gm_port, 
756                                                      iov_dup->iov_base, 
757                                                      iov_dup->iov_len);
758                                 iov_dup++;
759                         }
760                         GMNAL_GM_UNLOCK(nal_data);
761                         gmnal_return_stxd(nal_data, stxd);
762                         return(PTL_FAIL);
763                 }
764
765                 GMNAL_GM_UNLOCK(nal_data);
766                 iov++;
767         }
768
769         /*
770          *      Send the init message to the target
771          */
772         CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
773         GMNAL_GM_LOCK(nal_data);
774         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
775                                             &local_nid);
776         if (gm_status != GM_SUCCESS) {
777                 GMNAL_GM_UNLOCK(nal_data);
778                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
779                 gmnal_return_stxd(nal_data, stxd);
780                 /* TO DO deregister memory on failure */
781                 return(GMNAL_STATUS_FAIL);
782         }
783         CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
784         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
785                                       stxd->gm_size, mlen, GM_LOW_PRIORITY, 
786                                       local_nid, gmnal_large_tx_callback, 
787                                       (void*)stxd);
788         GMNAL_GM_UNLOCK(nal_data);
789         
790         CDEBUG(D_INFO, "done\n");
791                 
792         return(PTL_OK);
793 }
794
795 /*
796  *      Callback function indicates that send of buffer with 
797  *      large message iovec has completed (or failed).
798  */
799 void 
800 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
801 {
802         gmnal_small_tx_callback(gm_port, context, status);
803
804 }
805
806
807
808 /*
809  *      Have received a buffer that contains an iovec of the sender. 
810  *      Do a gm_register_memory of the receivers buffer and then do a get
811  *      data from the sender.
812  */
813 int
814 gmnal_large_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
815                 unsigned int nriov, struct iovec *riov, size_t mlen, 
816                 size_t rlen)
817 {
818         gmnal_data_t    *nal_data = nal_cb->nal_data;
819         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
820         void            *buffer = NULL;
821         struct  iovec   *riov_dup;
822         int             nriov_dup;
823         gmnal_msghdr_t  *msghdr = NULL;
824         gm_status_t     gm_status;
825
826         CDEBUG(D_TRACE, "gmnal_large_rx :: nal_cb[%p], private[%p], "
827                "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
828                 nal_cb, private, cookie, nriov, riov, mlen, rlen);
829
830         if (!srxd) {
831                 CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
832                 lib_finalize(nal_cb, private, cookie, PTL_FAIL);
833                 return(PTL_FAIL);
834         }
835
836         buffer = srxd->buffer;
837         msghdr = (gmnal_msghdr_t*)buffer;
838         buffer += sizeof(gmnal_msghdr_t);
839         buffer += sizeof(ptl_hdr_t);
840
841         /*
842          *      Store the senders stxd address in the srxd for this message
843          *      The gmnal_large_message_ack needs it to notify the sender
844          *      the pull of data is complete
845          */
846         srxd->source_stxd = msghdr->stxd;
847
848         /*
849          *      Register the receivers memory
850          *      get the data,
851          *      tell the sender that we got the data
852          *      then tell the receiver we got the data
853          *      TO DO
854          *      If the iovecs match, could interleave 
855          *      gm_registers and gm_gets for each element
856          */
857         nriov_dup = nriov;
858         riov_dup = riov;
859         while(nriov--) {
860                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
861                        riov->iov_base, riov->iov_len);
862                 GMNAL_GM_LOCK(nal_data);
863                 gm_status = gm_register_memory(nal_data->gm_port, 
864                                                riov->iov_base, riov->iov_len);
865                 if (gm_status != GM_SUCCESS) {
866                         GMNAL_GM_UNLOCK(nal_data);
867                         CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
868                                "for memory [%p] len ["LPSZ"]\n", 
869                                gm_status, gmnal_gm_error(gm_status), 
870                                riov->iov_base, riov->iov_len);
871                         GMNAL_GM_LOCK(nal_data);
872                         while (riov_dup != riov) {
873                                 gm_deregister_memory(nal_data->gm_port, 
874                                                      riov_dup->iov_base, 
875                                                      riov_dup->iov_len);
876                                 riov_dup++;
877                         }
878                         GMNAL_GM_LOCK(nal_data);
879                         /*
880                          *      give back srxd and buffer. Send NACK to sender
881                          */
882                         return(PTL_FAIL);
883                 }
884                 GMNAL_GM_UNLOCK(nal_data);
885                 riov++;
886         }
887         /*
888          *      do this so the final gm_get callback can deregister the memory
889          */
890         PORTAL_ALLOC(srxd->riov, nriov_dup*(sizeof(struct iovec)));
891         gm_bcopy(riov_dup, srxd->riov, nriov_dup*(sizeof(struct iovec)));
892         srxd->nriov = nriov_dup;
893
894         /*
895          *      now do gm_get to get the data
896          */
897         srxd->cookie = cookie;
898         if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, 
899                               nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
900                 CDEBUG(D_ERROR, "can't get the data");
901         }
902
903         CDEBUG(D_INFO, "lgmanl_large_rx done\n");
904
905         return(PTL_OK);
906 }
907
908
909 /*
910  *      Perform a number of remote gets as part of receiving 
911  *      a large message.
912  *      The final one to complete (i.e. the last callback to get called)
913  *      tidies up.
914  *      gm_get requires a send token.
915  */
916 int
917 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, 
918                   int nriov, struct iovec *riov)
919 {
920
921         int     ncalls = 0;
922
923         CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
924                "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
925
926
927         ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
928         if (ncalls < 0) {
929                 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
930                 return(GMNAL_STATUS_FAIL);
931         }
932         CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
933         spin_lock_init(&srxd->callback_lock);
934         srxd->ncallbacks = ncalls;
935         srxd->callback_status = 0;
936
937         ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
938         if (ncalls < 0) {
939                 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
940                 return(GMNAL_STATUS_FAIL);
941         }
942
943         return(GMNAL_STATUS_OK);
944
945 }
946
947
948 /*
949  *      pull data from source node (source iovec) to a local iovec.
950  *      The iovecs may not match which adds the complications below.
951  *      Count the number of gm_gets that will be required to the callbacks
952  *      can determine who is the last one.
953  */     
954 int
955 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, 
956                struct iovec *siov, int nriov, struct iovec *riov)
957 {
958
959         int     ncalls = 0;
960         int     slen = siov->iov_len, rlen = riov->iov_len;
961         char    *sbuf = siov->iov_base, *rbuf = riov->iov_base; 
962         unsigned long   sbuf_long;
963         gm_remote_ptr_t remote_ptr = 0;
964         unsigned int    source_node;
965         gmnal_ltxd_t    *ltxd = NULL;
966         gmnal_data_t    *nal_data = srxd->nal_data;
967
968         CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
969         if (do_copy) {
970                 if (!nal_data) {
971                         CDEBUG(D_ERROR, "Bad args No nal_data\n");
972                         return(GMNAL_STATUS_FAIL);
973                 }
974                 GMNAL_GM_LOCK(nal_data);
975                 if (gm_global_id_to_node_id(nal_data->gm_port, 
976                                             srxd->gm_source_node, 
977                                             &source_node) != GM_SUCCESS) {
978
979                         CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
980                                "to local node_id\n", srxd->gm_source_node);
981                         GMNAL_GM_UNLOCK(nal_data);
982                         return(GMNAL_STATUS_FAIL);
983                 }
984                 GMNAL_GM_UNLOCK(nal_data);
985                 /*
986                  *      We need a send token to use gm_get
987                  *      getting an stxd gets us a send token.
988                  *      the stxd is used as the context to the
989                  *      callback function (so stxd can be returned).
990                  *      Set pointer in stxd to srxd so callback count in srxd
991                  *      can be decremented to find last callback to complete
992                  */
993                 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", 
994                        srxd->gm_source_node, source_node);
995         }
996
997         do {
998                 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
999                                 sbuf, slen, rbuf, rlen);
1000                 if (slen > rlen) {
1001                         ncalls++;
1002                         if (do_copy) {
1003                                 CDEBUG(D_INFO, "slen>rlen\n");
1004                                 ltxd = gmnal_get_ltxd(nal_data);
1005                                 ltxd->srxd = srxd;
1006                                 GMNAL_GM_LOCK(nal_data);
1007                                 /* 
1008                                  *      funny business to get rid 
1009                                  *      of compiler warning 
1010                                  */
1011                                 sbuf_long = (unsigned long) sbuf;
1012                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1013                                 gm_get(nal_data->gm_port, remote_ptr, rbuf, 
1014                                        rlen, GM_LOW_PRIORITY, source_node, 
1015                                        GMNAL_GM_PORT, 
1016                                        gmnal_remote_get_callback, ltxd);
1017                                 GMNAL_GM_UNLOCK(nal_data);
1018                         }
1019                         /*
1020                          *      at the end of 1 iov element
1021                          */
1022                         sbuf+=rlen;
1023                         slen-=rlen;
1024                         riov++;
1025                         nriov--;
1026                         rbuf = riov->iov_base;
1027                         rlen = riov->iov_len;
1028                 } else if (rlen > slen) {
1029                         ncalls++;
1030                         if (do_copy) {
1031                                 CDEBUG(D_INFO, "slen<rlen\n");
1032                                 ltxd = gmnal_get_ltxd(nal_data);
1033                                 ltxd->srxd = srxd;
1034                                 GMNAL_GM_LOCK(nal_data);
1035                                 sbuf_long = (unsigned long) sbuf;
1036                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1037                                 gm_get(nal_data->gm_port, remote_ptr, rbuf, 
1038                                        slen, GM_LOW_PRIORITY, source_node, 
1039                                        GMNAL_GM_PORT, 
1040                                        gmnal_remote_get_callback, ltxd);
1041                                 GMNAL_GM_UNLOCK(nal_data);
1042                         }
1043                         /*
1044                          *      at end of siov element
1045                          */
1046                         rbuf+=slen;
1047                         rlen-=slen;
1048                         siov++;
1049                         sbuf = siov->iov_base;
1050                         slen = siov->iov_len;
1051                 } else {
1052                         ncalls++;
1053                         if (do_copy) {
1054                                 CDEBUG(D_INFO, "rlen=slen\n");
1055                                 ltxd = gmnal_get_ltxd(nal_data);
1056                                 ltxd->srxd = srxd;
1057                                 GMNAL_GM_LOCK(nal_data);
1058                                 sbuf_long = (unsigned long) sbuf;
1059                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1060                                 gm_get(nal_data->gm_port, remote_ptr, rbuf, 
1061                                        rlen, GM_LOW_PRIORITY, source_node, 
1062                                        GMNAL_GM_PORT, 
1063                                        gmnal_remote_get_callback, ltxd);
1064                                 GMNAL_GM_UNLOCK(nal_data);
1065                         }
1066                         /*
1067                          *      at end of siov and riov element
1068                          */
1069                         siov++;
1070                         sbuf = siov->iov_base;
1071                         slen = siov->iov_len;
1072                         riov++;
1073                         nriov--;
1074                         rbuf = riov->iov_base;
1075                         rlen = riov->iov_len;
1076                 }
1077
1078         } while (nriov);
1079         return(ncalls);
1080 }
1081
1082
1083 /*
1084  *      The callback function that is invoked after each gm_get call completes.
1085  *      Multiple callbacks may be invoked for 1 transaction, only the final
1086  *      callback has work to do.
1087  */
1088 void
1089 gmnal_remote_get_callback(gm_port_t *gm_port, void *context, 
1090                            gm_status_t status)
1091 {
1092
1093         gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
1094         gmnal_srxd_t    *srxd = ltxd->srxd;
1095         nal_cb_t        *nal_cb = srxd->nal_data->nal_cb;
1096         int             lastone;
1097         struct  iovec   *riov;
1098         int             nriov;
1099         gmnal_data_t    *nal_data;
1100
1101         CDEBUG(D_TRACE, "called for context [%p]\n", context);
1102
1103         if (status != GM_SUCCESS) {
1104                 CDEBUG(D_ERROR, "reports error [%d][%s]\n", status, 
1105                        gmnal_gm_error(status));
1106         }
1107
1108         spin_lock(&srxd->callback_lock);
1109         srxd->ncallbacks--;
1110         srxd->callback_status |= status;
1111         lastone = srxd->ncallbacks?0:1;
1112         spin_unlock(&srxd->callback_lock);
1113         nal_data = srxd->nal_data;
1114
1115         /*
1116          *      everyone returns a send token
1117          */
1118         gmnal_return_ltxd(nal_data, ltxd);
1119
1120         if (!lastone) {
1121                 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1122                 return;
1123         }
1124         
1125         /*
1126          *      Let our client application proceed
1127          */     
1128         CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
1129         lib_finalize(nal_cb, srxd, srxd->cookie, PTL_OK);
1130
1131         /*
1132          *      send an ack to the sender to let him know we got the data
1133          */
1134         gmnal_large_tx_ack(nal_data, srxd);
1135
1136         /*
1137          *      Unregister the memory that was used
1138          *      This is a very slow business (slower then register)
1139          */
1140         nriov = srxd->nriov;
1141         riov = srxd->riov;
1142         GMNAL_GM_LOCK(nal_data);
1143         while (nriov--) {
1144                 CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
1145                 if (gm_deregister_memory(srxd->nal_data->gm_port, 
1146                                          riov->iov_base, riov->iov_len)) {
1147                         CDEBUG(D_ERROR, "failed to deregister memory [%p]\n", 
1148                                riov->iov_base);
1149                 }
1150                 riov++;
1151         }
1152         GMNAL_GM_UNLOCK(nal_data);
1153         PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1154
1155         /*
1156          *      repost the receive buffer (return receive token)
1157          */
1158         GMNAL_GM_LOCK(nal_data);
1159         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
1160                                            srxd->gmsize, GM_LOW_PRIORITY, 0);   
1161         GMNAL_GM_UNLOCK(nal_data);
1162         
1163         return;
1164 }
1165
1166
1167 /*
1168  *      Called on target node.
1169  *      After pulling data from a source node
1170  *      send an ack message to indicate the large transmit is complete.
1171  */
1172 void 
1173 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1174 {
1175
1176         gmnal_stxd_t    *stxd;
1177         gmnal_msghdr_t *msghdr;
1178         void            *buffer = NULL;
1179         unsigned int    local_nid;
1180         gm_status_t     gm_status = GM_SUCCESS;
1181
1182         CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, 
1183                srxd->gm_source_node);
1184
1185         GMNAL_GM_LOCK(nal_data);
1186         gm_status = gm_global_id_to_node_id(nal_data->gm_port, 
1187                                             srxd->gm_source_node, &local_nid);
1188         GMNAL_GM_UNLOCK(nal_data);
1189         if (gm_status != GM_SUCCESS) {
1190                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
1191                 return;
1192         }
1193         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1194
1195         stxd = gmnal_get_stxd(nal_data, 1);
1196         CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1197
1198         stxd->nal_data = nal_data;
1199         stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1200
1201         /*
1202          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
1203          *      Then copy the data in
1204          */
1205         buffer = stxd->buffer;
1206         msghdr = (gmnal_msghdr_t*)buffer;
1207
1208         /*
1209          *      Add in the address of the original stxd from the sender node
1210          *      so it knows which thread to notify.
1211          */
1212         msghdr->magic = GMNAL_MAGIC;
1213         msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1214         msghdr->sender_node_id = nal_data->gm_global_nid;
1215         msghdr->stxd = srxd->source_stxd;
1216         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1217
1218         CDEBUG(D_INFO, "sending\n");
1219         stxd->msg_size= sizeof(gmnal_msghdr_t);
1220
1221
1222         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1223                "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1224                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
1225                stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1226         GMNAL_GM_LOCK(nal_data);
1227         stxd->gm_priority = GM_LOW_PRIORITY;
1228         stxd->gm_target_node = local_nid;
1229         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
1230                                       stxd->gm_size, stxd->msg_size, 
1231                                       GM_LOW_PRIORITY, local_nid, 
1232                                       gmnal_large_tx_ack_callback, 
1233                                       (void*)stxd);
1234         
1235         GMNAL_GM_UNLOCK(nal_data);
1236         CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1237                 
1238         return;
1239 }
1240
1241
1242 /*
1243  *      A callback to indicate the small transmit operation is compete
1244  *      Check for errors and try to deal with them.
1245  *      Call lib_finalise to inform the client application that the 
1246  *      send is complete and the memory can be reused.
1247  *      Return the stxd when finished with it (returns a send token)
1248  */
1249 void 
1250 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, 
1251                              gm_status_t status)
1252 {
1253         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
1254         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
1255
1256         if (!stxd) {
1257                 CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
1258                 return;
1259         }
1260         CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1261                stxd, status);
1262         gmnal_return_stxd(stxd->nal_data, stxd);
1263
1264         GMNAL_GM_UNLOCK(nal_data);
1265         return;
1266 }
1267
1268 /*
1269  *      Indicates the large transmit operation is compete.
1270  *      Called on transmit side (means data has been pulled  by receiver 
1271  *      or failed).
1272  *      Call lib_finalise to inform the client application that the send 
1273  *      is complete, deregister the memory and return the stxd. 
1274  *      Finally, report the rx buffer that the ack message was delivered in.
1275  */
1276 void 
1277 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1278 {
1279         nal_cb_t        *nal_cb = nal_data->nal_cb;
1280         gmnal_stxd_t    *stxd = NULL;
1281         gmnal_msghdr_t  *msghdr = NULL;
1282         void            *buffer = NULL;
1283         struct  iovec   *iov;
1284
1285
1286         CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1287
1288         buffer = srxd->buffer;
1289         msghdr = (gmnal_msghdr_t*)buffer;
1290         stxd = msghdr->stxd;
1291
1292         CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1293
1294         lib_finalize(nal_cb, stxd, stxd->cookie, PTL_OK);
1295
1296         /*
1297          *      extract the iovec from the stxd, deregister the memory.
1298          *      free the space used to store the iovec
1299          */
1300         iov = stxd->iov;
1301         while(stxd->niov--) {
1302                 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1303                        iov->iov_base, iov->iov_len);
1304                 GMNAL_GM_LOCK(nal_data);
1305                 gm_deregister_memory(nal_data->gm_port, iov->iov_base, 
1306                                      iov->iov_len);
1307                 GMNAL_GM_UNLOCK(nal_data);
1308                 iov++;
1309         }
1310
1311         /*
1312          *      return the send token
1313          *      TO DO It is bad to hold onto the send token so long?
1314          */
1315         gmnal_return_stxd(nal_data, stxd);
1316
1317
1318         /*
1319          *      requeue the receive buffer 
1320          */
1321         gmnal_rx_requeue_buffer(nal_data, srxd);
1322         
1323
1324         return;
1325 }