1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 * The caretaker thread
30 * This is main thread of execution for the NAL side
31 * This guy waits in gm_blocking_recvive and gets
32 * woken up when the myrinet adaptor gets an interrupt.
33 * Hands off receive operations to the receive thread
34 * This thread Looks after gm_callbacks etc inline.
37 gmnal_ct_thread(void *arg)
39 gmnal_data_t *nal_data;
40 gm_recv_event_t *rxevent = NULL;
41 gm_recv_t *recv = NULL;
44 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
48 nal_data = (gmnal_data_t*)arg;
49 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
53 nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
55 GMNAL_GM_LOCK(nal_data);
56 while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
57 CDEBUG(D_NET, "waiting\n");
58 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
59 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
60 CDEBUG(D_INFO, "time to exit\n");
63 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
64 switch (GM_RECV_EVENT_TYPE(rxevent)) {
67 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
68 recv = (gm_recv_t*)&rxevent->recv;
69 GMNAL_GM_UNLOCK(nal_data);
70 gmnal_add_rxtwe(nal_data, recv);
71 GMNAL_GM_LOCK(nal_data);
72 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
74 case(_GM_SLEEP_EVENT):
76 * Blocking receive above just returns
77 * immediatly with _GM_SLEEP_EVENT
78 * Don't know what this is
80 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
81 GMNAL_GM_UNLOCK(nal_data);
82 gm_unknown(nal_data->gm_port, rxevent);
83 GMNAL_GM_LOCK(nal_data);
84 CDEBUG(D_INFO, "Awake from gm_unknown\n");
89 * Don't know what this is
90 * gm_unknown will make sense of it
91 * Should be able to do something with
92 * FAST_RECV_EVENTS here.
94 CDEBUG(D_NET, "Passing event to gm_unknown\n");
95 GMNAL_GM_UNLOCK(nal_data);
96 gm_unknown(nal_data->gm_port, rxevent);
97 GMNAL_GM_LOCK(nal_data);
98 CDEBUG(D_INFO, "Processed unknown event\n");
101 GMNAL_GM_UNLOCK(nal_data);
102 nal_data->ctthread_flag = GMNAL_THREAD_RESET;
103 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
104 return(GMNAL_STATUS_OK);
109 * process a receive event
111 int gmnal_rx_thread(void *arg)
113 gmnal_data_t *nal_data;
115 gmnal_rxtwe_t *we = NULL;
118 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
122 nal_data = (gmnal_data_t*)arg;
123 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
127 * set 1 bit for each thread started
128 * doesn't matter which bit
130 spin_lock(&nal_data->rxthread_flag_lock);
131 if (nal_data->rxthread_flag)
132 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
134 nal_data->rxthread_flag = 1;
135 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
136 spin_unlock(&nal_data->rxthread_flag_lock);
138 while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
139 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
140 we = gmnal_get_rxtwe(nal_data);
142 CDEBUG(D_INFO, "Receive thread time to exit\n");
147 switch(((gmnal_msghdr_t*)buffer)->type) {
148 case(GMNAL_SMALL_MESSAGE):
149 gmnal_pre_receive(nal_data, we,
150 GMNAL_SMALL_MESSAGE);
152 case(GMNAL_LARGE_MESSAGE_INIT):
153 gmnal_pre_receive(nal_data, we,
154 GMNAL_LARGE_MESSAGE_INIT);
156 case(GMNAL_LARGE_MESSAGE_ACK):
157 gmnal_pre_receive(nal_data, we,
158 GMNAL_LARGE_MESSAGE_ACK);
161 CDEBUG(D_ERROR, "Unsupported message type\n");
162 gmnal_rx_bad(nal_data, we, NULL);
164 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
167 spin_lock(&nal_data->rxthread_flag_lock);
168 nal_data->rxthread_flag/=2;
169 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
170 spin_unlock(&nal_data->rxthread_flag_lock);
171 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
172 return(GMNAL_STATUS_OK);
178 * Start processing a small message receive
179 * Get here from gmnal_receive_thread
180 * Hand off to lib_parse, which calls cb_recv
181 * which hands back to gmnal_small_receive
182 * Deal with all endian stuff here.
185 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
187 gmnal_srxd_t *srxd = NULL;
189 unsigned int snode, sport, type, length;
190 gmnal_msghdr_t *gmnal_msghdr;
191 ptl_hdr_t *portals_hdr;
194 CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
195 nal_data, we, gmnal_type);
204 gmnal_msghdr = (gmnal_msghdr_t*)buffer;
205 portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
207 CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
208 "type [%d], length [%d], buffer [%p]\n",
209 snode, sport, type, length, buffer);
210 CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
211 "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
212 gmnal_msghdr->magic, gmnal_msghdr->type);
213 CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
214 "dest_node ["LPD64"]\n", portals_hdr->src_nid,
215 portals_hdr->dest_nid);
219 * Get a receive descriptor for this message
221 srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
222 CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
224 CDEBUG(D_ERROR, "Failed to get receive descriptor\n");
225 /* I think passing a NULL srxd to lib_parse will crash
228 lib_parse(nal_data->libnal, portals_hdr, srxd);
229 return(GMNAL_STATUS_FAIL);
233 * no need to bother portals library with this
235 if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
236 gmnal_large_tx_ack_received(nal_data, srxd);
237 return(GMNAL_STATUS_OK);
240 srxd->nal_data = nal_data;
241 srxd->type = gmnal_type;
242 srxd->nsiov = gmnal_msghdr->niov;
243 srxd->gm_source_node = gmnal_msghdr->sender_node_id;
245 CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
246 buffer+GMNAL_MSGHDR_SIZE);
248 * control passes to lib, which calls cb_recv
249 * cb_recv is responsible for returning the buffer
252 rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
255 /* I just received garbage; take appropriate action... */
259 return(GMNAL_STATUS_OK);
265 * After a receive has been processed,
266 * hang out the receive buffer again.
267 * This implicitly returns a receive token.
270 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
272 CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
274 CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
276 GMNAL_GM_LOCK(nal_data);
277 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
278 srxd->gmsize, GM_LOW_PRIORITY, 0 );
279 GMNAL_GM_UNLOCK(nal_data);
281 return(GMNAL_STATUS_OK);
286 * Handle a bad message
287 * A bad message is one we don't expect or can't interpret
290 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
292 CDEBUG(D_TRACE, "Can't handle message\n");
295 srxd = gmnal_rxbuffer_to_srxd(nal_data,
298 gmnal_rx_requeue_buffer(nal_data, srxd);
300 CDEBUG(D_ERROR, "Can't find a descriptor for this buffer\n");
304 return(GMNAL_STATUS_FAIL);
307 return(GMNAL_STATUS_OK);
313 * Process a small message receive.
314 * Get here from gmnal_receive_thread, gmnal_pre_receive
316 * Put data from prewired receive buffer into users buffer(s)
317 * Hang out the receive buffer again for another receive
321 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
322 unsigned int niov, struct iovec *iov, size_t offset, size_t mlen, size_t rlen)
324 gmnal_srxd_t *srxd = NULL;
326 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
329 CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
332 CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
333 lib_finalize(libnal, private, cookie, PTL_FAIL);
337 srxd = (gmnal_srxd_t*)private;
338 buffer = srxd->buffer;
339 buffer += sizeof(gmnal_msghdr_t);
340 buffer += sizeof(ptl_hdr_t);
343 if (offset >= iov->iov_len) {
344 offset -= iov->iov_len;
345 } else if (offset > 0) {
346 CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
347 "offset %d, len ["LPSZ"]\n", iov,
348 iov->iov_base + offset, iov->iov_len, offset,
349 iov->iov_len - offset);
350 gm_bcopy(buffer, iov->iov_base + offset,
351 iov->iov_len - offset);
353 buffer += iov->iov_len - offset;
355 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov,
357 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
358 buffer += iov->iov_len;
365 * let portals library know receive is complete
367 CDEBUG(D_PORTALS, "calling lib_finalize\n");
368 lib_finalize(libnal, private, cookie, PTL_OK);
370 * return buffer so it can be used again
372 CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
373 GMNAL_GM_LOCK(nal_data);
374 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
375 srxd->gmsize, GM_LOW_PRIORITY, 0);
376 GMNAL_GM_UNLOCK(nal_data);
383 * Start a small transmit.
384 * Get a send token (and wired transmit buffer).
385 * Copy data from senders buffer to wired buffer and
386 * initiate gm_send from the wired buffer.
387 * The callback function informs when the send is complete.
390 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
391 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
392 unsigned int niov, struct iovec *iov, size_t offset, int size)
394 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
395 gmnal_stxd_t *stxd = NULL;
397 gmnal_msghdr_t *msghdr = NULL;
399 unsigned int local_nid;
400 gm_status_t gm_status = GM_SUCCESS;
402 CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
403 "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
404 "iov [%p] size [%d]\n", libnal, private, cookie, hdr, type,
405 global_nid, pid, niov, iov, size);
407 CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
408 hdr->dest_nid, hdr->src_nid);
411 CDEBUG(D_ERROR, "no nal_data\n");
412 return(GMNAL_STATUS_FAIL);
414 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
417 GMNAL_GM_LOCK(nal_data);
418 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
420 GMNAL_GM_UNLOCK(nal_data);
421 if (gm_status != GM_SUCCESS) {
422 CDEBUG(D_ERROR, "Failed to obtain local id\n");
423 return(GMNAL_STATUS_FAIL);
425 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
427 stxd = gmnal_get_stxd(nal_data, 1);
428 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
430 stxd->type = GMNAL_SMALL_MESSAGE;
431 stxd->cookie = cookie;
434 * Copy gmnal_msg_hdr and portals header to the transmit buffer
435 * Then copy the data in
437 buffer = stxd->buffer;
438 msghdr = (gmnal_msghdr_t*)buffer;
440 msghdr->magic = GMNAL_MAGIC;
441 msghdr->type = GMNAL_SMALL_MESSAGE;
442 msghdr->sender_node_id = nal_data->gm_global_nid;
443 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
445 buffer += sizeof(gmnal_msghdr_t);
447 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
448 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
450 buffer += sizeof(ptl_hdr_t);
453 if (offset >= iov->iov_len) {
454 offset -= iov->iov_len;
455 } else if (offset > 0) {
456 CDEBUG(D_INFO, "processing iov [%p] base [%p] len ["LPSZ"] to [%p]\n",
457 iov, iov->iov_base + offset, iov->iov_len - offset, buffer);
458 gm_bcopy(iov->iov_base + offset, buffer, iov->iov_len - offset);
459 buffer+= iov->iov_len - offset;
462 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n",
463 iov, iov->iov_len, buffer);
464 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
465 buffer+= iov->iov_len;
470 CDEBUG(D_INFO, "sending\n");
471 tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
472 stxd->msg_size = tot_size;
475 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
476 "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
477 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
478 stxd->msg_size, global_nid, local_nid, stxd);
480 GMNAL_GM_LOCK(nal_data);
481 stxd->gm_priority = GM_LOW_PRIORITY;
482 stxd->gm_target_node = local_nid;
483 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
484 stxd->gm_size, stxd->msg_size,
485 GM_LOW_PRIORITY, local_nid,
486 gmnal_small_tx_callback, (void*)stxd);
487 GMNAL_GM_UNLOCK(nal_data);
488 CDEBUG(D_INFO, "done\n");
495 * A callback to indicate the small transmit operation is compete
496 * Check for erros and try to deal with them.
497 * Call lib_finalise to inform the client application that the send
498 * is complete and the memory can be reused.
499 * Return the stxd when finished with it (returns a send token)
502 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
504 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
505 lib_msg_t *cookie = stxd->cookie;
506 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
507 lib_nal_t *libnal = nal_data->libnal;
510 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
513 if (status != GM_SUCCESS) {
514 CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s]\n",
515 stxd, gmnal_gm_error(status));
524 case(GM_SEND_DROPPED):
526 * do a resend on the dropped ones
528 CDEBUG(D_ERROR, "send stxd [%p] was dropped "
529 "resending\n", context);
530 GMNAL_GM_LOCK(nal_data);
531 gm_send_to_peer_with_callback(nal_data->gm_port,
536 stxd->gm_target_node,
537 gmnal_small_tx_callback,
539 GMNAL_GM_UNLOCK(nal_data);
543 case(GM_SEND_TIMED_OUT):
547 CDEBUG(D_INFO, "calling gm_drop_sends\n");
548 GMNAL_GM_LOCK(nal_data);
549 gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
550 stxd->gm_target_node, GMNAL_GM_PORT,
551 gmnal_drop_sends_callback, context);
552 GMNAL_GM_UNLOCK(nal_data);
561 case(GM_INTERRUPTED):
563 case(GM_INPUT_BUFFER_TOO_SMALL):
564 case(GM_OUTPUT_BUFFER_TOO_SMALL):
566 case(GM_MEMORY_FAULT):
567 case(GM_INVALID_PARAMETER):
568 case(GM_OUT_OF_MEMORY):
569 case(GM_INVALID_COMMAND):
570 case(GM_PERMISSION_DENIED):
571 case(GM_INTERNAL_ERROR):
573 case(GM_UNSUPPORTED_DEVICE):
574 case(GM_SEND_REJECTED):
575 case(GM_SEND_TARGET_PORT_CLOSED):
576 case(GM_SEND_TARGET_NODE_UNREACHABLE):
577 case(GM_SEND_PORT_CLOSED):
578 case(GM_NODE_ID_NOT_YET_SET):
579 case(GM_STILL_SHUTTING_DOWN):
581 case(GM_NO_SUCH_DEVICE):
583 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
584 case(GM_UNTRANSLATED_SYSTEM_ERROR):
585 case(GM_ACCESS_DENIED):
586 case(GM_NO_DRIVER_SUPPORT):
587 case(GM_PTE_REF_CNT_OVERFLOW):
588 case(GM_NOT_SUPPORTED_IN_KERNEL):
589 case(GM_NOT_SUPPORTED_ON_ARCH):
592 case(GM_DATA_CORRUPTED):
593 case(GM_HARDWARE_FAULT):
594 case(GM_SEND_ORPHANED):
595 case(GM_MINOR_OVERFLOW):
596 case(GM_PAGE_TABLE_FULL):
598 case(GM_INVALID_PORT_NUMBER):
599 case(GM_DEV_NOT_FOUND):
600 case(GM_FIRMWARE_NOT_RUNNING):
601 case(GM_YP_NO_MATCH):
603 CDEBUG(D_ERROR, "Unknown send error\n");
604 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
605 stxd->gm_target_node, GMNAL_GM_PORT,
606 gmnal_resume_sending_callback, context);
613 * If this is a large message init,
614 * we're not finished with the data yet,
615 * so can't call lib_finalise.
616 * However, we're also holding on to a
617 * stxd here (to keep track of the source
618 * iovec only). Should use another structure
619 * to keep track of iovec and return stxd to
622 if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
623 CDEBUG(D_INFO, "large transmit done\n");
626 gmnal_return_stxd(nal_data, stxd);
627 lib_finalize(libnal, stxd, cookie, PTL_OK);
632 * After an error on the port
633 * call this to allow future sends to complete
635 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
638 gmnal_data_t *nal_data;
639 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
640 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
641 gmnal_return_stxd(stxd->nal_data, stxd);
646 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
649 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
650 gmnal_data_t *nal_data = stxd->nal_data;
652 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
653 if (status == GM_SUCCESS) {
654 GMNAL_GM_LOCK(nal_data);
655 gm_send_to_peer_with_callback(gm_port, stxd->buffer,
656 stxd->gm_size, stxd->msg_size,
658 stxd->gm_target_node,
659 gmnal_small_tx_callback,
661 GMNAL_GM_LOCK(nal_data);
663 CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
664 "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
673 * Begine a large transmit.
674 * Do a gm_register of the memory pointed to by the iovec
675 * and send details to the receiver. The receiver does a gm_get
676 * to pull the data and sends and ack when finished. Upon receipt of
677 * this ack, deregister the memory. Only 1 send token is required here.
680 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
681 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
682 unsigned int niov, struct iovec *iov, size_t offset, int size)
685 gmnal_data_t *nal_data;
686 gmnal_stxd_t *stxd = NULL;
688 gmnal_msghdr_t *msghdr = NULL;
689 unsigned int local_nid;
690 int mlen = 0; /* the size of the init message data */
691 struct iovec *iov_dup = NULL;
692 gm_status_t gm_status;
696 CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
697 "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
698 "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
699 global_nid, pid, niov, iov, size);
702 nal_data = (gmnal_data_t*)libnal->libnal_data;
704 CDEBUG(D_ERROR, "no libnal.\n");
705 return(GMNAL_STATUS_FAIL);
710 * Get stxd and buffer. Put local address of data in buffer,
711 * send local addresses to target,
712 * wait for the target node to suck the data over.
713 * The stxd is used to ren
715 stxd = gmnal_get_stxd(nal_data, 1);
716 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
718 stxd->type = GMNAL_LARGE_MESSAGE_INIT;
719 stxd->cookie = cookie;
722 * Copy gmnal_msg_hdr and portals header to the transmit buffer
723 * Then copy the iov in
725 buffer = stxd->buffer;
726 msghdr = (gmnal_msghdr_t*)buffer;
728 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
730 msghdr->magic = GMNAL_MAGIC;
731 msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
732 msghdr->sender_node_id = nal_data->gm_global_nid;
734 msghdr->niov = niov ;
735 buffer += sizeof(gmnal_msghdr_t);
736 mlen = sizeof(gmnal_msghdr_t);
737 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
740 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
742 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
743 buffer += sizeof(ptl_hdr_t);
744 mlen += sizeof(ptl_hdr_t);
745 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
747 while (offset >= iov->iov_len) {
748 offset -= iov->iov_len;
753 LASSERT(offset >= 0);
755 * Store the iovs in the stxd for we can get
756 * them later if we need them
758 stxd->iov[0].iov_base = iov->iov_base + offset;
759 stxd->iov[0].iov_len = iov->iov_len - offset;
760 CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
762 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
766 * copy the iov to the buffer so target knows
767 * where to get the data from
769 CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
770 gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
771 mlen += stxd->niov*(sizeof(struct iovec));
772 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
775 * register the memory so the NIC can get hold of the data
776 * This is a slow process. it'd be good to overlap it
777 * with something else.
783 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
784 iov->iov_base, iov->iov_len);
785 GMNAL_GM_LOCK(nal_data);
786 gm_status = gm_register_memory(nal_data->gm_port,
787 iov->iov_base, iov->iov_len);
788 if (gm_status != GM_SUCCESS) {
789 GMNAL_GM_UNLOCK(nal_data);
790 CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
791 "for memory [%p] len ["LPSZ"]\n",
792 gm_status, gmnal_gm_error(gm_status),
793 iov->iov_base, iov->iov_len);
794 GMNAL_GM_LOCK(nal_data);
795 while (iov_dup != iov) {
796 gm_deregister_memory(nal_data->gm_port,
801 GMNAL_GM_UNLOCK(nal_data);
802 gmnal_return_stxd(nal_data, stxd);
806 GMNAL_GM_UNLOCK(nal_data);
811 * Send the init message to the target
813 CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
814 GMNAL_GM_LOCK(nal_data);
815 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
817 if (gm_status != GM_SUCCESS) {
818 GMNAL_GM_UNLOCK(nal_data);
819 CDEBUG(D_ERROR, "Failed to obtain local id\n");
820 gmnal_return_stxd(nal_data, stxd);
821 /* TO DO deregister memory on failure */
822 return(GMNAL_STATUS_FAIL);
824 CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
825 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
826 stxd->gm_size, mlen, GM_LOW_PRIORITY,
827 local_nid, gmnal_large_tx_callback,
829 GMNAL_GM_UNLOCK(nal_data);
831 CDEBUG(D_INFO, "done\n");
837 * Callback function indicates that send of buffer with
838 * large message iovec has completed (or failed).
841 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
843 gmnal_small_tx_callback(gm_port, context, status);
850 * Have received a buffer that contains an iovec of the sender.
851 * Do a gm_register_memory of the receivers buffer and then do a get
852 * data from the sender.
855 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
856 unsigned int nriov, struct iovec *riov, size_t offset,
857 size_t mlen, size_t rlen)
859 gmnal_data_t *nal_data = libnal->libnal_data;
860 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
862 struct iovec *riov_dup;
864 gmnal_msghdr_t *msghdr = NULL;
865 gm_status_t gm_status;
867 CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
868 "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
869 libnal, private, cookie, nriov, riov, mlen, rlen);
872 CDEBUG(D_ERROR, "gmnal_large_rx no context\n");
873 lib_finalize(libnal, private, cookie, PTL_FAIL);
877 buffer = srxd->buffer;
878 msghdr = (gmnal_msghdr_t*)buffer;
879 buffer += sizeof(gmnal_msghdr_t);
880 buffer += sizeof(ptl_hdr_t);
883 * Store the senders stxd address in the srxd for this message
884 * The gmnal_large_message_ack needs it to notify the sender
885 * the pull of data is complete
887 srxd->source_stxd = msghdr->stxd;
890 * Register the receivers memory
892 * tell the sender that we got the data
893 * then tell the receiver we got the data
895 * If the iovecs match, could interleave
896 * gm_registers and gm_gets for each element
898 while (offset >= riov->iov_len) {
899 offset -= riov->iov_len;
903 LASSERT (nriov >= 0);
904 LASSERT (offset >= 0);
906 * do this so the final gm_get callback can deregister the memory
908 PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
910 srxd->riov[0].iov_base = riov->iov_base + offset;
911 srxd->riov[0].iov_len = riov->iov_len - offset;
913 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
920 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
921 riov->iov_base, riov->iov_len);
922 GMNAL_GM_LOCK(nal_data);
923 gm_status = gm_register_memory(nal_data->gm_port,
924 riov->iov_base, riov->iov_len);
925 if (gm_status != GM_SUCCESS) {
926 GMNAL_GM_UNLOCK(nal_data);
927 CDEBUG(D_ERROR, "gm_register_memory returns [%d][%s] "
928 "for memory [%p] len ["LPSZ"]\n",
929 gm_status, gmnal_gm_error(gm_status),
930 riov->iov_base, riov->iov_len);
931 GMNAL_GM_LOCK(nal_data);
932 while (riov_dup != riov) {
933 gm_deregister_memory(nal_data->gm_port,
938 GMNAL_GM_LOCK(nal_data);
940 * give back srxd and buffer. Send NACK to sender
942 PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
945 GMNAL_GM_UNLOCK(nal_data);
950 * now do gm_get to get the data
952 srxd->cookie = cookie;
953 if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
954 nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
955 CDEBUG(D_ERROR, "can't get the data");
958 CDEBUG(D_INFO, "lgmanl_large_rx done\n");
965 * Perform a number of remote gets as part of receiving
967 * The final one to complete (i.e. the last callback to get called)
969 * gm_get requires a send token.
972 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
973 int nriov, struct iovec *riov)
978 CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
979 "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
982 ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
984 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
985 return(GMNAL_STATUS_FAIL);
987 CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
988 spin_lock_init(&srxd->callback_lock);
989 srxd->ncallbacks = ncalls;
990 srxd->callback_status = 0;
992 ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
994 CDEBUG(D_ERROR, "there's something wrong with the iovecs\n");
995 return(GMNAL_STATUS_FAIL);
998 return(GMNAL_STATUS_OK);
1004 * pull data from source node (source iovec) to a local iovec.
1005 * The iovecs may not match which adds the complications below.
1006 * Count the number of gm_gets that will be required to the callbacks
1007 * can determine who is the last one.
1010 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
1011 struct iovec *siov, int nriov, struct iovec *riov)
1015 int slen = siov->iov_len, rlen = riov->iov_len;
1016 char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
1017 unsigned long sbuf_long;
1018 gm_remote_ptr_t remote_ptr = 0;
1019 unsigned int source_node;
1020 gmnal_ltxd_t *ltxd = NULL;
1021 gmnal_data_t *nal_data = srxd->nal_data;
1023 CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
1026 CDEBUG(D_ERROR, "Bad args No nal_data\n");
1027 return(GMNAL_STATUS_FAIL);
1029 GMNAL_GM_LOCK(nal_data);
1030 if (gm_global_id_to_node_id(nal_data->gm_port,
1031 srxd->gm_source_node,
1032 &source_node) != GM_SUCCESS) {
1034 CDEBUG(D_ERROR, "cannot resolve global_id [%u] "
1035 "to local node_id\n", srxd->gm_source_node);
1036 GMNAL_GM_UNLOCK(nal_data);
1037 return(GMNAL_STATUS_FAIL);
1039 GMNAL_GM_UNLOCK(nal_data);
1041 * We need a send token to use gm_get
1042 * getting an stxd gets us a send token.
1043 * the stxd is used as the context to the
1044 * callback function (so stxd can be returned).
1045 * Set pointer in stxd to srxd so callback count in srxd
1046 * can be decremented to find last callback to complete
1048 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
1049 srxd->gm_source_node, source_node);
1053 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1054 sbuf, slen, rbuf, rlen);
1058 CDEBUG(D_INFO, "slen>rlen\n");
1059 ltxd = gmnal_get_ltxd(nal_data);
1061 GMNAL_GM_LOCK(nal_data);
1063 * funny business to get rid
1064 * of compiler warning
1066 sbuf_long = (unsigned long) sbuf;
1067 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1068 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1069 rlen, GM_LOW_PRIORITY, source_node,
1071 gmnal_remote_get_callback, ltxd);
1072 GMNAL_GM_UNLOCK(nal_data);
1075 * at the end of 1 iov element
1081 rbuf = riov->iov_base;
1082 rlen = riov->iov_len;
1083 } else if (rlen > slen) {
1086 CDEBUG(D_INFO, "slen<rlen\n");
1087 ltxd = gmnal_get_ltxd(nal_data);
1089 GMNAL_GM_LOCK(nal_data);
1090 sbuf_long = (unsigned long) sbuf;
1091 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1092 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1093 slen, GM_LOW_PRIORITY, source_node,
1095 gmnal_remote_get_callback, ltxd);
1096 GMNAL_GM_UNLOCK(nal_data);
1099 * at end of siov element
1104 sbuf = siov->iov_base;
1105 slen = siov->iov_len;
1109 CDEBUG(D_INFO, "rlen=slen\n");
1110 ltxd = gmnal_get_ltxd(nal_data);
1112 GMNAL_GM_LOCK(nal_data);
1113 sbuf_long = (unsigned long) sbuf;
1114 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1115 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1116 rlen, GM_LOW_PRIORITY, source_node,
1118 gmnal_remote_get_callback, ltxd);
1119 GMNAL_GM_UNLOCK(nal_data);
1122 * at end of siov and riov element
1125 sbuf = siov->iov_base;
1126 slen = siov->iov_len;
1129 rbuf = riov->iov_base;
1130 rlen = riov->iov_len;
1139 * The callback function that is invoked after each gm_get call completes.
1140 * Multiple callbacks may be invoked for 1 transaction, only the final
1141 * callback has work to do.
1144 gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
1148 gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
1149 gmnal_srxd_t *srxd = ltxd->srxd;
1150 lib_nal_t *libnal = srxd->nal_data->libnal;
1154 gmnal_data_t *nal_data;
1156 CDEBUG(D_TRACE, "called for context [%p]\n", context);
1158 if (status != GM_SUCCESS) {
1159 CDEBUG(D_ERROR, "reports error [%d][%s]\n", status,
1160 gmnal_gm_error(status));
1163 spin_lock(&srxd->callback_lock);
1165 srxd->callback_status |= status;
1166 lastone = srxd->ncallbacks?0:1;
1167 spin_unlock(&srxd->callback_lock);
1168 nal_data = srxd->nal_data;
1171 * everyone returns a send token
1173 gmnal_return_ltxd(nal_data, ltxd);
1176 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1181 * Let our client application proceed
1183 CDEBUG(D_ERROR, "final callback context[%p]\n", srxd);
1184 lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1187 * send an ack to the sender to let him know we got the data
1189 gmnal_large_tx_ack(nal_data, srxd);
1192 * Unregister the memory that was used
1193 * This is a very slow business (slower then register)
1195 nriov = srxd->nriov;
1197 GMNAL_GM_LOCK(nal_data);
1199 CDEBUG(D_ERROR, "deregister memory [%p]\n", riov->iov_base);
1200 if (gm_deregister_memory(srxd->nal_data->gm_port,
1201 riov->iov_base, riov->iov_len)) {
1202 CDEBUG(D_ERROR, "failed to deregister memory [%p]\n",
1207 GMNAL_GM_UNLOCK(nal_data);
1208 PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1211 * repost the receive buffer (return receive token)
1213 GMNAL_GM_LOCK(nal_data);
1214 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
1215 srxd->gmsize, GM_LOW_PRIORITY, 0);
1216 GMNAL_GM_UNLOCK(nal_data);
1223 * Called on target node.
1224 * After pulling data from a source node
1225 * send an ack message to indicate the large transmit is complete.
1228 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1232 gmnal_msghdr_t *msghdr;
1233 void *buffer = NULL;
1234 unsigned int local_nid;
1235 gm_status_t gm_status = GM_SUCCESS;
1237 CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
1238 srxd->gm_source_node);
1240 GMNAL_GM_LOCK(nal_data);
1241 gm_status = gm_global_id_to_node_id(nal_data->gm_port,
1242 srxd->gm_source_node, &local_nid);
1243 GMNAL_GM_UNLOCK(nal_data);
1244 if (gm_status != GM_SUCCESS) {
1245 CDEBUG(D_ERROR, "Failed to obtain local id\n");
1248 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1250 stxd = gmnal_get_stxd(nal_data, 1);
1251 CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1253 stxd->nal_data = nal_data;
1254 stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1257 * Copy gmnal_msg_hdr and portals header to the transmit buffer
1258 * Then copy the data in
1260 buffer = stxd->buffer;
1261 msghdr = (gmnal_msghdr_t*)buffer;
1264 * Add in the address of the original stxd from the sender node
1265 * so it knows which thread to notify.
1267 msghdr->magic = GMNAL_MAGIC;
1268 msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1269 msghdr->sender_node_id = nal_data->gm_global_nid;
1270 msghdr->stxd = srxd->source_stxd;
1271 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1273 CDEBUG(D_INFO, "sending\n");
1274 stxd->msg_size= sizeof(gmnal_msghdr_t);
1277 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1278 "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1279 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
1280 stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1281 GMNAL_GM_LOCK(nal_data);
1282 stxd->gm_priority = GM_LOW_PRIORITY;
1283 stxd->gm_target_node = local_nid;
1284 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
1285 stxd->gm_size, stxd->msg_size,
1286 GM_LOW_PRIORITY, local_nid,
1287 gmnal_large_tx_ack_callback,
1290 GMNAL_GM_UNLOCK(nal_data);
1291 CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1298 * A callback to indicate the small transmit operation is compete
1299 * Check for errors and try to deal with them.
1300 * Call lib_finalise to inform the client application that the
1301 * send is complete and the memory can be reused.
1302 * Return the stxd when finished with it (returns a send token)
1305 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
1308 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
1309 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
1312 CDEBUG(D_ERROR, "send completion event for unknown stxd\n");
1315 CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1317 gmnal_return_stxd(stxd->nal_data, stxd);
1319 GMNAL_GM_UNLOCK(nal_data);
1324 * Indicates the large transmit operation is compete.
1325 * Called on transmit side (means data has been pulled by receiver
1327 * Call lib_finalise to inform the client application that the send
1328 * is complete, deregister the memory and return the stxd.
1329 * Finally, report the rx buffer that the ack message was delivered in.
1332 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1334 lib_nal_t *libnal = nal_data->libnal;
1335 gmnal_stxd_t *stxd = NULL;
1336 gmnal_msghdr_t *msghdr = NULL;
1337 void *buffer = NULL;
1341 CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1343 buffer = srxd->buffer;
1344 msghdr = (gmnal_msghdr_t*)buffer;
1345 stxd = msghdr->stxd;
1347 CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1349 lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1352 * extract the iovec from the stxd, deregister the memory.
1353 * free the space used to store the iovec
1356 while(stxd->niov--) {
1357 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1358 iov->iov_base, iov->iov_len);
1359 GMNAL_GM_LOCK(nal_data);
1360 gm_deregister_memory(nal_data->gm_port, iov->iov_base,
1362 GMNAL_GM_UNLOCK(nal_data);
1367 * return the send token
1368 * TO DO It is bad to hold onto the send token so long?
1370 gmnal_return_stxd(nal_data, stxd);
1374 * requeue the receive buffer
1376 gmnal_rx_requeue_buffer(nal_data, srxd);