1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 * The caretaker thread
30 * This is main thread of execution for the NAL side
31 * This guy waits in gm_blocking_recvive and gets
32 * woken up when the myrinet adaptor gets an interrupt.
33 * Hands off receive operations to the receive thread
34 * This thread Looks after gm_callbacks etc inline.
37 gmnal_ct_thread(void *arg)
39 gmnal_data_t *nal_data;
40 gm_recv_event_t *rxevent = NULL;
41 gm_recv_t *recv = NULL;
44 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
48 nal_data = (gmnal_data_t*)arg;
49 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
51 sprintf(current->comm, "gmnal_ct");
53 kportal_daemonize("gmnalctd");
55 nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
57 spin_lock(&nal_data->gm_lock);
58 while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59 CDEBUG(D_NET, "waiting\n");
60 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
61 if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
62 CDEBUG(D_INFO, "time to exit\n");
65 CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
66 switch (GM_RECV_EVENT_TYPE(rxevent)) {
69 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70 recv = (gm_recv_t*)&rxevent->recv;
71 spin_unlock(&nal_data->gm_lock);
72 gmnal_add_rxtwe(nal_data, recv);
73 spin_lock(&nal_data->gm_lock);
74 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
76 case(_GM_SLEEP_EVENT):
78 * Blocking receive above just returns
79 * immediatly with _GM_SLEEP_EVENT
80 * Don't know what this is
82 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83 spin_unlock(&nal_data->gm_lock);
84 gm_unknown(nal_data->gm_port, rxevent);
85 spin_lock(&nal_data->gm_lock);
86 CDEBUG(D_INFO, "Awake from gm_unknown\n");
91 * Don't know what this is
92 * gm_unknown will make sense of it
93 * Should be able to do something with
94 * FAST_RECV_EVENTS here.
96 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97 spin_unlock(&nal_data->gm_lock);
98 gm_unknown(nal_data->gm_port, rxevent);
99 spin_lock(&nal_data->gm_lock);
100 CDEBUG(D_INFO, "Processed unknown event\n");
103 spin_unlock(&nal_data->gm_lock);
104 nal_data->ctthread_flag = GMNAL_THREAD_RESET;
105 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
106 return(GMNAL_STATUS_OK);
111 * process a receive event
113 int gmnal_rx_thread(void *arg)
116 gmnal_data_t *nal_data;
118 gmnal_rxtwe_t *we = NULL;
122 CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
126 nal_data = (gmnal_data_t*)arg;
127 CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
129 for (rank=0; rank<num_rx_threads; rank++)
130 if (nal_data->rxthread_pid[rank] == current->pid)
133 snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
135 kportal_daemonize(name);
137 * set 1 bit for each thread started
138 * doesn't matter which bit
140 spin_lock(&nal_data->rxthread_flag_lock);
141 if (nal_data->rxthread_flag)
142 nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
144 nal_data->rxthread_flag = 1;
145 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
146 spin_unlock(&nal_data->rxthread_flag_lock);
148 while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
149 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
150 we = gmnal_get_rxtwe(nal_data);
152 CDEBUG(D_INFO, "Receive thread time to exit\n");
157 switch(((gmnal_msghdr_t*)buffer)->type) {
158 case(GMNAL_SMALL_MESSAGE):
159 gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE);
161 case(GMNAL_LARGE_MESSAGE_INIT):
162 gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT);
164 case(GMNAL_LARGE_MESSAGE_ACK):
165 gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK);
168 CERROR("Unsupported message type\n");
169 gmnal_rx_bad(nal_data, we, NULL);
171 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
174 spin_lock(&nal_data->rxthread_flag_lock);
175 nal_data->rxthread_flag/=2;
176 CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
177 spin_unlock(&nal_data->rxthread_flag_lock);
178 CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
179 return(GMNAL_STATUS_OK);
185 * Start processing a small message receive
186 * Get here from gmnal_receive_thread
187 * Hand off to lib_parse, which calls cb_recv
188 * which hands back to gmnal_small_receive
189 * Deal with all endian stuff here.
192 gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
194 gmnal_srxd_t *srxd = NULL;
196 unsigned int snode, sport, type, length;
197 gmnal_msghdr_t *gmnal_msghdr;
198 ptl_hdr_t *portals_hdr;
201 CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
202 nal_data, we, gmnal_type);
211 gmnal_msghdr = (gmnal_msghdr_t*)buffer;
212 portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
214 CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
215 "type [%d], length [%d], buffer [%p]\n",
216 snode, sport, type, length, buffer);
217 CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
218 "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
219 gmnal_msghdr->magic, gmnal_msghdr->type);
220 CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
221 "dest_node ["LPD64"]\n", portals_hdr->src_nid,
222 portals_hdr->dest_nid);
225 * Get a receive descriptor for this message
227 srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
228 CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
230 CERROR("Failed to get receive descriptor\n");
231 /* I think passing a NULL srxd to lib_parse will crash
234 lib_parse(nal_data->libnal, portals_hdr, srxd);
235 return(GMNAL_STATUS_FAIL);
239 * no need to bother portals library with this
241 if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
242 gmnal_large_tx_ack_received(nal_data, srxd);
243 return(GMNAL_STATUS_OK);
246 srxd->nal_data = nal_data;
247 srxd->type = gmnal_type;
248 srxd->nsiov = gmnal_msghdr->niov;
249 srxd->gm_source_node = gmnal_msghdr->sender_node_id;
251 CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
252 buffer+sizeof(gmnal_msghdr_t));
254 * control passes to lib, which calls cb_recv
255 * cb_recv is responsible for returning the buffer
258 rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
261 /* I just received garbage; return the srxd for use */
262 CWARN("Returning srxd and discarding message, "
263 "lib_parse didn't like it.\n");
264 return(gmnal_rx_bad(nal_data, we, srxd));
267 return(GMNAL_STATUS_OK);
273 * After a receive has been processed,
274 * hang out the receive buffer again.
275 * This implicitly returns a receive token.
278 gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
280 CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
282 CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
284 spin_lock(&nal_data->gm_lock);
285 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
286 srxd->gmsize, GM_LOW_PRIORITY, 0 );
287 spin_unlock(&nal_data->gm_lock);
289 return(GMNAL_STATUS_OK);
294 * Handle a bad message
295 * A bad message is one we don't expect or can't interpret
298 gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
300 CDEBUG(D_TRACE, "Can't handle message\n");
303 srxd = gmnal_rxbuffer_to_srxd(nal_data,
306 gmnal_rx_requeue_buffer(nal_data, srxd);
308 CERROR("Can't find a descriptor for this buffer\n");
312 return(GMNAL_STATUS_FAIL);
315 return(GMNAL_STATUS_OK);
321 * Process a small message receive.
322 * Get here from gmnal_receive_thread, gmnal_pre_receive
324 * Put data from prewired receive buffer into users buffer(s)
325 * Hang out the receive buffer again for another receive
329 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
331 gmnal_srxd_t *srxd = NULL;
332 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
336 CERROR("gmnal_small_rx no context\n");
337 lib_finalize(libnal, private, cookie, PTL_FAIL);
341 srxd = (gmnal_srxd_t*)private;
344 * let portals library know receive is complete
346 CDEBUG(D_PORTALS, "calling lib_finalize\n");
347 lib_finalize(libnal, private, cookie, PTL_OK);
349 * return buffer so it can be used again
351 CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
352 spin_lock(&nal_data->gm_lock);
353 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
354 srxd->gmsize, GM_LOW_PRIORITY, 0);
355 spin_unlock(&nal_data->gm_lock);
362 * Start a small transmit.
363 * Use the given send token (and wired transmit buffer).
364 * Copy headers to wired buffer and initiate gm_send from the wired buffer.
365 * The callback function informs when the send is complete.
368 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
369 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
370 gmnal_stxd_t *stxd, int size)
372 gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
374 gmnal_msghdr_t *msghdr = NULL;
376 unsigned int local_nid;
377 gm_status_t gm_status = GM_SUCCESS;
379 CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
380 "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
381 "size [%d]\n", libnal, private, cookie, hdr, type,
382 global_nid, pid, stxd, size);
384 CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
385 hdr->dest_nid, hdr->src_nid);
388 CERROR("no nal_data\n");
391 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
394 spin_lock(&nal_data->gm_lock);
395 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
397 spin_unlock(&nal_data->gm_lock);
398 if (gm_status != GM_SUCCESS) {
399 CERROR("Failed to obtain local id\n");
402 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
404 stxd->type = GMNAL_SMALL_MESSAGE;
405 stxd->cookie = cookie;
408 * Copy gmnal_msg_hdr and portals header to the transmit buffer
409 * Then send the message, as the data has previously been copied in
412 buffer = stxd->buffer;
413 msghdr = (gmnal_msghdr_t*)buffer;
415 msghdr->magic = GMNAL_MAGIC;
416 msghdr->type = GMNAL_SMALL_MESSAGE;
417 msghdr->sender_node_id = nal_data->gm_global_nid;
418 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
420 buffer += sizeof(gmnal_msghdr_t);
422 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
423 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
425 buffer += sizeof(ptl_hdr_t);
427 CDEBUG(D_INFO, "sending\n");
428 tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
429 stxd->msg_size = tot_size;
432 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
433 "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
434 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
435 stxd->msg_size, global_nid, local_nid, stxd);
437 spin_lock(&nal_data->gm_lock);
438 stxd->gm_priority = GM_LOW_PRIORITY;
439 stxd->gm_target_node = local_nid;
440 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
441 stxd->gm_size, stxd->msg_size,
442 GM_LOW_PRIORITY, local_nid,
443 gmnal_small_tx_callback, (void*)stxd);
444 spin_unlock(&nal_data->gm_lock);
445 CDEBUG(D_INFO, "done\n");
452 * A callback to indicate the small transmit operation is compete
453 * Check for erros and try to deal with them.
454 * Call lib_finalise to inform the client application that the send
455 * is complete and the memory can be reused.
456 * Return the stxd when finished with it (returns a send token)
459 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
461 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
462 lib_msg_t *cookie = stxd->cookie;
463 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
464 lib_nal_t *libnal = nal_data->libnal;
466 gm_status_t gm_status = 0;
469 CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
472 if (status != GM_SUCCESS) {
473 spin_lock(&nal_data->gm_lock);
474 gm_status = gm_node_id_to_global_id(nal_data->gm_port,
475 stxd->gm_target_node,&gnid);
476 spin_unlock(&nal_data->gm_lock);
477 if (gm_status != GM_SUCCESS) {
478 CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
482 CERROR("Result of send stxd [%p] is [%s] to [%u]\n",
483 stxd, gmnal_gm_error(status), gnid);
492 case(GM_SEND_DROPPED):
494 * do a resend on the dropped ones
496 CERROR("send stxd [%p] dropped, resending\n", context);
497 spin_lock(&nal_data->gm_lock);
498 gm_send_to_peer_with_callback(nal_data->gm_port,
503 stxd->gm_target_node,
504 gmnal_small_tx_callback,
506 spin_unlock(&nal_data->gm_lock);
509 case(GM_SEND_TIMED_OUT):
513 CDEBUG(D_INFO, "calling gm_drop_sends\n");
514 spin_lock(&nal_data->gm_lock);
515 gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
516 stxd->gm_target_node, gm_port_id,
517 gmnal_drop_sends_callback, context);
518 spin_unlock(&nal_data->gm_lock);
527 case(GM_INTERRUPTED):
529 case(GM_INPUT_BUFFER_TOO_SMALL):
530 case(GM_OUTPUT_BUFFER_TOO_SMALL):
532 case(GM_MEMORY_FAULT):
533 case(GM_INVALID_PARAMETER):
534 case(GM_OUT_OF_MEMORY):
535 case(GM_INVALID_COMMAND):
536 case(GM_PERMISSION_DENIED):
537 case(GM_INTERNAL_ERROR):
539 case(GM_UNSUPPORTED_DEVICE):
540 case(GM_SEND_REJECTED):
541 case(GM_SEND_TARGET_PORT_CLOSED):
542 case(GM_SEND_TARGET_NODE_UNREACHABLE):
543 case(GM_SEND_PORT_CLOSED):
544 case(GM_NODE_ID_NOT_YET_SET):
545 case(GM_STILL_SHUTTING_DOWN):
547 case(GM_NO_SUCH_DEVICE):
549 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
550 case(GM_UNTRANSLATED_SYSTEM_ERROR):
551 case(GM_ACCESS_DENIED):
552 case(GM_NO_DRIVER_SUPPORT):
553 case(GM_PTE_REF_CNT_OVERFLOW):
554 case(GM_NOT_SUPPORTED_IN_KERNEL):
555 case(GM_NOT_SUPPORTED_ON_ARCH):
558 case(GM_DATA_CORRUPTED):
559 case(GM_HARDWARE_FAULT):
560 case(GM_SEND_ORPHANED):
561 case(GM_MINOR_OVERFLOW):
562 case(GM_PAGE_TABLE_FULL):
564 case(GM_INVALID_PORT_NUMBER):
565 case(GM_DEV_NOT_FOUND):
566 case(GM_FIRMWARE_NOT_RUNNING):
567 case(GM_YP_NO_MATCH):
569 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
570 stxd->gm_target_node, gm_port_id,
571 gmnal_resume_sending_callback, context);
578 * If this is a large message init,
579 * we're not finished with the data yet,
580 * so can't call lib_finalise.
581 * However, we're also holding on to a
582 * stxd here (to keep track of the source
583 * iovec only). Should use another structure
584 * to keep track of iovec and return stxd to
587 if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
588 CDEBUG(D_INFO, "large transmit done\n");
591 gmnal_return_stxd(nal_data, stxd);
592 lib_finalize(libnal, stxd, cookie, PTL_OK);
597 * After an error on the port
598 * call this to allow future sends to complete
600 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
603 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
604 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
605 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
606 gmnal_return_stxd(stxd->nal_data, stxd);
607 lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
612 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
615 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
616 gmnal_data_t *nal_data = stxd->nal_data;
618 CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
619 if (status == GM_SUCCESS) {
620 spin_lock(&nal_data->gm_lock);
621 gm_send_to_peer_with_callback(gm_port, stxd->buffer,
622 stxd->gm_size, stxd->msg_size,
624 stxd->gm_target_node,
625 gmnal_small_tx_callback,
627 spin_unlock(&nal_data->gm_lock);
629 CERROR("send_to_peer status for stxd [%p] is "
630 "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
631 /* Recycle the stxd */
632 gmnal_return_stxd(nal_data, stxd);
633 lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
641 * Begine a large transmit.
642 * Do a gm_register of the memory pointed to by the iovec
643 * and send details to the receiver. The receiver does a gm_get
644 * to pull the data and sends and ack when finished. Upon receipt of
645 * this ack, deregister the memory. Only 1 send token is required here.
648 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
649 ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
650 unsigned int niov, struct iovec *iov, size_t offset, int size)
653 gmnal_data_t *nal_data;
654 gmnal_stxd_t *stxd = NULL;
656 gmnal_msghdr_t *msghdr = NULL;
657 unsigned int local_nid;
658 int mlen = 0; /* the size of the init message data */
659 struct iovec *iov_dup = NULL;
660 gm_status_t gm_status;
664 CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
665 "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
666 "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
667 global_nid, pid, niov, iov, size);
670 nal_data = (gmnal_data_t*)libnal->libnal_data;
672 CERROR("no libnal.\n");
673 return(GMNAL_STATUS_FAIL);
678 * Get stxd and buffer. Put local address of data in buffer,
679 * send local addresses to target,
680 * wait for the target node to suck the data over.
681 * The stxd is used to ren
683 stxd = gmnal_get_stxd(nal_data, 1);
684 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
686 stxd->type = GMNAL_LARGE_MESSAGE_INIT;
687 stxd->cookie = cookie;
690 * Copy gmnal_msg_hdr and portals header to the transmit buffer
691 * Then copy the iov in
693 buffer = stxd->buffer;
694 msghdr = (gmnal_msghdr_t*)buffer;
696 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
698 msghdr->magic = GMNAL_MAGIC;
699 msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
700 msghdr->sender_node_id = nal_data->gm_global_nid;
701 msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
702 msghdr->niov = niov ;
703 buffer += sizeof(gmnal_msghdr_t);
704 mlen = sizeof(gmnal_msghdr_t);
705 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
708 CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
710 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
711 buffer += sizeof(ptl_hdr_t);
712 mlen += sizeof(ptl_hdr_t);
713 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
715 while (offset >= iov->iov_len) {
716 offset -= iov->iov_len;
721 LASSERT(offset >= 0);
723 * Store the iovs in the stxd for we can get
724 * them later if we need them
726 stxd->iov[0].iov_base = iov->iov_base + offset;
727 stxd->iov[0].iov_len = iov->iov_len - offset;
728 CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
730 gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
734 * copy the iov to the buffer so target knows
735 * where to get the data from
737 CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
738 gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
739 mlen += stxd->niov*(sizeof(struct iovec));
740 CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
743 * register the memory so the NIC can get hold of the data
744 * This is a slow process. it'd be good to overlap it
745 * with something else.
751 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
752 iov->iov_base, iov->iov_len);
753 spin_lock(&nal_data->gm_lock);
754 gm_status = gm_register_memory(nal_data->gm_port,
755 iov->iov_base, iov->iov_len);
756 if (gm_status != GM_SUCCESS) {
757 spin_unlock(&nal_data->gm_lock);
758 CERROR("gm_register_memory returns [%d][%s] "
759 "for memory [%p] len ["LPSZ"]\n",
760 gm_status, gmnal_gm_error(gm_status),
761 iov->iov_base, iov->iov_len);
762 spin_lock(&nal_data->gm_lock);
763 while (iov_dup != iov) {
764 gm_deregister_memory(nal_data->gm_port,
769 spin_unlock(&nal_data->gm_lock);
770 gmnal_return_stxd(nal_data, stxd);
774 spin_unlock(&nal_data->gm_lock);
779 * Send the init message to the target
781 CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
782 spin_lock(&nal_data->gm_lock);
783 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
785 if (gm_status != GM_SUCCESS) {
786 spin_unlock(&nal_data->gm_lock);
787 CERROR("Failed to obtain local id\n");
788 gmnal_return_stxd(nal_data, stxd);
789 /* TO DO deregister memory on failure */
790 return(GMNAL_STATUS_FAIL);
792 CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
793 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
794 stxd->gm_size, mlen, GM_LOW_PRIORITY,
795 local_nid, gmnal_large_tx_callback,
797 spin_unlock(&nal_data->gm_lock);
799 CDEBUG(D_INFO, "done\n");
805 * Callback function indicates that send of buffer with
806 * large message iovec has completed (or failed).
809 gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
811 gmnal_small_tx_callback(gm_port, context, status);
818 * Have received a buffer that contains an iovec of the sender.
819 * Do a gm_register_memory of the receivers buffer and then do a get
820 * data from the sender.
823 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
824 unsigned int nriov, struct iovec *riov, size_t offset,
825 size_t mlen, size_t rlen)
827 gmnal_data_t *nal_data = libnal->libnal_data;
828 gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
830 struct iovec *riov_dup;
832 gmnal_msghdr_t *msghdr = NULL;
833 gm_status_t gm_status;
835 CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
836 "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
837 libnal, private, cookie, nriov, riov, mlen, rlen);
840 CERROR("gmnal_large_rx no context\n");
841 lib_finalize(libnal, private, cookie, PTL_FAIL);
845 buffer = srxd->buffer;
846 msghdr = (gmnal_msghdr_t*)buffer;
847 buffer += sizeof(gmnal_msghdr_t);
848 buffer += sizeof(ptl_hdr_t);
851 * Store the senders stxd address in the srxd for this message
852 * The gmnal_large_message_ack needs it to notify the sender
853 * the pull of data is complete
855 srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
858 * Register the receivers memory
860 * tell the sender that we got the data
861 * then tell the receiver we got the data
863 * If the iovecs match, could interleave
864 * gm_registers and gm_gets for each element
866 while (offset >= riov->iov_len) {
867 offset -= riov->iov_len;
871 LASSERT (nriov >= 0);
872 LASSERT (offset >= 0);
874 * do this so the final gm_get callback can deregister the memory
876 PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
878 srxd->riov[0].iov_base = riov->iov_base + offset;
879 srxd->riov[0].iov_len = riov->iov_len - offset;
881 gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
888 CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
889 riov->iov_base, riov->iov_len);
890 spin_lock(&nal_data->gm_lock);
891 gm_status = gm_register_memory(nal_data->gm_port,
892 riov->iov_base, riov->iov_len);
893 if (gm_status != GM_SUCCESS) {
894 spin_unlock(&nal_data->gm_lock);
895 CERROR("gm_register_memory returns [%d][%s] "
896 "for memory [%p] len ["LPSZ"]\n",
897 gm_status, gmnal_gm_error(gm_status),
898 riov->iov_base, riov->iov_len);
899 spin_lock(&nal_data->gm_lock);
900 while (riov_dup != riov) {
901 gm_deregister_memory(nal_data->gm_port,
906 spin_lock(&nal_data->gm_lock);
908 * give back srxd and buffer. Send NACK to sender
910 PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
913 spin_unlock(&nal_data->gm_lock);
918 * now do gm_get to get the data
920 srxd->cookie = cookie;
921 if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
922 nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
923 CERROR("can't get the data");
926 CDEBUG(D_INFO, "lgmanl_large_rx done\n");
933 * Perform a number of remote gets as part of receiving
935 * The final one to complete (i.e. the last callback to get called)
937 * gm_get requires a send token.
940 gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
941 int nriov, struct iovec *riov)
946 CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
947 "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
950 ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
952 CERROR("there's something wrong with the iovecs\n");
953 return(GMNAL_STATUS_FAIL);
955 CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
956 spin_lock_init(&srxd->callback_lock);
957 srxd->ncallbacks = ncalls;
958 srxd->callback_status = 0;
960 ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
962 CERROR("there's something wrong with the iovecs\n");
963 return(GMNAL_STATUS_FAIL);
966 return(GMNAL_STATUS_OK);
972 * pull data from source node (source iovec) to a local iovec.
973 * The iovecs may not match which adds the complications below.
974 * Count the number of gm_gets that will be required so the callbacks
975 * can determine who is the last one.
978 gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
979 struct iovec *siov, int nriov, struct iovec *riov)
983 int slen = siov->iov_len, rlen = riov->iov_len;
984 char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
985 unsigned long sbuf_long;
986 gm_remote_ptr_t remote_ptr = 0;
987 unsigned int source_node;
988 gmnal_ltxd_t *ltxd = NULL;
989 gmnal_data_t *nal_data = srxd->nal_data;
991 CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
994 CERROR("Bad args No nal_data\n");
995 return(GMNAL_STATUS_FAIL);
997 spin_lock(&nal_data->gm_lock);
998 if (gm_global_id_to_node_id(nal_data->gm_port,
999 srxd->gm_source_node,
1000 &source_node) != GM_SUCCESS) {
1002 CERROR("cannot resolve global_id [%u] "
1003 "to local node_id\n", srxd->gm_source_node);
1004 spin_unlock(&nal_data->gm_lock);
1005 return(GMNAL_STATUS_FAIL);
1007 spin_unlock(&nal_data->gm_lock);
1009 * We need a send token to use gm_get
1010 * getting an stxd gets us a send token.
1011 * the stxd is used as the context to the
1012 * callback function (so stxd can be returned).
1013 * Set pointer in stxd to srxd so callback count in srxd
1014 * can be decremented to find last callback to complete
1016 CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
1017 srxd->gm_source_node, source_node);
1021 CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
1022 sbuf, slen, rbuf, rlen);
1026 CDEBUG(D_INFO, "slen>rlen\n");
1027 ltxd = gmnal_get_ltxd(nal_data);
1029 spin_lock(&nal_data->gm_lock);
1031 * funny business to get rid
1032 * of compiler warning
1034 sbuf_long = (unsigned long) sbuf;
1035 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1036 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1037 rlen, GM_LOW_PRIORITY, source_node,
1039 gmnal_remote_get_callback, ltxd);
1040 spin_unlock(&nal_data->gm_lock);
1043 * at the end of 1 iov element
1049 rbuf = riov->iov_base;
1050 rlen = riov->iov_len;
1051 } else if (rlen > slen) {
1054 CDEBUG(D_INFO, "slen<rlen\n");
1055 ltxd = gmnal_get_ltxd(nal_data);
1057 spin_lock(&nal_data->gm_lock);
1058 sbuf_long = (unsigned long) sbuf;
1059 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1060 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1061 slen, GM_LOW_PRIORITY, source_node,
1063 gmnal_remote_get_callback, ltxd);
1064 spin_unlock(&nal_data->gm_lock);
1067 * at end of siov element
1072 sbuf = siov->iov_base;
1073 slen = siov->iov_len;
1077 CDEBUG(D_INFO, "rlen=slen\n");
1078 ltxd = gmnal_get_ltxd(nal_data);
1080 spin_lock(&nal_data->gm_lock);
1081 sbuf_long = (unsigned long) sbuf;
1082 remote_ptr = (gm_remote_ptr_t)sbuf_long;
1083 gm_get(nal_data->gm_port, remote_ptr, rbuf,
1084 rlen, GM_LOW_PRIORITY, source_node,
1086 gmnal_remote_get_callback, ltxd);
1087 spin_unlock(&nal_data->gm_lock);
1090 * at end of siov and riov element
1093 sbuf = siov->iov_base;
1094 slen = siov->iov_len;
1097 rbuf = riov->iov_base;
1098 rlen = riov->iov_len;
1107 * The callback function that is invoked after each gm_get call completes.
1108 * Multiple callbacks may be invoked for 1 transaction, only the final
1109 * callback has work to do.
1112 gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
1116 gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
1117 gmnal_srxd_t *srxd = ltxd->srxd;
1118 lib_nal_t *libnal = srxd->nal_data->libnal;
1122 gmnal_data_t *nal_data;
1124 CDEBUG(D_TRACE, "called for context [%p]\n", context);
1126 if (status != GM_SUCCESS) {
1127 CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status));
1130 spin_lock(&srxd->callback_lock);
1132 srxd->callback_status |= status;
1133 lastone = srxd->ncallbacks?0:1;
1134 spin_unlock(&srxd->callback_lock);
1135 nal_data = srxd->nal_data;
1138 * everyone returns a send token
1140 gmnal_return_ltxd(nal_data, ltxd);
1143 CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
1148 * Let our client application proceed
1150 CERROR("final callback context[%p]\n", srxd);
1151 lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
1154 * send an ack to the sender to let him know we got the data
1156 gmnal_large_tx_ack(nal_data, srxd);
1159 * Unregister the memory that was used
1160 * This is a very slow business (slower then register)
1162 nriov = srxd->nriov;
1164 spin_lock(&nal_data->gm_lock);
1166 CERROR("deregister memory [%p]\n", riov->iov_base);
1167 if (gm_deregister_memory(srxd->nal_data->gm_port,
1168 riov->iov_base, riov->iov_len)) {
1169 CERROR("failed to deregister memory [%p]\n",
1174 spin_unlock(&nal_data->gm_lock);
1175 PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
1178 * repost the receive buffer (return receive token)
1180 spin_lock(&nal_data->gm_lock);
1181 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
1182 srxd->gmsize, GM_LOW_PRIORITY, 0);
1183 spin_unlock(&nal_data->gm_lock);
1190 * Called on target node.
1191 * After pulling data from a source node
1192 * send an ack message to indicate the large transmit is complete.
1195 gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1199 gmnal_msghdr_t *msghdr;
1200 void *buffer = NULL;
1201 unsigned int local_nid;
1202 gm_status_t gm_status = GM_SUCCESS;
1204 CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
1205 srxd->gm_source_node);
1207 spin_lock(&nal_data->gm_lock);
1208 gm_status = gm_global_id_to_node_id(nal_data->gm_port,
1209 srxd->gm_source_node, &local_nid);
1210 spin_unlock(&nal_data->gm_lock);
1211 if (gm_status != GM_SUCCESS) {
1212 CERROR("Failed to obtain local id\n");
1215 CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
1217 stxd = gmnal_get_stxd(nal_data, 1);
1218 CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
1220 stxd->nal_data = nal_data;
1221 stxd->type = GMNAL_LARGE_MESSAGE_ACK;
1224 * Copy gmnal_msg_hdr and portals header to the transmit buffer
1225 * Then copy the data in
1227 buffer = stxd->buffer;
1228 msghdr = (gmnal_msghdr_t*)buffer;
1231 * Add in the address of the original stxd from the sender node
1232 * so it knows which thread to notify.
1234 msghdr->magic = GMNAL_MAGIC;
1235 msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
1236 msghdr->sender_node_id = nal_data->gm_global_nid;
1237 msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
1238 CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
1240 CDEBUG(D_INFO, "sending\n");
1241 stxd->msg_size= sizeof(gmnal_msghdr_t);
1244 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
1245 "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
1246 "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
1247 stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
1248 spin_lock(&nal_data->gm_lock);
1249 stxd->gm_priority = GM_LOW_PRIORITY;
1250 stxd->gm_target_node = local_nid;
1251 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
1252 stxd->gm_size, stxd->msg_size,
1253 GM_LOW_PRIORITY, local_nid,
1254 gmnal_large_tx_ack_callback,
1257 spin_unlock(&nal_data->gm_lock);
1258 CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
1265 * A callback to indicate the small transmit operation is compete
1266 * Check for errors and try to deal with them.
1267 * Call lib_finalise to inform the client application that the
1268 * send is complete and the memory can be reused.
1269 * Return the stxd when finished with it (returns a send token)
1272 gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
1275 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
1276 gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
1279 CERROR("send completion event for unknown stxd\n");
1282 CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
1284 gmnal_return_stxd(stxd->nal_data, stxd);
1286 spin_unlock(&nal_data->gm_lock);
1291 * Indicates the large transmit operation is compete.
1292 * Called on transmit side (means data has been pulled by receiver
1294 * Call lib_finalise to inform the client application that the send
1295 * is complete, deregister the memory and return the stxd.
1296 * Finally, report the rx buffer that the ack message was delivered in.
1299 gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
1301 lib_nal_t *libnal = nal_data->libnal;
1302 gmnal_stxd_t *stxd = NULL;
1303 gmnal_msghdr_t *msghdr = NULL;
1304 void *buffer = NULL;
1308 CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
1310 buffer = srxd->buffer;
1311 msghdr = (gmnal_msghdr_t*)buffer;
1312 stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
1314 CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
1316 lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
1319 * extract the iovec from the stxd, deregister the memory.
1320 * free the space used to store the iovec
1323 while(stxd->niov--) {
1324 CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
1325 iov->iov_base, iov->iov_len);
1326 spin_lock(&nal_data->gm_lock);
1327 gm_deregister_memory(nal_data->gm_port, iov->iov_base,
1329 spin_unlock(&nal_data->gm_lock);
1334 * return the send token
1335 * TO DO It is bad to hold onto the send token so long?
1337 gmnal_return_stxd(nal_data, stxd);
1341 * requeue the receive buffer
1343 gmnal_rx_requeue_buffer(nal_data, srxd);