2 * This program was prepared by the Regents of the University of
3 * California at Los Alamos National Laboratory (the University) under
4 * contract number W-7405-ENG-36 with the U.S. Department of Energy
5 * (DoE). Neither the U.S. Government nor the
6 * University makes any warranty, express or implied, or assumes any
7 * liability or responsibility for the use of this software.
11 * This file contains all lgmnal send and receive functions
17 lgmnal_requeue_rxbuffer(lgmnal_data_t *nal_data, lgmnal_srxd_t *srxd)
19 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_requeue_rxbuffer\n"));
21 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data));
23 LGMNAL_GM_LOCK(nal_data);
24 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
25 srxd->gmsize, GM_LOW_PRIORITY, 0 );
26 LGMNAL_GM_UNLOCK(nal_data);
28 return(LGMNAL_STATUS_OK);
33 * Handle a bad message
34 * A bad message is one we don't expect or can't interpret
37 lgmnal_badrx_message(lgmnal_data_t *nal_data, gm_recv_t *recv, lgmnal_srxd_t *srxd)
39 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("Can't handle message\n"));
42 srxd = lgmnal_rxbuffer_to_srxd(nal_data, gm_ntohp(recv->buffer));
44 lgmnal_requeue_rxbuffer(nal_data, srxd);
46 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Can't find a descriptor for this buffer\n"));
50 return(LGMNAL_STATUS_FAIL);
53 return(LGMNAL_STATUS_OK);
58 * Start processing a small message receive
59 * Get here from lgmnal_receive_thread
60 * Hand off to lib_parse, which calls cb_recv
61 * which hands back to lgmnal_small_receive2
62 * Deal with all endian stuff here (if we can!)
65 lgmnal_small_receive1(lgmnal_data_t *nal_data, gm_recv_t *recv)
67 lgmnal_srxd_t *srxd = NULL;
69 unsigned int snode, sport, type, length;
70 lgmnal_msghdr_t *lgmnal_msghdr;
71 ptl_hdr_t *portals_hdr;
73 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive1 nal_data [%p], recv [%p]\n", nal_data, recv));
75 buffer = gm_ntohp(recv->buffer);;
76 snode = (int)gm_ntoh_u16(recv->sender_node_id);
77 sport = (int)gm_ntoh_u8(recv->sender_port_id);
78 type = (int)gm_ntoh_u8(recv->type);
79 buffer = gm_ntohp(recv->buffer);
80 length = (int) gm_ntohl(recv->length);
82 lgmnal_msghdr = (lgmnal_msghdr_t*)buffer;
83 portals_hdr = (ptl_hdr_t*)(buffer+LGMNAL_MSGHDR_SIZE);
85 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("rx_event:: Sender node [%d], Sender Port [%d], type [%d], length [%d], buffer [%p]\n",
86 snode, sport, type, length, buffer));
87 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lgmnal_msghdr:: Sender node [%u], magic [%lx], type [%d]\n",
88 lgmnal_msghdr->sender_node_id, lgmnal_msghdr->magic, lgmnal_msghdr->type));
89 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: Sender node [%ul], dest_node [%ul]\n",
90 portals_hdr->src_nid, portals_hdr->dest_nid));
94 * Get a transmit descriptor for this message
96 srxd = lgmnal_rxbuffer_to_srxd(nal_data, buffer);
97 LGMNAL_PRINT(LGMNAL_DEBUG, ("Back from lgmnal_rxbuffer_to_srxd\n"));
99 LGMNAL_PRINT(LGMNAL_DEBUG, ("Failed to get receive descriptor for this buffer\n"));
100 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
101 return(LGMNAL_STATUS_FAIL);
103 srxd->type = LGMNAL_SMALL_MESSAGE;
105 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling lib_parse buffer is [%p]\n", buffer+LGMNAL_MSGHDR_SIZE));
107 * control passes to lib, which calls cb_recv
108 * cb_recv is responsible for returning the buffer
111 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
113 return(LGMNAL_STATUS_OK);
117 * Get here from lgmnal_receive_thread, lgmnal_small_receive1
119 * Put data from prewired receive buffer into users buffer(s)
120 * Hang out the receive buffer again for another receive
124 lgmnal_small_receive2(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, unsigned int niov,
125 struct iovec *iov, size_t mlen, size_t rlen)
127 lgmnal_srxd_t *srxd = NULL;
129 lgmnal_data_t *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
132 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive2 niov [%d] mlen[%d]\n", niov, mlen));
135 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("lgmnal_small_receive2 no context\n"));
136 lib_finalize(nal_cb, private, cookie);
140 srxd = (lgmnal_srxd_t*)private;
141 buffer = srxd->buffer;
142 buffer += sizeof(lgmnal_msghdr_t);
143 buffer += sizeof(ptl_hdr_t);
146 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing [%p] len [%d]\n", iov, iov->iov_len));
147 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
148 buffer += iov->iov_len;
154 * let portals library know receive is complete
156 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling lib_finalize\n"));
157 if (lib_finalize(nal_cb, private, cookie) != PTL_OK) {
158 /* TO DO what to do with failed lib_finalise? */
159 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lib_finalize failed\n"));
162 * return buffer so it can be used again
164 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling gm_provide_receive_buffer\n"));
165 LGMNAL_GM_LOCK(nal_data);
166 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, srxd->gmsize, GM_LOW_PRIORITY, 0);
167 LGMNAL_GM_UNLOCK(nal_data);
175 * The recevive thread
176 * This guy wait in gm_blocking_recvive and gets
177 * woken up when the myrinet adaptor gets an interrupt.
178 * Hands off processing of small messages and blocks again
181 lgmnal_receive_thread(void *arg)
183 lgmnal_data_t *nal_data;
184 gm_recv_event_t *rxevent = NULL;
185 gm_recv_t *recv = NULL;
189 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread. NO nal_data. Exiting\n", arg));
193 nal_data = (lgmnal_data_t*)arg;
194 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread nal_data is [%p]\n", arg));
196 nal_data->rxthread_flag = LGMNAL_THREAD_STARTED;
197 while (nal_data->rxthread_flag == LGMNAL_THREAD_STARTED) {
198 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: lgmnal_receive_threads waiting for LGMNAL_CONTINUE flag\n"));
199 set_current_state(TASK_INTERRUPTIBLE);
200 schedule_timeout(1024);
204 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: calling daemonize\n"));
206 LGMNAL_GM_LOCK(nal_data);
207 while(nal_data->rxthread_flag == LGMNAL_THREAD_CONTINUE) {
208 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Receive thread waiting\n"));
209 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
210 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: receive thread got [%s]\n", lgmnal_rxevent(rxevent)));
211 if (nal_data->rxthread_flag != LGMNAL_THREAD_CONTINUE) {
212 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Receive thread time to exit\n"));
215 switch (GM_RECV_EVENT_TYPE(rxevent)) {
218 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: GM_RECV_EVENT\n"));
219 recv = (gm_recv_t*)&(rxevent->recv);
220 buffer = gm_ntohp(recv->buffer);
221 if (((lgmnal_msghdr_t*)buffer)->type == LGMNAL_SMALL_MESSAGE) {
222 LGMNAL_GM_UNLOCK(nal_data);
223 lgmnal_small_receive1(nal_data, recv);
224 LGMNAL_GM_LOCK(nal_data);
226 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: Unsupported message type\n"));
227 lgmnal_badrx_message(nal_data, recv, NULL);
230 case(_GM_SLEEP_EVENT):
232 * Blocking receive above just returns
233 * immediatly with _GM_SLEEP_EVENT
234 * Don't know what this is
236 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Sleeping in gm_unknown\n"));
237 LGMNAL_GM_UNLOCK(nal_data);
238 gm_unknown(nal_data->gm_port, rxevent);
239 LGMNAL_GM_LOCK(nal_data);
240 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Awake from gm_unknown\n"));
245 * Don't know what this is
246 * gm_unknown will make sense of it
248 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Passing event to gm_unknown\n"));
249 gm_unknown(nal_data->gm_port, rxevent);
250 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Processed unknown event\n"));
256 LGMNAL_GM_UNLOCK(nal_data);
257 nal_data->rxthread_flag = LGMNAL_THREAD_STOPPED;
258 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: The lgmnal_receive_thread nal_data [%p] is exiting\n", nal_data));
259 return(LGMNAL_STATUS_OK);
264 lgmnal_small_transmit(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
265 ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
267 lgmnal_data_t *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
268 lgmnal_stxd_t *stxd = NULL;
270 lgmnal_msghdr_t *msghdr = NULL;
272 unsigned int local_nid;
273 gm_status_t gm_status = GM_SUCCESS;
275 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_transmit nal_cb [%p] private [%p] cookie [%p] hdr [%p] type [%d] global_nid [%u][%x] pid [%d] niov [%d] iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type, global_nid, global_nid, pid, niov, iov, size));
277 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: dest_nid [%lu], src_nid [%lu]\n", hdr->dest_nid, hdr->src_nid));
280 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_data\n"));
281 return(LGMNAL_STATUS_FAIL);
283 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("nal_data [%p]\n", nal_data));
286 LGMNAL_GM_LOCK(nal_data);
287 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, &local_nid);
288 LGMNAL_GM_UNLOCK(nal_data);
289 if (gm_status != GM_SUCCESS) {
290 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Failed to obtain local id\n"));
291 return(LGMNAL_STATUS_FAIL);
293 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Local Node_id is [%u][%x]\n", local_nid, local_nid));
295 stxd = lgmnal_get_stxd(nal_data, 1);
296 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
298 stxd->type = LGMNAL_SMALL_MESSAGE;
299 stxd->cookie = cookie;
302 * Copy lgmnal_msg_hdr and portals header to the transmit buffer
303 * Then copy the data in
305 buffer = stxd->buffer;
306 msghdr = (lgmnal_msghdr_t*)buffer;
308 msghdr->magic = LGMNAL_MAGIC;
309 msghdr->type = LGMNAL_SMALL_MESSAGE;
310 msghdr->sender_node_id = nal_data->gm_global_nid;
311 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
313 buffer += sizeof(lgmnal_msghdr_t);
314 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Advancing buffer pointer by [%x] to [%p]\n", sizeof(lgmnal_msghdr_t), buffer));
316 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing portals hdr at [%p]\n", buffer));
317 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
319 buffer += sizeof(ptl_hdr_t);
322 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing iov [%p] len [%d] to [%p]\n", iov, iov->iov_len, buffer));
323 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
324 buffer+= iov->iov_len;
328 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending\n"));
329 tot_size = size+sizeof(ptl_hdr_t)+sizeof(lgmnal_msghdr_t);
332 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling gm_send_to_peer port [%p] buffer [%p] gmsize [%d] msize [%d] global_nid [%u][%x] local_nid[%d] stxd [%p]\n",
333 nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, global_nid, global_nid, local_nid, stxd));
334 LGMNAL_GM_LOCK(nal_data);
335 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, GM_LOW_PRIORITY, local_nid, lgmnal_small_tx_done, (void*)stxd);
337 LGMNAL_GM_UNLOCK(nal_data);
338 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
345 lgmnal_small_tx_done(gm_port_t *gm_port, void *context, gm_status_t status)
347 lgmnal_stxd_t *stxd = (lgmnal_stxd_t*)context;
348 lib_msg_t *cookie = stxd->cookie;
349 lgmnal_data_t *nal_data = (lgmnal_data_t*)stxd->nal_data;
350 nal_cb_t *nal_cb = nal_data->nal_cb;
353 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("send completion event for unknown stxd\n"));
356 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Result of send stxd [%p] is [%s]\n", stxd, lgmnal_gm_error(status)));
357 /* TO DO figure out which sends are worth retrying and get a send token to retry */
358 if (lib_finalize(nal_cb, stxd, cookie) != PTL_OK) {
359 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Call to lib_finalize failed for stxd [%p]\n", stxd));
361 lgmnal_return_stxd(nal_data, stxd);
367 lgmnal_large_tx1_done(gm_port_t *gm_port, void *context, gm_status_t status)
373 * Begin a large transmit
376 lgmnal_large_transmit1(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
377 ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
380 lgmnal_data_t *nal_data;
381 lgmnal_stxd_t *stxd = NULL;
383 lgmnal_msghdr_t *msghdr = NULL;
384 unsigned int local_nid;
385 int mlen = 0; /* the size of the init message data */
388 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_large_transmit1 nal_cb [%p] private [%p], cookie [%p] hdr [%p], type [%d] global_nid [%u], pid [%d],
389 niov [%d], iov [%p], size [%d]\n",
390 nal_cb, private, cookie, hdr, type, global_nid, pid, niov, iov, size));
393 nal_data = (lgmnal_data_t*)nal_cb->nal_data;
395 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_cb.\n"));
396 return(LGMNAL_STATUS_FAIL);
401 * TO DO large transmit uses stxd. Should it have control descriptor?
403 stxd = lgmnal_get_stxd(nal_data, 1);
404 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
406 stxd->type = LGMNAL_LARGE_MESSAGE_INIT;
407 stxd->cookie = cookie;
410 * Copy lgmnal_msg_hdr and portals header to the transmit buffer
411 * Then copy the iov in
413 buffer = stxd->buffer;
414 msghdr = (lgmnal_msghdr_t*)buffer;
416 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
418 msghdr->magic = LGMNAL_MAGIC;
419 msghdr->type = LGMNAL_LARGE_MESSAGE_INIT;
420 msghdr->sender_node_id = nal_data->gm_global_nid;
422 buffer += sizeof(lgmnal_msghdr_t);
423 mlen = sizeof(lgmnal_msghdr_t);
426 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing portals hdr at [%p]\n", buffer));
428 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
429 buffer += sizeof(ptl_hdr_t);
430 mlen += sizeof(ptl_hdr_t);
433 * Store the iovs in the stxd for we can get them later
436 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Copying iov [%p] to [%p]\n", iov, stxd->iov));
437 gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
441 * Send the init message to the target
443 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending mlen [%d]\n", mlen));
444 LGMNAL_GM_LOCK(nal_data);
445 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, mlen, GM_LOW_PRIORITY, local_nid, lgmnal_large_tx1_done, (void*)stxd);
446 LGMNAL_GM_UNLOCK(nal_data);
448 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
456 EXPORT_SYMBOL(lgmnal_requeue_rxbuffer);
457 EXPORT_SYMBOL(lgmnal_badrx_message);
458 EXPORT_SYMBOL(lgmnal_large_tx1_done);
459 EXPORT_SYMBOL(lgmnal_large_transmit1);
460 EXPORT_SYMBOL(lgmnal_small_receive1);
461 EXPORT_SYMBOL(lgmnal_small_receive2);
462 EXPORT_SYMBOL(lgmnal_receive_thread);
463 EXPORT_SYMBOL(lgmnal_small_transmit);
464 EXPORT_SYMBOL(lgmnal_small_tx_done);