1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 * The caretaker thread
30 * This is main thread of execution for the NAL side
31 * This guy waits in gm_blocking_recvive and gets
32 * woken up when the myrinet adaptor gets an interrupt.
33 * Hands off receive operations to the receive thread
34 * This thread Looks after gm_callbacks etc inline.
37 gmnal_ct_thread(void *arg)
40 gm_recv_event_t *rxevent = NULL;
41 gm_recv_t *recv = NULL;
44 CDEBUG(D_NET, "NO gmnalni. Exiting\n");
48 gmnalni = (gmnal_ni_t*)arg;
49 CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
51 sprintf(current->comm, "gmnal_ct");
53 kportal_daemonize("gmnalctd");
55 gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
57 spin_lock(&gmnalni->gmni_gm_lock);
58 while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59 CDEBUG(D_NET, "waiting\n");
60 rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
61 if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
62 CDEBUG(D_NET, "time to exit\n");
65 CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent));
66 switch (GM_RECV_EVENT_TYPE(rxevent)) {
69 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70 recv = (gm_recv_t*)&rxevent->recv;
71 spin_unlock(&gmnalni->gmni_gm_lock);
72 gmnal_add_rxtwe(gmnalni, recv);
73 spin_lock(&gmnalni->gmni_gm_lock);
74 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
76 case(_GM_SLEEP_EVENT):
78 * Blocking receive above just returns
79 * immediatly with _GM_SLEEP_EVENT
80 * Don't know what this is
82 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83 spin_unlock(&gmnalni->gmni_gm_lock);
84 gm_unknown(gmnalni->gmni_port, rxevent);
85 spin_lock(&gmnalni->gmni_gm_lock);
86 CDEBUG(D_NET, "Awake from gm_unknown\n");
91 * Don't know what this is
92 * gm_unknown will make sense of it
93 * Should be able to do something with
94 * FAST_RECV_EVENTS here.
96 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97 spin_unlock(&gmnalni->gmni_gm_lock);
98 gm_unknown(gmnalni->gmni_port, rxevent);
99 spin_lock(&gmnalni->gmni_gm_lock);
100 CDEBUG(D_NET, "Processed unknown event\n");
103 spin_unlock(&gmnalni->gmni_gm_lock);
104 gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
105 CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
112 * process a receive event
115 gmnal_rx_thread(void *arg)
120 gmnal_rxtwe_t *we = NULL;
124 CDEBUG(D_NET, "NO gmnalni. Exiting\n");
128 gmnalni = (gmnal_ni_t*)arg;
129 CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
131 for (rank=0; rank<num_rx_threads; rank++)
132 if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
135 snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
136 kportal_daemonize(name);
139 * set 1 bit for each thread started
140 * doesn't matter which bit
142 spin_lock(&gmnalni->gmni_rxthread_flag_lock);
143 if (gmnalni->gmni_rxthread_flag)
144 gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
146 gmnalni->gmni_rxthread_flag = 1;
147 CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
148 spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
150 while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
151 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
152 we = gmnal_get_rxtwe(gmnalni);
154 CDEBUG(D_NET, "Receive thread time to exit\n");
159 switch(((gmnal_msghdr_t*)buffer)->gmm_type) {
160 case(GMNAL_SMALL_MESSAGE):
161 gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE);
164 #warning better handling
165 CERROR("Unsupported message type\n");
166 gmnal_rx_bad(gmnalni, we);
168 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
171 spin_lock(&gmnalni->gmni_rxthread_flag_lock);
172 gmnalni->gmni_rxthread_flag/=2;
173 CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
174 spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
175 CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
183 * Start processing a small message receive
184 * Get here from gmnal_receive_thread
185 * Hand off to lib_parse, which calls cb_recv
186 * which hands back to gmnal_small_receive
187 * Deal with all endian stuff here.
190 gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type)
192 gmnal_srxd_t *srxd = NULL;
194 gmnal_msghdr_t *gmnal_msghdr;
195 ptl_hdr_t *portals_hdr;
197 CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n",
198 gmnalni, we, gmnal_type);
202 gmnal_msghdr = (gmnal_msghdr_t*)buffer;
203 portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
205 CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], "
206 "type [%d], length [%d], buffer [%p]\n",
207 we->snode, we->sport, we->type, we->length, buffer);
208 CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], "
209 "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid,
210 gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type);
211 CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], "
212 "dest_node ["LPD64"]\n", portals_hdr->src_nid,
213 portals_hdr->dest_nid);
216 * Get a receive descriptor for this message
218 srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer);
219 CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n");
221 CERROR("Failed to get receive descriptor\n");
225 srxd->rx_gmni = gmnalni;
226 srxd->rx_type = gmnal_type;
227 srxd->rx_nsiov = gmnal_msghdr->gmm_niov;
228 srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid;
230 CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
231 buffer+sizeof(gmnal_msghdr_t));
233 (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd);
234 /* Ignore error; we're connectionless */
236 gmnal_rx_requeue_buffer(gmnalni, srxd);
242 * After a receive has been processed,
243 * hang out the receive buffer again.
244 * This implicitly returns a receive token.
247 gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd)
249 CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni);
251 spin_lock(&gmnalni->gmni_gm_lock);
252 gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer,
253 srxd->rx_gmsize, GM_LOW_PRIORITY, 0 );
254 spin_unlock(&gmnalni->gmni_gm_lock);
259 * Handle a bad message
260 * A bad message is one we don't expect or can't interpret
263 gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we)
265 gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni,
268 CERROR("Can't find a descriptor for this buffer\n");
272 gmnal_rx_requeue_buffer(gmnalni, srxd);
278 * Start a small transmit.
279 * Use the given send token (and wired transmit buffer).
280 * Copy headers to wired buffer and initiate gm_send from the wired buffer.
281 * The callback function informs when the send is complete.
284 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
285 ptl_hdr_t *hdr, int type, ptl_nid_t nid,
286 gmnal_stxd_t *stxd, int size)
288 gmnal_ni_t *gmnalni = (gmnal_ni_t*)libnal->libnal_data;
290 gmnal_msghdr_t *msghdr = NULL;
292 gm_status_t gm_status = GM_SUCCESS;
294 CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
295 "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] "
296 "size [%d]\n", libnal, private, cookie, hdr, type,
299 CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
300 hdr->dest_nid, hdr->src_nid);
302 LASSERT ((nid >> 32) == 0);
303 LASSERT (gmnalni != NULL);
305 spin_lock(&gmnalni->gmni_gm_lock);
306 gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid,
308 spin_unlock(&gmnalni->gmni_gm_lock);
310 if (gm_status != GM_SUCCESS) {
311 CERROR("Failed to obtain local id\n");
315 CDEBUG(D_NET, "Local Node_id is [%u][%x]\n",
316 stxd->tx_gmlid, stxd->tx_gmlid);
319 stxd->tx_cookie = cookie;
320 stxd->tx_type = GMNAL_SMALL_MESSAGE;
321 stxd->tx_gm_priority = GM_LOW_PRIORITY;
324 * Copy gmnal_msg_hdr and portals header to the transmit buffer
325 * Then send the message, as the data has previously been copied in
328 buffer = stxd->tx_buffer;
329 msghdr = (gmnal_msghdr_t*)buffer;
331 msghdr->gmm_magic = GMNAL_MAGIC;
332 msghdr->gmm_type = GMNAL_SMALL_MESSAGE;
333 msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid;
334 CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer);
336 buffer += sizeof(gmnal_msghdr_t);
338 CDEBUG(D_NET, "processing portals hdr at [%p]\n", buffer);
339 gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
341 buffer += sizeof(ptl_hdr_t);
343 CDEBUG(D_NET, "sending\n");
344 tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
345 stxd->tx_msg_size = tot_size;
347 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
348 "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
349 "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer,
350 stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid,
353 spin_lock(&gmnalni->gmni_gm_lock);
354 gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer,
355 stxd->tx_gm_size, stxd->tx_msg_size,
356 stxd->tx_gm_priority, stxd->tx_gmlid,
357 gmnal_small_tx_callback, (void*)stxd);
358 spin_unlock(&gmnalni->gmni_gm_lock);
359 CDEBUG(D_NET, "done\n");
366 * A callback to indicate the small transmit operation is compete
367 * Check for erros and try to deal with them.
368 * Call lib_finalise to inform the client application that the send
369 * is complete and the memory can be reused.
370 * Return the stxd when finished with it (returns a send token)
373 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
375 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
376 lib_msg_t *cookie = stxd->tx_cookie;
377 gmnal_ni_t *gmnalni = stxd->tx_gmni;
378 lib_nal_t *libnal = gmnalni->gmni_libnal;
381 CDEBUG(D_NET, "send completion event for unknown stxd\n");
384 if (status != GM_SUCCESS)
385 CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n",
386 stxd, gmnal_gm_error(status), stxd->tx_nid);
394 case(GM_SEND_DROPPED):
396 * do a resend on the dropped ones
398 CERROR("send stxd [%p] dropped, resending\n", context);
399 spin_lock(&gmnalni->gmni_gm_lock);
400 gm_send_to_peer_with_callback(gmnalni->gmni_port,
404 stxd->tx_gm_priority,
406 gmnal_small_tx_callback,
408 spin_unlock(&gmnalni->gmni_gm_lock);
411 case(GM_SEND_TIMED_OUT):
415 CDEBUG(D_NET, "calling gm_drop_sends\n");
416 spin_lock(&gmnalni->gmni_gm_lock);
417 gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority,
418 stxd->tx_gmlid, gm_port_id,
419 gmnal_drop_sends_callback, context);
420 spin_unlock(&gmnalni->gmni_gm_lock);
429 case(GM_INTERRUPTED):
431 case(GM_INPUT_BUFFER_TOO_SMALL):
432 case(GM_OUTPUT_BUFFER_TOO_SMALL):
434 case(GM_MEMORY_FAULT):
435 case(GM_INVALID_PARAMETER):
436 case(GM_OUT_OF_MEMORY):
437 case(GM_INVALID_COMMAND):
438 case(GM_PERMISSION_DENIED):
439 case(GM_INTERNAL_ERROR):
441 case(GM_UNSUPPORTED_DEVICE):
442 case(GM_SEND_REJECTED):
443 case(GM_SEND_TARGET_PORT_CLOSED):
444 case(GM_SEND_TARGET_NODE_UNREACHABLE):
445 case(GM_SEND_PORT_CLOSED):
446 case(GM_NODE_ID_NOT_YET_SET):
447 case(GM_STILL_SHUTTING_DOWN):
449 case(GM_NO_SUCH_DEVICE):
451 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
452 case(GM_UNTRANSLATED_SYSTEM_ERROR):
453 case(GM_ACCESS_DENIED):
454 case(GM_NO_DRIVER_SUPPORT):
455 case(GM_PTE_REF_CNT_OVERFLOW):
456 case(GM_NOT_SUPPORTED_IN_KERNEL):
457 case(GM_NOT_SUPPORTED_ON_ARCH):
460 case(GM_DATA_CORRUPTED):
461 case(GM_HARDWARE_FAULT):
462 case(GM_SEND_ORPHANED):
463 case(GM_MINOR_OVERFLOW):
464 case(GM_PAGE_TABLE_FULL):
466 case(GM_INVALID_PORT_NUMBER):
467 case(GM_DEV_NOT_FOUND):
468 case(GM_FIRMWARE_NOT_RUNNING):
469 case(GM_YP_NO_MATCH):
471 gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority,
472 stxd->tx_gmlid, gm_port_id,
473 gmnal_resume_sending_callback, context);
478 gmnal_return_stxd(gmnalni, stxd);
479 lib_finalize(libnal, stxd, cookie, PTL_OK);
484 * After an error on the port
485 * call this to allow future sends to complete
487 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
490 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
491 gmnal_ni_t *gmnalni = stxd->tx_gmni;
493 CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
494 gmnal_return_stxd(gmnalni, stxd);
495 lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
500 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
503 gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
504 gmnal_ni_t *gmnalni = stxd->tx_gmni;
506 CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
507 if (status == GM_SUCCESS) {
508 spin_lock(&gmnalni->gmni_gm_lock);
509 gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer,
512 stxd->tx_gm_priority,
514 gmnal_small_tx_callback,
516 spin_unlock(&gmnalni->gmni_gm_lock);
518 CERROR("send_to_peer status for stxd [%p] is "
519 "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
520 /* Recycle the stxd */
521 gmnal_return_stxd(gmnalni, stxd);
522 lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);