Whamcloud - gitweb
Land b_release_1_4_3 onto HEAD (20050619_0305)
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmnal.h"
27
28 /*
29  *      The caretaker thread
30  *      This is main thread of execution for the NAL side
31  *      This guy waits in gm_blocking_recvive and gets
32  *      woken up when the myrinet adaptor gets an interrupt.
33  *      Hands off receive operations to the receive thread 
34  *      This thread Looks after gm_callbacks etc inline.
35  */
36 int
37 gmnal_ct_thread(void *arg)
38 {
39         gmnal_data_t            *nal_data;
40         gm_recv_event_t         *rxevent = NULL;
41         gm_recv_t               *recv = NULL;
42
43         if (!arg) {
44                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
45                 return(-1);
46         }
47
48         nal_data = (gmnal_data_t*)arg;
49         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
50
51         sprintf(current->comm, "gmnal_ct");
52
53         daemonize();
54
55         nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
56
57         GMNAL_GM_LOCK(nal_data);
58         while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59                 CDEBUG(D_NET, "waiting\n");
60                 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
61                 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
62                         CDEBUG(D_INFO, "time to exit\n");
63                         break;
64                 }
65                 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
66                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
67
68                         case(GM_RECV_EVENT):
69                                 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70                                 recv = (gm_recv_t*)&rxevent->recv;
71                                 GMNAL_GM_UNLOCK(nal_data);
72                                 gmnal_add_rxtwe(nal_data, recv);
73                                 GMNAL_GM_LOCK(nal_data);
74                                 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
75                         break;
76                         case(_GM_SLEEP_EVENT):
77                                 /*
78                                  *      Blocking receive above just returns
79                                  *      immediatly with _GM_SLEEP_EVENT
80                                  *      Don't know what this is
81                                  */
82                                 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83                                 GMNAL_GM_UNLOCK(nal_data);
84                                 gm_unknown(nal_data->gm_port, rxevent);
85                                 GMNAL_GM_LOCK(nal_data);
86                                 CDEBUG(D_INFO, "Awake from gm_unknown\n");
87                                 break;
88                                 
89                         default:
90                                 /*
91                                  *      Don't know what this is
92                                  *      gm_unknown will make sense of it
93                                  *      Should be able to do something with
94                                  *      FAST_RECV_EVENTS here.
95                                  */
96                                 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97                                 GMNAL_GM_UNLOCK(nal_data);
98                                 gm_unknown(nal_data->gm_port, rxevent);
99                                 GMNAL_GM_LOCK(nal_data);
100                                 CDEBUG(D_INFO, "Processed unknown event\n");
101                 }
102         }
103         GMNAL_GM_UNLOCK(nal_data);
104         nal_data->ctthread_flag = GMNAL_THREAD_RESET;
105         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
106         return(GMNAL_STATUS_OK);
107 }
108
109
110 /*
111  *      process a receive event
112  */
113 int gmnal_rx_thread(void *arg)
114 {
115         gmnal_data_t            *nal_data;
116         void                    *buffer;
117         gmnal_rxtwe_t           *we = NULL;
118         int                     rank;
119
120         if (!arg) {
121                 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
122                 return(-1);
123         }
124
125         nal_data = (gmnal_data_t*)arg;
126         CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
127
128         for (rank=0; rank<num_rx_threads; rank++)
129                 if (nal_data->rxthread_pid[rank] == current->pid)
130                         break;
131
132         sprintf(current->comm, "gmnal_rx_%d", rank);
133
134         daemonize();
135         /*
136          *      set 1 bit for each thread started
137          *      doesn't matter which bit
138          */
139         spin_lock(&nal_data->rxthread_flag_lock);
140         if (nal_data->rxthread_flag)
141                 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
142         else
143                 nal_data->rxthread_flag = 1;
144         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
145         spin_unlock(&nal_data->rxthread_flag_lock);
146
147         while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
148                 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
149                 we = gmnal_get_rxtwe(nal_data);
150                 if (!we) {
151                         CDEBUG(D_INFO, "Receive thread time to exit\n");
152                         break;
153                 }
154
155                 buffer = we->buffer;
156                 switch(((gmnal_msghdr_t*)buffer)->type) {
157                 case(GMNAL_SMALL_MESSAGE):
158                         gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE);
159                 break;
160                 case(GMNAL_LARGE_MESSAGE_INIT):
161                         gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT);
162                 break;
163                 case(GMNAL_LARGE_MESSAGE_ACK):
164                         gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK);
165                 break;
166                 default:
167                         CERROR("Unsupported message type\n");
168                         gmnal_rx_bad(nal_data, we, NULL);
169                 }
170                 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
171         }
172
173         spin_lock(&nal_data->rxthread_flag_lock);
174         nal_data->rxthread_flag/=2;
175         CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
176         spin_unlock(&nal_data->rxthread_flag_lock);
177         CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
178         return(GMNAL_STATUS_OK);
179 }
180
181
182
183 /*
184  *      Start processing a small message receive
185  *      Get here from gmnal_receive_thread
186  *      Hand off to lib_parse, which calls cb_recv
187  *      which hands back to gmnal_small_receive
188  *      Deal with all endian stuff here.
189  */
190 int
191 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
192 {
193         gmnal_srxd_t    *srxd = NULL;
194         void            *buffer = NULL;
195         unsigned int snode, sport, type, length;
196         gmnal_msghdr_t  *gmnal_msghdr;
197         ptl_hdr_t       *portals_hdr;
198         int              rc;
199
200         CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
201                nal_data, we, gmnal_type);
202
203         buffer = we->buffer;
204         snode = we->snode;
205         sport = we->sport;
206         type = we->type;
207         buffer = we->buffer;
208         length = we->length;
209
210         gmnal_msghdr = (gmnal_msghdr_t*)buffer;
211         portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
212
213         CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
214                "type [%d], length [%d], buffer [%p]\n",
215                snode, sport, type, length, buffer);
216         CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
217                "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
218                gmnal_msghdr->magic, gmnal_msghdr->type);
219         CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
220                "dest_node ["LPD64"]\n", portals_hdr->src_nid,
221                portals_hdr->dest_nid);
222
223         /*
224          *      Get a receive descriptor for this message
225          */
226         srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
227         CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
228         if (!srxd) {
229                 CERROR("Failed to get receive descriptor\n");
230                 /* I think passing a NULL srxd to lib_parse will crash
231                  * gmnal_recv() */
232                 LBUG();
233                 lib_parse(nal_data->libnal, portals_hdr, srxd);
234                 return(GMNAL_STATUS_FAIL);
235         }
236
237         /*
238          *      no need to bother portals library with this
239          */
240         if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
241                 gmnal_large_tx_ack_received(nal_data, srxd);
242                 return(GMNAL_STATUS_OK);
243         }
244
245         srxd->nal_data = nal_data;
246         srxd->type = gmnal_type;
247         srxd->nsiov = gmnal_msghdr->niov;
248         srxd->gm_source_node = gmnal_msghdr->sender_node_id;
249
250         CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
251                buffer+GMNAL_MSGHDR_SIZE);
252         /*
253          *      control passes to lib, which calls cb_recv 
254          *      cb_recv is responsible for returning the buffer 
255          *      for future receive
256          */
257         rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
258
259         if (rc != PTL_OK) {
260                 /* I just received garbage; take appropriate action... */
261                 LBUG();
262         }
263
264         return(GMNAL_STATUS_OK);
265 }
266
267
268
269 /*
270  *      After a receive has been processed, 
271  *      hang out the receive buffer again.
272  *      This implicitly returns a receive token.
273  */
274 int
275 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
276 {
277         CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
278
279         CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
280
281         GMNAL_GM_LOCK(nal_data);
282         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
283                                         srxd->gmsize, GM_LOW_PRIORITY, 0 );
284         GMNAL_GM_UNLOCK(nal_data);
285
286         return(GMNAL_STATUS_OK);
287 }
288
289
290 /*
291  *      Handle a bad message
292  *      A bad message is one we don't expect or can't interpret
293  */
294 int
295 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
296 {
297         CDEBUG(D_TRACE, "Can't handle message\n");
298
299         if (!srxd)
300                 srxd = gmnal_rxbuffer_to_srxd(nal_data, 
301                                                we->buffer);
302         if (srxd) {
303                 gmnal_rx_requeue_buffer(nal_data, srxd);
304         } else {
305                 CERROR("Can't find a descriptor for this buffer\n");
306                 /*
307                  *      get rid of it ?
308                  */
309                 return(GMNAL_STATUS_FAIL);
310         }
311
312         return(GMNAL_STATUS_OK);
313 }
314
315
316
317 /*
318  *      Process a small message receive.
319  *      Get here from gmnal_receive_thread, gmnal_pre_receive
320  *      lib_parse, cb_recv
321  *      Put data from prewired receive buffer into users buffer(s)
322  *      Hang out the receive buffer again for another receive
323  *      Call lib_finalize
324  */
325 ptl_err_t
326 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
327 {
328         gmnal_srxd_t    *srxd = NULL;
329         gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
330
331
332         if (!private) {
333                 CERROR("gmnal_small_rx no context\n");
334                 lib_finalize(libnal, private, cookie, PTL_FAIL);
335                 return(PTL_FAIL);
336         }
337
338         srxd = (gmnal_srxd_t*)private;
339
340         /*
341          *      let portals library know receive is complete
342          */
343         CDEBUG(D_PORTALS, "calling lib_finalize\n");
344         lib_finalize(libnal, private, cookie, PTL_OK);
345         /*
346          *      return buffer so it can be used again
347          */
348         CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
349         GMNAL_GM_LOCK(nal_data);
350         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
351                                            srxd->gmsize, GM_LOW_PRIORITY, 0);
352         GMNAL_GM_UNLOCK(nal_data);
353
354         return(PTL_OK);
355 }
356
357
358 /*
359  *      Start a small transmit. 
360  *      Use the given send token (and wired transmit buffer).
361  *      Copy headers to wired buffer and initiate gm_send from the wired buffer.
362  *      The callback function informs when the send is complete.
363  */
364 ptl_err_t
365 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
366                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
367                 gmnal_stxd_t *stxd, int size)
368 {
369         gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
370         void            *buffer = NULL;
371         gmnal_msghdr_t  *msghdr = NULL;
372         int             tot_size = 0;
373         unsigned int    local_nid;
374         gm_status_t     gm_status = GM_SUCCESS;
375
376         CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
377                "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
378                "size [%d]\n", libnal, private, cookie, hdr, type,
379                global_nid, pid, stxd, size);
380
381         CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
382                hdr->dest_nid, hdr->src_nid);
383
384         if (!nal_data) {
385                 CERROR("no nal_data\n");
386                 return(PTL_FAIL);
387         } else {
388                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
389         }
390
391         GMNAL_GM_LOCK(nal_data);
392         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
393                                             &local_nid);
394         GMNAL_GM_UNLOCK(nal_data);
395         if (gm_status != GM_SUCCESS) {
396                 CERROR("Failed to obtain local id\n");
397                 return(PTL_FAIL);
398         }
399         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
400
401         stxd->type = GMNAL_SMALL_MESSAGE;
402         stxd->cookie = cookie;
403
404         /*
405          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
406          *      Then send the message, as the data has previously been copied in
407          *      (HP SFS 1380).
408          */
409         buffer = stxd->buffer;
410         msghdr = (gmnal_msghdr_t*)buffer;
411
412         msghdr->magic = GMNAL_MAGIC;
413         msghdr->type = GMNAL_SMALL_MESSAGE;
414         msghdr->sender_node_id = nal_data->gm_global_nid;
415         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
416
417         buffer += sizeof(gmnal_msghdr_t);
418
419         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
420         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
421
422         buffer += sizeof(ptl_hdr_t);
423
424         CDEBUG(D_INFO, "sending\n");
425         tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
426         stxd->msg_size = tot_size;
427
428
429         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
430                "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
431                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
432                stxd->msg_size, global_nid, local_nid, stxd);
433
434         GMNAL_GM_LOCK(nal_data);
435         stxd->gm_priority = GM_LOW_PRIORITY;
436         stxd->gm_target_node = local_nid;
437         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
438                                       stxd->gm_size, stxd->msg_size,
439                                       GM_LOW_PRIORITY, local_nid,
440                                       gmnal_small_tx_callback, (void*)stxd);
441         GMNAL_GM_UNLOCK(nal_data);
442         CDEBUG(D_INFO, "done\n");
443
444         return(PTL_OK);
445 }
446
447
448 /*
449  *      A callback to indicate the small transmit operation is compete
450  *      Check for erros and try to deal with them.
451  *      Call lib_finalise to inform the client application that the send 
452  *      is complete and the memory can be reused.
453  *      Return the stxd when finished with it (returns a send token)
454  */
455 void 
456 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
457 {
458         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
459         lib_msg_t       *cookie = stxd->cookie;
460         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
461         lib_nal_t       *libnal = nal_data->libnal;
462         unsigned         gnid = 0;
463         gm_status_t      gm_status = 0;
464
465         if (!stxd) {
466                 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
467                 return;
468         }
469         if (status != GM_SUCCESS) {
470                 GMNAL_GM_LOCK(nal_data);
471                 gm_status = gm_node_id_to_global_id(nal_data->gm_port,
472                                                     stxd->gm_target_node,&gnid);
473                 GMNAL_GM_UNLOCK(nal_data);
474                 if (gm_status != GM_SUCCESS) {
475                         CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
476                                gm_status);
477                         gnid = 0;
478                 }
479                 CERROR("Result of send stxd [%p] is [%s] to [%u]\n",
480                        stxd, gmnal_gm_error(status), gnid);
481         }
482
483         switch(status) {
484                 case(GM_SUCCESS):
485                 break;
486
487
488
489                 case(GM_SEND_DROPPED):
490                 /*
491                  *      do a resend on the dropped ones
492                  */
493                         CERROR("send stxd [%p] dropped, resending\n", context);
494                         GMNAL_GM_LOCK(nal_data);
495                         gm_send_to_peer_with_callback(nal_data->gm_port,
496                                                       stxd->buffer,
497                                                       stxd->gm_size,
498                                                       stxd->msg_size,
499                                                       stxd->gm_priority,
500                                                       stxd->gm_target_node,
501                                                       gmnal_small_tx_callback,
502                                                       context);
503                         GMNAL_GM_UNLOCK(nal_data);
504                 return;
505                 case(GM_TIMED_OUT):
506                 case(GM_SEND_TIMED_OUT):
507                 /*
508                  *      drop these ones
509                  */
510                         CDEBUG(D_INFO, "calling gm_drop_sends\n");
511                         GMNAL_GM_LOCK(nal_data);
512                         gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
513                                       stxd->gm_target_node, GMNAL_GM_PORT_ID, 
514                                       gmnal_drop_sends_callback, context);
515                         GMNAL_GM_UNLOCK(nal_data);
516
517                 return;
518
519
520                 /*
521                  *      abort on these ?
522                  */
523                 case(GM_TRY_AGAIN):
524                 case(GM_INTERRUPTED):
525                 case(GM_FAILURE):
526                 case(GM_INPUT_BUFFER_TOO_SMALL):
527                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
528                 case(GM_BUSY):
529                 case(GM_MEMORY_FAULT):
530                 case(GM_INVALID_PARAMETER):
531                 case(GM_OUT_OF_MEMORY):
532                 case(GM_INVALID_COMMAND):
533                 case(GM_PERMISSION_DENIED):
534                 case(GM_INTERNAL_ERROR):
535                 case(GM_UNATTACHED):
536                 case(GM_UNSUPPORTED_DEVICE):
537                 case(GM_SEND_REJECTED):
538                 case(GM_SEND_TARGET_PORT_CLOSED):
539                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
540                 case(GM_SEND_PORT_CLOSED):
541                 case(GM_NODE_ID_NOT_YET_SET):
542                 case(GM_STILL_SHUTTING_DOWN):
543                 case(GM_CLONE_BUSY):
544                 case(GM_NO_SUCH_DEVICE):
545                 case(GM_ABORTED):
546                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
547                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
548                 case(GM_ACCESS_DENIED):
549                 case(GM_NO_DRIVER_SUPPORT):
550                 case(GM_PTE_REF_CNT_OVERFLOW):
551                 case(GM_NOT_SUPPORTED_IN_KERNEL):
552                 case(GM_NOT_SUPPORTED_ON_ARCH):
553                 case(GM_NO_MATCH):
554                 case(GM_USER_ERROR):
555                 case(GM_DATA_CORRUPTED):
556                 case(GM_HARDWARE_FAULT):
557                 case(GM_SEND_ORPHANED):
558                 case(GM_MINOR_OVERFLOW):
559                 case(GM_PAGE_TABLE_FULL):
560                 case(GM_UC_ERROR):
561                 case(GM_INVALID_PORT_NUMBER):
562                 case(GM_DEV_NOT_FOUND):
563                 case(GM_FIRMWARE_NOT_RUNNING):
564                 case(GM_YP_NO_MATCH):
565                 default:
566                 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
567                                       stxd->gm_target_node, GMNAL_GM_PORT_ID,
568                                       gmnal_resume_sending_callback, context);
569                 return;
570
571         }
572
573         /*
574          *      TO DO
575          *      If this is a large message init,
576          *      we're not finished with the data yet,
577          *      so can't call lib_finalise.
578          *      However, we're also holding on to a 
579          *      stxd here (to keep track of the source
580          *      iovec only). Should use another structure
581          *      to keep track of iovec and return stxd to 
582          *      free list earlier.
583          */
584         if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
585                 CDEBUG(D_INFO, "large transmit done\n");
586                 return;
587         }
588         gmnal_return_stxd(nal_data, stxd);
589         lib_finalize(libnal, stxd, cookie, PTL_OK);
590         return;
591 }
592
593 /*
594  *      After an error on the port
595  *      call this to allow future sends to complete
596  */
597 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
598                                  gm_status_t status)
599 {
600         gmnal_data_t    *nal_data;
601         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
602         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
603         gmnal_return_stxd(stxd->nal_data, stxd);
604         return;
605 }
606
607
608 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
609                                 gm_status_t status)
610 {
611         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
612         gmnal_data_t    *nal_data = stxd->nal_data;
613
614         CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
615         if (status == GM_SUCCESS) {
616                 GMNAL_GM_LOCK(nal_data);
617                 gm_send_to_peer_with_callback(gm_port, stxd->buffer, 
618                                               stxd->gm_size, stxd->msg_size, 
619                                               stxd->gm_priority, 
620                                               stxd->gm_target_node, 
621                                               gmnal_small_tx_callback, 
622                                               context);
623                 GMNAL_GM_UNLOCK(nal_data);
624         } else {
625                 CERROR("send_to_peer status for stxd [%p] is "
626                        "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
627         }
628
629
630         return;
631 }
632
633
634 /*
635  *      Begine a large transmit.
636  *      Do a gm_register of the memory pointed to by the iovec 
637  *      and send details to the receiver. The receiver does a gm_get
638  *      to pull the data and sends and ack when finished. Upon receipt of
639  *      this ack, deregister the memory. Only 1 send token is required here.
640  */
641 int
642 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
643                 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
644                 unsigned int niov, struct iovec *iov, size_t offset, int size)
645 {
646
647         gmnal_data_t    *nal_data;
648         gmnal_stxd_t    *stxd = NULL;
649         void            *buffer = NULL;
650         gmnal_msghdr_t  *msghdr = NULL;
651         unsigned int    local_nid;
652         int             mlen = 0;       /* the size of the init message data */
653         struct iovec    *iov_dup = NULL;
654         gm_status_t     gm_status;
655         int             niov_dup;
656
657
658         CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
659                "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
660                "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, 
661                global_nid, pid, niov, iov, size);
662
663         if (libnal)
664                 nal_data = (gmnal_data_t*)libnal->libnal_data;
665         else  {
666                 CERROR("no libnal.\n");
667                 return(GMNAL_STATUS_FAIL);
668         }
669         
670
671         /*
672          *      Get stxd and buffer. Put local address of data in buffer, 
673          *      send local addresses to target, 
674          *      wait for the target node to suck the data over.
675          *      The stxd is used to ren
676          */
677         stxd = gmnal_get_stxd(nal_data, 1);
678         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
679
680         stxd->type = GMNAL_LARGE_MESSAGE_INIT;
681         stxd->cookie = cookie;
682
683         /*
684          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
685          *      Then copy the iov in
686          */
687         buffer = stxd->buffer;
688         msghdr = (gmnal_msghdr_t*)buffer;
689
690         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
691
692         msghdr->magic = GMNAL_MAGIC;
693         msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
694         msghdr->sender_node_id = nal_data->gm_global_nid;
695         msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
696         msghdr->niov = niov ;
697         buffer += sizeof(gmnal_msghdr_t);
698         mlen = sizeof(gmnal_msghdr_t);
699         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
700
701
702         CDEBUG(D_INFO, "processing  portals hdr at [%p]\n", buffer);
703
704         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
705         buffer += sizeof(ptl_hdr_t);
706         mlen += sizeof(ptl_hdr_t); 
707         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
708
709         while (offset >= iov->iov_len) {
710                 offset -= iov->iov_len;
711                 niov--;
712                 iov++;
713         } 
714
715         LASSERT(offset >= 0);
716         /*
717          *      Store the iovs in the stxd for we can get 
718          *      them later if we need them
719          */
720         stxd->iov[0].iov_base = iov->iov_base + offset; 
721         stxd->iov[0].iov_len = iov->iov_len - offset; 
722         CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
723         if (niov > 1)
724                 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
725         stxd->niov = niov;
726
727         /*
728          *      copy the iov to the buffer so target knows 
729          *      where to get the data from
730          */
731         CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
732         gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
733         mlen += stxd->niov*(sizeof(struct iovec));
734         CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
735         
736         /*
737          *      register the memory so the NIC can get hold of the data
738          *      This is a slow process. it'd be good to overlap it 
739          *      with something else.
740          */
741         iov = stxd->iov;
742         iov_dup = iov;
743         niov_dup = niov;
744         while(niov--) {
745                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", 
746                        iov->iov_base, iov->iov_len);
747                 GMNAL_GM_LOCK(nal_data);
748                 gm_status = gm_register_memory(nal_data->gm_port, 
749                                                iov->iov_base, iov->iov_len);
750                 if (gm_status != GM_SUCCESS) {
751                         GMNAL_GM_UNLOCK(nal_data);
752                         CERROR("gm_register_memory returns [%d][%s] "
753                                "for memory [%p] len ["LPSZ"]\n", 
754                                gm_status, gmnal_gm_error(gm_status), 
755                                iov->iov_base, iov->iov_len);
756                         GMNAL_GM_LOCK(nal_data);
757                         while (iov_dup != iov) {
758                                 gm_deregister_memory(nal_data->gm_port, 
759                                                      iov_dup->iov_base, 
760                                                      iov_dup->iov_len);
761                                 iov_dup++;
762                         }
763                         GMNAL_GM_UNLOCK(nal_data);
764                         gmnal_return_stxd(nal_data, stxd);
765                         return(PTL_FAIL);
766                 }
767
768                 GMNAL_GM_UNLOCK(nal_data);
769                 iov++;
770         }
771
772         /*
773          *      Send the init message to the target
774          */
775         CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
776         GMNAL_GM_LOCK(nal_data);
777         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, 
778                                             &local_nid);
779         if (gm_status != GM_SUCCESS) {
780                 GMNAL_GM_UNLOCK(nal_data);
781                 CERROR("Failed to obtain local id\n");
782                 gmnal_return_stxd(nal_data, stxd);
783                 /* TO DO deregister memory on failure */
784                 return(GMNAL_STATUS_FAIL);
785         }
786         CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
787         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, 
788                                       stxd->gm_size, mlen, GM_LOW_PRIORITY, 
789                                       local_nid, gmnal_large_tx_callback, 
790                                       (void*)stxd);
791         GMNAL_GM_UNLOCK(nal_data);
792
793         CDEBUG(D_INFO, "done\n");
794
795         return(PTL_OK);
796 }
797
798 /*
799  *      Callback function indicates that send of buffer with 
800  *      large message iovec has completed (or failed).
801  */
802 void 
803 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
804 {
805         gmnal_small_tx_callback(gm_port, context, status);
806
807 }
808
809
810
811 /*
812  *      Have received a buffer that contains an iovec of the sender. 
813  *      Do a gm_register_memory of the receivers buffer and then do a get
814  *      data from the sender.
815  */
816 int
817 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
818                 unsigned int nriov, struct iovec *riov, size_t offset, 
819                 size_t mlen, size_t rlen)
820 {
821         gmnal_data_t    *nal_data = libnal->libnal_data;
822         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
823         void            *buffer = NULL;
824         struct  iovec   *riov_dup;
825         int             nriov_dup;
826         gmnal_msghdr_t  *msghdr = NULL;
827         gm_status_t     gm_status;
828
829         CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
830                "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
831                 libnal, private, cookie, nriov, riov, mlen, rlen);
832
833         if (!srxd) {
834                 CERROR("gmnal_large_rx no context\n");
835                 lib_finalize(libnal, private, cookie, PTL_FAIL);
836                 return(PTL_FAIL);
837         }
838
839         buffer = srxd->buffer;
840         msghdr = (gmnal_msghdr_t*)buffer;
841         buffer += sizeof(gmnal_msghdr_t);
842         buffer += sizeof(ptl_hdr_t);
843
844         /*
845          *      Store the senders stxd address in the srxd for this message
846          *      The gmnal_large_message_ack needs it to notify the sender
847          *      the pull of data is complete
848          */
849         srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
850
851         /*
852          *      Register the receivers memory
853          *      get the data,
854          *      tell the sender that we got the data
855          *      then tell the receiver we got the data
856          *      TO DO
857          *      If the iovecs match, could interleave 
858          *      gm_registers and gm_gets for each element
859          */
860         while (offset >= riov->iov_len) {
861                 offset -= riov->iov_len;
862                 riov++;
863                 nriov--;
864         } 
865         LASSERT (nriov >= 0);
866         LASSERT (offset >= 0);
867         /*
868          *      do this so the final gm_get callback can deregister the memory
869          */
870         PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
871
872         srxd->riov[0].iov_base = riov->iov_base + offset;
873         srxd->riov[0].iov_len = riov->iov_len - offset;
874         if (nriov > 1)
875                 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
876         srxd->nriov = nriov;
877
878         riov = srxd->riov;
879         nriov_dup = nriov;
880         riov_dup = riov;
881         while(nriov--) {
882                 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
883                        riov->iov_base, riov->iov_len);
884                 GMNAL_GM_LOCK(nal_data);
885                 gm_status = gm_register_memory(nal_data->gm_port,
886                                                riov->iov_base, riov->iov_len);
887                 if (gm_status != GM_SUCCESS) {
888                         GMNAL_GM_UNLOCK(nal_data);
889                         CERROR("gm_register_memory returns [%d][%s] "
890                                "for memory [%p] len ["LPSZ"]\n",
891                                gm_status, gmnal_gm_error(gm_status),
892                                riov->iov_base, riov->iov_len);
893                         GMNAL_GM_LOCK(nal_data);
894                         while (riov_dup != riov) {
895                                 gm_deregister_memory(nal_data->gm_port, 
896                                                      riov_dup->iov_base, 
897                                                      riov_dup->iov_len);
898                                 riov_dup++;
899                         }
900                         GMNAL_GM_LOCK(nal_data);
901                         /*
902                          *      give back srxd and buffer. Send NACK to sender
903                          */
904                         PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
905                         return(PTL_FAIL);
906                 }
907                 GMNAL_GM_UNLOCK(nal_data);
908                 riov++;
909         }
910
911         /*
912          *      now do gm_get to get the data
913          */
914         srxd->cookie = cookie;
915         if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
916                               nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
917                 CERROR("can't get the data");
918         }
919
920         CDEBUG(D_INFO, "lgmanl_large_rx done\n");
921
922         return(PTL_OK);
923 }
924
925
926 /*
927  *      Perform a number of remote gets as part of receiving 
928  *      a large message.
929  *      The final one to complete (i.e. the last callback to get called)
930  *      tidies up.
931  *      gm_get requires a send token.
932  */
933 int
934 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, 
935                   int nriov, struct iovec *riov)
936 {
937
938         int     ncalls = 0;
939
940         CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
941                "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
942
943
944         ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
945         if (ncalls < 0) {
946                 CERROR("there's something wrong with the iovecs\n");
947                 return(GMNAL_STATUS_FAIL);
948         }
949         CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
950         spin_lock_init(&srxd->callback_lock);
951         srxd->ncallbacks = ncalls;
952         srxd->callback_status = 0;
953
954         ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
955         if (ncalls < 0) {
956                 CERROR("there's something wrong with the iovecs\n");
957                 return(GMNAL_STATUS_FAIL);
958         }
959
960         return(GMNAL_STATUS_OK);
961
962 }
963
964
965 /*
966  *      pull data from source node (source iovec) to a local iovec.
967  *      The iovecs may not match which adds the complications below.
968  *      Count the number of gm_gets that will be required so the callbacks
969  *      can determine who is the last one.
970  */     
971 int
972 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, 
973                struct iovec *siov, int nriov, struct iovec *riov)
974 {
975
976         int     ncalls = 0;
977         int     slen = siov->iov_len, rlen = riov->iov_len;
978         char    *sbuf = siov->iov_base, *rbuf = riov->iov_base; 
979         unsigned long   sbuf_long;
980         gm_remote_ptr_t remote_ptr = 0;
981         unsigned int    source_node;
982         gmnal_ltxd_t    *ltxd = NULL;
983         gmnal_data_t    *nal_data = srxd->nal_data;
984
985         CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
986         if (do_copy) {
987                 if (!nal_data) {
988                         CERROR("Bad args No nal_data\n");
989                         return(GMNAL_STATUS_FAIL);
990                 }
991                 GMNAL_GM_LOCK(nal_data);
992                 if (gm_global_id_to_node_id(nal_data->gm_port,
993                                             srxd->gm_source_node,
994                                             &source_node) != GM_SUCCESS) {
995
996                         CERROR("cannot resolve global_id [%u] "
997                                "to local node_id\n", srxd->gm_source_node);
998                         GMNAL_GM_UNLOCK(nal_data);
999                         return(GMNAL_STATUS_FAIL);
1000                 }
1001                 GMNAL_GM_UNLOCK(nal_data);
1002                 /*
1003                  *      We need a send token to use gm_get
1004                  *      getting an stxd gets us a send token.
1005                  *      the stxd is used as the context to the
1006                  *      callback function (so stxd can be returned).
1007                  *      Set pointer in stxd to srxd so callback count in srxd
1008                  *      can be decremented to find last callback to complete
1009                  */
1010                 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
1011                        srxd->gm_source_node, source_node);
1012         }
1013
1014         do {
1015                 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1016                                 sbuf, slen, rbuf, rlen);
1017                 if (slen > rlen) {
1018                         ncalls++;
1019                         if (do_copy) {
1020                                 CDEBUG(D_INFO, "slen>rlen\n");
1021                                 ltxd = gmnal_get_ltxd(nal_data);
1022                                 ltxd->srxd = srxd;
1023                                 GMNAL_GM_LOCK(nal_data);
1024                                 /* 
1025                                  *      funny business to get rid 
1026                                  *      of compiler warning 
1027                                  */
1028                                 sbuf_long = (unsigned long) sbuf;
1029                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1030                                 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1031                                        rlen, GM_LOW_PRIORITY, source_node,
1032                                        GMNAL_GM_PORT_ID,
1033                                        gmnal_remote_get_callback, ltxd);
1034                                 GMNAL_GM_UNLOCK(nal_data);
1035                         }
1036                         /*
1037                          *      at the end of 1 iov element
1038                          */
1039                         sbuf+=rlen;
1040                         slen-=rlen;
1041                         riov++;
1042                         nriov--;
1043                         rbuf = riov->iov_base;
1044                         rlen = riov->iov_len;
1045                 } else if (rlen > slen) {
1046                         ncalls++;
1047                         if (do_copy) {
1048                                 CDEBUG(D_INFO, "slen<rlen\n");
1049                                 ltxd = gmnal_get_ltxd(nal_data);
1050                                 ltxd->srxd = srxd;
1051                                 GMNAL_GM_LOCK(nal_data);
1052                                 sbuf_long = (unsigned long) sbuf;
1053                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1054                                 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1055                                        slen, GM_LOW_PRIORITY, source_node,
1056                                        GMNAL_GM_PORT_ID,
1057                                        gmnal_remote_get_callback, ltxd);
1058                                 GMNAL_GM_UNLOCK(nal_data);
1059                         }
1060                         /*
1061                          *      at end of siov element
1062                          */
1063                         rbuf+=slen;
1064                         rlen-=slen;
1065                         siov++;
1066                         sbuf = siov->iov_base;
1067                         slen = siov->iov_len;
1068                 } else {
1069                         ncalls++;
1070                         if (do_copy) {
1071                                 CDEBUG(D_INFO, "rlen=slen\n");
1072                                 ltxd = gmnal_get_ltxd(nal_data);
1073                                 ltxd->srxd = srxd;
1074                                 GMNAL_GM_LOCK(nal_data);
1075                                 sbuf_long = (unsigned long) sbuf;
1076                                 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1077                                 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1078                                        rlen, GM_LOW_PRIORITY, source_node,
1079                                        GMNAL_GM_PORT_ID,
1080                                        gmnal_remote_get_callback, ltxd);
1081                                 GMNAL_GM_UNLOCK(nal_data);
1082                         }
1083                         /*
1084                          *      at end of siov and riov element
1085                          */
1086                         siov++;
1087                         sbuf = siov->iov_base;
1088                         slen = siov->iov_len;
1089                         riov++;
1090                         nriov--;
1091                         rbuf = riov->iov_base;
1092                         rlen = riov->iov_len;
1093                 }
1094
1095         } while (nriov);
1096         return(ncalls);
1097 }
1098
1099
1100 /*
1101  *      The callback function that is invoked after each gm_get call completes.
1102  *      Multiple callbacks may be invoked for 1 transaction, only the final
1103  *      callback has work to do.
1104  */
1105 void
1106 gmnal_remote_get_callback(gm_port_t *gm_port, void *context, 
1107                            gm_status_t status)
1108 {
1109
1110         gmnal_ltxd_t    *ltxd = (gmnal_ltxd_t*)context;
1111         gmnal_srxd_t    *srxd = ltxd->srxd;
1112         lib_nal_t       *libnal = srxd->nal_data->libnal;
1113         int             lastone;
1114         struct  iovec   *riov;
1115         int             nriov;
1116         gmnal_data_t    *nal_data;
1117
1118         CDEBUG(D_TRACE, "called for context [%p]\n", context);
1119
1120         if (status != GM_SUCCESS) {
1121                 CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status));
1122         }
1123
1124         spin_lock(&srxd->callback_lock);
1125         srxd->ncallbacks--;
1126         srxd->callback_status |= status;
1127         lastone = srxd->ncallbacks?0:1;
1128         spin_unlock(&srxd->callback_lock);
1129         nal_data = srxd->nal_data;
1130
1131         /*
1132          *      everyone returns a send token
1133          */
1134         gmnal_return_ltxd(nal_data, ltxd);
1135
1136         if (!lastone) {
1137                 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1138                 return;
1139         }
1140
1141         /*
1142          *      Let our client application proceed
1143          */
1144         CERROR("final callback context[%p]\n", srxd);
1145         lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1146
1147         /*
1148          *      send an ack to the sender to let him know we got the data
1149          */
1150         gmnal_large_tx_ack(nal_data, srxd);
1151
1152         /*
1153          *      Unregister the memory that was used
1154          *      This is a very slow business (slower then register)
1155          */
1156         nriov = srxd->nriov;
1157         riov = srxd->riov;
1158         GMNAL_GM_LOCK(nal_data);
1159         while (nriov--) {
1160                 CERROR("deregister memory [%p]\n", riov->iov_base);
1161                 if (gm_deregister_memory(srxd->nal_data->gm_port,
1162                                          riov->iov_base, riov->iov_len)) {
1163                         CERROR("failed to deregister memory [%p]\n",
1164                                riov->iov_base);
1165                 }
1166                 riov++;
1167         }
1168         GMNAL_GM_UNLOCK(nal_data);
1169         PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1170
1171         /*
1172          *      repost the receive buffer (return receive token)
1173          */
1174         GMNAL_GM_LOCK(nal_data);
1175         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
1176                                            srxd->gmsize, GM_LOW_PRIORITY, 0);   
1177         GMNAL_GM_UNLOCK(nal_data);
1178         
1179         return;
1180 }
1181
1182
1183 /*
1184  *      Called on target node.
1185  *      After pulling data from a source node
1186  *      send an ack message to indicate the large transmit is complete.
1187  */
1188 void 
1189 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1190 {
1191
1192         gmnal_stxd_t    *stxd;
1193         gmnal_msghdr_t *msghdr;
1194         void            *buffer = NULL;
1195         unsigned int    local_nid;
1196         gm_status_t     gm_status = GM_SUCCESS;
1197
1198         CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
1199                srxd->gm_source_node);
1200
1201         GMNAL_GM_LOCK(nal_data);
1202         gm_status = gm_global_id_to_node_id(nal_data->gm_port, 
1203                                             srxd->gm_source_node, &local_nid);
1204         GMNAL_GM_UNLOCK(nal_data);
1205         if (gm_status != GM_SUCCESS) {
1206                 CERROR("Failed to obtain local id\n");
1207                 return;
1208         }
1209         CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1210
1211         stxd = gmnal_get_stxd(nal_data, 1);
1212         CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1213
1214         stxd->nal_data = nal_data;
1215         stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1216
1217         /*
1218          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
1219          *      Then copy the data in
1220          */
1221         buffer = stxd->buffer;
1222         msghdr = (gmnal_msghdr_t*)buffer;
1223
1224         /*
1225          *      Add in the address of the original stxd from the sender node
1226          *      so it knows which thread to notify.
1227          */
1228         msghdr->magic = GMNAL_MAGIC;
1229         msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1230         msghdr->sender_node_id = nal_data->gm_global_nid;
1231         msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
1232         CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1233
1234         CDEBUG(D_INFO, "sending\n");
1235         stxd->msg_size= sizeof(gmnal_msghdr_t);
1236
1237
1238         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1239                "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1240                "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
1241                stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1242         GMNAL_GM_LOCK(nal_data);
1243         stxd->gm_priority = GM_LOW_PRIORITY;
1244         stxd->gm_target_node = local_nid;
1245         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
1246                                       stxd->gm_size, stxd->msg_size,
1247                                       GM_LOW_PRIORITY, local_nid,
1248                                       gmnal_large_tx_ack_callback,
1249                                       (void*)stxd);
1250
1251         GMNAL_GM_UNLOCK(nal_data);
1252         CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1253
1254         return;
1255 }
1256
1257
1258 /*
1259  *      A callback to indicate the small transmit operation is compete
1260  *      Check for errors and try to deal with them.
1261  *      Call lib_finalise to inform the client application that the
1262  *      send is complete and the memory can be reused.
1263  *      Return the stxd when finished with it (returns a send token)
1264  */
1265 void
1266 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
1267                              gm_status_t status)
1268 {
1269         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
1270         gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
1271
1272         if (!stxd) {
1273                 CERROR("send completion event for unknown stxd\n");
1274                 return;
1275         }
1276         CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1277                stxd, status);
1278         gmnal_return_stxd(stxd->nal_data, stxd);
1279
1280         GMNAL_GM_UNLOCK(nal_data);
1281         return;
1282 }
1283
1284 /*
1285  *      Indicates the large transmit operation is compete.
1286  *      Called on transmit side (means data has been pulled  by receiver 
1287  *      or failed).
1288  *      Call lib_finalise to inform the client application that the send 
1289  *      is complete, deregister the memory and return the stxd. 
1290  *      Finally, report the rx buffer that the ack message was delivered in.
1291  */
1292 void 
1293 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1294 {
1295         lib_nal_t       *libnal = nal_data->libnal;
1296         gmnal_stxd_t    *stxd = NULL;
1297         gmnal_msghdr_t  *msghdr = NULL;
1298         void            *buffer = NULL;
1299         struct  iovec   *iov;
1300
1301
1302         CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1303
1304         buffer = srxd->buffer;
1305         msghdr = (gmnal_msghdr_t*)buffer;
1306         stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
1307
1308         CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1309
1310         lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1311
1312         /*
1313          *      extract the iovec from the stxd, deregister the memory.
1314          *      free the space used to store the iovec
1315          */
1316         iov = stxd->iov;
1317         while(stxd->niov--) {
1318                 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1319                        iov->iov_base, iov->iov_len);
1320                 GMNAL_GM_LOCK(nal_data);
1321                 gm_deregister_memory(nal_data->gm_port, iov->iov_base, 
1322                                      iov->iov_len);
1323                 GMNAL_GM_UNLOCK(nal_data);
1324                 iov++;
1325         }
1326
1327         /*
1328          *      return the send token
1329          *      TO DO It is bad to hold onto the send token so long?
1330          */
1331         gmnal_return_stxd(nal_data, stxd);
1332
1333
1334         /*
1335          *      requeue the receive buffer 
1336          */
1337         gmnal_rx_requeue_buffer(nal_data, srxd);
1338         
1339
1340         return;
1341 }