Whamcloud - gitweb
206d86b75bbd4c9026da11049d65adbd73cdc9c8
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmnal.h"
27
28 /*
29  *      The caretaker thread
30  *      This is main thread of execution for the NAL side
31  *      This guy waits in gm_blocking_recvive and gets
32  *      woken up when the myrinet adaptor gets an interrupt.
33  *      Hands off receive operations to the receive thread 
34  *      This thread Looks after gm_callbacks etc inline.
35  */
36 int
37 gmnal_ct_thread(void *arg)
38 {
39         gmnal_data_t            *nal_data;
40         gm_recv_event_t         *rxevent = NULL;
41         gm_recv_t               *recv = NULL;
42
43         if (!arg) {
44                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
45                 return(-1);
46         }
47
48         nal_data = (gmnal_data_t*)arg;
49         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
50
51         sprintf(current->comm, "gmnal_ct");
52
53         daemonize();
54
55         nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
56
57         GMNAL_GM_LOCK(nal_data);
58         while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59                 CDEBUG(D_NET, "waiting\n");
60                 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
61                 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
62                         CDEBUG(D_INFO, "time to exit\n");
63                         break;
64                 }
65                 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
66                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
67
68                         case(GM_RECV_EVENT):
69                                 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70                                 recv = (gm_recv_t*)&rxevent->recv;
71                                 GMNAL_GM_UNLOCK(nal_data);
72                                 gmnal_add_rxtwe(nal_data, recv);
73                                 GMNAL_GM_LOCK(nal_data);
74                                 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
75                         break;
76                         case(_GM_SLEEP_EVENT):
77                                 /*
78                                  *      Blocking receive above just returns
79                                  *      immediatly with _GM_SLEEP_EVENT
80                                  *      Don't know what this is
81                                  */
82                                 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83                                 GMNAL_GM_UNLOCK(nal_data);
84                                 gm_unknown(nal_data->gm_port, rxevent);
85                                 GMNAL_GM_LOCK(nal_data);
86                                 CDEBUG(D_INFO, "Awake from gm_unknown\n");
87                                 break;
88                                 
89                         default:
90                                 /*
91                                  *      Don't know what this is
92                                  *      gm_unknown will make sense of it
93                                  *      Should be able to do something with
94                                  *      FAST_RECV_EVENTS here.
95                                  */
96                                 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97                                 GMNAL_GM_UNLOCK(nal_data);
98                                 gm_unknown(nal_data->gm_port, rxevent);
99                                 GMNAL_GM_LOCK(nal_data);
100                                 CDEBUG(D_INFO, "Processed unknown event\n");
101                 }
102         }
103         GMNAL_GM_UNLOCK(nal_data);
104         nal_data->ctthread_flag = GMNAL_THREAD_RESET;
105         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
106         return(GMNAL_STATUS_OK);
107 }
108
109
110 /*
111  *      process a receive event
112  */
113 int gmnal_rx_thread(void *arg)
114 {
115         gmnal_data_t            *nal_data;
116         void                    *buffer;
117         gmnal_rxtwe_t           *we = NULL;
118         int                     rank;
119
120         if (!arg) {
121                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
122                 return(-1);
123         }
124
125         nal_data = (gmnal_data_t*)arg;
126         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
127
128         for (rank=0; rank<num_rx_threads; rank++)
129                 if (nal_data->rxthread_pid[rank] == current->pid)
130                         break;
131
132         sprintf(current->comm, "gmnal_rx_%d", rank);
133
134         daemonize();
135         /*
136          *      set 1 bit for each thread started
137          *      doesn't matter which bit
138          */
139         spin_lock(&nal_data->rxthread_flag_lock);
140         if (nal_data->rxthread_flag)
141                 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
142         else
143                 nal_data->rxthread_flag = 1;
144         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
145         spin_unlock(&nal_data->rxthread_flag_lock);
146
147         while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
148                 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
149                 we = gmnal_get_rxtwe(nal_data);
150                 if (!we) {
151                         CDEBUG(D_INFO, "Receive thread time to exit\n");
152                         break;
153                 }
154
155                 buffer = we->buffer;
156                 switch(((gmnal_msghdr_t*)buffer)->type) {
157                 case(GMNAL_SMALL_MESSAGE):
158                         gmnal_pre_receive(nal_data, we, 
159                                            GMNAL_SMALL_MESSAGE);
160                 break;  
161                 case(GMNAL_LARGE_MESSAGE_INIT):
162                         gmnal_pre_receive(nal_data, we, 
163                                            GMNAL_LARGE_MESSAGE_INIT);
164                 break;  
165                 case(GMNAL_LARGE_MESSAGE_ACK):
166                         gmnal_pre_receive(nal_data, we, 
167                                            GMNAL_LARGE_MESSAGE_ACK);
168                 break;  
169                 default:
170                         CDEBUG(D_ERROR, "Unsupported message type\n");
171                         gmnal_rx_bad(nal_data, we, NULL);
172                 }
173                 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
174         }
175
176         spin_lock(&nal_data->rxthread_flag_lock);
177         nal_data->rxthread_flag/=2;
178         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
179         spin_unlock(&nal_data->rxthread_flag_lock);
180         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
181         return(GMNAL_STATUS_OK);
182 }
183
184
185
186 /*
187  *      Start processing a small message receive
188  *      Get here from gmnal_receive_thread
189  *      Hand off to lib_parse, which calls cb_recv
190  *      which hands back to gmnal_small_receive
191  *      Deal with all endian stuff here.
192  */
193 int
194 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
195 {
196         gmnal_srxd_t    *srxd = NULL;
197         void            *buffer = NULL;
198         unsigned int snode, sport, type, length;
199         gmnal_msghdr_t  *gmnal_msghdr;
200         ptl_hdr_t       *portals_hdr;
201         int              rc;
202
203         CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
204                nal_data, we, gmnal_type);
205
206         buffer = we->buffer;
207         snode = we->snode;
208         sport = we->sport;
209         type = we->type;
210         buffer = we->buffer;
211         length = we->length;
212
213         gmnal_msghdr = (gmnal_msghdr_t*)buffer;
214         portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
215
216         CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
217                "type [%d], length [%d], buffer [%p]\n",
218                snode, sport, type, length, buffer);
219         CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
220                "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, 
221                gmnal_msghdr->magic, gmnal_msghdr->type);
222         CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
223                "dest_node ["LPD64"]\n", portals_hdr->src_nid, 
224                portals_hdr->dest_nid);
225
226         
227         /*
228          *      Get a receive descriptor for this message
229          */
230         srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
231         CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
232         if (!srxd) {
233                 CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
234                 /* I think passing a NULL srxd to lib_parse will crash
235                  * gmnal_recv() */
236                 LBUG();
237                 lib_parse(nal_data->libnal, portals_hdr, srxd);
238                 return(GMNAL_STATUS_FAIL);
239         }
240
241         /*
242          *      no need to bother portals library with this
243          */
244         if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
245                 gmnal_large_tx_ack_received(nal_data, srxd);
246                 return(GMNAL_STATUS_OK);
247         }
248
249         srxd->nal_data = nal_data;
250         srxd->type = gmnal_type;
251         srxd->nsiov = gmnal_msghdr->niov;
252         srxd->gm_source_node = gmnal_msghdr->sender_node_id;
253         
254         CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", 
255                buffer+GMNAL_MSGHDR_SIZE);
256         /*
257          *      control passes to lib, which calls cb_recv 
258          *      cb_recv is responsible for returning the buffer 
259          *      for future receive
260          */
261         rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
262
263         if (rc != PTL_OK) {
264                 /* I just received garbage; take appropriate action... */
265                 LBUG();
266         }
267
268         return(GMNAL_STATUS_OK);
269 }
270
271
272
273 /*
274  *      After a receive has been processed, 
275  *      hang out the receive buffer again.
276  *      This implicitly returns a receive token.
277  */
278 int
279 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
280 {
281         CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
282
283         CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
284
285         GMNAL_GM_LOCK(nal_data);
286         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
287                                         srxd->gmsize, GM_LOW_PRIORITY, 0 );
288         GMNAL_GM_UNLOCK(nal_data);
289
290         return(GMNAL_STATUS_OK);
291 }
292
293
294 /*
295  *      Handle a bad message
296  *      A bad message is one we don't expect or can't interpret
297  */
298 int
299 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
300 {
301         CDEBUG(D_TRACE, "Can't handle message\n");
302
303         if (!srxd)
304                 srxd = gmnal_rxbuffer_to_srxd(nal_data, 
305                                                we->buffer);
306         if (srxd) {
307                 gmnal_rx_requeue_buffer(nal_data, srxd);
308         } else {
309                 CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
310                 /*
311                  *      get rid of it ?
312                  */
313                 return(GMNAL_STATUS_FAIL);
314         }
315
316         return(GMNAL_STATUS_OK);
317 }
318
319
320
321 /*
322  *      Process a small message receive.
323  *      Get here from gmnal_receive_thread, gmnal_pre_receive
324  *      lib_parse, cb_recv
325  *      Put data from prewired receive buffer into users buffer(s)
326  *      Hang out the receive buffer again for another receive
327  *      Call lib_finalize
328  */
329 ptl_err_t
330 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
331 {
332         gmnal_srxd_t    *srxd = NULL;
333         gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
334
335
336         if (!private) {
337                 CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
338                 lib_finalize(libnal, private, cookie, PTL_FAIL);
339                 return(PTL_FAIL);
340         }
341
342         srxd = (gmnal_srxd_t*)private;
343
344         /*
345          *      let portals library know receive is complete
346          */
347         CDEBUG(D_PORTALS, "calling lib_finalize\n");
348         lib_finalize(libnal, private, cookie, PTL_OK);
349         /*
350          *      return buffer so it can be used again
351          */
352         CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
353         GMNAL_GM_LOCK(nal_data);
354         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
355                                            srxd->gmsize, GM_LOW_PRIORITY, 0);
356         GMNAL_GM_UNLOCK(nal_data);
357
358         return(PTL_OK);
359 }
360
361
362 /*
363  *      Start a small transmit. 
364  *      Use the given send token (and wired transmit buffer).
365  *      Copy headers to wired buffer and initiate gm_send from the wired buffer.
366  *      The callback function informs when the send is complete.
367  */
368 ptl_err_t
369 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
370                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
371                 gmnal_stxd_t *stxd, int size)
372 {
373         gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
374         void            *buffer = NULL;
375         gmnal_msghdr_t  *msghdr = NULL;
376         int             tot_size = 0;
377         unsigned int    local_nid;
378         gm_status_t     gm_status = GM_SUCCESS;
379
380         CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
381                "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
382                "size [%d]\n", libnal, private, cookie, hdr, type,
383                global_nid, pid, stxd, size);
384
385         CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
386                hdr->dest_nid, hdr->src_nid);
387
388         if (!nal_data) {
389                 CDEBUG(D_ERROR, "no nal_data\n");
390                 return(PTL_FAIL);
391         } else {
392                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
393         }
394
395         GMNAL_GM_LOCK(nal_data);
396         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
397                                             &local_nid);
398         GMNAL_GM_UNLOCK(nal_data);
399         if (gm_status != GM_SUCCESS) {
400                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
401                 return(PTL_FAIL);
402         }
403         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
404
405         stxd->type = GMNAL_SMALL_MESSAGE;
406         stxd->cookie = cookie;
407
408         /*
409          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
410          *      Then send the message, as the data has previously been copied in
411          *      (HP SFS 1380).
412          */
413         buffer = stxd->buffer;
414         msghdr = (gmnal_msghdr_t*)buffer;
415
416         msghdr->magic = GMNAL_MAGIC;
417         msghdr->type = GMNAL_SMALL_MESSAGE;
418         msghdr->sender_node_id = nal_data->gm_global_nid;
419         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
420
421         buffer += sizeof(gmnal_msghdr_t);
422
423         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
424         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
425
426         buffer += sizeof(ptl_hdr_t);
427
428         CDEBUG(D_INFO, "sending\n");
429         tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
430         stxd->msg_size = tot_size;
431
432
433         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
434                "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
435                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
436                stxd->msg_size, global_nid, local_nid, stxd);
437
438         GMNAL_GM_LOCK(nal_data);
439         stxd->gm_priority = GM_LOW_PRIORITY;
440         stxd->gm_target_node = local_nid;
441         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
442                                       stxd->gm_size, stxd->msg_size, 
443                                       GM_LOW_PRIORITY, local_nid, 
444                                       gmnal_small_tx_callback, (void*)stxd);
445         GMNAL_GM_UNLOCK(nal_data);
446         CDEBUG(D_INFO, "done\n");
447                 
448         return(PTL_OK);
449 }
450
451
452 /*
453  *      A callback to indicate the small transmit operation is compete
454  *      Check for erros and try to deal with them.
455  *      Call lib_finalise to inform the client application that the send 
456  *      is complete and the memory can be reused.
457  *      Return the stxd when finished with it (returns a send token)
458  */
459 void 
460 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
461 {
462         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
463         lib_msg_t       *cookie = stxd->cookie;
464         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
465         lib_nal_t       *libnal = nal_data->libnal;
466         unsigned         gnid = 0;
467         gm_status_t      gm_status = 0;
468
469         if (!stxd) {
470                 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
471                 return;
472         }
473         if (status != GM_SUCCESS) {
474                 GMNAL_GM_LOCK(nal_data);
475                 gm_status = gm_node_id_to_global_id(nal_data->gm_port,
476                                                     stxd->gm_target_node,&gnid);
477                 GMNAL_GM_UNLOCK(nal_data);
478                 if (gm_status != GM_SUCCESS) {
479                         CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
480                                gm_status);
481                         gnid = 0;
482                 }
483                 CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s] to [%u]\n",
484                        stxd, gmnal_gm_error(status), gnid);
485         }
486
487         switch(status) {
488                 case(GM_SUCCESS):
489                 break;
490
491
492
493                 case(GM_SEND_DROPPED):
494                 /*
495                  *      do a resend on the dropped ones
496                  */
497                         CDEBUG(D_ERROR, "send stxd [%p] was dropped "
498                                "resending\n", context);
499                         GMNAL_GM_LOCK(nal_data);
500                         gm_send_to_peer_with_callback(nal_data->gm_port, 
501                                                       stxd->buffer, 
502                                                       stxd->gm_size, 
503                                                       stxd->msg_size, 
504                                                       stxd->gm_priority, 
505                                                       stxd->gm_target_node, 
506                                                       gmnal_small_tx_callback,
507                                                       context);
508                         GMNAL_GM_UNLOCK(nal_data);
509                 
510                 return;
511                 case(GM_TIMED_OUT):
512                 case(GM_SEND_TIMED_OUT):
513                 /*
514                  *      drop these ones
515                  */
516                         CDEBUG(D_INFO, "calling gm_drop_sends\n");
517                         GMNAL_GM_LOCK(nal_data);
518                         gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
519                                       stxd->gm_target_node, GMNAL_GM_PORT_ID, 
520                                       gmnal_drop_sends_callback, context);
521                         GMNAL_GM_UNLOCK(nal_data);
522
523                 return;
524
525
526                 /*
527                  *      abort on these ?
528                  */
529                 case(GM_TRY_AGAIN):
530                 case(GM_INTERRUPTED):
531                 case(GM_FAILURE):
532                 case(GM_INPUT_BUFFER_TOO_SMALL):
533                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
534                 case(GM_BUSY):
535                 case(GM_MEMORY_FAULT):
536                 case(GM_INVALID_PARAMETER):
537                 case(GM_OUT_OF_MEMORY):
538                 case(GM_INVALID_COMMAND):
539                 case(GM_PERMISSION_DENIED):
540                 case(GM_INTERNAL_ERROR):
541                 case(GM_UNATTACHED):
542                 case(GM_UNSUPPORTED_DEVICE):
543                 case(GM_SEND_REJECTED):
544                 case(GM_SEND_TARGET_PORT_CLOSED):
545                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
546                 case(GM_SEND_PORT_CLOSED):
547                 case(GM_NODE_ID_NOT_YET_SET):
548                 case(GM_STILL_SHUTTING_DOWN):
549                 case(GM_CLONE_BUSY):
550                 case(GM_NO_SUCH_DEVICE):
551                 case(GM_ABORTED):
552                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
553                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
554                 case(GM_ACCESS_DENIED):
555                 case(GM_NO_DRIVER_SUPPORT):
556                 case(GM_PTE_REF_CNT_OVERFLOW):
557                 case(GM_NOT_SUPPORTED_IN_KERNEL):
558                 case(GM_NOT_SUPPORTED_ON_ARCH):
559                 case(GM_NO_MATCH):
560                 case(GM_USER_ERROR):
561                 case(GM_DATA_CORRUPTED):
562                 case(GM_HARDWARE_FAULT):
563                 case(GM_SEND_ORPHANED):
564                 case(GM_MINOR_OVERFLOW):
565                 case(GM_PAGE_TABLE_FULL):
566                 case(GM_UC_ERROR):
567                 case(GM_INVALID_PORT_NUMBER):
568                 case(GM_DEV_NOT_FOUND):
569                 case(GM_FIRMWARE_NOT_RUNNING):
570                 case(GM_YP_NO_MATCH):
571                 default:
572                 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
573                                       stxd->gm_target_node, GMNAL_GM_PORT_ID,
574                                       gmnal_resume_sending_callback, context);
575                 return;
576
577         }
578
579         /*
580          *      TO DO
581          *      If this is a large message init,
582          *      we're not finished with the data yet,
583          *      so can't call lib_finalise.
584          *      However, we're also holding on to a 
585          *      stxd here (to keep track of the source
586          *      iovec only). Should use another structure
587          *      to keep track of iovec and return stxd to 
588          *      free list earlier.
589          */
590         if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
591                 CDEBUG(D_INFO, "large transmit done\n");
592                 return;
593         }
594         gmnal_return_stxd(nal_data, stxd);
595         lib_finalize(libnal, stxd, cookie, PTL_OK);
596         return;
597 }
598
599 /*
600  *      After an error on the port
601  *      call this to allow future sends to complete
602  */
603 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
604                                  gm_status_t status)
605 {
606         gmnal_data_t    *nal_data;
607         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
608         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
609         gmnal_return_stxd(stxd->nal_data, stxd);
610         return;
611 }
612
613
614 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
615                                 gm_status_t status)
616 {
617         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
618         gmnal_data_t    *nal_data = stxd->nal_data;
619
620         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
621         if (status == GM_SUCCESS) {
622                 GMNAL_GM_LOCK(nal_data);
623                 gm_send_to_peer_with_callback(gm_port, stxd->buffer, 
624                                               stxd->gm_size, stxd->msg_size, 
625                                               stxd->gm_priority, 
626                                               stxd->gm_target_node, 
627                                               gmnal_small_tx_callback, 
628                                               context);
629                 GMNAL_GM_UNLOCK(nal_data);
630         } else {
631                 CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
632                        "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
633         }
634
635
636         return;
637 }
638
639
640 /*
641  *      Begine a large transmit.
642  *      Do a gm_register of the memory pointed to by the iovec 
643  *      and send details to the receiver. The receiver does a gm_get
644  *      to pull the data and sends and ack when finished. Upon receipt of
645  *      this ack, deregister the memory. Only 1 send token is required here.
646  */
647 int
648 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
649                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
650                 unsigned int niov, struct iovec *iov, size_t offset, int size)
651 {
652
653         gmnal_data_t    *nal_data;
654         gmnal_stxd_t    *stxd = NULL;
655         void            *buffer = NULL;
656         gmnal_msghdr_t  *msghdr = NULL;
657         unsigned int    local_nid;
658         int             mlen = 0;       /* the size of the init message data */
659         struct iovec    *iov_dup = NULL;
660         gm_status_t     gm_status;
661         int             niov_dup;
662
663
664         CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
665                "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
666                "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, 
667                global_nid, pid, niov, iov, size);
668
669         if (libnal)
670                 nal_data = (gmnal_data_t*)libnal->libnal_data;
671         else  {
672                 CDEBUG(D_ERROR, "no libnal.\n");
673                 return(GMNAL_STATUS_FAIL);
674         }
675         
676
677         /*
678          *      Get stxd and buffer. Put local address of data in buffer, 
679          *      send local addresses to target, 
680          *      wait for the target node to suck the data over.
681          *      The stxd is used to ren
682          */
683         stxd = gmnal_get_stxd(nal_data, 1);
684         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
685
686         stxd->type = GMNAL_LARGE_MESSAGE_INIT;
687         stxd->cookie = cookie;
688
689         /*
690          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
691          *      Then copy the iov in
692          */
693         buffer = stxd->buffer;
694         msghdr = (gmnal_msghdr_t*)buffer;
695
696         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
697
698         msghdr->magic = GMNAL_MAGIC;
699         msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
700         msghdr->sender_node_id = nal_data->gm_global_nid;
701         msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
702         msghdr->niov = niov ;
703         buffer += sizeof(gmnal_msghdr_t);
704         mlen = sizeof(gmnal_msghdr_t);
705         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
706
707
708         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
709
710         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
711         buffer += sizeof(ptl_hdr_t);
712         mlen += sizeof(ptl_hdr_t); 
713         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
714
715         while (offset >= iov->iov_len) {
716                 offset -= iov->iov_len;
717                 niov--;
718                 iov++;
719         } 
720
721         LASSERT(offset >= 0);
722         /*
723          *      Store the iovs in the stxd for we can get 
724          *      them later if we need them
725          */
726         stxd->iov[0].iov_base = iov->iov_base + offset; 
727         stxd->iov[0].iov_len = iov->iov_len - offset; 
728         CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
729         if (niov > 1)
730                 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
731         stxd->niov = niov;
732
733         /*
734          *      copy the iov to the buffer so target knows 
735          *      where to get the data from
736          */
737         CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
738         gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
739         mlen += stxd->niov*(sizeof(struct iovec));
740         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
741         
742         /*
743          *      register the memory so the NIC can get hold of the data
744          *      This is a slow process. it'd be good to overlap it 
745          *      with something else.
746          */
747         iov = stxd->iov;
748         iov_dup = iov;
749         niov_dup = niov;
750         while(niov--) {
751                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
752                        iov->iov_base, iov->iov_len);
753                 GMNAL_GM_LOCK(nal_data);
754                 gm_status = gm_register_memory(nal_data->gm_port, 
755                                                iov->iov_base, iov->iov_len);
756                 if (gm_status != GM_SUCCESS) {
757                         GMNAL_GM_UNLOCK(nal_data);
758                         CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
759                                "for memory [%p] len ["LPSZ"]\n", 
760                                gm_status, gmnal_gm_error(gm_status), 
761                                iov->iov_base, iov->iov_len);
762                         GMNAL_GM_LOCK(nal_data);
763                         while (iov_dup != iov) {
764                                 gm_deregister_memory(nal_data->gm_port, 
765                                                      iov_dup->iov_base, 
766                                                      iov_dup->iov_len);
767                                 iov_dup++;
768                         }
769                         GMNAL_GM_UNLOCK(nal_data);
770                         gmnal_return_stxd(nal_data, stxd);
771                         return(PTL_FAIL);
772                 }
773
774                 GMNAL_GM_UNLOCK(nal_data);
775                 iov++;
776         }
777
778         /*
779          *      Send the init message to the target
780          */
781         CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
782         GMNAL_GM_LOCK(nal_data);
783         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
784                                             &local_nid);
785         if (gm_status != GM_SUCCESS) {
786                 GMNAL_GM_UNLOCK(nal_data);
787                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
788                 gmnal_return_stxd(nal_data, stxd);
789                 /* TO DO deregister memory on failure */
790                 return(GMNAL_STATUS_FAIL);
791         }
792         CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
793         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
794                                       stxd->gm_size, mlen, GM_LOW_PRIORITY, 
795                                       local_nid, gmnal_large_tx_callback, 
796                                       (void*)stxd);
797         GMNAL_GM_UNLOCK(nal_data);
798         
799         CDEBUG(D_INFO, "done\n");
800                 
801         return(PTL_OK);
802 }
803
804 /*
805  *      Callback function indicates that send of buffer with 
806  *      large message iovec has completed (or failed).
807  */
808 void 
809 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
810 {
811         gmnal_small_tx_callback(gm_port, context, status);
812
813 }
814
815
816
817 /*
818  *      Have received a buffer that contains an iovec of the sender. 
819  *      Do a gm_register_memory of the receivers buffer and then do a get
820  *      data from the sender.
821  */
822 int
823 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
824                 unsigned int nriov, struct iovec *riov, size_t offset, 
825                 size_t mlen, size_t rlen)
826 {
827         gmnal_data_t    *nal_data = libnal->libnal_data;
828         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
829         void            *buffer = NULL;
830         struct  iovec   *riov_dup;
831         int             nriov_dup;
832         gmnal_msghdr_t  *msghdr = NULL;
833         gm_status_t     gm_status;
834
835         CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
836                "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
837                 libnal, private, cookie, nriov, riov, mlen, rlen);
838
839         if (!srxd) {
840                 CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
841                 lib_finalize(libnal, private, cookie, PTL_FAIL);
842                 return(PTL_FAIL);
843         }
844
845         buffer = srxd->buffer;
846         msghdr = (gmnal_msghdr_t*)buffer;
847         buffer += sizeof(gmnal_msghdr_t);
848         buffer += sizeof(ptl_hdr_t);
849
850         /*
851          *      Store the senders stxd address in the srxd for this message
852          *      The gmnal_large_message_ack needs it to notify the sender
853          *      the pull of data is complete
854          */
855         srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
856
857         /*
858          *      Register the receivers memory
859          *      get the data,
860          *      tell the sender that we got the data
861          *      then tell the receiver we got the data
862          *      TO DO
863          *      If the iovecs match, could interleave 
864          *      gm_registers and gm_gets for each element
865          */
866         while (offset >= riov->iov_len) {
867                 offset -= riov->iov_len;
868                 riov++;
869                 nriov--;
870         } 
871         LASSERT (nriov >= 0);
872         LASSERT (offset >= 0);
873         /*
874          *      do this so the final gm_get callback can deregister the memory
875          */
876         PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
877
878         srxd->riov[0].iov_base = riov->iov_base + offset;
879         srxd->riov[0].iov_len = riov->iov_len - offset;
880         if (nriov > 1)
881                 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
882         srxd->nriov = nriov;
883         
884         riov = srxd->riov;
885         nriov_dup = nriov;
886         riov_dup = riov;
887         while(nriov--) {
888                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
889                        riov->iov_base, riov->iov_len);
890                 GMNAL_GM_LOCK(nal_data);
891                 gm_status = gm_register_memory(nal_data->gm_port, 
892                                                riov->iov_base, riov->iov_len);
893                 if (gm_status != GM_SUCCESS) {
894                         GMNAL_GM_UNLOCK(nal_data);
895                         CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
896                                "for memory [%p] len ["LPSZ"]\n", 
897                                gm_status, gmnal_gm_error(gm_status), 
898                                riov->iov_base, riov->iov_len);
899                         GMNAL_GM_LOCK(nal_data);
900                         while (riov_dup != riov) {
901                                 gm_deregister_memory(nal_data->gm_port, 
902                                                      riov_dup->iov_base, 
903                                                      riov_dup->iov_len);
904                                 riov_dup++;
905                         }
906                         GMNAL_GM_LOCK(nal_data);
907                         /*
908                          *      give back srxd and buffer. Send NACK to sender
909                          */
910                         PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
911                         return(PTL_FAIL);
912                 }
913                 GMNAL_GM_UNLOCK(nal_data);
914                 riov++;
915         }
916
917         /*
918          *      now do gm_get to get the data
919          */
920         srxd->cookie = cookie;
921         if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, 
922                               nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
923                 CDEBUG(D_ERROR, "can't get the data");
924         }
925
926         CDEBUG(D_INFO, "lgmanl_large_rx done\n");
927
928         return(PTL_OK);
929 }
930
931
932 /*
933  *      Perform a number of remote gets as part of receiving 
934  *      a large message.
935  *      The final one to complete (i.e. the last callback to get called)
936  *      tidies up.
937  *      gm_get requires a send token.
938  */
939 int
940 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, 
941                   int nriov, struct iovec *riov)
942 {
943
944         int     ncalls = 0;
945
946         CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
947                "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
948
949
950         ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
951         if (ncalls < 0) {
952                 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
953                 return(GMNAL_STATUS_FAIL);
954         }
955         CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
956         spin_lock_init(&srxd->callback_lock);
957         srxd->ncallbacks = ncalls;
958         srxd->callback_status = 0;
959
960         ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
961         if (ncalls < 0) {
962                 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
963                 return(GMNAL_STATUS_FAIL);
964         }
965
966         return(GMNAL_STATUS_OK);
967
968 }
969
970
971 /*
972  *      pull data from source node (source iovec) to a local iovec.
973  *      The iovecs may not match which adds the complications below.
974  *      Count the number of gm_gets that will be required so the callbacks
975  *      can determine who is the last one.
976  */     
977 int
978 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, 
979                struct iovec *siov, int nriov, struct iovec *riov)
980 {
981
982         int     ncalls = 0;
983         int     slen = siov->iov_len, rlen = riov->iov_len;
984         char    *sbuf = siov->iov_base, *rbuf = riov->iov_base; 
985         unsigned long   sbuf_long;
986         gm_remote_ptr_t remote_ptr = 0;
987         unsigned int    source_node;
988         gmnal_ltxd_t    *ltxd = NULL;
989         gmnal_data_t    *nal_data = srxd->nal_data;
990
991         CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
992         if (do_copy) {
993                 if (!nal_data) {
994                         CDEBUG(D_ERROR, "Bad args No nal_data\n");
995                         return(GMNAL_STATUS_FAIL);
996                 }
997                 GMNAL_GM_LOCK(nal_data);
998                 if (gm_global_id_to_node_id(nal_data->gm_port, 
999                                             srxd->gm_source_node, 
1000                                             &source_node) != GM_SUCCESS) {
1001
1002                         CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
1003                                "to local node_id\n", srxd->gm_source_node);
1004                         GMNAL_GM_UNLOCK(nal_data);
1005                         return(GMNAL_STATUS_FAIL);
1006                 }
1007                 GMNAL_GM_UNLOCK(nal_data);
1008                 /*
1009                  *      We need a send token to use gm_get
1010                  *      getting an stxd gets us a send token.
1011                  *      the stxd is used as the context to the
1012                  *      callback function (so stxd can be returned).
1013                  *      Set pointer in stxd to srxd so callback count in srxd
1014                  *      can be decremented to find last callback to complete
1015                  */
1016                 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", 
1017                        srxd->gm_source_node, source_node);
1018         }
1019
1020         do {
1021                 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1022                                 sbuf, slen, rbuf, rlen);
1023                 if (slen > rlen) {
1024                         ncalls++;
1025                         if (do_copy) {
1026                                 CDEBUG(D_INFO, "slen>rlen\n");
1027                                 ltxd = gmnal_get_ltxd(nal_data);
1028                                 ltxd->srxd = srxd;
1029                                 GMNAL_GM_LOCK(nal_data);
1030                                 /* 
1031                                  *      funny business to get rid 
1032                                  *      of compiler warning 
1033                                  */
1034                                 sbuf_long = (unsigned long) sbuf;
1035                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1036                                 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1037                                        rlen, GM_LOW_PRIORITY, source_node,
1038                                        GMNAL_GM_PORT_ID,
1039                                        gmnal_remote_get_callback, ltxd);
1040                                 GMNAL_GM_UNLOCK(nal_data);
1041                         }
1042                         /*
1043                          *      at the end of 1 iov element
1044                          */
1045                         sbuf+=rlen;
1046                         slen-=rlen;
1047                         riov++;
1048                         nriov--;
1049                         rbuf = riov->iov_base;
1050                         rlen = riov->iov_len;
1051                 } else if (rlen > slen) {
1052                         ncalls++;
1053                         if (do_copy) {
1054                                 CDEBUG(D_INFO, "slen<rlen\n");
1055                                 ltxd = gmnal_get_ltxd(nal_data);
1056                                 ltxd->srxd = srxd;
1057                                 GMNAL_GM_LOCK(nal_data);
1058                                 sbuf_long = (unsigned long) sbuf;
1059                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1060                                 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1061                                        slen, GM_LOW_PRIORITY, source_node,
1062                                        GMNAL_GM_PORT_ID,
1063                                        gmnal_remote_get_callback, ltxd);
1064                                 GMNAL_GM_UNLOCK(nal_data);
1065                         }
1066                         /*
1067                          *      at end of siov element
1068                          */
1069                         rbuf+=slen;
1070                         rlen-=slen;
1071                         siov++;
1072                         sbuf = siov->iov_base;
1073                         slen = siov->iov_len;
1074                 } else {
1075                         ncalls++;
1076                         if (do_copy) {
1077                                 CDEBUG(D_INFO, "rlen=slen\n");
1078                                 ltxd = gmnal_get_ltxd(nal_data);
1079                                 ltxd->srxd = srxd;
1080                                 GMNAL_GM_LOCK(nal_data);
1081                                 sbuf_long = (unsigned long) sbuf;
1082                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1083                                 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1084                                        rlen, GM_LOW_PRIORITY, source_node,
1085                                        GMNAL_GM_PORT_ID,
1086                                        gmnal_remote_get_callback, ltxd);
1087                                 GMNAL_GM_UNLOCK(nal_data);
1088                         }
1089                         /*
1090                          *      at end of siov and riov element
1091                          */
1092                         siov++;
1093                         sbuf = siov->iov_base;
1094                         slen = siov->iov_len;
1095                         riov++;
1096                         nriov--;
1097                         rbuf = riov->iov_base;
1098                         rlen = riov->iov_len;
1099                 }
1100
1101         } while (nriov);
1102         return(ncalls);
1103 }
1104
1105
1106 /*
1107  *      The callback function that is invoked after each gm_get call completes.
1108  *      Multiple callbacks may be invoked for 1 transaction, only the final
1109  *      callback has work to do.
1110  */
1111 void
1112 gmnal_remote_get_callback(gm_port_t *gm_port, void *context, 
1113                            gm_status_t status)
1114 {
1115
1116         gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
1117         gmnal_srxd_t    *srxd = ltxd->srxd;
1118         lib_nal_t       *libnal = srxd->nal_data->libnal;
1119         int             lastone;
1120         struct  iovec   *riov;
1121         int             nriov;
1122         gmnal_data_t    *nal_data;
1123
1124         CDEBUG(D_TRACE, "called for context [%p]\n", context);
1125
1126         if (status != GM_SUCCESS) {
1127                 CDEBUG(D_ERROR, "reports error [%d][%s]\n", status, 
1128                        gmnal_gm_error(status));
1129         }
1130
1131         spin_lock(&srxd->callback_lock);
1132         srxd->ncallbacks--;
1133         srxd->callback_status |= status;
1134         lastone = srxd->ncallbacks?0:1;
1135         spin_unlock(&srxd->callback_lock);
1136         nal_data = srxd->nal_data;
1137
1138         /*
1139          *      everyone returns a send token
1140          */
1141         gmnal_return_ltxd(nal_data, ltxd);
1142
1143         if (!lastone) {
1144                 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1145                 return;
1146         }
1147         
1148         /*
1149          *      Let our client application proceed
1150          */     
1151         CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
1152         lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1153
1154         /*
1155          *      send an ack to the sender to let him know we got the data
1156          */
1157         gmnal_large_tx_ack(nal_data, srxd);
1158
1159         /*
1160          *      Unregister the memory that was used
1161          *      This is a very slow business (slower then register)
1162          */
1163         nriov = srxd->nriov;
1164         riov = srxd->riov;
1165         GMNAL_GM_LOCK(nal_data);
1166         while (nriov--) {
1167                 CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
1168                 if (gm_deregister_memory(srxd->nal_data->gm_port, 
1169                                          riov->iov_base, riov->iov_len)) {
1170                         CDEBUG(D_ERROR, "failed to deregister memory [%p]\n", 
1171                                riov->iov_base);
1172                 }
1173                 riov++;
1174         }
1175         GMNAL_GM_UNLOCK(nal_data);
1176         PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1177
1178         /*
1179          *      repost the receive buffer (return receive token)
1180          */
1181         GMNAL_GM_LOCK(nal_data);
1182         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
1183                                            srxd->gmsize, GM_LOW_PRIORITY, 0);   
1184         GMNAL_GM_UNLOCK(nal_data);
1185         
1186         return;
1187 }
1188
1189
1190 /*
1191  *      Called on target node.
1192  *      After pulling data from a source node
1193  *      send an ack message to indicate the large transmit is complete.
1194  */
1195 void 
1196 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1197 {
1198
1199         gmnal_stxd_t    *stxd;
1200         gmnal_msghdr_t *msghdr;
1201         void            *buffer = NULL;
1202         unsigned int    local_nid;
1203         gm_status_t     gm_status = GM_SUCCESS;
1204
1205         CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, 
1206                srxd->gm_source_node);
1207
1208         GMNAL_GM_LOCK(nal_data);
1209         gm_status = gm_global_id_to_node_id(nal_data->gm_port, 
1210                                             srxd->gm_source_node, &local_nid);
1211         GMNAL_GM_UNLOCK(nal_data);
1212         if (gm_status != GM_SUCCESS) {
1213                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
1214                 return;
1215         }
1216         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1217
1218         stxd = gmnal_get_stxd(nal_data, 1);
1219         CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1220
1221         stxd->nal_data = nal_data;
1222         stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1223
1224         /*
1225          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
1226          *      Then copy the data in
1227          */
1228         buffer = stxd->buffer;
1229         msghdr = (gmnal_msghdr_t*)buffer;
1230
1231         /*
1232          *      Add in the address of the original stxd from the sender node
1233          *      so it knows which thread to notify.
1234          */
1235         msghdr->magic = GMNAL_MAGIC;
1236         msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1237         msghdr->sender_node_id = nal_data->gm_global_nid;
1238         msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
1239         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1240
1241         CDEBUG(D_INFO, "sending\n");
1242         stxd->msg_size= sizeof(gmnal_msghdr_t);
1243
1244
1245         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1246                "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1247                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
1248                stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1249         GMNAL_GM_LOCK(nal_data);
1250         stxd->gm_priority = GM_LOW_PRIORITY;
1251         stxd->gm_target_node = local_nid;
1252         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
1253                                       stxd->gm_size, stxd->msg_size, 
1254                                       GM_LOW_PRIORITY, local_nid, 
1255                                       gmnal_large_tx_ack_callback, 
1256                                       (void*)stxd);
1257         
1258         GMNAL_GM_UNLOCK(nal_data);
1259         CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1260                 
1261         return;
1262 }
1263
1264
1265 /*
1266  *      A callback to indicate the small transmit operation is compete
1267  *      Check for errors and try to deal with them.
1268  *      Call lib_finalise to inform the client application that the 
1269  *      send is complete and the memory can be reused.
1270  *      Return the stxd when finished with it (returns a send token)
1271  */
1272 void 
1273 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, 
1274                              gm_status_t status)
1275 {
1276         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
1277         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
1278
1279         if (!stxd) {
1280                 CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
1281                 return;
1282         }
1283         CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1284                stxd, status);
1285         gmnal_return_stxd(stxd->nal_data, stxd);
1286
1287         GMNAL_GM_UNLOCK(nal_data);
1288         return;
1289 }
1290
1291 /*
1292  *      Indicates the large transmit operation is compete.
1293  *      Called on transmit side (means data has been pulled  by receiver 
1294  *      or failed).
1295  *      Call lib_finalise to inform the client application that the send 
1296  *      is complete, deregister the memory and return the stxd. 
1297  *      Finally, report the rx buffer that the ack message was delivered in.
1298  */
1299 void 
1300 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1301 {
1302         lib_nal_t       *libnal = nal_data->libnal;
1303         gmnal_stxd_t    *stxd = NULL;
1304         gmnal_msghdr_t  *msghdr = NULL;
1305         void            *buffer = NULL;
1306         struct  iovec   *iov;
1307
1308
1309         CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1310
1311         buffer = srxd->buffer;
1312         msghdr = (gmnal_msghdr_t*)buffer;
1313         stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
1314
1315         CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1316
1317         lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1318
1319         /*
1320          *      extract the iovec from the stxd, deregister the memory.
1321          *      free the space used to store the iovec
1322          */
1323         iov = stxd->iov;
1324         while(stxd->niov--) {
1325                 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1326                        iov->iov_base, iov->iov_len);
1327                 GMNAL_GM_LOCK(nal_data);
1328                 gm_deregister_memory(nal_data->gm_port, iov->iov_base, 
1329                                      iov->iov_len);
1330                 GMNAL_GM_UNLOCK(nal_data);
1331                 iov++;
1332         }
1333
1334         /*
1335          *      return the send token
1336          *      TO DO It is bad to hold onto the send token so long?
1337          */
1338         gmnal_return_stxd(nal_data, stxd);
1339
1340
1341         /*
1342          *      requeue the receive buffer 
1343          */
1344         gmnal_rx_requeue_buffer(nal_data, srxd);
1345         
1346
1347         return;
1348 }