Whamcloud - gitweb
* ranal passes netregression
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmnal.h"
27
28 /*
29  *      The caretaker thread
30  *      This is main thread of execution for the NAL side
31  *      This guy waits in gm_blocking_recvive and gets
32  *      woken up when the myrinet adaptor gets an interrupt.
33  *      Hands off receive operations to the receive thread 
34  *      This thread Looks after gm_callbacks etc inline.
35  */
36 int
37 gmnal_ct_thread(void *arg)
38 {
39         gmnal_data_t            *nal_data;
40         gm_recv_event_t         *rxevent = NULL;
41         gm_recv_t               *recv = NULL;
42
43         if (!arg) {
44                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
45                 return(-1);
46         }
47
48         nal_data = (gmnal_data_t*)arg;
49         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
50
51         daemonize();
52
53         nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
54
55         GMNAL_GM_LOCK(nal_data);
56         while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
57                 CDEBUG(D_NET, "waiting\n");
58                 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
59                 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
60                         CDEBUG(D_INFO, "time to exit\n");
61                         break;
62                 }
63                 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
64                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
65
66                         case(GM_RECV_EVENT):
67                                 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
68                                 recv = (gm_recv_t*)&rxevent->recv;
69                                 GMNAL_GM_UNLOCK(nal_data);
70                                 gmnal_add_rxtwe(nal_data, recv);
71                                 GMNAL_GM_LOCK(nal_data);
72                                 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
73                         break;
74                         case(_GM_SLEEP_EVENT):
75                                 /*
76                                  *      Blocking receive above just returns
77                                  *      immediatly with _GM_SLEEP_EVENT
78                                  *      Don't know what this is
79                                  */
80                                 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
81                                 GMNAL_GM_UNLOCK(nal_data);
82                                 gm_unknown(nal_data->gm_port, rxevent);
83                                 GMNAL_GM_LOCK(nal_data);
84                                 CDEBUG(D_INFO, "Awake from gm_unknown\n");
85                                 break;
86                                 
87                         default:
88                                 /*
89                                  *      Don't know what this is
90                                  *      gm_unknown will make sense of it
91                                  *      Should be able to do something with
92                                  *      FAST_RECV_EVENTS here.
93                                  */
94                                 CDEBUG(D_NET, "Passing event to gm_unknown\n");
95                                 GMNAL_GM_UNLOCK(nal_data);
96                                 gm_unknown(nal_data->gm_port, rxevent);
97                                 GMNAL_GM_LOCK(nal_data);
98                                 CDEBUG(D_INFO, "Processed unknown event\n");
99                 }
100         }
101         GMNAL_GM_UNLOCK(nal_data);
102         nal_data->ctthread_flag = GMNAL_THREAD_RESET;
103         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
104         return(GMNAL_STATUS_OK);
105 }
106
107
108 /*
109  *      process a receive event
110  */
111 int gmnal_rx_thread(void *arg)
112 {
113         gmnal_data_t            *nal_data;
114         void                    *buffer;
115         gmnal_rxtwe_t           *we = NULL;
116
117         if (!arg) {
118                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
119                 return(-1);
120         }
121
122         nal_data = (gmnal_data_t*)arg;
123         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
124
125         daemonize();
126         /*
127          *      set 1 bit for each thread started
128          *      doesn't matter which bit
129          */
130         spin_lock(&nal_data->rxthread_flag_lock);
131         if (nal_data->rxthread_flag)
132                 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
133         else
134                 nal_data->rxthread_flag = 1;
135         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
136         spin_unlock(&nal_data->rxthread_flag_lock);
137
138         while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
139                 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
140                 we = gmnal_get_rxtwe(nal_data);
141                 if (!we) {
142                         CDEBUG(D_INFO, "Receive thread time to exit\n");
143                         break;
144                 }
145
146                 buffer = we->buffer;
147                 switch(((gmnal_msghdr_t*)buffer)->type) {
148                 case(GMNAL_SMALL_MESSAGE):
149                         gmnal_pre_receive(nal_data, we, 
150                                            GMNAL_SMALL_MESSAGE);
151                 break;  
152                 case(GMNAL_LARGE_MESSAGE_INIT):
153                         gmnal_pre_receive(nal_data, we, 
154                                            GMNAL_LARGE_MESSAGE_INIT);
155                 break;  
156                 case(GMNAL_LARGE_MESSAGE_ACK):
157                         gmnal_pre_receive(nal_data, we, 
158                                            GMNAL_LARGE_MESSAGE_ACK);
159                 break;  
160                 default:
161                         CDEBUG(D_ERROR, "Unsupported message type\n");
162                         gmnal_rx_bad(nal_data, we, NULL);
163                 }
164                 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
165         }
166
167         spin_lock(&nal_data->rxthread_flag_lock);
168         nal_data->rxthread_flag/=2;
169         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
170         spin_unlock(&nal_data->rxthread_flag_lock);
171         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
172         return(GMNAL_STATUS_OK);
173 }
174
175
176
177 /*
178  *      Start processing a small message receive
179  *      Get here from gmnal_receive_thread
180  *      Hand off to lib_parse, which calls cb_recv
181  *      which hands back to gmnal_small_receive
182  *      Deal with all endian stuff here.
183  */
184 int
185 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
186 {
187         gmnal_srxd_t    *srxd = NULL;
188         void            *buffer = NULL;
189         unsigned int snode, sport, type, length;
190         gmnal_msghdr_t  *gmnal_msghdr;
191         ptl_hdr_t       *portals_hdr;
192         int              rc;
193
194         CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", 
195                nal_data, we, gmnal_type);
196
197         buffer = we->buffer;
198         snode = we->snode;
199         sport = we->sport;
200         type = we->type;
201         buffer = we->buffer;
202         length = we->length;
203
204         gmnal_msghdr = (gmnal_msghdr_t*)buffer;
205         portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
206
207         CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
208                "type [%d], length [%d], buffer [%p]\n",
209                snode, sport, type, length, buffer);
210         CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
211                "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, 
212                gmnal_msghdr->magic, gmnal_msghdr->type);
213         CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
214                "dest_node ["LPD64"]\n", portals_hdr->src_nid, 
215                portals_hdr->dest_nid);
216
217         
218         /*
219          *      Get a receive descriptor for this message
220          */
221         srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
222         CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
223         if (!srxd) {
224                 CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
225                 /* I think passing a NULL srxd to lib_parse will crash
226                  * gmnal_recv() */
227                 LBUG();
228                 lib_parse(nal_data->libnal, portals_hdr, srxd);
229                 return(GMNAL_STATUS_FAIL);
230         }
231
232         /*
233          *      no need to bother portals library with this
234          */
235         if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
236                 gmnal_large_tx_ack_received(nal_data, srxd);
237                 return(GMNAL_STATUS_OK);
238         }
239
240         srxd->nal_data = nal_data;
241         srxd->type = gmnal_type;
242         srxd->nsiov = gmnal_msghdr->niov;
243         srxd->gm_source_node = gmnal_msghdr->sender_node_id;
244         
245         CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", 
246                buffer+GMNAL_MSGHDR_SIZE);
247         /*
248          *      control passes to lib, which calls cb_recv 
249          *      cb_recv is responsible for returning the buffer 
250          *      for future receive
251          */
252         rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
253
254         if (rc != PTL_OK) {
255                 /* I just received garbage; take appropriate action... */
256                 LBUG();
257         }
258
259         return(GMNAL_STATUS_OK);
260 }
261
262
263
264 /*
265  *      After a receive has been processed, 
266  *      hang out the receive buffer again.
267  *      This implicitly returns a receive token.
268  */
269 int
270 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
271 {
272         CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
273
274         CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
275
276         GMNAL_GM_LOCK(nal_data);
277         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
278                                         srxd->gmsize, GM_LOW_PRIORITY, 0 );
279         GMNAL_GM_UNLOCK(nal_data);
280
281         return(GMNAL_STATUS_OK);
282 }
283
284
285 /*
286  *      Handle a bad message
287  *      A bad message is one we don't expect or can't interpret
288  */
289 int
290 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
291 {
292         CDEBUG(D_TRACE, "Can't handle message\n");
293
294         if (!srxd)
295                 srxd = gmnal_rxbuffer_to_srxd(nal_data, 
296                                                we->buffer);
297         if (srxd) {
298                 gmnal_rx_requeue_buffer(nal_data, srxd);
299         } else {
300                 CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
301                 /*
302                  *      get rid of it ?
303                  */
304                 return(GMNAL_STATUS_FAIL);
305         }
306
307         return(GMNAL_STATUS_OK);
308 }
309
310
311
312 /*
313  *      Process a small message receive.
314  *      Get here from gmnal_receive_thread, gmnal_pre_receive
315  *      lib_parse, cb_recv
316  *      Put data from prewired receive buffer into users buffer(s)
317  *      Hang out the receive buffer again for another receive
318  *      Call lib_finalize
319  */
320 int
321 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
322                 unsigned int niov, struct iovec *iov, size_t offset, size_t mlen, size_t rlen)
323 {
324         gmnal_srxd_t    *srxd = NULL;
325         void    *buffer = NULL;
326         gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
327
328
329         CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
330
331         if (!private) {
332                 CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
333                 lib_finalize(libnal, private, cookie, PTL_FAIL);
334                 return(PTL_FAIL);
335         }
336
337         srxd = (gmnal_srxd_t*)private;
338         buffer = srxd->buffer;
339         buffer += sizeof(gmnal_msghdr_t);
340         buffer += sizeof(ptl_hdr_t);
341
342         while(niov--) {
343                 if (offset >= iov->iov_len) {
344                         offset -= iov->iov_len;
345                 } else if (offset > 0) {
346                         CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
347                                "offset %d, len ["LPSZ"]\n", iov,
348                                iov->iov_base + offset, iov->iov_len, offset,
349                                iov->iov_len - offset);
350                         gm_bcopy(buffer, iov->iov_base + offset,
351                                  iov->iov_len - offset);
352                         offset = 0;
353                         buffer += iov->iov_len - offset;
354                 } else {
355                         CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov,
356                                iov->iov_len);
357                         gm_bcopy(buffer, iov->iov_base, iov->iov_len);
358                         buffer += iov->iov_len;
359                 }
360                 iov++;
361         }
362
363
364         /*
365          *      let portals library know receive is complete
366          */
367         CDEBUG(D_PORTALS, "calling lib_finalize\n");
368         lib_finalize(libnal, private, cookie, PTL_OK);
369         /*
370          *      return buffer so it can be used again
371          */
372         CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
373         GMNAL_GM_LOCK(nal_data);
374         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
375                                            srxd->gmsize, GM_LOW_PRIORITY, 0);   
376         GMNAL_GM_UNLOCK(nal_data);
377
378         return(PTL_OK);
379 }
380
381
382 /*
383  *      Start a small transmit. 
384  *      Get a send token (and wired transmit buffer).
385  *      Copy data from senders buffer to wired buffer and
386  *      initiate gm_send from the wired buffer.
387  *      The callback function informs when the send is complete.
388  */
389 int
390 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
391                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
392                 unsigned int niov, struct iovec *iov, size_t offset, int size)
393 {
394         gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
395         gmnal_stxd_t    *stxd = NULL;
396         void            *buffer = NULL;
397         gmnal_msghdr_t  *msghdr = NULL;
398         int             tot_size = 0;
399         unsigned int    local_nid;
400         gm_status_t     gm_status = GM_SUCCESS;
401
402         CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
403                "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
404                "iov [%p] size [%d]\n", libnal, private, cookie, hdr, type, 
405                global_nid, pid, niov, iov, size);
406
407         CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
408                hdr->dest_nid, hdr->src_nid);
409
410         if (!nal_data) {
411                 CDEBUG(D_ERROR, "no nal_data\n");
412                 return(GMNAL_STATUS_FAIL);
413         } else {
414                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
415         }
416
417         GMNAL_GM_LOCK(nal_data);
418         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
419                                             &local_nid);
420         GMNAL_GM_UNLOCK(nal_data);
421         if (gm_status != GM_SUCCESS) {
422                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
423                 return(GMNAL_STATUS_FAIL);
424         }
425         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
426
427         stxd = gmnal_get_stxd(nal_data, 1);
428         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
429
430         stxd->type = GMNAL_SMALL_MESSAGE;
431         stxd->cookie = cookie;
432
433         /*
434          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
435          *      Then copy the data in
436          */
437         buffer = stxd->buffer;
438         msghdr = (gmnal_msghdr_t*)buffer;
439
440         msghdr->magic = GMNAL_MAGIC;
441         msghdr->type = GMNAL_SMALL_MESSAGE;
442         msghdr->sender_node_id = nal_data->gm_global_nid;
443         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
444
445         buffer += sizeof(gmnal_msghdr_t);
446
447         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
448         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
449
450         buffer += sizeof(ptl_hdr_t);
451
452         while(niov--) {
453                 if (offset >= iov->iov_len) {
454                         offset -= iov->iov_len;
455                 } else if (offset > 0) {
456                         CDEBUG(D_INFO, "processing iov [%p] base [%p] len ["LPSZ"] to [%p]\n", 
457                                 iov, iov->iov_base + offset, iov->iov_len - offset, buffer);
458                         gm_bcopy(iov->iov_base + offset, buffer, iov->iov_len - offset);
459                         buffer+= iov->iov_len - offset;
460                         offset = 0;
461                 } else {
462                         CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n", 
463                                 iov, iov->iov_len, buffer);
464                         gm_bcopy(iov->iov_base, buffer, iov->iov_len);
465                         buffer+= iov->iov_len;
466                 } 
467                 iov++;
468         }
469
470         CDEBUG(D_INFO, "sending\n");
471         tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
472         stxd->msg_size = tot_size;
473
474
475         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
476                "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
477                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
478                stxd->msg_size, global_nid, local_nid, stxd);
479
480         GMNAL_GM_LOCK(nal_data);
481         stxd->gm_priority = GM_LOW_PRIORITY;
482         stxd->gm_target_node = local_nid;
483         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
484                                       stxd->gm_size, stxd->msg_size, 
485                                       GM_LOW_PRIORITY, local_nid, 
486                                       gmnal_small_tx_callback, (void*)stxd);
487         GMNAL_GM_UNLOCK(nal_data);
488         CDEBUG(D_INFO, "done\n");
489                 
490         return(PTL_OK);
491 }
492
493
494 /*
495  *      A callback to indicate the small transmit operation is compete
496  *      Check for erros and try to deal with them.
497  *      Call lib_finalise to inform the client application that the send 
498  *      is complete and the memory can be reused.
499  *      Return the stxd when finished with it (returns a send token)
500  */
501 void 
502 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
503 {
504         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
505         lib_msg_t       *cookie = stxd->cookie;
506         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
507         lib_nal_t       *libnal = nal_data->libnal;
508
509         if (!stxd) {
510                 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
511                 return;
512         }
513         if (status != GM_SUCCESS) {
514                 CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s]\n", 
515                        stxd, gmnal_gm_error(status));
516         }
517
518         switch(status) {
519                 case(GM_SUCCESS):
520                 break;
521
522
523
524                 case(GM_SEND_DROPPED):
525                 /*
526                  *      do a resend on the dropped ones
527                  */
528                         CDEBUG(D_ERROR, "send stxd [%p] was dropped "
529                                "resending\n", context);
530                         GMNAL_GM_LOCK(nal_data);
531                         gm_send_to_peer_with_callback(nal_data->gm_port, 
532                                                       stxd->buffer, 
533                                                       stxd->gm_size, 
534                                                       stxd->msg_size, 
535                                                       stxd->gm_priority, 
536                                                       stxd->gm_target_node, 
537                                                       gmnal_small_tx_callback,
538                                                       context);
539                         GMNAL_GM_UNLOCK(nal_data);
540                 
541                 return;
542                 case(GM_TIMED_OUT):
543                 case(GM_SEND_TIMED_OUT):
544                 /*
545                  *      drop these ones
546                  */
547                         CDEBUG(D_INFO, "calling gm_drop_sends\n");
548                         GMNAL_GM_LOCK(nal_data);
549                         gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
550                                       stxd->gm_target_node, GMNAL_GM_PORT, 
551                                       gmnal_drop_sends_callback, context);
552                         GMNAL_GM_UNLOCK(nal_data);
553
554                 return;
555
556
557                 /*
558                  *      abort on these ?
559                  */
560                 case(GM_TRY_AGAIN):
561                 case(GM_INTERRUPTED):
562                 case(GM_FAILURE):
563                 case(GM_INPUT_BUFFER_TOO_SMALL):
564                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
565                 case(GM_BUSY):
566                 case(GM_MEMORY_FAULT):
567                 case(GM_INVALID_PARAMETER):
568                 case(GM_OUT_OF_MEMORY):
569                 case(GM_INVALID_COMMAND):
570                 case(GM_PERMISSION_DENIED):
571                 case(GM_INTERNAL_ERROR):
572                 case(GM_UNATTACHED):
573                 case(GM_UNSUPPORTED_DEVICE):
574                 case(GM_SEND_REJECTED):
575                 case(GM_SEND_TARGET_PORT_CLOSED):
576                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
577                 case(GM_SEND_PORT_CLOSED):
578                 case(GM_NODE_ID_NOT_YET_SET):
579                 case(GM_STILL_SHUTTING_DOWN):
580                 case(GM_CLONE_BUSY):
581                 case(GM_NO_SUCH_DEVICE):
582                 case(GM_ABORTED):
583                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
584                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
585                 case(GM_ACCESS_DENIED):
586                 case(GM_NO_DRIVER_SUPPORT):
587                 case(GM_PTE_REF_CNT_OVERFLOW):
588                 case(GM_NOT_SUPPORTED_IN_KERNEL):
589                 case(GM_NOT_SUPPORTED_ON_ARCH):
590                 case(GM_NO_MATCH):
591                 case(GM_USER_ERROR):
592                 case(GM_DATA_CORRUPTED):
593                 case(GM_HARDWARE_FAULT):
594                 case(GM_SEND_ORPHANED):
595                 case(GM_MINOR_OVERFLOW):
596                 case(GM_PAGE_TABLE_FULL):
597                 case(GM_UC_ERROR):
598                 case(GM_INVALID_PORT_NUMBER):
599                 case(GM_DEV_NOT_FOUND):
600                 case(GM_FIRMWARE_NOT_RUNNING):
601                 case(GM_YP_NO_MATCH):
602                 default:
603                         CDEBUG(D_ERROR, "Unknown send error\n");
604                 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
605                                       stxd->gm_target_node, GMNAL_GM_PORT,
606                                       gmnal_resume_sending_callback, context);
607                 return;
608
609         }
610
611         /*
612          *      TO DO
613          *      If this is a large message init,
614          *      we're not finished with the data yet,
615          *      so can't call lib_finalise.
616          *      However, we're also holding on to a 
617          *      stxd here (to keep track of the source
618          *      iovec only). Should use another structure
619          *      to keep track of iovec and return stxd to 
620          *      free list earlier.
621          */
622         if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
623                 CDEBUG(D_INFO, "large transmit done\n");
624                 return;
625         }
626         gmnal_return_stxd(nal_data, stxd);
627         lib_finalize(libnal, stxd, cookie, PTL_OK);
628         return;
629 }
630
631 /*
632  *      After an error on the port
633  *      call this to allow future sends to complete
634  */
635 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
636                                  gm_status_t status)
637 {
638         gmnal_data_t    *nal_data;
639         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
640         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
641         gmnal_return_stxd(stxd->nal_data, stxd);
642         return;
643 }
644
645
646 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
647                                 gm_status_t status)
648 {
649         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
650         gmnal_data_t    *nal_data = stxd->nal_data;
651
652         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
653         if (status == GM_SUCCESS) {
654                 GMNAL_GM_LOCK(nal_data);
655                 gm_send_to_peer_with_callback(gm_port, stxd->buffer, 
656                                               stxd->gm_size, stxd->msg_size, 
657                                               stxd->gm_priority, 
658                                               stxd->gm_target_node, 
659                                               gmnal_small_tx_callback, 
660                                               context);
661                 GMNAL_GM_LOCK(nal_data);
662         } else {
663                 CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
664                        "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
665         }
666
667
668         return;
669 }
670
671
672 /*
673  *      Begine a large transmit.
674  *      Do a gm_register of the memory pointed to by the iovec 
675  *      and send details to the receiver. The receiver does a gm_get
676  *      to pull the data and sends and ack when finished. Upon receipt of
677  *      this ack, deregister the memory. Only 1 send token is required here.
678  */
679 int
680 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
681                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
682                 unsigned int niov, struct iovec *iov, size_t offset, int size)
683 {
684
685         gmnal_data_t    *nal_data;
686         gmnal_stxd_t    *stxd = NULL;
687         void            *buffer = NULL;
688         gmnal_msghdr_t  *msghdr = NULL;
689         unsigned int    local_nid;
690         int             mlen = 0;       /* the size of the init message data */
691         struct iovec    *iov_dup = NULL;
692         gm_status_t     gm_status;
693         int             niov_dup;
694
695
696         CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
697                "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
698                "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, 
699                global_nid, pid, niov, iov, size);
700
701         if (libnal)
702                 nal_data = (gmnal_data_t*)libnal->libnal_data;
703         else  {
704                 CDEBUG(D_ERROR, "no libnal.\n");
705                 return(GMNAL_STATUS_FAIL);
706         }
707         
708
709         /*
710          *      Get stxd and buffer. Put local address of data in buffer, 
711          *      send local addresses to target, 
712          *      wait for the target node to suck the data over.
713          *      The stxd is used to ren
714          */
715         stxd = gmnal_get_stxd(nal_data, 1);
716         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
717
718         stxd->type = GMNAL_LARGE_MESSAGE_INIT;
719         stxd->cookie = cookie;
720
721         /*
722          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
723          *      Then copy the iov in
724          */
725         buffer = stxd->buffer;
726         msghdr = (gmnal_msghdr_t*)buffer;
727
728         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
729
730         msghdr->magic = GMNAL_MAGIC;
731         msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
732         msghdr->sender_node_id = nal_data->gm_global_nid;
733         msghdr->stxd = stxd;
734         msghdr->niov = niov ;
735         buffer += sizeof(gmnal_msghdr_t);
736         mlen = sizeof(gmnal_msghdr_t);
737         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
738
739
740         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
741
742         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
743         buffer += sizeof(ptl_hdr_t);
744         mlen += sizeof(ptl_hdr_t); 
745         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
746
747         while (offset >= iov->iov_len) {
748                 offset -= iov->iov_len;
749                 niov--;
750                 iov++;
751         } 
752
753         LASSERT(offset >= 0);
754         /*
755          *      Store the iovs in the stxd for we can get 
756          *      them later if we need them
757          */
758         stxd->iov[0].iov_base = iov->iov_base + offset; 
759         stxd->iov[0].iov_len = iov->iov_len - offset; 
760         CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
761         if (niov > 1)
762                 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
763         stxd->niov = niov;
764
765         /*
766          *      copy the iov to the buffer so target knows 
767          *      where to get the data from
768          */
769         CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
770         gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
771         mlen += stxd->niov*(sizeof(struct iovec));
772         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
773         
774         /*
775          *      register the memory so the NIC can get hold of the data
776          *      This is a slow process. it'd be good to overlap it 
777          *      with something else.
778          */
779         iov = stxd->iov;
780         iov_dup = iov;
781         niov_dup = niov;
782         while(niov--) {
783                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
784                        iov->iov_base, iov->iov_len);
785                 GMNAL_GM_LOCK(nal_data);
786                 gm_status = gm_register_memory(nal_data->gm_port, 
787                                                iov->iov_base, iov->iov_len);
788                 if (gm_status != GM_SUCCESS) {
789                         GMNAL_GM_UNLOCK(nal_data);
790                         CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
791                                "for memory [%p] len ["LPSZ"]\n", 
792                                gm_status, gmnal_gm_error(gm_status), 
793                                iov->iov_base, iov->iov_len);
794                         GMNAL_GM_LOCK(nal_data);
795                         while (iov_dup != iov) {
796                                 gm_deregister_memory(nal_data->gm_port, 
797                                                      iov_dup->iov_base, 
798                                                      iov_dup->iov_len);
799                                 iov_dup++;
800                         }
801                         GMNAL_GM_UNLOCK(nal_data);
802                         gmnal_return_stxd(nal_data, stxd);
803                         return(PTL_FAIL);
804                 }
805
806                 GMNAL_GM_UNLOCK(nal_data);
807                 iov++;
808         }
809
810         /*
811          *      Send the init message to the target
812          */
813         CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
814         GMNAL_GM_LOCK(nal_data);
815         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
816                                             &local_nid);
817         if (gm_status != GM_SUCCESS) {
818                 GMNAL_GM_UNLOCK(nal_data);
819                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
820                 gmnal_return_stxd(nal_data, stxd);
821                 /* TO DO deregister memory on failure */
822                 return(GMNAL_STATUS_FAIL);
823         }
824         CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
825         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
826                                       stxd->gm_size, mlen, GM_LOW_PRIORITY, 
827                                       local_nid, gmnal_large_tx_callback, 
828                                       (void*)stxd);
829         GMNAL_GM_UNLOCK(nal_data);
830         
831         CDEBUG(D_INFO, "done\n");
832                 
833         return(PTL_OK);
834 }
835
836 /*
837  *      Callback function indicates that send of buffer with 
838  *      large message iovec has completed (or failed).
839  */
840 void 
841 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
842 {
843         gmnal_small_tx_callback(gm_port, context, status);
844
845 }
846
847
848
849 /*
850  *      Have received a buffer that contains an iovec of the sender. 
851  *      Do a gm_register_memory of the receivers buffer and then do a get
852  *      data from the sender.
853  */
854 int
855 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
856                 unsigned int nriov, struct iovec *riov, size_t offset, 
857                 size_t mlen, size_t rlen)
858 {
859         gmnal_data_t    *nal_data = libnal->libnal_data;
860         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
861         void            *buffer = NULL;
862         struct  iovec   *riov_dup;
863         int             nriov_dup;
864         gmnal_msghdr_t  *msghdr = NULL;
865         gm_status_t     gm_status;
866
867         CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
868                "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
869                 libnal, private, cookie, nriov, riov, mlen, rlen);
870
871         if (!srxd) {
872                 CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
873                 lib_finalize(libnal, private, cookie, PTL_FAIL);
874                 return(PTL_FAIL);
875         }
876
877         buffer = srxd->buffer;
878         msghdr = (gmnal_msghdr_t*)buffer;
879         buffer += sizeof(gmnal_msghdr_t);
880         buffer += sizeof(ptl_hdr_t);
881
882         /*
883          *      Store the senders stxd address in the srxd for this message
884          *      The gmnal_large_message_ack needs it to notify the sender
885          *      the pull of data is complete
886          */
887         srxd->source_stxd = msghdr->stxd;
888
889         /*
890          *      Register the receivers memory
891          *      get the data,
892          *      tell the sender that we got the data
893          *      then tell the receiver we got the data
894          *      TO DO
895          *      If the iovecs match, could interleave 
896          *      gm_registers and gm_gets for each element
897          */
898         while (offset >= riov->iov_len) {
899                 offset -= riov->iov_len;
900                 riov++;
901                 nriov--;
902         } 
903         LASSERT (nriov >= 0);
904         LASSERT (offset >= 0);
905         /*
906          *      do this so the final gm_get callback can deregister the memory
907          */
908         PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
909
910         srxd->riov[0].iov_base = riov->iov_base + offset;
911         srxd->riov[0].iov_len = riov->iov_len - offset;
912         if (nriov > 1)
913                 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
914         srxd->nriov = nriov;
915         
916         riov = srxd->riov;
917         nriov_dup = nriov;
918         riov_dup = riov;
919         while(nriov--) {
920                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
921                        riov->iov_base, riov->iov_len);
922                 GMNAL_GM_LOCK(nal_data);
923                 gm_status = gm_register_memory(nal_data->gm_port, 
924                                                riov->iov_base, riov->iov_len);
925                 if (gm_status != GM_SUCCESS) {
926                         GMNAL_GM_UNLOCK(nal_data);
927                         CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
928                                "for memory [%p] len ["LPSZ"]\n", 
929                                gm_status, gmnal_gm_error(gm_status), 
930                                riov->iov_base, riov->iov_len);
931                         GMNAL_GM_LOCK(nal_data);
932                         while (riov_dup != riov) {
933                                 gm_deregister_memory(nal_data->gm_port, 
934                                                      riov_dup->iov_base, 
935                                                      riov_dup->iov_len);
936                                 riov_dup++;
937                         }
938                         GMNAL_GM_LOCK(nal_data);
939                         /*
940                          *      give back srxd and buffer. Send NACK to sender
941                          */
942                         PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
943                         return(PTL_FAIL);
944                 }
945                 GMNAL_GM_UNLOCK(nal_data);
946                 riov++;
947         }
948
949         /*
950          *      now do gm_get to get the data
951          */
952         srxd->cookie = cookie;
953         if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, 
954                               nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
955                 CDEBUG(D_ERROR, "can't get the data");
956         }
957
958         CDEBUG(D_INFO, "lgmanl_large_rx done\n");
959
960         return(PTL_OK);
961 }
962
963
964 /*
965  *      Perform a number of remote gets as part of receiving 
966  *      a large message.
967  *      The final one to complete (i.e. the last callback to get called)
968  *      tidies up.
969  *      gm_get requires a send token.
970  */
971 int
972 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, 
973                   int nriov, struct iovec *riov)
974 {
975
976         int     ncalls = 0;
977
978         CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
979                "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
980
981
982         ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
983         if (ncalls < 0) {
984                 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
985                 return(GMNAL_STATUS_FAIL);
986         }
987         CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
988         spin_lock_init(&srxd->callback_lock);
989         srxd->ncallbacks = ncalls;
990         srxd->callback_status = 0;
991
992         ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
993         if (ncalls < 0) {
994                 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
995                 return(GMNAL_STATUS_FAIL);
996         }
997
998         return(GMNAL_STATUS_OK);
999
1000 }
1001
1002
1003 /*
1004  *      pull data from source node (source iovec) to a local iovec.
1005  *      The iovecs may not match which adds the complications below.
1006  *      Count the number of gm_gets that will be required to the callbacks
1007  *      can determine who is the last one.
1008  */     
1009 int
1010 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, 
1011                struct iovec *siov, int nriov, struct iovec *riov)
1012 {
1013
1014         int     ncalls = 0;
1015         int     slen = siov->iov_len, rlen = riov->iov_len;
1016         char    *sbuf = siov->iov_base, *rbuf = riov->iov_base; 
1017         unsigned long   sbuf_long;
1018         gm_remote_ptr_t remote_ptr = 0;
1019         unsigned int    source_node;
1020         gmnal_ltxd_t    *ltxd = NULL;
1021         gmnal_data_t    *nal_data = srxd->nal_data;
1022
1023         CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
1024         if (do_copy) {
1025                 if (!nal_data) {
1026                         CDEBUG(D_ERROR, "Bad args No nal_data\n");
1027                         return(GMNAL_STATUS_FAIL);
1028                 }
1029                 GMNAL_GM_LOCK(nal_data);
1030                 if (gm_global_id_to_node_id(nal_data->gm_port, 
1031                                             srxd->gm_source_node, 
1032                                             &source_node) != GM_SUCCESS) {
1033
1034                         CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
1035                                "to local node_id\n", srxd->gm_source_node);
1036                         GMNAL_GM_UNLOCK(nal_data);
1037                         return(GMNAL_STATUS_FAIL);
1038                 }
1039                 GMNAL_GM_UNLOCK(nal_data);
1040                 /*
1041                  *      We need a send token to use gm_get
1042                  *      getting an stxd gets us a send token.
1043                  *      the stxd is used as the context to the
1044                  *      callback function (so stxd can be returned).
1045                  *      Set pointer in stxd to srxd so callback count in srxd
1046                  *      can be decremented to find last callback to complete
1047                  */
1048                 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", 
1049                        srxd->gm_source_node, source_node);
1050         }
1051
1052         do {
1053                 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1054                                 sbuf, slen, rbuf, rlen);
1055                 if (slen > rlen) {
1056                         ncalls++;
1057                         if (do_copy) {
1058                                 CDEBUG(D_INFO, "slen>rlen\n");
1059                                 ltxd = gmnal_get_ltxd(nal_data);
1060                                 ltxd->srxd = srxd;
1061                                 GMNAL_GM_LOCK(nal_data);
1062                                 /* 
1063                                  *      funny business to get rid 
1064                                  *      of compiler warning 
1065                                  */
1066                                 sbuf_long = (unsigned long) sbuf;
1067                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1068                                 gm_get(nal_data->gm_port, remote_ptr, rbuf, 
1069                                        rlen, GM_LOW_PRIORITY, source_node, 
1070                                        GMNAL_GM_PORT, 
1071                                        gmnal_remote_get_callback, ltxd);
1072                                 GMNAL_GM_UNLOCK(nal_data);
1073                         }
1074                         /*
1075                          *      at the end of 1 iov element
1076                          */
1077                         sbuf+=rlen;
1078                         slen-=rlen;
1079                         riov++;
1080                         nriov--;
1081                         rbuf = riov->iov_base;
1082                         rlen = riov->iov_len;
1083                 } else if (rlen > slen) {
1084                         ncalls++;
1085                         if (do_copy) {
1086                                 CDEBUG(D_INFO, "slen<rlen\n");
1087                                 ltxd = gmnal_get_ltxd(nal_data);
1088                                 ltxd->srxd = srxd;
1089                                 GMNAL_GM_LOCK(nal_data);
1090                                 sbuf_long = (unsigned long) sbuf;
1091                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1092                                 gm_get(nal_data->gm_port, remote_ptr, rbuf, 
1093                                        slen, GM_LOW_PRIORITY, source_node, 
1094                                        GMNAL_GM_PORT, 
1095                                        gmnal_remote_get_callback, ltxd);
1096                                 GMNAL_GM_UNLOCK(nal_data);
1097                         }
1098                         /*
1099                          *      at end of siov element
1100                          */
1101                         rbuf+=slen;
1102                         rlen-=slen;
1103                         siov++;
1104                         sbuf = siov->iov_base;
1105                         slen = siov->iov_len;
1106                 } else {
1107                         ncalls++;
1108                         if (do_copy) {
1109                                 CDEBUG(D_INFO, "rlen=slen\n");
1110                                 ltxd = gmnal_get_ltxd(nal_data);
1111                                 ltxd->srxd = srxd;
1112                                 GMNAL_GM_LOCK(nal_data);
1113                                 sbuf_long = (unsigned long) sbuf;
1114                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1115                                 gm_get(nal_data->gm_port, remote_ptr, rbuf, 
1116                                        rlen, GM_LOW_PRIORITY, source_node, 
1117                                        GMNAL_GM_PORT, 
1118                                        gmnal_remote_get_callback, ltxd);
1119                                 GMNAL_GM_UNLOCK(nal_data);
1120                         }
1121                         /*
1122                          *      at end of siov and riov element
1123                          */
1124                         siov++;
1125                         sbuf = siov->iov_base;
1126                         slen = siov->iov_len;
1127                         riov++;
1128                         nriov--;
1129                         rbuf = riov->iov_base;
1130                         rlen = riov->iov_len;
1131                 }
1132
1133         } while (nriov);
1134         return(ncalls);
1135 }
1136
1137
1138 /*
1139  *      The callback function that is invoked after each gm_get call completes.
1140  *      Multiple callbacks may be invoked for 1 transaction, only the final
1141  *      callback has work to do.
1142  */
1143 void
1144 gmnal_remote_get_callback(gm_port_t *gm_port, void *context, 
1145                            gm_status_t status)
1146 {
1147
1148         gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
1149         gmnal_srxd_t    *srxd = ltxd->srxd;
1150         lib_nal_t       *libnal = srxd->nal_data->libnal;
1151         int             lastone;
1152         struct  iovec   *riov;
1153         int             nriov;
1154         gmnal_data_t    *nal_data;
1155
1156         CDEBUG(D_TRACE, "called for context [%p]\n", context);
1157
1158         if (status != GM_SUCCESS) {
1159                 CDEBUG(D_ERROR, "reports error [%d][%s]\n", status, 
1160                        gmnal_gm_error(status));
1161         }
1162
1163         spin_lock(&srxd->callback_lock);
1164         srxd->ncallbacks--;
1165         srxd->callback_status |= status;
1166         lastone = srxd->ncallbacks?0:1;
1167         spin_unlock(&srxd->callback_lock);
1168         nal_data = srxd->nal_data;
1169
1170         /*
1171          *      everyone returns a send token
1172          */
1173         gmnal_return_ltxd(nal_data, ltxd);
1174
1175         if (!lastone) {
1176                 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1177                 return;
1178         }
1179         
1180         /*
1181          *      Let our client application proceed
1182          */     
1183         CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
1184         lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1185
1186         /*
1187          *      send an ack to the sender to let him know we got the data
1188          */
1189         gmnal_large_tx_ack(nal_data, srxd);
1190
1191         /*
1192          *      Unregister the memory that was used
1193          *      This is a very slow business (slower then register)
1194          */
1195         nriov = srxd->nriov;
1196         riov = srxd->riov;
1197         GMNAL_GM_LOCK(nal_data);
1198         while (nriov--) {
1199                 CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
1200                 if (gm_deregister_memory(srxd->nal_data->gm_port, 
1201                                          riov->iov_base, riov->iov_len)) {
1202                         CDEBUG(D_ERROR, "failed to deregister memory [%p]\n", 
1203                                riov->iov_base);
1204                 }
1205                 riov++;
1206         }
1207         GMNAL_GM_UNLOCK(nal_data);
1208         PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1209
1210         /*
1211          *      repost the receive buffer (return receive token)
1212          */
1213         GMNAL_GM_LOCK(nal_data);
1214         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
1215                                            srxd->gmsize, GM_LOW_PRIORITY, 0);   
1216         GMNAL_GM_UNLOCK(nal_data);
1217         
1218         return;
1219 }
1220
1221
1222 /*
1223  *      Called on target node.
1224  *      After pulling data from a source node
1225  *      send an ack message to indicate the large transmit is complete.
1226  */
1227 void 
1228 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1229 {
1230
1231         gmnal_stxd_t    *stxd;
1232         gmnal_msghdr_t *msghdr;
1233         void            *buffer = NULL;
1234         unsigned int    local_nid;
1235         gm_status_t     gm_status = GM_SUCCESS;
1236
1237         CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, 
1238                srxd->gm_source_node);
1239
1240         GMNAL_GM_LOCK(nal_data);
1241         gm_status = gm_global_id_to_node_id(nal_data->gm_port, 
1242                                             srxd->gm_source_node, &local_nid);
1243         GMNAL_GM_UNLOCK(nal_data);
1244         if (gm_status != GM_SUCCESS) {
1245                 CDEBUG(D_ERROR, "Failed to obtain local id\n");
1246                 return;
1247         }
1248         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1249
1250         stxd = gmnal_get_stxd(nal_data, 1);
1251         CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1252
1253         stxd->nal_data = nal_data;
1254         stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1255
1256         /*
1257          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
1258          *      Then copy the data in
1259          */
1260         buffer = stxd->buffer;
1261         msghdr = (gmnal_msghdr_t*)buffer;
1262
1263         /*
1264          *      Add in the address of the original stxd from the sender node
1265          *      so it knows which thread to notify.
1266          */
1267         msghdr->magic = GMNAL_MAGIC;
1268         msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1269         msghdr->sender_node_id = nal_data->gm_global_nid;
1270         msghdr->stxd = srxd->source_stxd;
1271         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1272
1273         CDEBUG(D_INFO, "sending\n");
1274         stxd->msg_size= sizeof(gmnal_msghdr_t);
1275
1276
1277         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1278                "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1279                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, 
1280                stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1281         GMNAL_GM_LOCK(nal_data);
1282         stxd->gm_priority = GM_LOW_PRIORITY;
1283         stxd->gm_target_node = local_nid;
1284         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
1285                                       stxd->gm_size, stxd->msg_size, 
1286                                       GM_LOW_PRIORITY, local_nid, 
1287                                       gmnal_large_tx_ack_callback, 
1288                                       (void*)stxd);
1289         
1290         GMNAL_GM_UNLOCK(nal_data);
1291         CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1292                 
1293         return;
1294 }
1295
1296
1297 /*
1298  *      A callback to indicate the small transmit operation is compete
1299  *      Check for errors and try to deal with them.
1300  *      Call lib_finalise to inform the client application that the 
1301  *      send is complete and the memory can be reused.
1302  *      Return the stxd when finished with it (returns a send token)
1303  */
1304 void 
1305 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, 
1306                              gm_status_t status)
1307 {
1308         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
1309         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
1310
1311         if (!stxd) {
1312                 CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
1313                 return;
1314         }
1315         CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1316                stxd, status);
1317         gmnal_return_stxd(stxd->nal_data, stxd);
1318
1319         GMNAL_GM_UNLOCK(nal_data);
1320         return;
1321 }
1322
1323 /*
1324  *      Indicates the large transmit operation is compete.
1325  *      Called on transmit side (means data has been pulled  by receiver 
1326  *      or failed).
1327  *      Call lib_finalise to inform the client application that the send 
1328  *      is complete, deregister the memory and return the stxd. 
1329  *      Finally, report the rx buffer that the ack message was delivered in.
1330  */
1331 void 
1332 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1333 {
1334         lib_nal_t       *libnal = nal_data->libnal;
1335         gmnal_stxd_t    *stxd = NULL;
1336         gmnal_msghdr_t  *msghdr = NULL;
1337         void            *buffer = NULL;
1338         struct  iovec   *iov;
1339
1340
1341         CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1342
1343         buffer = srxd->buffer;
1344         msghdr = (gmnal_msghdr_t*)buffer;
1345         stxd = msghdr->stxd;
1346
1347         CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1348
1349         lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1350
1351         /*
1352          *      extract the iovec from the stxd, deregister the memory.
1353          *      free the space used to store the iovec
1354          */
1355         iov = stxd->iov;
1356         while(stxd->niov--) {
1357                 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1358                        iov->iov_base, iov->iov_len);
1359                 GMNAL_GM_LOCK(nal_data);
1360                 gm_deregister_memory(nal_data->gm_port, iov->iov_base, 
1361                                      iov->iov_len);
1362                 GMNAL_GM_UNLOCK(nal_data);
1363                 iov++;
1364         }
1365
1366         /*
1367          *      return the send token
1368          *      TO DO It is bad to hold onto the send token so long?
1369          */
1370         gmnal_return_stxd(nal_data, stxd);
1371
1372
1373         /*
1374          *      requeue the receive buffer 
1375          */
1376         gmnal_rx_requeue_buffer(nal_data, srxd);
1377         
1378
1379         return;
1380 }