1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
7 * This file is part of Lustre, http://www.lustre.org/
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * This file contains all gmnal send and receive functions
30 gmnal_notify_peer_down(gmnal_tx_t *tx)
35 do_gettimeofday (&now);
36 then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;
38 lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
42 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
43 lnet_nid_t dstnid, int type)
45 /* CAVEAT EMPTOR! this only sets the common message fields. */
46 msg->gmm_magic = GMNAL_MSG_MAGIC;
47 msg->gmm_version = GMNAL_MSG_VERSION;
49 msg->gmm_srcnid = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,
51 msg->gmm_dstnid = dstnid;
55 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
57 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
58 const int hdr_size = offsetof(gmnal_msg_t, gmm_u);
59 int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
60 gmni->gmni_small_msgsize;
63 /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
65 /* GM may not overflow our buffer */
66 LASSERT (rx->rx_recv_nob <= buffnob);
68 /* 6 bytes are enough to have received magic + version */
69 if (rx->rx_recv_nob < 6) {
70 CERROR("Short message from gmid %u: %d\n",
71 rx->rx_recv_gmid, rx->rx_recv_nob);
75 if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
77 } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
79 } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
80 msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
83 CERROR("Bad magic from gmid %u: %08x\n",
84 rx->rx_recv_gmid, msg->gmm_magic);
88 if (msg->gmm_version !=
89 (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
93 if (rx->rx_recv_nob < hdr_size) {
94 CERROR("Short message from %u: %d\n",
95 rx->rx_recv_gmid, rx->rx_recv_nob);
100 /* leave magic unflipped as a clue to peer endianness */
101 __swab16s(&msg->gmm_version);
102 __swab16s(&msg->gmm_type);
103 __swab64s(&msg->gmm_srcnid);
104 __swab64s(&msg->gmm_dstnid);
107 if (msg->gmm_srcnid == LNET_NID_ANY) {
108 CERROR("Bad src nid from %u: %s\n",
109 rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
113 if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid,
115 CERROR("Bad dst nid from %u: %s\n",
116 rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
120 switch (msg->gmm_type) {
122 CERROR("Unknown message type from %u: %x\n",
123 rx->rx_recv_gmid, msg->gmm_type);
126 case GMNAL_MSG_IMMEDIATE:
127 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
128 CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
129 rx->rx_recv_gmid, rx->rx_recv_nob,
130 offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
139 gmnal_get_tx(gmnal_ni_t *gmni)
141 gmnal_tx_t *tx = NULL;
143 spin_lock(&gmni->gmni_tx_lock);
145 if (gmni->gmni_shutdown ||
146 list_empty(&gmni->gmni_idle_txs)) {
147 spin_unlock(&gmni->gmni_tx_lock);
151 tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
152 list_del(&tx->tx_list);
154 spin_unlock(&gmni->gmni_tx_lock);
156 LASSERT (tx->tx_lntmsg == NULL);
157 LASSERT (tx->tx_ltxb == NULL);
158 LASSERT (!tx->tx_credit);
164 gmnal_tx_done(gmnal_tx_t *tx, int rc)
166 gmnal_ni_t *gmni = tx->tx_gmni;
168 lnet_msg_t *lnetmsg = tx->tx_lntmsg;
170 tx->tx_lntmsg = NULL;
172 spin_lock(&gmni->gmni_tx_lock);
174 if (tx->tx_ltxb != NULL) {
176 list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
182 gmni->gmni_tx_credits++;
186 list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
189 gmnal_check_txqueues_locked(gmni);
191 spin_unlock(&gmni->gmni_tx_lock);
193 /* Delay finalize until tx is free */
195 lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
199 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
202 gmnal_tx_t *tx = (gmnal_tx_t*)context;
204 LASSERT(!in_interrupt());
206 CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
207 tx, status, gmnal_gmstatus2str(status),
208 libcfs_nid2str(tx->tx_nid));
210 gmnal_tx_done(tx, -EIO);
214 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
216 gmnal_tx_t *tx = (gmnal_tx_t*)context;
217 gmnal_ni_t *gmni = tx->tx_gmni;
219 LASSERT(!in_interrupt());
223 gmnal_tx_done(tx, 0);
226 case GM_SEND_DROPPED:
227 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
228 tx, libcfs_nid2str(tx->tx_nid));
229 /* Another tx failed and called gm_drop_sends() which made this
230 * one complete immediately */
231 gmnal_tx_done(tx, -EIO);
235 /* Some error; NB don't complete tx yet; we need its credit for
237 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
238 tx, status, gmnal_gmstatus2str(status),
239 libcfs_nid2str(tx->tx_nid));
241 gmnal_notify_peer_down(tx);
243 spin_lock(&gmni->gmni_gm_lock);
244 gm_drop_sends(gmni->gmni_port,
245 tx->tx_ltxb != NULL ?
246 GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
247 tx->tx_gmlid, *gmnal_tunables.gm_port,
248 gmnal_drop_sends_callback, tx);
249 spin_unlock(&gmni->gmni_gm_lock);
258 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
266 tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
267 list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
270 (tx->tx_large_nob == 0 ||
271 !list_empty(&gmni->gmni_idle_ltxbs))) {
274 list_del(&tx->tx_list);
276 LASSERT (tx->tx_ltxb == NULL);
278 if (tx->tx_large_nob != 0) {
279 ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
280 gmnal_txbuf_t, txb_list);
282 /* consume large buffer */
283 list_del(<xb->txb_list);
285 spin_unlock(&gmni->gmni_tx_lock);
287 /* Unlocking here allows sends to get re-ordered,
288 * but we want to allow other CPUs to progress... */
292 /* marshall message in tx_ltxb...
293 * 1. Copy what was marshalled so far (in tx_buf) */
294 memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf),
295 GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
297 /* 2. Copy the payload */
298 if (tx->tx_large_iskiov)
300 gmni->gmni_large_pages,
301 ltxb->txb_buf.nb_kiov,
304 tx->tx_large_frags.kiov,
309 gmni->gmni_large_pages,
310 ltxb->txb_buf.nb_kiov,
313 tx->tx_large_frags.iov,
317 tx->tx_msgnob += tx->tx_large_nob;
319 spin_lock(&gmni->gmni_tx_lock);
322 list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
325 if (!list_empty(&gmni->gmni_cred_txq) &&
326 gmni->gmni_tx_credits != 0) {
328 tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
330 /* consume tx and 1 credit */
331 list_del(&tx->tx_list);
332 gmni->gmni_tx_credits--;
334 spin_unlock(&gmni->gmni_tx_lock);
336 /* Unlocking here allows sends to get re-ordered, but we want
337 * to allow other CPUs to progress... */
339 LASSERT(!tx->tx_credit);
342 tx->tx_launchtime = jiffies;
344 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
345 LASSERT (tx->tx_ltxb == NULL);
346 netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
347 gmsize = gmni->gmni_small_gmsize;
348 pri = GMNAL_SMALL_PRIORITY;
350 LASSERT (tx->tx_ltxb != NULL);
351 netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
352 gmsize = gmni->gmni_large_gmsize;
353 pri = GMNAL_LARGE_PRIORITY;
356 spin_lock(&gmni->gmni_gm_lock);
358 gm_send_to_peer_with_callback(gmni->gmni_port,
366 spin_unlock(&gmni->gmni_gm_lock);
367 spin_lock(&gmni->gmni_tx_lock);
372 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
374 int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
375 gmni->gmni_small_gmsize;
376 int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
377 GMNAL_SMALL_PRIORITY;
378 void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
380 CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
382 spin_lock(&gmni->gmni_gm_lock);
383 gm_provide_receive_buffer_with_tag(gmni->gmni_port,
384 buffer, gmsize, pri, 0);
385 spin_unlock(&gmni->gmni_gm_lock);
389 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
391 /* Future protocol version compatibility support!
392 * The next gmlnd-specific protocol rev will first send a message to
393 * check version; I reply with a stub message containing my current
394 * magic+version... */
396 gmnal_tx_t *tx = gmnal_get_tx(gmni);
399 CERROR("Can't allocate tx to send version info to %u\n",
404 LASSERT (tx->tx_lntmsg == NULL); /* no finalize */
406 tx->tx_nid = LNET_NID_ANY;
407 tx->tx_gmlid = rx->rx_recv_gmid;
409 msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
410 msg->gmm_magic = GMNAL_MSG_MAGIC;
411 msg->gmm_version = GMNAL_MSG_VERSION;
413 /* just send magic + version */
414 tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
415 tx->tx_large_nob = 0;
417 spin_lock(&gmni->gmni_tx_lock);
419 list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
420 gmnal_check_txqueues_locked(gmni);
422 spin_unlock(&gmni->gmni_tx_lock);
426 gmnal_rx_thread(void *arg)
428 gmnal_ni_t *gmni = arg;
429 gm_recv_event_t *rxevent = NULL;
430 gm_recv_t *recv = NULL;
434 cfs_daemonize("gmnal_rxd");
436 down(&gmni->gmni_rx_mutex);
438 while (!gmni->gmni_shutdown) {
440 spin_lock(&gmni->gmni_gm_lock);
441 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
442 spin_unlock(&gmni->gmni_gm_lock);
444 switch (GM_RECV_EVENT_TYPE(rxevent)) {
446 gm_unknown(gmni->gmni_port, rxevent);
449 case GM_FAST_RECV_EVENT:
450 case GM_FAST_PEER_RECV_EVENT:
451 case GM_PEER_RECV_EVENT:
452 case GM_FAST_HIGH_RECV_EVENT:
453 case GM_FAST_HIGH_PEER_RECV_EVENT:
454 case GM_HIGH_PEER_RECV_EVENT:
456 case GM_HIGH_RECV_EVENT:
460 recv = &rxevent->recv;
461 rx = gm_hash_find(gmni->gmni_rx_hash,
462 gm_ntohp(recv->buffer));
463 LASSERT (rx != NULL);
465 rx->rx_recv_nob = gm_ntoh_u32(recv->length);
466 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
467 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
468 rx->rx_recv_type = gm_ntoh_u8(recv->type);
470 switch (GM_RECV_EVENT_TYPE(rxevent)) {
471 case GM_FAST_RECV_EVENT:
472 case GM_FAST_PEER_RECV_EVENT:
473 case GM_FAST_HIGH_RECV_EVENT:
474 case GM_FAST_HIGH_PEER_RECV_EVENT:
475 LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
477 memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
478 gm_ntohp(recv->message), rx->rx_recv_nob);
482 up(&gmni->gmni_rx_mutex);
484 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
485 GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
486 gm_ntohp(recv->buffer), rx->rx_recv_nob);
488 /* We're connectionless: simply drop packets with
490 rc = gmnal_unpack_msg(gmni, rx);
493 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
495 LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
496 rc = lnet_parse(gmni->gmni_ni,
497 &msg->gmm_u.immediate.gmim_hdr,
501 gmnal_version_reply(gmni, rx);
502 rc = -EPROTO; /* repost rx */
505 if (rc < 0) /* parse failure */
506 gmnal_post_rx(gmni, rx);
508 down(&gmni->gmni_rx_mutex);
511 up(&gmni->gmni_rx_mutex);
513 CDEBUG(D_NET, "exiting\n");
514 atomic_dec(&gmni->gmni_nthreads);
519 gmnal_stop_threads(gmnal_ni_t *gmni)
523 gmni->gmni_shutdown = 1;
526 /* wake rxthread owning gmni_rx_mutex with an alarm. */
527 spin_lock(&gmni->gmni_gm_lock);
528 gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
529 spin_unlock(&gmni->gmni_gm_lock);
531 while (atomic_read(&gmni->gmni_nthreads) != 0) {
533 if ((count & (count - 1)) == 0)
534 CWARN("Waiting for %d threads to stop\n",
535 atomic_read(&gmni->gmni_nthreads));
541 gmnal_start_threads(gmnal_ni_t *gmni)
546 LASSERT (!gmni->gmni_shutdown);
547 LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
549 gm_initialize_alarm(&gmni->gmni_alarm);
551 for (i = 0; i < num_online_cpus(); i++) {
553 pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
555 CERROR("rx thread failed to start: %d\n", pid);
556 gmnal_stop_threads(gmni);
560 atomic_inc(&gmni->gmni_nthreads);