1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 * The caretaker thread
30 * This is main thread of execution for the NAL side
31 * This guy waits in gm_blocking_recvive and gets
32 * woken up when the myrinet adaptor gets an interrupt.
33 * Hands off receive operations to the receive thread
34 * This thread Looks after gm_callbacks etc inline.
37 gmnal_ct_thread(void *arg)
39 gmnal_data_t *nal_data;
40 gm_recv_event_t *rxevent = NULL;
41 gm_recv_t *recv = NULL;
44 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
48 nal_data = (gmnal_data_t*)arg;
49 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
51 sprintf(current->comm, "gmnal_ct");
55 nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
57 GMNAL_GM_LOCK(nal_data);
58 while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59 CDEBUG(D_NET, "waiting\n");
60 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
61 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
62 CDEBUG(D_INFO, "time to exit\n");
65 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
66 switch (GM_RECV_EVENT_TYPE(rxevent)) {
69 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70 recv = (gm_recv_t*)&rxevent->recv;
71 GMNAL_GM_UNLOCK(nal_data);
72 gmnal_add_rxtwe(nal_data, recv);
73 GMNAL_GM_LOCK(nal_data);
74 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
76 case(_GM_SLEEP_EVENT):
78 * Blocking receive above just returns
79 * immediatly with _GM_SLEEP_EVENT
80 * Don't know what this is
82 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83 GMNAL_GM_UNLOCK(nal_data);
84 gm_unknown(nal_data->gm_port, rxevent);
85 GMNAL_GM_LOCK(nal_data);
86 CDEBUG(D_INFO, "Awake from gm_unknown\n");
91 * Don't know what this is
92 * gm_unknown will make sense of it
93 * Should be able to do something with
94 * FAST_RECV_EVENTS here.
96 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97 GMNAL_GM_UNLOCK(nal_data);
98 gm_unknown(nal_data->gm_port, rxevent);
99 GMNAL_GM_LOCK(nal_data);
100 CDEBUG(D_INFO, "Processed unknown event\n");
103 GMNAL_GM_UNLOCK(nal_data);
104 nal_data->ctthread_flag = GMNAL_THREAD_RESET;
105 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
106 return(GMNAL_STATUS_OK);
111 * process a receive event
113 int gmnal_rx_thread(void *arg)
115 gmnal_data_t *nal_data;
117 gmnal_rxtwe_t *we = NULL;
121 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
125 nal_data = (gmnal_data_t*)arg;
126 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
128 for (rank=0; rank<num_rx_threads; rank++)
129 if (nal_data->rxthread_pid[rank] == current->pid)
132 sprintf(current->comm, "gmnal_rx_%d", rank);
136 * set 1 bit for each thread started
137 * doesn't matter which bit
139 spin_lock(&nal_data->rxthread_flag_lock);
140 if (nal_data->rxthread_flag)
141 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
143 nal_data->rxthread_flag = 1;
144 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
145 spin_unlock(&nal_data->rxthread_flag_lock);
147 while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
148 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
149 we = gmnal_get_rxtwe(nal_data);
151 CDEBUG(D_INFO, "Receive thread time to exit\n");
156 switch(((gmnal_msghdr_t*)buffer)->type) {
157 case(GMNAL_SMALL_MESSAGE):
158 gmnal_pre_receive(nal_data, we,
159 GMNAL_SMALL_MESSAGE);
161 case(GMNAL_LARGE_MESSAGE_INIT):
162 gmnal_pre_receive(nal_data, we,
163 GMNAL_LARGE_MESSAGE_INIT);
165 case(GMNAL_LARGE_MESSAGE_ACK):
166 gmnal_pre_receive(nal_data, we,
167 GMNAL_LARGE_MESSAGE_ACK);
170 CDEBUG(D_ERROR, "Unsupported message type\n");
171 gmnal_rx_bad(nal_data, we, NULL);
173 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
176 spin_lock(&nal_data->rxthread_flag_lock);
177 nal_data->rxthread_flag/=2;
178 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
179 spin_unlock(&nal_data->rxthread_flag_lock);
180 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
181 return(GMNAL_STATUS_OK);
187 * Start processing a small message receive
188 * Get here from gmnal_receive_thread
189 * Hand off to lib_parse, which calls cb_recv
190 * which hands back to gmnal_small_receive
191 * Deal with all endian stuff here.
194 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
196 gmnal_srxd_t *srxd = NULL;
198 unsigned int snode, sport, type, length;
199 gmnal_msghdr_t *gmnal_msghdr;
200 ptl_hdr_t *portals_hdr;
203 CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
204 nal_data, we, gmnal_type);
213 gmnal_msghdr = (gmnal_msghdr_t*)buffer;
214 portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
216 CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
217 "type [%d], length [%d], buffer [%p]\n",
218 snode, sport, type, length, buffer);
219 CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
220 "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
221 gmnal_msghdr->magic, gmnal_msghdr->type);
222 CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
223 "dest_node ["LPD64"]\n", portals_hdr->src_nid,
224 portals_hdr->dest_nid);
228 * Get a receive descriptor for this message
230 srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
231 CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
233 CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
234 /* I think passing a NULL srxd to lib_parse will crash
237 lib_parse(nal_data->libnal, portals_hdr, srxd);
238 return(GMNAL_STATUS_FAIL);
242 * no need to bother portals library with this
244 if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
245 gmnal_large_tx_ack_received(nal_data, srxd);
246 return(GMNAL_STATUS_OK);
249 srxd->nal_data = nal_data;
250 srxd->type = gmnal_type;
251 srxd->nsiov = gmnal_msghdr->niov;
252 srxd->gm_source_node = gmnal_msghdr->sender_node_id;
254 CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
255 buffer+GMNAL_MSGHDR_SIZE);
257 * control passes to lib, which calls cb_recv
258 * cb_recv is responsible for returning the buffer
261 rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
264 /* I just received garbage; take appropriate action... */
268 return(GMNAL_STATUS_OK);
274 * After a receive has been processed,
275 * hang out the receive buffer again.
276 * This implicitly returns a receive token.
279 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
281 CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
283 CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
285 GMNAL_GM_LOCK(nal_data);
286 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
287 srxd->gmsize, GM_LOW_PRIORITY, 0 );
288 GMNAL_GM_UNLOCK(nal_data);
290 return(GMNAL_STATUS_OK);
295 * Handle a bad message
296 * A bad message is one we don't expect or can't interpret
299 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
301 CDEBUG(D_TRACE, "Can't handle message\n");
304 srxd = gmnal_rxbuffer_to_srxd(nal_data,
307 gmnal_rx_requeue_buffer(nal_data, srxd);
309 CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
313 return(GMNAL_STATUS_FAIL);
316 return(GMNAL_STATUS_OK);
322 * Process a small message receive.
323 * Get here from gmnal_receive_thread, gmnal_pre_receive
325 * Put data from prewired receive buffer into users buffer(s)
326 * Hang out the receive buffer again for another receive
330 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
332 gmnal_srxd_t *srxd = NULL;
333 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
337 CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
338 lib_finalize(libnal, private, cookie, PTL_FAIL);
342 srxd = (gmnal_srxd_t*)private;
345 * let portals library know receive is complete
347 CDEBUG(D_PORTALS, "calling lib_finalize\n");
348 lib_finalize(libnal, private, cookie, PTL_OK);
350 * return buffer so it can be used again
352 CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
353 GMNAL_GM_LOCK(nal_data);
354 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
355 srxd->gmsize, GM_LOW_PRIORITY, 0);
356 GMNAL_GM_UNLOCK(nal_data);
363 * Start a small transmit.
364 * Use the given send token (and wired transmit buffer).
365 * Copy headers to wired buffer and initiate gm_send from the wired buffer.
366 * The callback function informs when the send is complete.
369 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
370 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
371 gmnal_stxd_t *stxd, int size)
373 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
375 gmnal_msghdr_t *msghdr = NULL;
377 unsigned int local_nid;
378 gm_status_t gm_status = GM_SUCCESS;
380 CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
381 "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
382 "size [%d]\n", libnal, private, cookie, hdr, type,
383 global_nid, pid, stxd, size);
385 CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
386 hdr->dest_nid, hdr->src_nid);
389 CDEBUG(D_ERROR, "no nal_data\n");
392 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
395 GMNAL_GM_LOCK(nal_data);
396 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
398 GMNAL_GM_UNLOCK(nal_data);
399 if (gm_status != GM_SUCCESS) {
400 CDEBUG(D_ERROR, "Failed to obtain local id\n");
403 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
405 stxd->type = GMNAL_SMALL_MESSAGE;
406 stxd->cookie = cookie;
409 * Copy gmnal_msg_hdr and portals header to the transmit buffer
410 * Then send the message, as the data has previously been copied in
413 buffer = stxd->buffer;
414 msghdr = (gmnal_msghdr_t*)buffer;
416 msghdr->magic = GMNAL_MAGIC;
417 msghdr->type = GMNAL_SMALL_MESSAGE;
418 msghdr->sender_node_id = nal_data->gm_global_nid;
419 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
421 buffer += sizeof(gmnal_msghdr_t);
423 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
424 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
426 buffer += sizeof(ptl_hdr_t);
428 CDEBUG(D_INFO, "sending\n");
429 tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
430 stxd->msg_size = tot_size;
433 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
434 "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
435 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
436 stxd->msg_size, global_nid, local_nid, stxd);
438 GMNAL_GM_LOCK(nal_data);
439 stxd->gm_priority = GM_LOW_PRIORITY;
440 stxd->gm_target_node = local_nid;
441 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
442 stxd->gm_size, stxd->msg_size,
443 GM_LOW_PRIORITY, local_nid,
444 gmnal_small_tx_callback, (void*)stxd);
445 GMNAL_GM_UNLOCK(nal_data);
446 CDEBUG(D_INFO, "done\n");
453 * A callback to indicate the small transmit operation is compete
454 * Check for erros and try to deal with them.
455 * Call lib_finalise to inform the client application that the send
456 * is complete and the memory can be reused.
457 * Return the stxd when finished with it (returns a send token)
460 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
462 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
463 lib_msg_t *cookie = stxd->cookie;
464 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
465 lib_nal_t *libnal = nal_data->libnal;
467 gm_status_t gm_status = 0;
470 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
473 if (status != GM_SUCCESS) {
474 GMNAL_GM_LOCK(nal_data);
475 gm_status = gm_node_id_to_global_id(nal_data->gm_port,
476 stxd->gm_target_node,&gnid);
477 GMNAL_GM_UNLOCK(nal_data);
478 if (gm_status != GM_SUCCESS) {
479 CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
483 CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s] to [%u]\n",
484 stxd, gmnal_gm_error(status), gnid);
493 case(GM_SEND_DROPPED):
495 * do a resend on the dropped ones
497 CDEBUG(D_ERROR, "send stxd [%p] was dropped "
498 "resending\n", context);
499 GMNAL_GM_LOCK(nal_data);
500 gm_send_to_peer_with_callback(nal_data->gm_port,
505 stxd->gm_target_node,
506 gmnal_small_tx_callback,
508 GMNAL_GM_UNLOCK(nal_data);
512 case(GM_SEND_TIMED_OUT):
516 CDEBUG(D_INFO, "calling gm_drop_sends\n");
517 GMNAL_GM_LOCK(nal_data);
518 gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
519 stxd->gm_target_node, GMNAL_GM_PORT_ID,
520 gmnal_drop_sends_callback, context);
521 GMNAL_GM_UNLOCK(nal_data);
530 case(GM_INTERRUPTED):
532 case(GM_INPUT_BUFFER_TOO_SMALL):
533 case(GM_OUTPUT_BUFFER_TOO_SMALL):
535 case(GM_MEMORY_FAULT):
536 case(GM_INVALID_PARAMETER):
537 case(GM_OUT_OF_MEMORY):
538 case(GM_INVALID_COMMAND):
539 case(GM_PERMISSION_DENIED):
540 case(GM_INTERNAL_ERROR):
542 case(GM_UNSUPPORTED_DEVICE):
543 case(GM_SEND_REJECTED):
544 case(GM_SEND_TARGET_PORT_CLOSED):
545 case(GM_SEND_TARGET_NODE_UNREACHABLE):
546 case(GM_SEND_PORT_CLOSED):
547 case(GM_NODE_ID_NOT_YET_SET):
548 case(GM_STILL_SHUTTING_DOWN):
550 case(GM_NO_SUCH_DEVICE):
552 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
553 case(GM_UNTRANSLATED_SYSTEM_ERROR):
554 case(GM_ACCESS_DENIED):
555 case(GM_NO_DRIVER_SUPPORT):
556 case(GM_PTE_REF_CNT_OVERFLOW):
557 case(GM_NOT_SUPPORTED_IN_KERNEL):
558 case(GM_NOT_SUPPORTED_ON_ARCH):
561 case(GM_DATA_CORRUPTED):
562 case(GM_HARDWARE_FAULT):
563 case(GM_SEND_ORPHANED):
564 case(GM_MINOR_OVERFLOW):
565 case(GM_PAGE_TABLE_FULL):
567 case(GM_INVALID_PORT_NUMBER):
568 case(GM_DEV_NOT_FOUND):
569 case(GM_FIRMWARE_NOT_RUNNING):
570 case(GM_YP_NO_MATCH):
572 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
573 stxd->gm_target_node, GMNAL_GM_PORT_ID,
574 gmnal_resume_sending_callback, context);
581 * If this is a large message init,
582 * we're not finished with the data yet,
583 * so can't call lib_finalise.
584 * However, we're also holding on to a
585 * stxd here (to keep track of the source
586 * iovec only). Should use another structure
587 * to keep track of iovec and return stxd to
590 if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
591 CDEBUG(D_INFO, "large transmit done\n");
594 gmnal_return_stxd(nal_data, stxd);
595 lib_finalize(libnal, stxd, cookie, PTL_OK);
600 * After an error on the port
601 * call this to allow future sends to complete
603 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
606 gmnal_data_t *nal_data;
607 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
608 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
609 gmnal_return_stxd(stxd->nal_data, stxd);
614 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
617 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
618 gmnal_data_t *nal_data = stxd->nal_data;
620 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
621 if (status == GM_SUCCESS) {
622 GMNAL_GM_LOCK(nal_data);
623 gm_send_to_peer_with_callback(gm_port, stxd->buffer,
624 stxd->gm_size, stxd->msg_size,
626 stxd->gm_target_node,
627 gmnal_small_tx_callback,
629 GMNAL_GM_UNLOCK(nal_data);
631 CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
632 "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
641 * Begine a large transmit.
642 * Do a gm_register of the memory pointed to by the iovec
643 * and send details to the receiver. The receiver does a gm_get
644 * to pull the data and sends and ack when finished. Upon receipt of
645 * this ack, deregister the memory. Only 1 send token is required here.
648 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
649 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
650 unsigned int niov, struct iovec *iov, size_t offset, int size)
653 gmnal_data_t *nal_data;
654 gmnal_stxd_t *stxd = NULL;
656 gmnal_msghdr_t *msghdr = NULL;
657 unsigned int local_nid;
658 int mlen = 0; /* the size of the init message data */
659 struct iovec *iov_dup = NULL;
660 gm_status_t gm_status;
664 CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
665 "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
666 "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
667 global_nid, pid, niov, iov, size);
670 nal_data = (gmnal_data_t*)libnal->libnal_data;
672 CDEBUG(D_ERROR, "no libnal.\n");
673 return(GMNAL_STATUS_FAIL);
678 * Get stxd and buffer. Put local address of data in buffer,
679 * send local addresses to target,
680 * wait for the target node to suck the data over.
681 * The stxd is used to ren
683 stxd = gmnal_get_stxd(nal_data, 1);
684 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
686 stxd->type = GMNAL_LARGE_MESSAGE_INIT;
687 stxd->cookie = cookie;
690 * Copy gmnal_msg_hdr and portals header to the transmit buffer
691 * Then copy the iov in
693 buffer = stxd->buffer;
694 msghdr = (gmnal_msghdr_t*)buffer;
696 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
698 msghdr->magic = GMNAL_MAGIC;
699 msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
700 msghdr->sender_node_id = nal_data->gm_global_nid;
701 msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
702 msghdr->niov = niov ;
703 buffer += sizeof(gmnal_msghdr_t);
704 mlen = sizeof(gmnal_msghdr_t);
705 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
708 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
710 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
711 buffer += sizeof(ptl_hdr_t);
712 mlen += sizeof(ptl_hdr_t);
713 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
715 while (offset >= iov->iov_len) {
716 offset -= iov->iov_len;
721 LASSERT(offset >= 0);
723 * Store the iovs in the stxd for we can get
724 * them later if we need them
726 stxd->iov[0].iov_base = iov->iov_base + offset;
727 stxd->iov[0].iov_len = iov->iov_len - offset;
728 CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
730 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
734 * copy the iov to the buffer so target knows
735 * where to get the data from
737 CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
738 gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
739 mlen += stxd->niov*(sizeof(struct iovec));
740 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
743 * register the memory so the NIC can get hold of the data
744 * This is a slow process. it'd be good to overlap it
745 * with something else.
751 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
752 iov->iov_base, iov->iov_len);
753 GMNAL_GM_LOCK(nal_data);
754 gm_status = gm_register_memory(nal_data->gm_port,
755 iov->iov_base, iov->iov_len);
756 if (gm_status != GM_SUCCESS) {
757 GMNAL_GM_UNLOCK(nal_data);
758 CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
759 "for memory [%p] len ["LPSZ"]\n",
760 gm_status, gmnal_gm_error(gm_status),
761 iov->iov_base, iov->iov_len);
762 GMNAL_GM_LOCK(nal_data);
763 while (iov_dup != iov) {
764 gm_deregister_memory(nal_data->gm_port,
769 GMNAL_GM_UNLOCK(nal_data);
770 gmnal_return_stxd(nal_data, stxd);
774 GMNAL_GM_UNLOCK(nal_data);
779 * Send the init message to the target
781 CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
782 GMNAL_GM_LOCK(nal_data);
783 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
785 if (gm_status != GM_SUCCESS) {
786 GMNAL_GM_UNLOCK(nal_data);
787 CDEBUG(D_ERROR, "Failed to obtain local id\n");
788 gmnal_return_stxd(nal_data, stxd);
789 /* TO DO deregister memory on failure */
790 return(GMNAL_STATUS_FAIL);
792 CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
793 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
794 stxd->gm_size, mlen, GM_LOW_PRIORITY,
795 local_nid, gmnal_large_tx_callback,
797 GMNAL_GM_UNLOCK(nal_data);
799 CDEBUG(D_INFO, "done\n");
805 * Callback function indicates that send of buffer with
806 * large message iovec has completed (or failed).
809 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
811 gmnal_small_tx_callback(gm_port, context, status);
818 * Have received a buffer that contains an iovec of the sender.
819 * Do a gm_register_memory of the receivers buffer and then do a get
820 * data from the sender.
823 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
824 unsigned int nriov, struct iovec *riov, size_t offset,
825 size_t mlen, size_t rlen)
827 gmnal_data_t *nal_data = libnal->libnal_data;
828 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
830 struct iovec *riov_dup;
832 gmnal_msghdr_t *msghdr = NULL;
833 gm_status_t gm_status;
835 CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
836 "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
837 libnal, private, cookie, nriov, riov, mlen, rlen);
840 CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
841 lib_finalize(libnal, private, cookie, PTL_FAIL);
845 buffer = srxd->buffer;
846 msghdr = (gmnal_msghdr_t*)buffer;
847 buffer += sizeof(gmnal_msghdr_t);
848 buffer += sizeof(ptl_hdr_t);
851 * Store the senders stxd address in the srxd for this message
852 * The gmnal_large_message_ack needs it to notify the sender
853 * the pull of data is complete
855 srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
858 * Register the receivers memory
860 * tell the sender that we got the data
861 * then tell the receiver we got the data
863 * If the iovecs match, could interleave
864 * gm_registers and gm_gets for each element
866 while (offset >= riov->iov_len) {
867 offset -= riov->iov_len;
871 LASSERT (nriov >= 0);
872 LASSERT (offset >= 0);
874 * do this so the final gm_get callback can deregister the memory
876 PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
878 srxd->riov[0].iov_base = riov->iov_base + offset;
879 srxd->riov[0].iov_len = riov->iov_len - offset;
881 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
888 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
889 riov->iov_base, riov->iov_len);
890 GMNAL_GM_LOCK(nal_data);
891 gm_status = gm_register_memory(nal_data->gm_port,
892 riov->iov_base, riov->iov_len);
893 if (gm_status != GM_SUCCESS) {
894 GMNAL_GM_UNLOCK(nal_data);
895 CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
896 "for memory [%p] len ["LPSZ"]\n",
897 gm_status, gmnal_gm_error(gm_status),
898 riov->iov_base, riov->iov_len);
899 GMNAL_GM_LOCK(nal_data);
900 while (riov_dup != riov) {
901 gm_deregister_memory(nal_data->gm_port,
906 GMNAL_GM_LOCK(nal_data);
908 * give back srxd and buffer. Send NACK to sender
910 PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
913 GMNAL_GM_UNLOCK(nal_data);
918 * now do gm_get to get the data
920 srxd->cookie = cookie;
921 if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
922 nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
923 CDEBUG(D_ERROR, "can't get the data");
926 CDEBUG(D_INFO, "lgmanl_large_rx done\n");
933 * Perform a number of remote gets as part of receiving
935 * The final one to complete (i.e. the last callback to get called)
937 * gm_get requires a send token.
940 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
941 int nriov, struct iovec *riov)
946 CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
947 "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
950 ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
952 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
953 return(GMNAL_STATUS_FAIL);
955 CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
956 spin_lock_init(&srxd->callback_lock);
957 srxd->ncallbacks = ncalls;
958 srxd->callback_status = 0;
960 ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
962 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
963 return(GMNAL_STATUS_FAIL);
966 return(GMNAL_STATUS_OK);
972 * pull data from source node (source iovec) to a local iovec.
973 * The iovecs may not match which adds the complications below.
974 * Count the number of gm_gets that will be required so the callbacks
975 * can determine who is the last one.
978 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
979 struct iovec *siov, int nriov, struct iovec *riov)
983 int slen = siov->iov_len, rlen = riov->iov_len;
984 char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
985 unsigned long sbuf_long;
986 gm_remote_ptr_t remote_ptr = 0;
987 unsigned int source_node;
988 gmnal_ltxd_t *ltxd = NULL;
989 gmnal_data_t *nal_data = srxd->nal_data;
991 CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
994 CDEBUG(D_ERROR, "Bad args No nal_data\n");
995 return(GMNAL_STATUS_FAIL);
997 GMNAL_GM_LOCK(nal_data);
998 if (gm_global_id_to_node_id(nal_data->gm_port,
999 srxd->gm_source_node,
1000 &source_node) != GM_SUCCESS) {
1002 CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
1003 "to local node_id\n", srxd->gm_source_node);
1004 GMNAL_GM_UNLOCK(nal_data);
1005 return(GMNAL_STATUS_FAIL);
1007 GMNAL_GM_UNLOCK(nal_data);
1009 * We need a send token to use gm_get
1010 * getting an stxd gets us a send token.
1011 * the stxd is used as the context to the
1012 * callback function (so stxd can be returned).
1013 * Set pointer in stxd to srxd so callback count in srxd
1014 * can be decremented to find last callback to complete
1016 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
1017 srxd->gm_source_node, source_node);
1021 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1022 sbuf, slen, rbuf, rlen);
1026 CDEBUG(D_INFO, "slen>rlen\n");
1027 ltxd = gmnal_get_ltxd(nal_data);
1029 GMNAL_GM_LOCK(nal_data);
1031 * funny business to get rid
1032 * of compiler warning
1034 sbuf_long = (unsigned long) sbuf;
1035 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1036 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1037 rlen, GM_LOW_PRIORITY, source_node,
1039 gmnal_remote_get_callback, ltxd);
1040 GMNAL_GM_UNLOCK(nal_data);
1043 * at the end of 1 iov element
1049 rbuf = riov->iov_base;
1050 rlen = riov->iov_len;
1051 } else if (rlen > slen) {
1054 CDEBUG(D_INFO, "slen<rlen\n");
1055 ltxd = gmnal_get_ltxd(nal_data);
1057 GMNAL_GM_LOCK(nal_data);
1058 sbuf_long = (unsigned long) sbuf;
1059 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1060 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1061 slen, GM_LOW_PRIORITY, source_node,
1063 gmnal_remote_get_callback, ltxd);
1064 GMNAL_GM_UNLOCK(nal_data);
1067 * at end of siov element
1072 sbuf = siov->iov_base;
1073 slen = siov->iov_len;
1077 CDEBUG(D_INFO, "rlen=slen\n");
1078 ltxd = gmnal_get_ltxd(nal_data);
1080 GMNAL_GM_LOCK(nal_data);
1081 sbuf_long = (unsigned long) sbuf;
1082 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1083 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1084 rlen, GM_LOW_PRIORITY, source_node,
1086 gmnal_remote_get_callback, ltxd);
1087 GMNAL_GM_UNLOCK(nal_data);
1090 * at end of siov and riov element
1093 sbuf = siov->iov_base;
1094 slen = siov->iov_len;
1097 rbuf = riov->iov_base;
1098 rlen = riov->iov_len;
1107 * The callback function that is invoked after each gm_get call completes.
1108 * Multiple callbacks may be invoked for 1 transaction, only the final
1109 * callback has work to do.
1112 gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
1116 gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
1117 gmnal_srxd_t *srxd = ltxd->srxd;
1118 lib_nal_t *libnal = srxd->nal_data->libnal;
1122 gmnal_data_t *nal_data;
1124 CDEBUG(D_TRACE, "called for context [%p]\n", context);
1126 if (status != GM_SUCCESS) {
1127 CDEBUG(D_ERROR, "reports error [%d][%s]\n", status,
1128 gmnal_gm_error(status));
1131 spin_lock(&srxd->callback_lock);
1133 srxd->callback_status |= status;
1134 lastone = srxd->ncallbacks?0:1;
1135 spin_unlock(&srxd->callback_lock);
1136 nal_data = srxd->nal_data;
1139 * everyone returns a send token
1141 gmnal_return_ltxd(nal_data, ltxd);
1144 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1149 * Let our client application proceed
1151 CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
1152 lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1155 * send an ack to the sender to let him know we got the data
1157 gmnal_large_tx_ack(nal_data, srxd);
1160 * Unregister the memory that was used
1161 * This is a very slow business (slower then register)
1163 nriov = srxd->nriov;
1165 GMNAL_GM_LOCK(nal_data);
1167 CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
1168 if (gm_deregister_memory(srxd->nal_data->gm_port,
1169 riov->iov_base, riov->iov_len)) {
1170 CDEBUG(D_ERROR, "failed to deregister memory [%p]\n",
1175 GMNAL_GM_UNLOCK(nal_data);
1176 PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1179 * repost the receive buffer (return receive token)
1181 GMNAL_GM_LOCK(nal_data);
1182 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
1183 srxd->gmsize, GM_LOW_PRIORITY, 0);
1184 GMNAL_GM_UNLOCK(nal_data);
1191 * Called on target node.
1192 * After pulling data from a source node
1193 * send an ack message to indicate the large transmit is complete.
1196 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1200 gmnal_msghdr_t *msghdr;
1201 void *buffer = NULL;
1202 unsigned int local_nid;
1203 gm_status_t gm_status = GM_SUCCESS;
1205 CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
1206 srxd->gm_source_node);
1208 GMNAL_GM_LOCK(nal_data);
1209 gm_status = gm_global_id_to_node_id(nal_data->gm_port,
1210 srxd->gm_source_node, &local_nid);
1211 GMNAL_GM_UNLOCK(nal_data);
1212 if (gm_status != GM_SUCCESS) {
1213 CDEBUG(D_ERROR, "Failed to obtain local id\n");
1216 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1218 stxd = gmnal_get_stxd(nal_data, 1);
1219 CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1221 stxd->nal_data = nal_data;
1222 stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1225 * Copy gmnal_msg_hdr and portals header to the transmit buffer
1226 * Then copy the data in
1228 buffer = stxd->buffer;
1229 msghdr = (gmnal_msghdr_t*)buffer;
1232 * Add in the address of the original stxd from the sender node
1233 * so it knows which thread to notify.
1235 msghdr->magic = GMNAL_MAGIC;
1236 msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1237 msghdr->sender_node_id = nal_data->gm_global_nid;
1238 msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
1239 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1241 CDEBUG(D_INFO, "sending\n");
1242 stxd->msg_size= sizeof(gmnal_msghdr_t);
1245 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1246 "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1247 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
1248 stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1249 GMNAL_GM_LOCK(nal_data);
1250 stxd->gm_priority = GM_LOW_PRIORITY;
1251 stxd->gm_target_node = local_nid;
1252 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
1253 stxd->gm_size, stxd->msg_size,
1254 GM_LOW_PRIORITY, local_nid,
1255 gmnal_large_tx_ack_callback,
1258 GMNAL_GM_UNLOCK(nal_data);
1259 CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1266 * A callback to indicate the small transmit operation is compete
1267 * Check for errors and try to deal with them.
1268 * Call lib_finalise to inform the client application that the
1269 * send is complete and the memory can be reused.
1270 * Return the stxd when finished with it (returns a send token)
1273 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
1276 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
1277 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
1280 CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
1283 CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1285 gmnal_return_stxd(stxd->nal_data, stxd);
1287 GMNAL_GM_UNLOCK(nal_data);
1292 * Indicates the large transmit operation is compete.
1293 * Called on transmit side (means data has been pulled by receiver
1295 * Call lib_finalise to inform the client application that the send
1296 * is complete, deregister the memory and return the stxd.
1297 * Finally, report the rx buffer that the ack message was delivered in.
1300 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1302 lib_nal_t *libnal = nal_data->libnal;
1303 gmnal_stxd_t *stxd = NULL;
1304 gmnal_msghdr_t *msghdr = NULL;
1305 void *buffer = NULL;
1309 CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1311 buffer = srxd->buffer;
1312 msghdr = (gmnal_msghdr_t*)buffer;
1313 stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
1315 CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1317 lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1320 * extract the iovec from the stxd, deregister the memory.
1321 * free the space used to store the iovec
1324 while(stxd->niov--) {
1325 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1326 iov->iov_base, iov->iov_len);
1327 GMNAL_GM_LOCK(nal_data);
1328 gm_deregister_memory(nal_data->gm_port, iov->iov_base,
1330 GMNAL_GM_UNLOCK(nal_data);
1335 * return the send token
1336 * TO DO It is bad to hold onto the send token so long?
1338 gmnal_return_stxd(nal_data, stxd);
1342 * requeue the receive buffer
1344 gmnal_rx_requeue_buffer(nal_data, srxd);