1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 * The caretaker thread
30 * This is main thread of execution for the NAL side
31 * This guy waits in gm_blocking_recvive and gets
32 * woken up when the myrinet adaptor gets an interrupt.
33 * Hands off receive operations to the receive thread
34 * This thread Looks after gm_callbacks etc inline.
37 gmnal_ct_thread(void *arg)
39 gmnal_data_t *nal_data;
40 gm_recv_event_t *rxevent = NULL;
41 gm_recv_t *recv = NULL;
44 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
48 nal_data = (gmnal_data_t*)arg;
49 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
51 sprintf(current->comm, "gmnal_ct");
55 nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
57 GMNAL_GM_LOCK(nal_data);
58 while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59 CDEBUG(D_NET, "waiting\n");
60 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
61 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
62 CDEBUG(D_INFO, "time to exit\n");
65 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
66 switch (GM_RECV_EVENT_TYPE(rxevent)) {
69 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70 recv = (gm_recv_t*)&rxevent->recv;
71 GMNAL_GM_UNLOCK(nal_data);
72 gmnal_add_rxtwe(nal_data, recv);
73 GMNAL_GM_LOCK(nal_data);
74 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
76 case(_GM_SLEEP_EVENT):
78 * Blocking receive above just returns
79 * immediatly with _GM_SLEEP_EVENT
80 * Don't know what this is
82 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83 GMNAL_GM_UNLOCK(nal_data);
84 gm_unknown(nal_data->gm_port, rxevent);
85 GMNAL_GM_LOCK(nal_data);
86 CDEBUG(D_INFO, "Awake from gm_unknown\n");
91 * Don't know what this is
92 * gm_unknown will make sense of it
93 * Should be able to do something with
94 * FAST_RECV_EVENTS here.
96 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97 GMNAL_GM_UNLOCK(nal_data);
98 gm_unknown(nal_data->gm_port, rxevent);
99 GMNAL_GM_LOCK(nal_data);
100 CDEBUG(D_INFO, "Processed unknown event\n");
103 GMNAL_GM_UNLOCK(nal_data);
104 nal_data->ctthread_flag = GMNAL_THREAD_RESET;
105 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
106 return(GMNAL_STATUS_OK);
111 * process a receive event
113 int gmnal_rx_thread(void *arg)
115 gmnal_data_t *nal_data;
117 gmnal_rxtwe_t *we = NULL;
121 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
125 nal_data = (gmnal_data_t*)arg;
126 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
128 for (rank=0; rank<num_rx_threads; rank++)
129 if (nal_data->rxthread_pid[rank] == current->pid)
132 sprintf(current->comm, "gmnal_rx_%d", rank);
136 * set 1 bit for each thread started
137 * doesn't matter which bit
139 spin_lock(&nal_data->rxthread_flag_lock);
140 if (nal_data->rxthread_flag)
141 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
143 nal_data->rxthread_flag = 1;
144 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
145 spin_unlock(&nal_data->rxthread_flag_lock);
147 while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
148 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
149 we = gmnal_get_rxtwe(nal_data);
151 CDEBUG(D_INFO, "Receive thread time to exit\n");
156 switch(((gmnal_msghdr_t*)buffer)->type) {
157 case(GMNAL_SMALL_MESSAGE):
158 gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE);
160 case(GMNAL_LARGE_MESSAGE_INIT):
161 gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT);
163 case(GMNAL_LARGE_MESSAGE_ACK):
164 gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK);
167 CERROR("Unsupported message type\n");
168 gmnal_rx_bad(nal_data, we, NULL);
170 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
173 spin_lock(&nal_data->rxthread_flag_lock);
174 nal_data->rxthread_flag/=2;
175 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
176 spin_unlock(&nal_data->rxthread_flag_lock);
177 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
178 return(GMNAL_STATUS_OK);
184 * Start processing a small message receive
185 * Get here from gmnal_receive_thread
186 * Hand off to lib_parse, which calls cb_recv
187 * which hands back to gmnal_small_receive
188 * Deal with all endian stuff here.
191 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
193 gmnal_srxd_t *srxd = NULL;
195 unsigned int snode, sport, type, length;
196 gmnal_msghdr_t *gmnal_msghdr;
197 ptl_hdr_t *portals_hdr;
200 CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
201 nal_data, we, gmnal_type);
210 gmnal_msghdr = (gmnal_msghdr_t*)buffer;
211 portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE);
213 CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
214 "type [%d], length [%d], buffer [%p]\n",
215 snode, sport, type, length, buffer);
216 CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
217 "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
218 gmnal_msghdr->magic, gmnal_msghdr->type);
219 CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
220 "dest_node ["LPD64"]\n", portals_hdr->src_nid,
221 portals_hdr->dest_nid);
224 * Get a receive descriptor for this message
226 srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
227 CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
229 CERROR("Failed to get receive descriptor\n");
230 /* I think passing a NULL srxd to lib_parse will crash
233 lib_parse(nal_data->libnal, portals_hdr, srxd);
234 return(GMNAL_STATUS_FAIL);
238 * no need to bother portals library with this
240 if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
241 gmnal_large_tx_ack_received(nal_data, srxd);
242 return(GMNAL_STATUS_OK);
245 srxd->nal_data = nal_data;
246 srxd->type = gmnal_type;
247 srxd->nsiov = gmnal_msghdr->niov;
248 srxd->gm_source_node = gmnal_msghdr->sender_node_id;
250 CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
251 buffer+GMNAL_MSGHDR_SIZE);
253 * control passes to lib, which calls cb_recv
254 * cb_recv is responsible for returning the buffer
257 rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
260 /* I just received garbage; take appropriate action... */
264 return(GMNAL_STATUS_OK);
270 * After a receive has been processed,
271 * hang out the receive buffer again.
272 * This implicitly returns a receive token.
275 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
277 CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
279 CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
281 GMNAL_GM_LOCK(nal_data);
282 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
283 srxd->gmsize, GM_LOW_PRIORITY, 0 );
284 GMNAL_GM_UNLOCK(nal_data);
286 return(GMNAL_STATUS_OK);
291 * Handle a bad message
292 * A bad message is one we don't expect or can't interpret
295 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
297 CDEBUG(D_TRACE, "Can't handle message\n");
300 srxd = gmnal_rxbuffer_to_srxd(nal_data,
303 gmnal_rx_requeue_buffer(nal_data, srxd);
305 CERROR("Can't find a descriptor for this buffer\n");
309 return(GMNAL_STATUS_FAIL);
312 return(GMNAL_STATUS_OK);
318 * Process a small message receive.
319 * Get here from gmnal_receive_thread, gmnal_pre_receive
321 * Put data from prewired receive buffer into users buffer(s)
322 * Hang out the receive buffer again for another receive
326 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
328 gmnal_srxd_t *srxd = NULL;
329 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
333 CERROR("gmnal_small_rx no context\n");
334 lib_finalize(libnal, private, cookie, PTL_FAIL);
338 srxd = (gmnal_srxd_t*)private;
341 * let portals library know receive is complete
343 CDEBUG(D_PORTALS, "calling lib_finalize\n");
344 lib_finalize(libnal, private, cookie, PTL_OK);
346 * return buffer so it can be used again
348 CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
349 GMNAL_GM_LOCK(nal_data);
350 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
351 srxd->gmsize, GM_LOW_PRIORITY, 0);
352 GMNAL_GM_UNLOCK(nal_data);
359 * Start a small transmit.
360 * Use the given send token (and wired transmit buffer).
361 * Copy headers to wired buffer and initiate gm_send from the wired buffer.
362 * The callback function informs when the send is complete.
365 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
366 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
367 gmnal_stxd_t *stxd, int size)
369 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
371 gmnal_msghdr_t *msghdr = NULL;
373 unsigned int local_nid;
374 gm_status_t gm_status = GM_SUCCESS;
376 CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
377 "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
378 "size [%d]\n", libnal, private, cookie, hdr, type,
379 global_nid, pid, stxd, size);
381 CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
382 hdr->dest_nid, hdr->src_nid);
385 CERROR("no nal_data\n");
388 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
391 GMNAL_GM_LOCK(nal_data);
392 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
394 GMNAL_GM_UNLOCK(nal_data);
395 if (gm_status != GM_SUCCESS) {
396 CERROR("Failed to obtain local id\n");
399 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
401 stxd->type = GMNAL_SMALL_MESSAGE;
402 stxd->cookie = cookie;
405 * Copy gmnal_msg_hdr and portals header to the transmit buffer
406 * Then send the message, as the data has previously been copied in
409 buffer = stxd->buffer;
410 msghdr = (gmnal_msghdr_t*)buffer;
412 msghdr->magic = GMNAL_MAGIC;
413 msghdr->type = GMNAL_SMALL_MESSAGE;
414 msghdr->sender_node_id = nal_data->gm_global_nid;
415 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
417 buffer += sizeof(gmnal_msghdr_t);
419 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
420 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
422 buffer += sizeof(ptl_hdr_t);
424 CDEBUG(D_INFO, "sending\n");
425 tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
426 stxd->msg_size = tot_size;
429 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
430 "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
431 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
432 stxd->msg_size, global_nid, local_nid, stxd);
434 GMNAL_GM_LOCK(nal_data);
435 stxd->gm_priority = GM_LOW_PRIORITY;
436 stxd->gm_target_node = local_nid;
437 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
438 stxd->gm_size, stxd->msg_size,
439 GM_LOW_PRIORITY, local_nid,
440 gmnal_small_tx_callback, (void*)stxd);
441 GMNAL_GM_UNLOCK(nal_data);
442 CDEBUG(D_INFO, "done\n");
449 * A callback to indicate the small transmit operation is compete
450 * Check for erros and try to deal with them.
451 * Call lib_finalise to inform the client application that the send
452 * is complete and the memory can be reused.
453 * Return the stxd when finished with it (returns a send token)
456 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
458 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
459 lib_msg_t *cookie = stxd->cookie;
460 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
461 lib_nal_t *libnal = nal_data->libnal;
463 gm_status_t gm_status = 0;
466 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
469 if (status != GM_SUCCESS) {
470 GMNAL_GM_LOCK(nal_data);
471 gm_status = gm_node_id_to_global_id(nal_data->gm_port,
472 stxd->gm_target_node,&gnid);
473 GMNAL_GM_UNLOCK(nal_data);
474 if (gm_status != GM_SUCCESS) {
475 CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
479 CERROR("Result of send stxd [%p] is [%s] to [%u]\n",
480 stxd, gmnal_gm_error(status), gnid);
489 case(GM_SEND_DROPPED):
491 * do a resend on the dropped ones
493 CERROR("send stxd [%p] dropped, resending\n", context);
494 GMNAL_GM_LOCK(nal_data);
495 gm_send_to_peer_with_callback(nal_data->gm_port,
500 stxd->gm_target_node,
501 gmnal_small_tx_callback,
503 GMNAL_GM_UNLOCK(nal_data);
506 case(GM_SEND_TIMED_OUT):
510 CDEBUG(D_INFO, "calling gm_drop_sends\n");
511 GMNAL_GM_LOCK(nal_data);
512 gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
513 stxd->gm_target_node, GMNAL_GM_PORT_ID,
514 gmnal_drop_sends_callback, context);
515 GMNAL_GM_UNLOCK(nal_data);
524 case(GM_INTERRUPTED):
526 case(GM_INPUT_BUFFER_TOO_SMALL):
527 case(GM_OUTPUT_BUFFER_TOO_SMALL):
529 case(GM_MEMORY_FAULT):
530 case(GM_INVALID_PARAMETER):
531 case(GM_OUT_OF_MEMORY):
532 case(GM_INVALID_COMMAND):
533 case(GM_PERMISSION_DENIED):
534 case(GM_INTERNAL_ERROR):
536 case(GM_UNSUPPORTED_DEVICE):
537 case(GM_SEND_REJECTED):
538 case(GM_SEND_TARGET_PORT_CLOSED):
539 case(GM_SEND_TARGET_NODE_UNREACHABLE):
540 case(GM_SEND_PORT_CLOSED):
541 case(GM_NODE_ID_NOT_YET_SET):
542 case(GM_STILL_SHUTTING_DOWN):
544 case(GM_NO_SUCH_DEVICE):
546 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
547 case(GM_UNTRANSLATED_SYSTEM_ERROR):
548 case(GM_ACCESS_DENIED):
549 case(GM_NO_DRIVER_SUPPORT):
550 case(GM_PTE_REF_CNT_OVERFLOW):
551 case(GM_NOT_SUPPORTED_IN_KERNEL):
552 case(GM_NOT_SUPPORTED_ON_ARCH):
555 case(GM_DATA_CORRUPTED):
556 case(GM_HARDWARE_FAULT):
557 case(GM_SEND_ORPHANED):
558 case(GM_MINOR_OVERFLOW):
559 case(GM_PAGE_TABLE_FULL):
561 case(GM_INVALID_PORT_NUMBER):
562 case(GM_DEV_NOT_FOUND):
563 case(GM_FIRMWARE_NOT_RUNNING):
564 case(GM_YP_NO_MATCH):
566 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
567 stxd->gm_target_node, GMNAL_GM_PORT_ID,
568 gmnal_resume_sending_callback, context);
575 * If this is a large message init,
576 * we're not finished with the data yet,
577 * so can't call lib_finalise.
578 * However, we're also holding on to a
579 * stxd here (to keep track of the source
580 * iovec only). Should use another structure
581 * to keep track of iovec and return stxd to
584 if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
585 CDEBUG(D_INFO, "large transmit done\n");
588 gmnal_return_stxd(nal_data, stxd);
589 lib_finalize(libnal, stxd, cookie, PTL_OK);
594 * After an error on the port
595 * call this to allow future sends to complete
597 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
600 gmnal_data_t *nal_data;
601 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
602 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
603 gmnal_return_stxd(stxd->nal_data, stxd);
608 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
611 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
612 gmnal_data_t *nal_data = stxd->nal_data;
614 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
615 if (status == GM_SUCCESS) {
616 GMNAL_GM_LOCK(nal_data);
617 gm_send_to_peer_with_callback(gm_port, stxd->buffer,
618 stxd->gm_size, stxd->msg_size,
620 stxd->gm_target_node,
621 gmnal_small_tx_callback,
623 GMNAL_GM_UNLOCK(nal_data);
625 CERROR("send_to_peer status for stxd [%p] is "
626 "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
635 * Begine a large transmit.
636 * Do a gm_register of the memory pointed to by the iovec
637 * and send details to the receiver. The receiver does a gm_get
638 * to pull the data and sends and ack when finished. Upon receipt of
639 * this ack, deregister the memory. Only 1 send token is required here.
642 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
643 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
644 unsigned int niov, struct iovec *iov, size_t offset, int size)
647 gmnal_data_t *nal_data;
648 gmnal_stxd_t *stxd = NULL;
650 gmnal_msghdr_t *msghdr = NULL;
651 unsigned int local_nid;
652 int mlen = 0; /* the size of the init message data */
653 struct iovec *iov_dup = NULL;
654 gm_status_t gm_status;
658 CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
659 "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
660 "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
661 global_nid, pid, niov, iov, size);
664 nal_data = (gmnal_data_t*)libnal->libnal_data;
666 CERROR("no libnal.\n");
667 return(GMNAL_STATUS_FAIL);
672 * Get stxd and buffer. Put local address of data in buffer,
673 * send local addresses to target,
674 * wait for the target node to suck the data over.
675 * The stxd is used to ren
677 stxd = gmnal_get_stxd(nal_data, 1);
678 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
680 stxd->type = GMNAL_LARGE_MESSAGE_INIT;
681 stxd->cookie = cookie;
684 * Copy gmnal_msg_hdr and portals header to the transmit buffer
685 * Then copy the iov in
687 buffer = stxd->buffer;
688 msghdr = (gmnal_msghdr_t*)buffer;
690 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
692 msghdr->magic = GMNAL_MAGIC;
693 msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
694 msghdr->sender_node_id = nal_data->gm_global_nid;
695 msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
696 msghdr->niov = niov ;
697 buffer += sizeof(gmnal_msghdr_t);
698 mlen = sizeof(gmnal_msghdr_t);
699 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
702 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
704 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
705 buffer += sizeof(ptl_hdr_t);
706 mlen += sizeof(ptl_hdr_t);
707 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
709 while (offset >= iov->iov_len) {
710 offset -= iov->iov_len;
715 LASSERT(offset >= 0);
717 * Store the iovs in the stxd for we can get
718 * them later if we need them
720 stxd->iov[0].iov_base = iov->iov_base + offset;
721 stxd->iov[0].iov_len = iov->iov_len - offset;
722 CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
724 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
728 * copy the iov to the buffer so target knows
729 * where to get the data from
731 CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
732 gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
733 mlen += stxd->niov*(sizeof(struct iovec));
734 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
737 * register the memory so the NIC can get hold of the data
738 * This is a slow process. it'd be good to overlap it
739 * with something else.
745 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
746 iov->iov_base, iov->iov_len);
747 GMNAL_GM_LOCK(nal_data);
748 gm_status = gm_register_memory(nal_data->gm_port,
749 iov->iov_base, iov->iov_len);
750 if (gm_status != GM_SUCCESS) {
751 GMNAL_GM_UNLOCK(nal_data);
752 CERROR("gm_register_memory returns [%d][%s] "
753 "for memory [%p] len ["LPSZ"]\n",
754 gm_status, gmnal_gm_error(gm_status),
755 iov->iov_base, iov->iov_len);
756 GMNAL_GM_LOCK(nal_data);
757 while (iov_dup != iov) {
758 gm_deregister_memory(nal_data->gm_port,
763 GMNAL_GM_UNLOCK(nal_data);
764 gmnal_return_stxd(nal_data, stxd);
768 GMNAL_GM_UNLOCK(nal_data);
773 * Send the init message to the target
775 CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
776 GMNAL_GM_LOCK(nal_data);
777 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
779 if (gm_status != GM_SUCCESS) {
780 GMNAL_GM_UNLOCK(nal_data);
781 CERROR("Failed to obtain local id\n");
782 gmnal_return_stxd(nal_data, stxd);
783 /* TO DO deregister memory on failure */
784 return(GMNAL_STATUS_FAIL);
786 CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
787 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
788 stxd->gm_size, mlen, GM_LOW_PRIORITY,
789 local_nid, gmnal_large_tx_callback,
791 GMNAL_GM_UNLOCK(nal_data);
793 CDEBUG(D_INFO, "done\n");
799 * Callback function indicates that send of buffer with
800 * large message iovec has completed (or failed).
803 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
805 gmnal_small_tx_callback(gm_port, context, status);
812 * Have received a buffer that contains an iovec of the sender.
813 * Do a gm_register_memory of the receivers buffer and then do a get
814 * data from the sender.
817 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
818 unsigned int nriov, struct iovec *riov, size_t offset,
819 size_t mlen, size_t rlen)
821 gmnal_data_t *nal_data = libnal->libnal_data;
822 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
824 struct iovec *riov_dup;
826 gmnal_msghdr_t *msghdr = NULL;
827 gm_status_t gm_status;
829 CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
830 "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
831 libnal, private, cookie, nriov, riov, mlen, rlen);
834 CERROR("gmnal_large_rx no context\n");
835 lib_finalize(libnal, private, cookie, PTL_FAIL);
839 buffer = srxd->buffer;
840 msghdr = (gmnal_msghdr_t*)buffer;
841 buffer += sizeof(gmnal_msghdr_t);
842 buffer += sizeof(ptl_hdr_t);
845 * Store the senders stxd address in the srxd for this message
846 * The gmnal_large_message_ack needs it to notify the sender
847 * the pull of data is complete
849 srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
852 * Register the receivers memory
854 * tell the sender that we got the data
855 * then tell the receiver we got the data
857 * If the iovecs match, could interleave
858 * gm_registers and gm_gets for each element
860 while (offset >= riov->iov_len) {
861 offset -= riov->iov_len;
865 LASSERT (nriov >= 0);
866 LASSERT (offset >= 0);
868 * do this so the final gm_get callback can deregister the memory
870 PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
872 srxd->riov[0].iov_base = riov->iov_base + offset;
873 srxd->riov[0].iov_len = riov->iov_len - offset;
875 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
882 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
883 riov->iov_base, riov->iov_len);
884 GMNAL_GM_LOCK(nal_data);
885 gm_status = gm_register_memory(nal_data->gm_port,
886 riov->iov_base, riov->iov_len);
887 if (gm_status != GM_SUCCESS) {
888 GMNAL_GM_UNLOCK(nal_data);
889 CERROR("gm_register_memory returns [%d][%s] "
890 "for memory [%p] len ["LPSZ"]\n",
891 gm_status, gmnal_gm_error(gm_status),
892 riov->iov_base, riov->iov_len);
893 GMNAL_GM_LOCK(nal_data);
894 while (riov_dup != riov) {
895 gm_deregister_memory(nal_data->gm_port,
900 GMNAL_GM_LOCK(nal_data);
902 * give back srxd and buffer. Send NACK to sender
904 PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
907 GMNAL_GM_UNLOCK(nal_data);
912 * now do gm_get to get the data
914 srxd->cookie = cookie;
915 if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
916 nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
917 CERROR("can't get the data");
920 CDEBUG(D_INFO, "lgmanl_large_rx done\n");
927 * Perform a number of remote gets as part of receiving
929 * The final one to complete (i.e. the last callback to get called)
931 * gm_get requires a send token.
934 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
935 int nriov, struct iovec *riov)
940 CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
941 "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
944 ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
946 CERROR("there's something wrong with the iovecs\n");
947 return(GMNAL_STATUS_FAIL);
949 CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
950 spin_lock_init(&srxd->callback_lock);
951 srxd->ncallbacks = ncalls;
952 srxd->callback_status = 0;
954 ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
956 CERROR("there's something wrong with the iovecs\n");
957 return(GMNAL_STATUS_FAIL);
960 return(GMNAL_STATUS_OK);
966 * pull data from source node (source iovec) to a local iovec.
967 * The iovecs may not match which adds the complications below.
968 * Count the number of gm_gets that will be required so the callbacks
969 * can determine who is the last one.
972 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
973 struct iovec *siov, int nriov, struct iovec *riov)
977 int slen = siov->iov_len, rlen = riov->iov_len;
978 char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
979 unsigned long sbuf_long;
980 gm_remote_ptr_t remote_ptr = 0;
981 unsigned int source_node;
982 gmnal_ltxd_t *ltxd = NULL;
983 gmnal_data_t *nal_data = srxd->nal_data;
985 CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
988 CERROR("Bad args No nal_data\n");
989 return(GMNAL_STATUS_FAIL);
991 GMNAL_GM_LOCK(nal_data);
992 if (gm_global_id_to_node_id(nal_data->gm_port,
993 srxd->gm_source_node,
994 &source_node) != GM_SUCCESS) {
996 CERROR("cannot resolve global_id [%u] "
997 "to local node_id\n", srxd->gm_source_node);
998 GMNAL_GM_UNLOCK(nal_data);
999 return(GMNAL_STATUS_FAIL);
1001 GMNAL_GM_UNLOCK(nal_data);
1003 * We need a send token to use gm_get
1004 * getting an stxd gets us a send token.
1005 * the stxd is used as the context to the
1006 * callback function (so stxd can be returned).
1007 * Set pointer in stxd to srxd so callback count in srxd
1008 * can be decremented to find last callback to complete
1010 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
1011 srxd->gm_source_node, source_node);
1015 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1016 sbuf, slen, rbuf, rlen);
1020 CDEBUG(D_INFO, "slen>rlen\n");
1021 ltxd = gmnal_get_ltxd(nal_data);
1023 GMNAL_GM_LOCK(nal_data);
1025 * funny business to get rid
1026 * of compiler warning
1028 sbuf_long = (unsigned long) sbuf;
1029 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1030 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1031 rlen, GM_LOW_PRIORITY, source_node,
1033 gmnal_remote_get_callback, ltxd);
1034 GMNAL_GM_UNLOCK(nal_data);
1037 * at the end of 1 iov element
1043 rbuf = riov->iov_base;
1044 rlen = riov->iov_len;
1045 } else if (rlen > slen) {
1048 CDEBUG(D_INFO, "slen<rlen\n");
1049 ltxd = gmnal_get_ltxd(nal_data);
1051 GMNAL_GM_LOCK(nal_data);
1052 sbuf_long = (unsigned long) sbuf;
1053 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1054 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1055 slen, GM_LOW_PRIORITY, source_node,
1057 gmnal_remote_get_callback, ltxd);
1058 GMNAL_GM_UNLOCK(nal_data);
1061 * at end of siov element
1066 sbuf = siov->iov_base;
1067 slen = siov->iov_len;
1071 CDEBUG(D_INFO, "rlen=slen\n");
1072 ltxd = gmnal_get_ltxd(nal_data);
1074 GMNAL_GM_LOCK(nal_data);
1075 sbuf_long = (unsigned long) sbuf;
1076 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1077 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1078 rlen, GM_LOW_PRIORITY, source_node,
1080 gmnal_remote_get_callback, ltxd);
1081 GMNAL_GM_UNLOCK(nal_data);
1084 * at end of siov and riov element
1087 sbuf = siov->iov_base;
1088 slen = siov->iov_len;
1091 rbuf = riov->iov_base;
1092 rlen = riov->iov_len;
1101 * The callback function that is invoked after each gm_get call completes.
1102 * Multiple callbacks may be invoked for 1 transaction, only the final
1103 * callback has work to do.
1106 gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
1110 gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
1111 gmnal_srxd_t *srxd = ltxd->srxd;
1112 lib_nal_t *libnal = srxd->nal_data->libnal;
1116 gmnal_data_t *nal_data;
1118 CDEBUG(D_TRACE, "called for context [%p]\n", context);
1120 if (status != GM_SUCCESS) {
1121 CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status));
1124 spin_lock(&srxd->callback_lock);
1126 srxd->callback_status |= status;
1127 lastone = srxd->ncallbacks?0:1;
1128 spin_unlock(&srxd->callback_lock);
1129 nal_data = srxd->nal_data;
1132 * everyone returns a send token
1134 gmnal_return_ltxd(nal_data, ltxd);
1137 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1142 * Let our client application proceed
1144 CERROR("final callback context[%p]\n", srxd);
1145 lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1148 * send an ack to the sender to let him know we got the data
1150 gmnal_large_tx_ack(nal_data, srxd);
1153 * Unregister the memory that was used
1154 * This is a very slow business (slower then register)
1156 nriov = srxd->nriov;
1158 GMNAL_GM_LOCK(nal_data);
1160 CERROR("deregister memory [%p]\n", riov->iov_base);
1161 if (gm_deregister_memory(srxd->nal_data->gm_port,
1162 riov->iov_base, riov->iov_len)) {
1163 CERROR("failed to deregister memory [%p]\n",
1168 GMNAL_GM_UNLOCK(nal_data);
1169 PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1172 * repost the receive buffer (return receive token)
1174 GMNAL_GM_LOCK(nal_data);
1175 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
1176 srxd->gmsize, GM_LOW_PRIORITY, 0);
1177 GMNAL_GM_UNLOCK(nal_data);
1184 * Called on target node.
1185 * After pulling data from a source node
1186 * send an ack message to indicate the large transmit is complete.
1189 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1193 gmnal_msghdr_t *msghdr;
1194 void *buffer = NULL;
1195 unsigned int local_nid;
1196 gm_status_t gm_status = GM_SUCCESS;
1198 CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
1199 srxd->gm_source_node);
1201 GMNAL_GM_LOCK(nal_data);
1202 gm_status = gm_global_id_to_node_id(nal_data->gm_port,
1203 srxd->gm_source_node, &local_nid);
1204 GMNAL_GM_UNLOCK(nal_data);
1205 if (gm_status != GM_SUCCESS) {
1206 CERROR("Failed to obtain local id\n");
1209 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1211 stxd = gmnal_get_stxd(nal_data, 1);
1212 CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1214 stxd->nal_data = nal_data;
1215 stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1218 * Copy gmnal_msg_hdr and portals header to the transmit buffer
1219 * Then copy the data in
1221 buffer = stxd->buffer;
1222 msghdr = (gmnal_msghdr_t*)buffer;
1225 * Add in the address of the original stxd from the sender node
1226 * so it knows which thread to notify.
1228 msghdr->magic = GMNAL_MAGIC;
1229 msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1230 msghdr->sender_node_id = nal_data->gm_global_nid;
1231 msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
1232 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1234 CDEBUG(D_INFO, "sending\n");
1235 stxd->msg_size= sizeof(gmnal_msghdr_t);
1238 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1239 "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1240 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
1241 stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1242 GMNAL_GM_LOCK(nal_data);
1243 stxd->gm_priority = GM_LOW_PRIORITY;
1244 stxd->gm_target_node = local_nid;
1245 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
1246 stxd->gm_size, stxd->msg_size,
1247 GM_LOW_PRIORITY, local_nid,
1248 gmnal_large_tx_ack_callback,
1251 GMNAL_GM_UNLOCK(nal_data);
1252 CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1259 * A callback to indicate the small transmit operation is compete
1260 * Check for errors and try to deal with them.
1261 * Call lib_finalise to inform the client application that the
1262 * send is complete and the memory can be reused.
1263 * Return the stxd when finished with it (returns a send token)
1266 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
1269 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
1270 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
1273 CERROR("send completion event for unknown stxd\n");
1276 CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1278 gmnal_return_stxd(stxd->nal_data, stxd);
1280 GMNAL_GM_UNLOCK(nal_data);
1285 * Indicates the large transmit operation is compete.
1286 * Called on transmit side (means data has been pulled by receiver
1288 * Call lib_finalise to inform the client application that the send
1289 * is complete, deregister the memory and return the stxd.
1290 * Finally, report the rx buffer that the ack message was delivered in.
1293 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1295 lib_nal_t *libnal = nal_data->libnal;
1296 gmnal_stxd_t *stxd = NULL;
1297 gmnal_msghdr_t *msghdr = NULL;
1298 void *buffer = NULL;
1302 CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1304 buffer = srxd->buffer;
1305 msghdr = (gmnal_msghdr_t*)buffer;
1306 stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
1308 CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1310 lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1313 * extract the iovec from the stxd, deregister the memory.
1314 * free the space used to store the iovec
1317 while(stxd->niov--) {
1318 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1319 iov->iov_base, iov->iov_len);
1320 GMNAL_GM_LOCK(nal_data);
1321 gm_deregister_memory(nal_data->gm_port, iov->iov_base,
1323 GMNAL_GM_UNLOCK(nal_data);
1328 * return the send token
1329 * TO DO It is bad to hold onto the send token so long?
1331 gmnal_return_stxd(nal_data, stxd);
1335 * requeue the receive buffer
1337 gmnal_rx_requeue_buffer(nal_data, srxd);