1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * This file contains all lgmnal send and receive functions
30 lgmnal_requeue_rxbuffer(lgmnal_data_t *nal_data, lgmnal_srxd_t *srxd)
32 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_requeue_rxbuffer\n"));
34 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data));
36 LGMNAL_GM_LOCK(nal_data);
37 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
38 srxd->gmsize, GM_LOW_PRIORITY, 0 );
39 LGMNAL_GM_UNLOCK(nal_data);
41 return(LGMNAL_STATUS_OK);
46 * Handle a bad message
47 * A bad message is one we don't expect or can't interpret
50 lgmnal_badrx_message(lgmnal_data_t *nal_data, gm_recv_t *recv, lgmnal_srxd_t *srxd)
52 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("Can't handle message\n"));
55 srxd = lgmnal_rxbuffer_to_srxd(nal_data, gm_ntohp(recv->buffer));
57 lgmnal_requeue_rxbuffer(nal_data, srxd);
59 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Can't find a descriptor for this buffer\n"));
63 return(LGMNAL_STATUS_FAIL);
66 return(LGMNAL_STATUS_OK);
71 * Start processing a small message receive
72 * Get here from lgmnal_receive_thread
73 * Hand off to lib_parse, which calls cb_recv
74 * which hands back to lgmnal_small_receive2
75 * Deal with all endian stuff here (if we can!)
78 lgmnal_small_receive1(lgmnal_data_t *nal_data, gm_recv_t *recv)
80 lgmnal_srxd_t *srxd = NULL;
82 unsigned int snode, sport, type, length;
83 lgmnal_msghdr_t *lgmnal_msghdr;
84 ptl_hdr_t *portals_hdr;
86 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive1 nal_data [%p], recv [%p]\n", nal_data, recv));
88 buffer = gm_ntohp(recv->buffer);;
89 snode = (int)gm_ntoh_u16(recv->sender_node_id);
90 sport = (int)gm_ntoh_u8(recv->sender_port_id);
91 type = (int)gm_ntoh_u8(recv->type);
92 buffer = gm_ntohp(recv->buffer);
93 length = (int) gm_ntohl(recv->length);
95 lgmnal_msghdr = (lgmnal_msghdr_t*)buffer;
96 portals_hdr = (ptl_hdr_t*)(buffer+LGMNAL_MSGHDR_SIZE);
98 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("rx_event:: Sender node [%d], Sender Port [%d], type [%d], length [%d], buffer [%p]\n",
99 snode, sport, type, length, buffer));
100 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lgmnal_msghdr:: Sender node [%u], magic [%lx], type [%d]\n",
101 lgmnal_msghdr->sender_node_id, lgmnal_msghdr->magic, lgmnal_msghdr->type));
102 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: Sender node [%ul], dest_node [%ul]\n",
103 portals_hdr->src_nid, portals_hdr->dest_nid));
107 * Get a transmit descriptor for this message
109 srxd = lgmnal_rxbuffer_to_srxd(nal_data, buffer);
110 LGMNAL_PRINT(LGMNAL_DEBUG, ("Back from lgmnal_rxbuffer_to_srxd\n"));
112 LGMNAL_PRINT(LGMNAL_DEBUG, ("Failed to get receive descriptor for this buffer\n"));
113 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
114 return(LGMNAL_STATUS_FAIL);
116 srxd->type = LGMNAL_SMALL_MESSAGE;
118 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling lib_parse buffer is [%p]\n", buffer+LGMNAL_MSGHDR_SIZE));
120 * control passes to lib, which calls cb_recv
121 * cb_recv is responsible for returning the buffer
124 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
126 return(LGMNAL_STATUS_OK);
130 * Get here from lgmnal_receive_thread, lgmnal_small_receive1
132 * Put data from prewired receive buffer into users buffer(s)
133 * Hang out the receive buffer again for another receive
137 lgmnal_small_receive2(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, unsigned int niov,
138 struct iovec *iov, size_t mlen, size_t rlen)
140 lgmnal_srxd_t *srxd = NULL;
142 lgmnal_data_t *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
145 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive2 niov [%d] mlen[%d]\n", niov, mlen));
148 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("lgmnal_small_receive2 no context\n"));
149 lib_finalize(nal_cb, private, cookie);
153 srxd = (lgmnal_srxd_t*)private;
154 buffer = srxd->buffer;
155 buffer += sizeof(lgmnal_msghdr_t);
156 buffer += sizeof(ptl_hdr_t);
159 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing [%p] len [%d]\n", iov, iov->iov_len));
160 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
161 buffer += iov->iov_len;
167 * let portals library know receive is complete
169 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling lib_finalize\n"));
170 if (lib_finalize(nal_cb, private, cookie) != PTL_OK) {
171 /* TO DO what to do with failed lib_finalise? */
172 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lib_finalize failed\n"));
175 * return buffer so it can be used again
177 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling gm_provide_receive_buffer\n"));
178 LGMNAL_GM_LOCK(nal_data);
179 gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, srxd->gmsize, GM_LOW_PRIORITY, 0);
180 LGMNAL_GM_UNLOCK(nal_data);
188 * The recevive thread
189 * This guy wait in gm_blocking_recvive and gets
190 * woken up when the myrinet adaptor gets an interrupt.
191 * Hands off processing of small messages and blocks again
194 lgmnal_receive_thread(void *arg)
196 lgmnal_data_t *nal_data;
197 gm_recv_event_t *rxevent = NULL;
198 gm_recv_t *recv = NULL;
202 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread. NO nal_data. Exiting\n", arg));
206 nal_data = (lgmnal_data_t*)arg;
207 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread nal_data is [%p]\n", arg));
209 nal_data->rxthread_flag = LGMNAL_THREAD_STARTED;
210 while (nal_data->rxthread_flag == LGMNAL_THREAD_STARTED) {
211 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: lgmnal_receive_threads waiting for LGMNAL_CONTINUE flag\n"));
212 set_current_state(TASK_INTERRUPTIBLE);
213 schedule_timeout(1024);
217 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: calling daemonize\n"));
219 LGMNAL_GM_LOCK(nal_data);
220 while(nal_data->rxthread_flag == LGMNAL_THREAD_CONTINUE) {
221 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Receive thread waiting\n"));
222 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
223 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: receive thread got [%s]\n", lgmnal_rxevent(rxevent)));
224 if (nal_data->rxthread_flag != LGMNAL_THREAD_CONTINUE) {
225 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Receive thread time to exit\n"));
228 switch (GM_RECV_EVENT_TYPE(rxevent)) {
231 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: GM_RECV_EVENT\n"));
232 recv = (gm_recv_t*)&(rxevent->recv);
233 buffer = gm_ntohp(recv->buffer);
234 if (((lgmnal_msghdr_t*)buffer)->type == LGMNAL_SMALL_MESSAGE) {
235 LGMNAL_GM_UNLOCK(nal_data);
236 lgmnal_small_receive1(nal_data, recv);
237 LGMNAL_GM_LOCK(nal_data);
239 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: Unsupported message type\n"));
240 lgmnal_badrx_message(nal_data, recv, NULL);
243 case(_GM_SLEEP_EVENT):
245 * Blocking receive above just returns
246 * immediatly with _GM_SLEEP_EVENT
247 * Don't know what this is
249 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Sleeping in gm_unknown\n"));
250 LGMNAL_GM_UNLOCK(nal_data);
251 gm_unknown(nal_data->gm_port, rxevent);
252 LGMNAL_GM_LOCK(nal_data);
253 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Awake from gm_unknown\n"));
258 * Don't know what this is
259 * gm_unknown will make sense of it
261 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Passing event to gm_unknown\n"));
262 gm_unknown(nal_data->gm_port, rxevent);
263 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Processed unknown event\n"));
269 LGMNAL_GM_UNLOCK(nal_data);
270 nal_data->rxthread_flag = LGMNAL_THREAD_STOPPED;
271 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: The lgmnal_receive_thread nal_data [%p] is exiting\n", nal_data));
272 return(LGMNAL_STATUS_OK);
277 lgmnal_small_transmit(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
278 ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
280 lgmnal_data_t *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
281 lgmnal_stxd_t *stxd = NULL;
283 lgmnal_msghdr_t *msghdr = NULL;
285 unsigned int local_nid;
286 gm_status_t gm_status = GM_SUCCESS;
288 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_transmit nal_cb [%p] private [%p] cookie [%p] hdr [%p] type [%d] global_nid [%u][%x] pid [%d] niov [%d] iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type, global_nid, global_nid, pid, niov, iov, size));
290 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: dest_nid [%lu], src_nid [%lu]\n", hdr->dest_nid, hdr->src_nid));
293 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_data\n"));
294 return(LGMNAL_STATUS_FAIL);
296 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("nal_data [%p]\n", nal_data));
299 LGMNAL_GM_LOCK(nal_data);
300 gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, &local_nid);
301 LGMNAL_GM_UNLOCK(nal_data);
302 if (gm_status != GM_SUCCESS) {
303 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Failed to obtain local id\n"));
304 return(LGMNAL_STATUS_FAIL);
306 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Local Node_id is [%u][%x]\n", local_nid, local_nid));
308 stxd = lgmnal_get_stxd(nal_data, 1);
309 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
311 stxd->type = LGMNAL_SMALL_MESSAGE;
312 stxd->cookie = cookie;
315 * Copy lgmnal_msg_hdr and portals header to the transmit buffer
316 * Then copy the data in
318 buffer = stxd->buffer;
319 msghdr = (lgmnal_msghdr_t*)buffer;
321 msghdr->magic = LGMNAL_MAGIC;
322 msghdr->type = LGMNAL_SMALL_MESSAGE;
323 msghdr->sender_node_id = nal_data->gm_global_nid;
324 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
326 buffer += sizeof(lgmnal_msghdr_t);
327 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Advancing buffer pointer by [%x] to [%p]\n", sizeof(lgmnal_msghdr_t), buffer));
329 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing portals hdr at [%p]\n", buffer));
330 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
332 buffer += sizeof(ptl_hdr_t);
335 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing iov [%p] len [%d] to [%p]\n", iov, iov->iov_len, buffer));
336 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
337 buffer+= iov->iov_len;
341 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending\n"));
342 tot_size = size+sizeof(ptl_hdr_t)+sizeof(lgmnal_msghdr_t);
345 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling gm_send_to_peer port [%p] buffer [%p] gmsize [%d] msize [%d] global_nid [%u][%x] local_nid[%d] stxd [%p]\n",
346 nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, global_nid, global_nid, local_nid, stxd));
347 LGMNAL_GM_LOCK(nal_data);
348 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, GM_LOW_PRIORITY, local_nid, lgmnal_small_tx_done, (void*)stxd);
350 LGMNAL_GM_UNLOCK(nal_data);
351 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
358 lgmnal_small_tx_done(gm_port_t *gm_port, void *context, gm_status_t status)
360 lgmnal_stxd_t *stxd = (lgmnal_stxd_t*)context;
361 lib_msg_t *cookie = stxd->cookie;
362 lgmnal_data_t *nal_data = (lgmnal_data_t*)stxd->nal_data;
363 nal_cb_t *nal_cb = nal_data->nal_cb;
366 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("send completion event for unknown stxd\n"));
369 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Result of send stxd [%p] is [%s]\n", stxd, lgmnal_gm_error(status)));
370 /* TO DO figure out which sends are worth retrying and get a send token to retry */
371 if (lib_finalize(nal_cb, stxd, cookie) != PTL_OK) {
372 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Call to lib_finalize failed for stxd [%p]\n", stxd));
374 lgmnal_return_stxd(nal_data, stxd);
380 lgmnal_large_tx1_done(gm_port_t *gm_port, void *context, gm_status_t status)
386 * Begin a large transmit
389 lgmnal_large_transmit1(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
390 ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
393 lgmnal_data_t *nal_data;
394 lgmnal_stxd_t *stxd = NULL;
396 lgmnal_msghdr_t *msghdr = NULL;
397 unsigned int local_nid;
398 int mlen = 0; /* the size of the init message data */
401 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_large_transmit1 nal_cb [%p] private [%p], cookie [%p] hdr [%p], type [%d] global_nid [%u], pid [%d],
402 niov [%d], iov [%p], size [%d]\n",
403 nal_cb, private, cookie, hdr, type, global_nid, pid, niov, iov, size));
406 nal_data = (lgmnal_data_t*)nal_cb->nal_data;
408 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_cb.\n"));
409 return(LGMNAL_STATUS_FAIL);
414 * TO DO large transmit uses stxd. Should it have control descriptor?
416 stxd = lgmnal_get_stxd(nal_data, 1);
417 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
419 stxd->type = LGMNAL_LARGE_MESSAGE_INIT;
420 stxd->cookie = cookie;
423 * Copy lgmnal_msg_hdr and portals header to the transmit buffer
424 * Then copy the iov in
426 buffer = stxd->buffer;
427 msghdr = (lgmnal_msghdr_t*)buffer;
429 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
431 msghdr->magic = LGMNAL_MAGIC;
432 msghdr->type = LGMNAL_LARGE_MESSAGE_INIT;
433 msghdr->sender_node_id = nal_data->gm_global_nid;
435 buffer += sizeof(lgmnal_msghdr_t);
436 mlen = sizeof(lgmnal_msghdr_t);
439 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing portals hdr at [%p]\n", buffer));
441 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
442 buffer += sizeof(ptl_hdr_t);
443 mlen += sizeof(ptl_hdr_t);
446 * Store the iovs in the stxd for we can get them later
449 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Copying iov [%p] to [%p]\n", iov, stxd->iov));
450 gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
454 * Send the init message to the target
456 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending mlen [%d]\n", mlen));
457 LGMNAL_GM_LOCK(nal_data);
458 gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, mlen, GM_LOW_PRIORITY, local_nid, lgmnal_large_tx1_done, (void*)stxd);
459 LGMNAL_GM_UNLOCK(nal_data);
461 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
469 EXPORT_SYMBOL(lgmnal_requeue_rxbuffer);
470 EXPORT_SYMBOL(lgmnal_badrx_message);
471 EXPORT_SYMBOL(lgmnal_large_tx1_done);
472 EXPORT_SYMBOL(lgmnal_large_transmit1);
473 EXPORT_SYMBOL(lgmnal_small_receive1);
474 EXPORT_SYMBOL(lgmnal_small_receive2);
475 EXPORT_SYMBOL(lgmnal_receive_thread);
476 EXPORT_SYMBOL(lgmnal_small_transmit);
477 EXPORT_SYMBOL(lgmnal_small_tx_done);