1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
30 ptl_nid_t dstnid, int type)
32 gmnal_msg_t *msg = tx->tx_msg;
34 /* CAVEAT EMPTOR! this only sets the common message fields. */
35 msg->gmm_magic = GMNAL_MSG_MAGIC;
36 msg->gmm_version = GMNAL_MSG_VERSION;
38 msg->gmm_srcnid = gmnalni->gmni_libnal->libnal_ni.ni_pid.nid;
39 msg->gmm_dstnid = dstnid;
43 gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
45 gmnal_msg_t *msg = rx->rx_msg;
46 const int hdr_size = offsetof(gmnal_msg_t, gmm_u);
49 /* 6 bytes are enough to have received magic + version */
50 if (rx->rx_recv_nob < 6) {
51 CERROR("Short message from gmid %u: %d\n",
52 rx->rx_recv_gmid, rx->rx_recv_nob);
56 if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
58 } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
61 CERROR("Bad magic from gmid %u: %08x\n",
62 rx->rx_recv_gmid, msg->gmm_magic);
66 if (msg->gmm_version !=
67 (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
68 CERROR("Bad version from gmid %u: %d\n",
69 rx->rx_recv_gmid, msg->gmm_version);
73 if (rx->rx_recv_nob < hdr_size) {
74 CERROR("Short message from %u: %d\n",
75 rx->rx_recv_gmid, rx->rx_recv_nob);
80 /* leave magic unflipped as a clue to peer endianness */
81 __swab16s(&msg->gmm_version);
82 __swab16s(&msg->gmm_type);
83 __swab64s(&msg->gmm_srcnid);
84 __swab64s(&msg->gmm_dstnid);
87 if (msg->gmm_srcnid == PTL_NID_ANY) {
88 CERROR("Bad src nid from %u: "LPX64"\n",
89 rx->rx_recv_gmid, msg->gmm_srcnid);
93 if (msg->gmm_dstnid != gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) {
94 CERROR("Bad dst nid from %u: "LPX64"\n",
95 rx->rx_recv_gmid, msg->gmm_dstnid);
99 switch (msg->gmm_type) {
101 CERROR("Unknown message type from %u: %x\n",
102 rx->rx_recv_gmid, msg->gmm_type);
105 case GMNAL_MSG_IMMEDIATE:
106 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
107 CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
108 rx->rx_recv_gmid, rx->rx_recv_nob,
109 offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
119 * The caretaker thread
120 * This is main thread of execution for the NAL side
121 * This guy waits in gm_blocking_recvive and gets
122 * woken up when the myrinet adaptor gets an interrupt.
123 * Hands off receive operations to the receive thread
124 * This thread Looks after gm_callbacks etc inline.
127 gmnal_ct_thread(void *arg)
129 gmnal_ni_t *gmnalni = arg;
130 gm_recv_event_t *rxevent = NULL;
131 gm_recv_t *recv = NULL;
133 sprintf(current->comm, "gmnal_ct");
134 kportal_daemonize("gmnalctd");
136 gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
138 while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
140 spin_lock(&gmnalni->gmni_gm_lock);
141 rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
142 spin_unlock(&gmnalni->gmni_gm_lock);
144 if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
145 CDEBUG(D_NET, "time to exit\n");
149 CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
151 if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
152 recv = (gm_recv_t*)&rxevent->recv;
153 gmnal_enqueue_rx(gmnalni, recv);
155 gm_unknown(gmnalni->gmni_port, rxevent);
159 gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
160 CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
166 * process a receive event
169 gmnal_rx_thread(void *arg)
171 gmnal_ni_t *gmnalni = arg;
176 for (rank=0; rank<num_rx_threads; rank++)
177 if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
180 snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
181 kportal_daemonize(name);
184 * set 1 bit for each thread started
185 * doesn't matter which bit
187 spin_lock(&gmnalni->gmni_rxthread_flag_lock);
188 if (gmnalni->gmni_rxthread_flag)
189 gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
191 gmnalni->gmni_rxthread_flag = 1;
192 spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
194 while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
195 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
197 rx = gmnal_dequeue_rx(gmnalni);
199 CDEBUG(D_NET, "Receive thread time to exit\n");
203 /* We're connectionless: simply ignore packets on error */
205 if (gmnal_unpack_msg(gmnalni, rx) == 0) {
207 LASSERT (rx->rx_msg->gmm_type == GMNAL_MSG_IMMEDIATE);
208 (void)lib_parse(gmnalni->gmni_libnal,
209 &rx->rx_msg->gmm_u.immediate.gmim_hdr,
213 gmnal_post_rx(gmnalni, rx);
216 spin_lock(&gmnalni->gmni_rxthread_flag_lock);
217 gmnalni->gmni_rxthread_flag /= 2;
218 spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
220 CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
225 gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
227 CDEBUG(D_NET, "requeueing rx[%p] gmnalni[%p]\n", rx, gmnalni);
229 spin_lock(&gmnalni->gmni_gm_lock);
230 gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, rx->rx_msg,
231 rx->rx_gmsize, GM_LOW_PRIORITY, 0 );
232 spin_unlock(&gmnalni->gmni_gm_lock);
236 gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
239 gmnal_tx_t *tx = (gmnal_tx_t*)context;
240 gmnal_ni_t *gmnalni = tx->tx_gmni;
241 lib_msg_t *libmsg = tx->tx_libmsg;
243 CWARN("status for tx [%p] is [%d][%s]\n",
244 tx, status, gmnal_gmstatus2str(status));
246 gmnal_return_tx(gmnalni, tx);
247 lib_finalize(gmnalni->gmni_libnal, NULL, libmsg, PTL_FAIL);
251 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
254 gmnal_tx_t *tx = (gmnal_tx_t*)context;
255 gmnal_ni_t *gmnalni = tx->tx_gmni;
257 CERROR("status for tx [%p] is [%d][%s]\n",
258 tx, status, gmnal_gmstatus2str(status));
260 gm_resume_sending(gmnalni->gmni_port, tx->tx_gm_priority,
261 tx->tx_gmlid, gm_port_id,
262 gmnal_resume_sending_callback, tx);
266 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
268 gmnal_tx_t *tx = (gmnal_tx_t*)context;
269 gmnal_ni_t *gmnalni = tx->tx_gmni;
270 lib_nal_t *libnal = gmnalni->gmni_libnal;
271 lib_msg_t *libmsg = tx->tx_libmsg;
275 CERROR("send completion event for unknown tx\n");
284 case(GM_SEND_DROPPED):
289 CERROR("Error %d(%s), nid "LPD64"\n",
290 status, gmnal_gmstatus2str(status), tx->tx_nid);
292 spin_lock(&gmnalni->gmni_gm_lock);
293 gm_drop_sends(gmnalni->gmni_port, tx->tx_gm_priority,
294 tx->tx_gmlid, gm_port_id,
295 gmnal_drop_sends_callback, tx);
296 spin_unlock(&gmnalni->gmni_gm_lock);
300 gmnal_return_tx(gmnalni, tx);
301 lib_finalize(libnal, NULL, libmsg, rc);
306 gmnal_post_tx (gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
307 lib_msg_t *libmsg, ptl_nid_t nid, int nob)
309 gm_status_t gm_status;
311 CDEBUG(D_NET, "send %d bytes to "LPU64"\n", nob, nid);
313 LASSERT ((nid >> 32) == 0);
315 spin_lock(&gmnalni->gmni_gm_lock);
316 gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid,
318 spin_unlock(&gmnalni->gmni_gm_lock);
320 if (gm_status != GM_SUCCESS) {
321 CERROR("Failed to obtain local id\n");
322 gmnal_return_tx(gmnalni, tx);
326 CDEBUG(D_NET, "Local Node_id is [%u][%x]\n",
327 tx->tx_gmlid, tx->tx_gmlid);
330 tx->tx_libmsg = libmsg;
331 tx->tx_gm_priority = GM_LOW_PRIORITY;
332 tx->tx_msg_size = nob;
334 CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
335 "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
336 "tx [%p]\n", gmnalni->gmni_port, tx->tx_msg,
337 tx->tx_gm_size, tx->tx_msg_size,
338 tx->tx_nid, tx->tx_gmlid, tx);
340 spin_lock(&gmnalni->gmni_gm_lock);
341 gm_send_to_peer_with_callback(gmnalni->gmni_port, tx->tx_msg,
342 tx->tx_gm_size, tx->tx_msg_size,
343 tx->tx_gm_priority, tx->tx_gmlid,
344 gmnal_tx_callback, (void*)tx);
345 spin_unlock(&gmnalni->gmni_gm_lock);