1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 * The caretaker thread
30 * This is main thread of execution for the NAL side
31 * This guy waits in gm_blocking_recvive and gets
32 * woken up when the myrinet adaptor gets an interrupt.
33 * Hands off receive operations to the receive thread
34 * This thread Looks after gm_callbacks etc inline.
37 gmnal_ct_thread(void *arg)
39 gmnal_data_t *nal_data;
40 gm_recv_event_t *rxevent = NULL;
41 gm_recv_t *recv = NULL;
44 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
48 nal_data = (gmnal_data_t*)arg;
49 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
53 nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
55 GMNAL_GM_LOCK(nal_data);
56 while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
57 CDEBUG(D_NET, "waiting\n");
58 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
59 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
60 CDEBUG(D_INFO, "time to exit\n");
63 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
64 switch (GM_RECV_EVENT_TYPE(rxevent)) {
67 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
68 recv = (gm_recv_t*)&rxevent->recv;
69 GMNAL_GM_UNLOCK(nal_data);
70 gmnal_add_rxtwe(nal_data, recv);
71 GMNAL_GM_LOCK(nal_data);
72 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
74 case(_GM_SLEEP_EVENT):
76 * Blocking receive above just returns
77 * immediatly with _GM_SLEEP_EVENT
78 * Don't know what this is
80 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
81 GMNAL_GM_UNLOCK(nal_data);
82 gm_unknown(nal_data->gm_port, rxevent);
83 GMNAL_GM_LOCK(nal_data);
84 CDEBUG(D_INFO, "Awake from gm_unknown\n");
89 * Don't know what this is
90 * gm_unknown will make sense of it
91 * Should be able to do something with
92 * FAST_RECV_EVENTS here.
94 CDEBUG(D_NET, "Passing event to gm_unknown\n");
95 GMNAL_GM_UNLOCK(nal_data);
96 gm_unknown(nal_data->gm_port, rxevent);
97 GMNAL_GM_LOCK(nal_data);
98 CDEBUG(D_INFO, "Processed unknown event\n");
101 GMNAL_GM_UNLOCK(nal_data);
102 nal_data->ctthread_flag = GMNAL_THREAD_RESET;
103 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
104 return(GMNAL_STATUS_OK);
109 * process a receive event
111 int gmnal_rx_thread(void *arg)
113 gmnal_data_t *nal_data;
115 gmnal_rxtwe_t *we = NULL;
118 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
122 nal_data = (gmnal_data_t*)arg;
123 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
127 * set 1 bit for each thread started
128 * doesn't matter which bit
130 spin_lock(&nal_data->rxthread_flag_lock);
131 if (nal_data->rxthread_flag)
132 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
134 nal_data->rxthread_flag = 1;
135 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
136 spin_unlock(&nal_data->rxthread_flag_lock);
138 while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
139 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
140 we = gmnal_get_rxtwe(nal_data);
142 CDEBUG(D_INFO, "Receive thread time to exit\n");
147 switch(((gmnal_msghdr_t*)buffer)->type) {
148 case(GMNAL_SMALL_MESSAGE):
149 gmnal_pre_receive(nal_data, we,
150 GMNAL_SMALL_MESSAGE);
152 case(GMNAL_LARGE_MESSAGE_INIT):
153 gmnal_pre_receive(nal_data, we,
154 GMNAL_LARGE_MESSAGE_INIT);
156 case(GMNAL_LARGE_MESSAGE_ACK):
157 gmnal_pre_receive(nal_data, we,
158 GMNAL_LARGE_MESSAGE_ACK);
161 CDEBUG(D_ERROR, "Unsupported message type\n");
162 gmnal_rx_bad(nal_data, we, NULL);
164 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
167 spin_lock(&nal_data->rxthread_flag_lock);
168 nal_data->rxthread_flag/=2;
169 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
170 spin_unlock(&nal_data->rxthread_flag_lock);
171 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
172 return(GMNAL_STATUS_OK);
178 * Start processing a small message receive
179 * Get here from gmnal_receive_thread
180 * Hand off to lib_parse, which calls cb_recv
181 * which hands back to gmnal_small_receive
182 * Deal with all endian stuff here.
185 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
187 gmnal_srxd_t *srxd = NULL;
189 unsigned int snode, sport, type, length;
190 gmnal_msghdr_t *gmnal_msghdr;
191 ptl_hdr_t *portals_hdr;
193 CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
194 nal_data, we, gmnal_type);
203 gmnal_msghdr = (gmnal_msghdr_t*)buffer;
204 portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
206 CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
207 "type [%d], length [%d], buffer [%p]\n",
208 snode, sport, type, length, buffer);
209 CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
210 "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
211 gmnal_msghdr->magic, gmnal_msghdr->type);
212 CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
213 "dest_node ["LPD64"]\n", portals_hdr->src_nid,
214 portals_hdr->dest_nid);
218 * Get a receive descriptor for this message
220 srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
221 CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
222 srxd->nal_data = nal_data;
224 CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
225 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
226 return(GMNAL_STATUS_FAIL);
230 * no need to bother portals library with this
232 if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
233 gmnal_large_tx_ack_received(nal_data, srxd);
234 return(GMNAL_STATUS_OK);
237 srxd->type = gmnal_type;
238 srxd->nsiov = gmnal_msghdr->niov;
239 srxd->gm_source_node = gmnal_msghdr->sender_node_id;
241 CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
242 buffer+GMNAL_MSGHDR_SIZE);
244 * control passes to lib, which calls cb_recv
245 * cb_recv is responsible for returning the buffer
248 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
250 return(GMNAL_STATUS_OK);
256 * After a receive has been processed,
257 * hang out the receive buffer again.
258 * This implicitly returns a receive token.
261 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
263 CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
265 CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
267 GMNAL_GM_LOCK(nal_data);
268 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
269 srxd->gmsize, GM_LOW_PRIORITY, 0 );
270 GMNAL_GM_UNLOCK(nal_data);
272 return(GMNAL_STATUS_OK);
277 * Handle a bad message
278 * A bad message is one we don't expect or can't interpret
281 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
283 CDEBUG(D_TRACE, "Can't handle message\n");
286 srxd = gmnal_rxbuffer_to_srxd(nal_data,
289 gmnal_rx_requeue_buffer(nal_data, srxd);
291 CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
295 return(GMNAL_STATUS_FAIL);
298 return(GMNAL_STATUS_OK);
304 * Process a small message receive.
305 * Get here from gmnal_receive_thread, gmnal_pre_receive
307 * Put data from prewired receive buffer into users buffer(s)
308 * Hang out the receive buffer again for another receive
312 gmnal_small_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
313 unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
315 gmnal_srxd_t *srxd = NULL;
317 gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
320 CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
323 CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
324 lib_finalize(nal_cb, private, cookie, PTL_FAIL);
328 srxd = (gmnal_srxd_t*)private;
329 buffer = srxd->buffer;
330 buffer += sizeof(gmnal_msghdr_t);
331 buffer += sizeof(ptl_hdr_t);
334 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov,
336 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
337 buffer += iov->iov_len;
343 * let portals library know receive is complete
345 CDEBUG(D_PORTALS, "calling lib_finalize\n");
346 lib_finalize(nal_cb, private, cookie, PTL_OK);
348 * return buffer so it can be used again
350 CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
351 GMNAL_GM_LOCK(nal_data);
352 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
353 srxd->gmsize, GM_LOW_PRIORITY, 0);
354 GMNAL_GM_UNLOCK(nal_data);
361 * Start a small transmit.
362 * Get a send token (and wired transmit buffer).
363 * Copy data from senders buffer to wired buffer and
364 * initiate gm_send from the wired buffer.
365 * The callback function informs when the send is complete.
368 gmnal_small_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
369 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
370 unsigned int niov, struct iovec *iov, int size)
372 gmnal_data_t *nal_data = (gmnal_data_t*)nal_cb->nal_data;
373 gmnal_stxd_t *stxd = NULL;
375 gmnal_msghdr_t *msghdr = NULL;
377 unsigned int local_nid;
378 gm_status_t gm_status = GM_SUCCESS;
380 CDEBUG(D_TRACE, "gmnal_small_tx nal_cb [%p] private [%p] cookie [%p] "
381 "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
382 "iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type,
383 global_nid, pid, niov, iov, size);
385 CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
386 hdr->dest_nid, hdr->src_nid);
389 CDEBUG(D_ERROR, "no nal_data\n");
390 return(GMNAL_STATUS_FAIL);
392 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
395 GMNAL_GM_LOCK(nal_data);
396 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
398 GMNAL_GM_UNLOCK(nal_data);
399 if (gm_status != GM_SUCCESS) {
400 CDEBUG(D_ERROR, "Failed to obtain local id\n");
401 return(GMNAL_STATUS_FAIL);
403 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
405 stxd = gmnal_get_stxd(nal_data, 1);
406 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
408 stxd->type = GMNAL_SMALL_MESSAGE;
409 stxd->cookie = cookie;
412 * Copy gmnal_msg_hdr and portals header to the transmit buffer
413 * Then copy the data in
415 buffer = stxd->buffer;
416 msghdr = (gmnal_msghdr_t*)buffer;
418 msghdr->magic = GMNAL_MAGIC;
419 msghdr->type = GMNAL_SMALL_MESSAGE;
420 msghdr->sender_node_id = nal_data->gm_global_nid;
421 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
423 buffer += sizeof(gmnal_msghdr_t);
425 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
426 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
428 buffer += sizeof(ptl_hdr_t);
431 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n",
432 iov, iov->iov_len, buffer);
433 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
434 buffer+= iov->iov_len;
438 CDEBUG(D_INFO, "sending\n");
439 tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
440 stxd->msg_size = tot_size;
443 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
444 "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
445 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
446 stxd->msg_size, global_nid, local_nid, stxd);
448 GMNAL_GM_LOCK(nal_data);
449 stxd->gm_priority = GM_LOW_PRIORITY;
450 stxd->gm_target_node = local_nid;
451 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
452 stxd->gm_size, stxd->msg_size,
453 GM_LOW_PRIORITY, local_nid,
454 gmnal_small_tx_callback, (void*)stxd);
455 GMNAL_GM_UNLOCK(nal_data);
456 CDEBUG(D_INFO, "done\n");
463 * A callback to indicate the small transmit operation is compete
464 * Check for erros and try to deal with them.
465 * Call lib_finalise to inform the client application that the send
466 * is complete and the memory can be reused.
467 * Return the stxd when finished with it (returns a send token)
470 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
472 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
473 lib_msg_t *cookie = stxd->cookie;
474 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
475 nal_cb_t *nal_cb = nal_data->nal_cb;
478 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
481 if (status != GM_SUCCESS) {
482 CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s]\n",
483 stxd, gmnal_gm_error(status));
492 case(GM_SEND_DROPPED):
494 * do a resend on the dropped ones
496 CDEBUG(D_ERROR, "send stxd [%p] was dropped "
497 "resending\n", context);
498 GMNAL_GM_LOCK(nal_data);
499 gm_send_to_peer_with_callback(nal_data->gm_port,
504 stxd->gm_target_node,
505 gmnal_small_tx_callback,
507 GMNAL_GM_UNLOCK(nal_data);
511 case(GM_SEND_TIMED_OUT):
515 CDEBUG(D_INFO, "calling gm_drop_sends\n");
516 GMNAL_GM_LOCK(nal_data);
517 gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
518 stxd->gm_target_node, GMNAL_GM_PORT,
519 gmnal_drop_sends_callback, context);
520 GMNAL_GM_UNLOCK(nal_data);
529 case(GM_INTERRUPTED):
531 case(GM_INPUT_BUFFER_TOO_SMALL):
532 case(GM_OUTPUT_BUFFER_TOO_SMALL):
534 case(GM_MEMORY_FAULT):
535 case(GM_INVALID_PARAMETER):
536 case(GM_OUT_OF_MEMORY):
537 case(GM_INVALID_COMMAND):
538 case(GM_PERMISSION_DENIED):
539 case(GM_INTERNAL_ERROR):
541 case(GM_UNSUPPORTED_DEVICE):
542 case(GM_SEND_REJECTED):
543 case(GM_SEND_TARGET_PORT_CLOSED):
544 case(GM_SEND_TARGET_NODE_UNREACHABLE):
545 case(GM_SEND_PORT_CLOSED):
546 case(GM_NODE_ID_NOT_YET_SET):
547 case(GM_STILL_SHUTTING_DOWN):
549 case(GM_NO_SUCH_DEVICE):
551 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
552 case(GM_UNTRANSLATED_SYSTEM_ERROR):
553 case(GM_ACCESS_DENIED):
554 case(GM_NO_DRIVER_SUPPORT):
555 case(GM_PTE_REF_CNT_OVERFLOW):
556 case(GM_NOT_SUPPORTED_IN_KERNEL):
557 case(GM_NOT_SUPPORTED_ON_ARCH):
560 case(GM_DATA_CORRUPTED):
561 case(GM_HARDWARE_FAULT):
562 case(GM_SEND_ORPHANED):
563 case(GM_MINOR_OVERFLOW):
564 case(GM_PAGE_TABLE_FULL):
566 case(GM_INVALID_PORT_NUMBER):
567 case(GM_DEV_NOT_FOUND):
568 case(GM_FIRMWARE_NOT_RUNNING):
569 case(GM_YP_NO_MATCH):
571 CDEBUG(D_ERROR, "Unknown send error\n");
572 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
573 stxd->gm_target_node, GMNAL_GM_PORT,
574 gmnal_resume_sending_callback, context);
581 * If this is a large message init,
582 * we're not finished with the data yet,
583 * so can't call lib_finalise.
584 * However, we're also holding on to a
585 * stxd here (to keep track of the source
586 * iovec only). Should use another structure
587 * to keep track of iovec and return stxd to
590 if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
591 CDEBUG(D_INFO, "large transmit done\n");
594 gmnal_return_stxd(nal_data, stxd);
595 lib_finalize(nal_cb, stxd, cookie, PTL_OK);
600 * After an error on the port
601 * call this to allow future sends to complete
603 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
606 gmnal_data_t *nal_data;
607 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
608 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
609 gmnal_return_stxd(stxd->nal_data, stxd);
614 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
617 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
618 gmnal_data_t *nal_data = stxd->nal_data;
620 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
621 if (status == GM_SUCCESS) {
622 GMNAL_GM_LOCK(nal_data);
623 gm_send_to_peer_with_callback(gm_port, stxd->buffer,
624 stxd->gm_size, stxd->msg_size,
626 stxd->gm_target_node,
627 gmnal_small_tx_callback,
629 GMNAL_GM_LOCK(nal_data);
631 CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
632 "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
641 * Begine a large transmit.
642 * Do a gm_register of the memory pointed to by the iovec
643 * and send details to the receiver. The receiver does a gm_get
644 * to pull the data and sends and ack when finished. Upon receipt of
645 * this ack, deregister the memory. Only 1 send token is required here.
648 gmnal_large_tx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
649 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
650 unsigned int niov, struct iovec *iov, int size)
653 gmnal_data_t *nal_data;
654 gmnal_stxd_t *stxd = NULL;
656 gmnal_msghdr_t *msghdr = NULL;
657 unsigned int local_nid;
658 int mlen = 0; /* the size of the init message data */
659 struct iovec *iov_dup = NULL;
660 gm_status_t gm_status;
664 CDEBUG(D_TRACE, "gmnal_large_tx nal_cb [%p] private [%p], cookie [%p] "
665 "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
666 "iov [%p], size [%d]\n", nal_cb, private, cookie, hdr, type,
667 global_nid, pid, niov, iov, size);
670 nal_data = (gmnal_data_t*)nal_cb->nal_data;
672 CDEBUG(D_ERROR, "no nal_cb.\n");
673 return(GMNAL_STATUS_FAIL);
678 * Get stxd and buffer. Put local address of data in buffer,
679 * send local addresses to target,
680 * wait for the target node to suck the data over.
681 * The stxd is used to ren
683 stxd = gmnal_get_stxd(nal_data, 1);
684 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
686 stxd->type = GMNAL_LARGE_MESSAGE_INIT;
687 stxd->cookie = cookie;
690 * Copy gmnal_msg_hdr and portals header to the transmit buffer
691 * Then copy the iov in
693 buffer = stxd->buffer;
694 msghdr = (gmnal_msghdr_t*)buffer;
696 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
698 msghdr->magic = GMNAL_MAGIC;
699 msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
700 msghdr->sender_node_id = nal_data->gm_global_nid;
702 msghdr->niov = niov ;
703 buffer += sizeof(gmnal_msghdr_t);
704 mlen = sizeof(gmnal_msghdr_t);
705 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
708 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
710 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
711 buffer += sizeof(ptl_hdr_t);
712 mlen += sizeof(ptl_hdr_t);
713 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
716 * copy the iov to the buffer so target knows
717 * where to get the data from
719 CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
720 gm_bcopy(iov, buffer, niov*sizeof(struct iovec));
721 mlen += niov*(sizeof(struct iovec));
722 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
726 * Store the iovs in the stxd for we can get
727 * them later if we need them
729 CDEBUG(D_NET, "Copying iov [%p] to [%p]\n", iov, stxd->iov);
730 gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
735 * register the memory so the NIC can get hold of the data
736 * This is a slow process. it'd be good to overlap it
737 * with something else.
742 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
743 iov->iov_base, iov->iov_len);
744 GMNAL_GM_LOCK(nal_data);
745 gm_status = gm_register_memory(nal_data->gm_port,
746 iov->iov_base, iov->iov_len);
747 if (gm_status != GM_SUCCESS) {
748 GMNAL_GM_UNLOCK(nal_data);
749 CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
750 "for memory [%p] len ["LPSZ"]\n",
751 gm_status, gmnal_gm_error(gm_status),
752 iov->iov_base, iov->iov_len);
753 GMNAL_GM_LOCK(nal_data);
754 while (iov_dup != iov) {
755 gm_deregister_memory(nal_data->gm_port,
760 GMNAL_GM_UNLOCK(nal_data);
761 gmnal_return_stxd(nal_data, stxd);
765 GMNAL_GM_UNLOCK(nal_data);
770 * Send the init message to the target
772 CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
773 GMNAL_GM_LOCK(nal_data);
774 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
776 if (gm_status != GM_SUCCESS) {
777 GMNAL_GM_UNLOCK(nal_data);
778 CDEBUG(D_ERROR, "Failed to obtain local id\n");
779 gmnal_return_stxd(nal_data, stxd);
780 /* TO DO deregister memory on failure */
781 return(GMNAL_STATUS_FAIL);
783 CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
784 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
785 stxd->gm_size, mlen, GM_LOW_PRIORITY,
786 local_nid, gmnal_large_tx_callback,
788 GMNAL_GM_UNLOCK(nal_data);
790 CDEBUG(D_INFO, "done\n");
796 * Callback function indicates that send of buffer with
797 * large message iovec has completed (or failed).
800 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
802 gmnal_small_tx_callback(gm_port, context, status);
809 * Have received a buffer that contains an iovec of the sender.
810 * Do a gm_register_memory of the receivers buffer and then do a get
811 * data from the sender.
814 gmnal_large_rx(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
815 unsigned int nriov, struct iovec *riov, size_t mlen,
818 gmnal_data_t *nal_data = nal_cb->nal_data;
819 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
821 struct iovec *riov_dup;
823 gmnal_msghdr_t *msghdr = NULL;
824 gm_status_t gm_status;
826 CDEBUG(D_TRACE, "gmnal_large_rx :: nal_cb[%p], private[%p], "
827 "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
828 nal_cb, private, cookie, nriov, riov, mlen, rlen);
831 CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
832 lib_finalize(nal_cb, private, cookie, PTL_FAIL);
836 buffer = srxd->buffer;
837 msghdr = (gmnal_msghdr_t*)buffer;
838 buffer += sizeof(gmnal_msghdr_t);
839 buffer += sizeof(ptl_hdr_t);
842 * Store the senders stxd address in the srxd for this message
843 * The gmnal_large_message_ack needs it to notify the sender
844 * the pull of data is complete
846 srxd->source_stxd = msghdr->stxd;
849 * Register the receivers memory
851 * tell the sender that we got the data
852 * then tell the receiver we got the data
854 * If the iovecs match, could interleave
855 * gm_registers and gm_gets for each element
860 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
861 riov->iov_base, riov->iov_len);
862 GMNAL_GM_LOCK(nal_data);
863 gm_status = gm_register_memory(nal_data->gm_port,
864 riov->iov_base, riov->iov_len);
865 if (gm_status != GM_SUCCESS) {
866 GMNAL_GM_UNLOCK(nal_data);
867 CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
868 "for memory [%p] len ["LPSZ"]\n",
869 gm_status, gmnal_gm_error(gm_status),
870 riov->iov_base, riov->iov_len);
871 GMNAL_GM_LOCK(nal_data);
872 while (riov_dup != riov) {
873 gm_deregister_memory(nal_data->gm_port,
878 GMNAL_GM_LOCK(nal_data);
880 * give back srxd and buffer. Send NACK to sender
884 GMNAL_GM_UNLOCK(nal_data);
888 * do this so the final gm_get callback can deregister the memory
890 PORTAL_ALLOC(srxd->riov, nriov_dup*(sizeof(struct iovec)));
891 gm_bcopy(riov_dup, srxd->riov, nriov_dup*(sizeof(struct iovec)));
892 srxd->nriov = nriov_dup;
895 * now do gm_get to get the data
897 srxd->cookie = cookie;
898 if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
899 nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
900 CDEBUG(D_ERROR, "can't get the data");
903 CDEBUG(D_INFO, "lgmanl_large_rx done\n");
910 * Perform a number of remote gets as part of receiving
912 * The final one to complete (i.e. the last callback to get called)
914 * gm_get requires a send token.
917 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
918 int nriov, struct iovec *riov)
923 CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
924 "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
927 ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
929 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
930 return(GMNAL_STATUS_FAIL);
932 CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
933 spin_lock_init(&srxd->callback_lock);
934 srxd->ncallbacks = ncalls;
935 srxd->callback_status = 0;
937 ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
939 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
940 return(GMNAL_STATUS_FAIL);
943 return(GMNAL_STATUS_OK);
949 * pull data from source node (source iovec) to a local iovec.
950 * The iovecs may not match which adds the complications below.
951 * Count the number of gm_gets that will be required to the callbacks
952 * can determine who is the last one.
955 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
956 struct iovec *siov, int nriov, struct iovec *riov)
960 int slen = siov->iov_len, rlen = riov->iov_len;
961 char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
962 unsigned long sbuf_long;
963 gm_remote_ptr_t remote_ptr = 0;
964 unsigned int source_node;
965 gmnal_ltxd_t *ltxd = NULL;
966 gmnal_data_t *nal_data = srxd->nal_data;
968 CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
971 CDEBUG(D_ERROR, "Bad args No nal_data\n");
972 return(GMNAL_STATUS_FAIL);
974 GMNAL_GM_LOCK(nal_data);
975 if (gm_global_id_to_node_id(nal_data->gm_port,
976 srxd->gm_source_node,
977 &source_node) != GM_SUCCESS) {
979 CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
980 "to local node_id\n", srxd->gm_source_node);
981 GMNAL_GM_UNLOCK(nal_data);
982 return(GMNAL_STATUS_FAIL);
984 GMNAL_GM_UNLOCK(nal_data);
986 * We need a send token to use gm_get
987 * getting an stxd gets us a send token.
988 * the stxd is used as the context to the
989 * callback function (so stxd can be returned).
990 * Set pointer in stxd to srxd so callback count in srxd
991 * can be decremented to find last callback to complete
993 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
994 srxd->gm_source_node, source_node);
998 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
999 sbuf, slen, rbuf, rlen);
1003 CDEBUG(D_INFO, "slen>rlen\n");
1004 ltxd = gmnal_get_ltxd(nal_data);
1006 GMNAL_GM_LOCK(nal_data);
1008 * funny business to get rid
1009 * of compiler warning
1011 sbuf_long = (unsigned long) sbuf;
1012 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1013 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1014 rlen, GM_LOW_PRIORITY, source_node,
1016 gmnal_remote_get_callback, ltxd);
1017 GMNAL_GM_UNLOCK(nal_data);
1020 * at the end of 1 iov element
1026 rbuf = riov->iov_base;
1027 rlen = riov->iov_len;
1028 } else if (rlen > slen) {
1031 CDEBUG(D_INFO, "slen<rlen\n");
1032 ltxd = gmnal_get_ltxd(nal_data);
1034 GMNAL_GM_LOCK(nal_data);
1035 sbuf_long = (unsigned long) sbuf;
1036 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1037 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1038 slen, GM_LOW_PRIORITY, source_node,
1040 gmnal_remote_get_callback, ltxd);
1041 GMNAL_GM_UNLOCK(nal_data);
1044 * at end of siov element
1049 sbuf = siov->iov_base;
1050 slen = siov->iov_len;
1054 CDEBUG(D_INFO, "rlen=slen\n");
1055 ltxd = gmnal_get_ltxd(nal_data);
1057 GMNAL_GM_LOCK(nal_data);
1058 sbuf_long = (unsigned long) sbuf;
1059 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1060 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1061 rlen, GM_LOW_PRIORITY, source_node,
1063 gmnal_remote_get_callback, ltxd);
1064 GMNAL_GM_UNLOCK(nal_data);
1067 * at end of siov and riov element
1070 sbuf = siov->iov_base;
1071 slen = siov->iov_len;
1074 rbuf = riov->iov_base;
1075 rlen = riov->iov_len;
1084 * The callback function that is invoked after each gm_get call completes.
1085 * Multiple callbacks may be invoked for 1 transaction, only the final
1086 * callback has work to do.
1089 gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
1093 gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
1094 gmnal_srxd_t *srxd = ltxd->srxd;
1095 nal_cb_t *nal_cb = srxd->nal_data->nal_cb;
1099 gmnal_data_t *nal_data;
1101 CDEBUG(D_TRACE, "called for context [%p]\n", context);
1103 if (status != GM_SUCCESS) {
1104 CDEBUG(D_ERROR, "reports error [%d][%s]\n", status,
1105 gmnal_gm_error(status));
1108 spin_lock(&srxd->callback_lock);
1110 srxd->callback_status |= status;
1111 lastone = srxd->ncallbacks?0:1;
1112 spin_unlock(&srxd->callback_lock);
1113 nal_data = srxd->nal_data;
1116 * everyone returns a send token
1118 gmnal_return_ltxd(nal_data, ltxd);
1121 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1126 * Let our client application proceed
1128 CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
1129 lib_finalize(nal_cb, srxd, srxd->cookie, PTL_OK);
1132 * send an ack to the sender to let him know we got the data
1134 gmnal_large_tx_ack(nal_data, srxd);
1137 * Unregister the memory that was used
1138 * This is a very slow business (slower then register)
1140 nriov = srxd->nriov;
1142 GMNAL_GM_LOCK(nal_data);
1144 CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
1145 if (gm_deregister_memory(srxd->nal_data->gm_port,
1146 riov->iov_base, riov->iov_len)) {
1147 CDEBUG(D_ERROR, "failed to deregister memory [%p]\n",
1152 GMNAL_GM_UNLOCK(nal_data);
1153 PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1156 * repost the receive buffer (return receive token)
1158 GMNAL_GM_LOCK(nal_data);
1159 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
1160 srxd->gmsize, GM_LOW_PRIORITY, 0);
1161 GMNAL_GM_UNLOCK(nal_data);
1168 * Called on target node.
1169 * After pulling data from a source node
1170 * send an ack message to indicate the large transmit is complete.
1173 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1177 gmnal_msghdr_t *msghdr;
1178 void *buffer = NULL;
1179 unsigned int local_nid;
1180 gm_status_t gm_status = GM_SUCCESS;
1182 CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
1183 srxd->gm_source_node);
1185 GMNAL_GM_LOCK(nal_data);
1186 gm_status = gm_global_id_to_node_id(nal_data->gm_port,
1187 srxd->gm_source_node, &local_nid);
1188 GMNAL_GM_UNLOCK(nal_data);
1189 if (gm_status != GM_SUCCESS) {
1190 CDEBUG(D_ERROR, "Failed to obtain local id\n");
1193 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1195 stxd = gmnal_get_stxd(nal_data, 1);
1196 CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1198 stxd->nal_data = nal_data;
1199 stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1202 * Copy gmnal_msg_hdr and portals header to the transmit buffer
1203 * Then copy the data in
1205 buffer = stxd->buffer;
1206 msghdr = (gmnal_msghdr_t*)buffer;
1209 * Add in the address of the original stxd from the sender node
1210 * so it knows which thread to notify.
1212 msghdr->magic = GMNAL_MAGIC;
1213 msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1214 msghdr->sender_node_id = nal_data->gm_global_nid;
1215 msghdr->stxd = srxd->source_stxd;
1216 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1218 CDEBUG(D_INFO, "sending\n");
1219 stxd->msg_size= sizeof(gmnal_msghdr_t);
1222 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1223 "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1224 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
1225 stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1226 GMNAL_GM_LOCK(nal_data);
1227 stxd->gm_priority = GM_LOW_PRIORITY;
1228 stxd->gm_target_node = local_nid;
1229 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
1230 stxd->gm_size, stxd->msg_size,
1231 GM_LOW_PRIORITY, local_nid,
1232 gmnal_large_tx_ack_callback,
1235 GMNAL_GM_UNLOCK(nal_data);
1236 CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1243 * A callback to indicate the small transmit operation is compete
1244 * Check for errors and try to deal with them.
1245 * Call lib_finalise to inform the client application that the
1246 * send is complete and the memory can be reused.
1247 * Return the stxd when finished with it (returns a send token)
1250 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
1253 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
1254 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
1257 CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
1260 CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1262 gmnal_return_stxd(stxd->nal_data, stxd);
1264 GMNAL_GM_UNLOCK(nal_data);
1269 * Indicates the large transmit operation is compete.
1270 * Called on transmit side (means data has been pulled by receiver
1272 * Call lib_finalise to inform the client application that the send
1273 * is complete, deregister the memory and return the stxd.
1274 * Finally, report the rx buffer that the ack message was delivered in.
1277 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1279 nal_cb_t *nal_cb = nal_data->nal_cb;
1280 gmnal_stxd_t *stxd = NULL;
1281 gmnal_msghdr_t *msghdr = NULL;
1282 void *buffer = NULL;
1286 CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1288 buffer = srxd->buffer;
1289 msghdr = (gmnal_msghdr_t*)buffer;
1290 stxd = msghdr->stxd;
1292 CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1294 lib_finalize(nal_cb, stxd, stxd->cookie, PTL_OK);
1297 * extract the iovec from the stxd, deregister the memory.
1298 * free the space used to store the iovec
1301 while(stxd->niov--) {
1302 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1303 iov->iov_base, iov->iov_len);
1304 GMNAL_GM_LOCK(nal_data);
1305 gm_deregister_memory(nal_data->gm_port, iov->iov_base,
1307 GMNAL_GM_UNLOCK(nal_data);
1312 * return the send token
1313 * TO DO It is bad to hold onto the send token so long?
1315 gmnal_return_stxd(nal_data, stxd);
1319 * requeue the receive buffer
1321 gmnal_rx_requeue_buffer(nal_data, srxd);