1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 * This file contains all gmnal send and receive functions
29 gmnal_notify_peer_down(gmnal_tx_t *tx)
34 do_gettimeofday (&now);
35 then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;
37 lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
41 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
42 lnet_nid_t dstnid, int type)
44 /* CAVEAT EMPTOR! this only sets the common message fields. */
45 msg->gmm_magic = GMNAL_MSG_MAGIC;
46 msg->gmm_version = GMNAL_MSG_VERSION;
48 msg->gmm_srcnid = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,
50 msg->gmm_dstnid = dstnid;
54 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
56 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
57 const int hdr_size = offsetof(gmnal_msg_t, gmm_u);
58 int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
59 gmni->gmni_small_msgsize;
62 /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
64 /* GM may not overflow our buffer */
65 LASSERT (rx->rx_recv_nob <= buffnob);
67 /* 6 bytes are enough to have received magic + version */
68 if (rx->rx_recv_nob < 6) {
69 CERROR("Short message from gmid %u: %d\n",
70 rx->rx_recv_gmid, rx->rx_recv_nob);
74 if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
76 } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
78 } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
79 msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
82 CERROR("Bad magic from gmid %u: %08x\n",
83 rx->rx_recv_gmid, msg->gmm_magic);
87 if (msg->gmm_version !=
88 (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
92 if (rx->rx_recv_nob < hdr_size) {
93 CERROR("Short message from %u: %d\n",
94 rx->rx_recv_gmid, rx->rx_recv_nob);
99 /* leave magic unflipped as a clue to peer endianness */
100 __swab16s(&msg->gmm_version);
101 __swab16s(&msg->gmm_type);
102 __swab64s(&msg->gmm_srcnid);
103 __swab64s(&msg->gmm_dstnid);
106 if (msg->gmm_srcnid == LNET_NID_ANY) {
107 CERROR("Bad src nid from %u: %s\n",
108 rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
112 if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid,
114 CERROR("Bad dst nid from %u: %s\n",
115 rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
119 switch (msg->gmm_type) {
121 CERROR("Unknown message type from %u: %x\n",
122 rx->rx_recv_gmid, msg->gmm_type);
125 case GMNAL_MSG_IMMEDIATE:
126 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
127 CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
128 rx->rx_recv_gmid, rx->rx_recv_nob,
129 offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
138 gmnal_get_tx(gmnal_ni_t *gmni)
140 gmnal_tx_t *tx = NULL;
142 spin_lock(&gmni->gmni_tx_lock);
144 if (gmni->gmni_shutdown ||
145 list_empty(&gmni->gmni_idle_txs)) {
146 spin_unlock(&gmni->gmni_tx_lock);
150 tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
151 list_del(&tx->tx_list);
153 spin_unlock(&gmni->gmni_tx_lock);
155 LASSERT (tx->tx_lntmsg == NULL);
156 LASSERT (tx->tx_ltxb == NULL);
157 LASSERT (!tx->tx_credit);
163 gmnal_tx_done(gmnal_tx_t *tx, int rc)
165 gmnal_ni_t *gmni = tx->tx_gmni;
167 lnet_msg_t *lnetmsg = tx->tx_lntmsg;
169 tx->tx_lntmsg = NULL;
171 spin_lock(&gmni->gmni_tx_lock);
173 if (tx->tx_ltxb != NULL) {
175 list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
181 gmni->gmni_tx_credits++;
185 list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
188 gmnal_check_txqueues_locked(gmni);
190 spin_unlock(&gmni->gmni_tx_lock);
192 /* Delay finalize until tx is free */
194 lnet_finalize(gmni->gmni_ni, lnetmsg, 0);
198 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
201 gmnal_tx_t *tx = (gmnal_tx_t*)context;
203 LASSERT(!in_interrupt());
205 CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
206 tx, status, gmnal_gmstatus2str(status),
207 libcfs_nid2str(tx->tx_nid));
209 gmnal_tx_done(tx, -EIO);
213 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
215 gmnal_tx_t *tx = (gmnal_tx_t*)context;
216 gmnal_ni_t *gmni = tx->tx_gmni;
218 LASSERT(!in_interrupt());
222 gmnal_tx_done(tx, 0);
225 case GM_SEND_DROPPED:
226 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
227 tx, libcfs_nid2str(tx->tx_nid));
228 /* Another tx failed and called gm_drop_sends() which made this
229 * one complete immediately */
230 gmnal_tx_done(tx, -EIO);
234 /* Some error; NB don't complete tx yet; we need its credit for
236 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
237 tx, status, gmnal_gmstatus2str(status),
238 libcfs_nid2str(tx->tx_nid));
240 gmnal_notify_peer_down(tx);
242 spin_lock(&gmni->gmni_gm_lock);
243 gm_drop_sends(gmni->gmni_port,
244 tx->tx_ltxb != NULL ?
245 GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
246 tx->tx_gmlid, *gmnal_tunables.gm_port,
247 gmnal_drop_sends_callback, tx);
248 spin_unlock(&gmni->gmni_gm_lock);
257 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
265 tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
266 list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
269 (tx->tx_large_nob == 0 ||
270 !list_empty(&gmni->gmni_idle_ltxbs))) {
273 list_del(&tx->tx_list);
275 LASSERT (tx->tx_ltxb == NULL);
277 if (tx->tx_large_nob != 0) {
278 ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
279 gmnal_txbuf_t, txb_list);
281 /* consume large buffer */
282 list_del(<xb->txb_list);
284 spin_unlock(&gmni->gmni_tx_lock);
286 /* Unlocking here allows sends to get re-ordered,
287 * but we want to allow other CPUs to progress... */
291 /* marshall message in tx_ltxb...
292 * 1. Copy what was marshalled so far (in tx_buf) */
293 memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf),
294 GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
296 /* 2. Copy the payload */
297 if (tx->tx_large_iskiov)
299 gmni->gmni_large_pages,
300 ltxb->txb_buf.nb_kiov,
303 tx->tx_large_frags.kiov,
308 gmni->gmni_large_pages,
309 ltxb->txb_buf.nb_kiov,
312 tx->tx_large_frags.iov,
316 tx->tx_msgnob += tx->tx_large_nob;
318 spin_lock(&gmni->gmni_tx_lock);
321 list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
324 if (!list_empty(&gmni->gmni_cred_txq) &&
325 gmni->gmni_tx_credits != 0) {
327 tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
329 /* consume tx and 1 credit */
330 list_del(&tx->tx_list);
331 gmni->gmni_tx_credits--;
333 spin_unlock(&gmni->gmni_tx_lock);
335 /* Unlocking here allows sends to get re-ordered, but we want
336 * to allow other CPUs to progress... */
338 LASSERT(!tx->tx_credit);
341 tx->tx_launchtime = jiffies;
343 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
344 LASSERT (tx->tx_ltxb == NULL);
345 netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
346 gmsize = gmni->gmni_small_gmsize;
347 pri = GMNAL_SMALL_PRIORITY;
349 LASSERT (tx->tx_ltxb != NULL);
350 netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
351 gmsize = gmni->gmni_large_gmsize;
352 pri = GMNAL_LARGE_PRIORITY;
355 spin_lock(&gmni->gmni_gm_lock);
357 gm_send_to_peer_with_callback(gmni->gmni_port,
365 spin_unlock(&gmni->gmni_gm_lock);
366 spin_lock(&gmni->gmni_tx_lock);
371 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
373 int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
374 gmni->gmni_small_gmsize;
375 int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
376 GMNAL_SMALL_PRIORITY;
377 void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
379 CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
381 spin_lock(&gmni->gmni_gm_lock);
382 gm_provide_receive_buffer_with_tag(gmni->gmni_port,
383 buffer, gmsize, pri, 0);
384 spin_unlock(&gmni->gmni_gm_lock);
388 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
390 /* Future protocol version compatibility support!
391 * The next gmlnd-specific protocol rev will first send a message to
392 * check version; I reply with a stub message containing my current
393 * magic+version... */
395 gmnal_tx_t *tx = gmnal_get_tx(gmni);
398 CERROR("Can't allocate tx to send version info to %u\n",
403 LASSERT (tx->tx_lntmsg == NULL); /* no finalize */
405 tx->tx_nid = LNET_NID_ANY;
406 tx->tx_gmlid = rx->rx_recv_gmid;
408 msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
409 msg->gmm_magic = GMNAL_MSG_MAGIC;
410 msg->gmm_version = GMNAL_MSG_VERSION;
412 /* just send magic + version */
413 tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
414 tx->tx_large_nob = 0;
416 spin_lock(&gmni->gmni_tx_lock);
418 list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
419 gmnal_check_txqueues_locked(gmni);
421 spin_unlock(&gmni->gmni_tx_lock);
425 gmnal_rx_thread(void *arg)
427 gmnal_ni_t *gmni = arg;
428 gm_recv_event_t *rxevent = NULL;
429 gm_recv_t *recv = NULL;
433 cfs_daemonize("gmnal_rxd");
435 down(&gmni->gmni_rx_mutex);
437 while (!gmni->gmni_shutdown) {
439 spin_lock(&gmni->gmni_gm_lock);
440 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
441 spin_unlock(&gmni->gmni_gm_lock);
443 switch (GM_RECV_EVENT_TYPE(rxevent)) {
445 gm_unknown(gmni->gmni_port, rxevent);
448 case GM_FAST_RECV_EVENT:
449 case GM_FAST_PEER_RECV_EVENT:
450 case GM_PEER_RECV_EVENT:
451 case GM_FAST_HIGH_RECV_EVENT:
452 case GM_FAST_HIGH_PEER_RECV_EVENT:
453 case GM_HIGH_PEER_RECV_EVENT:
455 case GM_HIGH_RECV_EVENT:
459 recv = &rxevent->recv;
460 rx = gm_hash_find(gmni->gmni_rx_hash,
461 gm_ntohp(recv->buffer));
462 LASSERT (rx != NULL);
464 rx->rx_recv_nob = gm_ntoh_u32(recv->length);
465 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
466 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
467 rx->rx_recv_type = gm_ntoh_u8(recv->type);
469 switch (GM_RECV_EVENT_TYPE(rxevent)) {
470 case GM_FAST_RECV_EVENT:
471 case GM_FAST_PEER_RECV_EVENT:
472 case GM_FAST_HIGH_RECV_EVENT:
473 case GM_FAST_HIGH_PEER_RECV_EVENT:
474 LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
476 memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
477 gm_ntohp(recv->message), rx->rx_recv_nob);
481 up(&gmni->gmni_rx_mutex);
483 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
484 GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
485 gm_ntohp(recv->buffer), rx->rx_recv_nob);
487 /* We're connectionless: simply drop packets with
489 rc = gmnal_unpack_msg(gmni, rx);
492 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
494 LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
495 rc = lnet_parse(gmni->gmni_ni,
496 &msg->gmm_u.immediate.gmim_hdr,
500 gmnal_version_reply(gmni, rx);
501 rc = -EPROTO; /* repost rx */
504 if (rc < 0) /* parse failure */
505 gmnal_post_rx(gmni, rx);
507 down(&gmni->gmni_rx_mutex);
510 up(&gmni->gmni_rx_mutex);
512 CDEBUG(D_NET, "exiting\n");
513 atomic_dec(&gmni->gmni_nthreads);
518 gmnal_stop_threads(gmnal_ni_t *gmni)
522 gmni->gmni_shutdown = 1;
525 /* wake rxthread owning gmni_rx_mutex with an alarm. */
526 spin_lock(&gmni->gmni_gm_lock);
527 gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
528 spin_unlock(&gmni->gmni_gm_lock);
530 while (atomic_read(&gmni->gmni_nthreads) != 0) {
532 if ((count & (count - 1)) == 0)
533 CWARN("Waiting for %d threads to stop\n",
534 atomic_read(&gmni->gmni_nthreads));
540 gmnal_start_threads(gmnal_ni_t *gmni)
545 LASSERT (!gmni->gmni_shutdown);
546 LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
548 gm_initialize_alarm(&gmni->gmni_alarm);
550 for (i = 0; i < num_online_cpus(); i++) {
552 pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
554 CERROR("rx thread failed to start: %d\n", pid);
555 gmnal_stop_threads(gmni);
559 atomic_inc(&gmni->gmni_nthreads);