1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
32 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
35 * This file is part of Lustre, http://www.lustre.org/
36 * Lustre is a trademark of Sun Microsystems, Inc.
40 * This file contains all gmnal send and receive functions
46 gmnal_notify_peer_down(gmnal_tx_t *tx)
50 then = cfs_time_current_sec() -
51 cfs_duration_sec(cfs_time_current() -
54 lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
58 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
59 lnet_nid_t dstnid, int type)
61 /* CAVEAT EMPTOR! this only sets the common message fields. */
62 msg->gmm_magic = GMNAL_MSG_MAGIC;
63 msg->gmm_version = GMNAL_MSG_VERSION;
65 msg->gmm_srcnid = gmni->gmni_ni->ni_nid;
66 msg->gmm_dstnid = dstnid;
70 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
72 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
73 const int hdr_size = offsetof(gmnal_msg_t, gmm_u);
74 int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
75 gmni->gmni_small_msgsize;
78 /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
80 /* GM may not overflow our buffer */
81 LASSERT (rx->rx_recv_nob <= buffnob);
83 /* 6 bytes are enough to have received magic + version */
84 if (rx->rx_recv_nob < 6) {
85 CERROR("Short message from gmid %u: %d\n",
86 rx->rx_recv_gmid, rx->rx_recv_nob);
90 if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
92 } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
94 } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
95 msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
98 CERROR("Bad magic from gmid %u: %08x\n",
99 rx->rx_recv_gmid, msg->gmm_magic);
103 if (msg->gmm_version !=
104 (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
108 if (rx->rx_recv_nob < hdr_size) {
109 CERROR("Short message from %u: %d\n",
110 rx->rx_recv_gmid, rx->rx_recv_nob);
115 /* leave magic unflipped as a clue to peer endianness */
116 __swab16s(&msg->gmm_version);
117 __swab16s(&msg->gmm_type);
118 __swab64s(&msg->gmm_srcnid);
119 __swab64s(&msg->gmm_dstnid);
122 if (msg->gmm_srcnid == LNET_NID_ANY) {
123 CERROR("Bad src nid from %u: %s\n",
124 rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
128 if (gmni->gmni_ni->ni_nid != msg->gmm_dstnid) {
129 CERROR("Bad dst nid from %u: %s\n",
130 rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
134 switch (msg->gmm_type) {
136 CERROR("Unknown message type from %u: %x\n",
137 rx->rx_recv_gmid, msg->gmm_type);
140 case GMNAL_MSG_IMMEDIATE:
141 if (rx->rx_recv_nob <
142 offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
143 CERROR("Short IMMEDIATE from %u: %d(%lu)\n",
144 rx->rx_recv_gmid, rx->rx_recv_nob,
145 (long)offsetof(gmnal_msg_t,
146 gmm_u.immediate.gmim_payload[0]));
155 gmnal_get_tx(gmnal_ni_t *gmni)
157 gmnal_tx_t *tx = NULL;
159 cfs_spin_lock(&gmni->gmni_tx_lock);
161 if (gmni->gmni_shutdown ||
162 cfs_list_empty(&gmni->gmni_idle_txs)) {
163 cfs_spin_unlock(&gmni->gmni_tx_lock);
167 tx = cfs_list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
168 cfs_list_del(&tx->tx_list);
170 cfs_spin_unlock(&gmni->gmni_tx_lock);
172 LASSERT (tx->tx_lntmsg == NULL);
173 LASSERT (tx->tx_ltxb == NULL);
174 LASSERT (!tx->tx_credit);
180 gmnal_tx_done(gmnal_tx_t *tx, int rc)
182 gmnal_ni_t *gmni = tx->tx_gmni;
184 lnet_msg_t *lnetmsg = tx->tx_lntmsg;
186 tx->tx_lntmsg = NULL;
188 cfs_spin_lock(&gmni->gmni_tx_lock);
190 if (tx->tx_ltxb != NULL) {
192 cfs_list_add_tail(&tx->tx_ltxb->txb_list,
193 &gmni->gmni_idle_ltxbs);
199 gmni->gmni_tx_credits++;
203 cfs_list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
206 gmnal_check_txqueues_locked(gmni);
208 cfs_spin_unlock(&gmni->gmni_tx_lock);
210 /* Delay finalize until tx is free */
212 lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
216 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
219 gmnal_tx_t *tx = (gmnal_tx_t*)context;
221 LASSERT(!cfs_in_interrupt());
223 CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
224 tx, status, gmnal_gmstatus2str(status),
225 libcfs_nid2str(tx->tx_nid));
227 gmnal_tx_done(tx, -EIO);
231 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
233 gmnal_tx_t *tx = (gmnal_tx_t*)context;
234 gmnal_ni_t *gmni = tx->tx_gmni;
236 LASSERT(!cfs_in_interrupt());
240 gmnal_tx_done(tx, 0);
243 case GM_SEND_DROPPED:
244 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
245 tx, libcfs_nid2str(tx->tx_nid));
246 /* Another tx failed and called gm_drop_sends() which made this
247 * one complete immediately */
248 gmnal_tx_done(tx, -EIO);
252 /* Some error; NB don't complete tx yet; we need its credit for
254 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
255 tx, status, gmnal_gmstatus2str(status),
256 libcfs_nid2str(tx->tx_nid));
258 gmnal_notify_peer_down(tx);
260 cfs_spin_lock(&gmni->gmni_gm_lock);
261 gm_drop_sends(gmni->gmni_port,
262 tx->tx_ltxb != NULL ?
263 GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
264 tx->tx_gmlid, *gmnal_tunables.gm_port,
265 gmnal_drop_sends_callback, tx);
266 cfs_spin_unlock(&gmni->gmni_gm_lock);
275 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
283 tx = cfs_list_empty(&gmni->gmni_buf_txq) ? NULL :
284 cfs_list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
287 (tx->tx_large_nob == 0 ||
288 !cfs_list_empty(&gmni->gmni_idle_ltxbs))) {
291 cfs_list_del(&tx->tx_list);
293 LASSERT (tx->tx_ltxb == NULL);
295 if (tx->tx_large_nob != 0) {
296 ltxb = cfs_list_entry(gmni->gmni_idle_ltxbs.next,
297 gmnal_txbuf_t, txb_list);
299 /* consume large buffer */
300 cfs_list_del(<xb->txb_list);
302 cfs_spin_unlock(&gmni->gmni_tx_lock);
304 /* Unlocking here allows sends to get re-ordered,
305 * but we want to allow other CPUs to progress... */
309 /* marshall message in tx_ltxb...
310 * 1. Copy what was marshalled so far (in tx_buf) */
311 memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf),
312 GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
314 /* 2. Copy the payload */
315 if (tx->tx_large_iskiov)
317 gmni->gmni_large_pages,
318 ltxb->txb_buf.nb_kiov,
321 tx->tx_large_frags.kiov,
326 gmni->gmni_large_pages,
327 ltxb->txb_buf.nb_kiov,
330 tx->tx_large_frags.iov,
334 tx->tx_msgnob += tx->tx_large_nob;
336 cfs_spin_lock(&gmni->gmni_tx_lock);
339 cfs_list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
342 if (!cfs_list_empty(&gmni->gmni_cred_txq) &&
343 gmni->gmni_tx_credits != 0) {
345 tx = cfs_list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t,
348 /* consume tx and 1 credit */
349 cfs_list_del(&tx->tx_list);
350 gmni->gmni_tx_credits--;
352 cfs_spin_unlock(&gmni->gmni_tx_lock);
354 /* Unlocking here allows sends to get re-ordered, but we want
355 * to allow other CPUs to progress... */
357 LASSERT(!tx->tx_credit);
360 tx->tx_launchtime = cfs_time_current();
362 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
363 LASSERT (tx->tx_ltxb == NULL);
364 netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
365 gmsize = gmni->gmni_small_gmsize;
366 pri = GMNAL_SMALL_PRIORITY;
368 LASSERT (tx->tx_ltxb != NULL);
369 netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
370 gmsize = gmni->gmni_large_gmsize;
371 pri = GMNAL_LARGE_PRIORITY;
374 cfs_spin_lock(&gmni->gmni_gm_lock);
376 gm_send_to_peer_with_callback(gmni->gmni_port,
384 cfs_spin_unlock(&gmni->gmni_gm_lock);
385 cfs_spin_lock(&gmni->gmni_tx_lock);
390 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
392 int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
393 gmni->gmni_small_gmsize;
394 int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
395 GMNAL_SMALL_PRIORITY;
396 void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
398 CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
400 cfs_spin_lock(&gmni->gmni_gm_lock);
401 gm_provide_receive_buffer_with_tag(gmni->gmni_port,
402 buffer, gmsize, pri, 0);
403 cfs_spin_unlock(&gmni->gmni_gm_lock);
407 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
409 /* Future protocol version compatibility support!
410 * The next gmlnd-specific protocol rev will first send a message to
411 * check version; I reply with a stub message containing my current
412 * magic+version... */
414 gmnal_tx_t *tx = gmnal_get_tx(gmni);
417 CERROR("Can't allocate tx to send version info to %u\n",
422 LASSERT (tx->tx_lntmsg == NULL); /* no finalize */
424 tx->tx_nid = LNET_NID_ANY;
425 tx->tx_gmlid = rx->rx_recv_gmid;
427 msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
428 msg->gmm_magic = GMNAL_MSG_MAGIC;
429 msg->gmm_version = GMNAL_MSG_VERSION;
431 /* just send magic + version */
432 tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
433 tx->tx_large_nob = 0;
435 cfs_spin_lock(&gmni->gmni_tx_lock);
437 cfs_list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
438 gmnal_check_txqueues_locked(gmni);
440 cfs_spin_unlock(&gmni->gmni_tx_lock);
444 gmnal_rx_thread(void *arg)
446 gmnal_ni_t *gmni = arg;
447 gm_recv_event_t *rxevent = NULL;
448 gm_recv_t *recv = NULL;
452 cfs_daemonize("gmnal_rxd");
454 while (!gmni->gmni_shutdown) {
455 rc = down_interruptible(&gmni->gmni_rx_mutex);
456 LASSERT (rc == 0 || rc == -EINTR);
460 cfs_spin_lock(&gmni->gmni_gm_lock);
461 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
462 cfs_spin_unlock(&gmni->gmni_gm_lock);
464 switch (GM_RECV_EVENT_TYPE(rxevent)) {
466 gm_unknown(gmni->gmni_port, rxevent);
467 cfs_up(&gmni->gmni_rx_mutex);
470 case GM_FAST_RECV_EVENT:
471 case GM_FAST_PEER_RECV_EVENT:
472 case GM_PEER_RECV_EVENT:
473 case GM_FAST_HIGH_RECV_EVENT:
474 case GM_FAST_HIGH_PEER_RECV_EVENT:
475 case GM_HIGH_PEER_RECV_EVENT:
477 case GM_HIGH_RECV_EVENT:
481 recv = &rxevent->recv;
482 rx = gm_hash_find(gmni->gmni_rx_hash,
483 gm_ntohp(recv->buffer));
484 LASSERT (rx != NULL);
486 rx->rx_recv_nob = gm_ntoh_u32(recv->length);
487 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
488 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
489 rx->rx_recv_type = gm_ntoh_u8(recv->type);
491 switch (GM_RECV_EVENT_TYPE(rxevent)) {
492 case GM_FAST_RECV_EVENT:
493 case GM_FAST_PEER_RECV_EVENT:
494 case GM_FAST_HIGH_RECV_EVENT:
495 case GM_FAST_HIGH_PEER_RECV_EVENT:
496 LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
498 memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
499 gm_ntohp(recv->message), rx->rx_recv_nob);
503 cfs_up(&gmni->gmni_rx_mutex);
505 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
506 GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
507 gm_ntohp(recv->buffer), rx->rx_recv_nob);
509 /* We're connectionless: simply drop packets with
511 rc = gmnal_unpack_msg(gmni, rx);
514 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
516 LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
517 rc = lnet_parse(gmni->gmni_ni,
518 &msg->gmm_u.immediate.gmim_hdr,
519 msg->gmm_srcnid, rx, 0);
521 gmnal_version_reply(gmni, rx);
522 rc = -EPROTO; /* repost rx */
525 if (rc < 0) /* parse failure */
526 gmnal_post_rx(gmni, rx);
529 CDEBUG(D_NET, "exiting\n");
530 cfs_atomic_dec(&gmni->gmni_nthreads);
535 gmnal_stop_threads(gmnal_ni_t *gmni)
539 gmni->gmni_shutdown = 1;
542 /* wake rxthread owning gmni_rx_mutex with an alarm. */
543 cfs_spin_lock(&gmni->gmni_gm_lock);
544 gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
545 cfs_spin_unlock(&gmni->gmni_gm_lock);
547 while (cfs_atomic_read(&gmni->gmni_nthreads) != 0) {
549 if ((count & (count - 1)) == 0)
550 CWARN("Waiting for %d threads to stop\n",
551 cfs_atomic_read(&gmni->gmni_nthreads));
557 gmnal_start_threads(gmnal_ni_t *gmni)
562 LASSERT (!gmni->gmni_shutdown);
563 LASSERT (cfs_atomic_read(&gmni->gmni_nthreads) == 0);
565 gm_initialize_alarm(&gmni->gmni_alarm);
567 for (i = 0; i < cfs_num_online_cpus(); i++) {
569 pid = cfs_kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
571 CERROR("rx thread failed to start: %d\n", pid);
572 gmnal_stop_threads(gmni);
576 cfs_atomic_inc(&gmni->gmni_nthreads);