/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ * GPL HEADER START
*
- * This file is part of Lustre, http://www.lustre.org/
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
/*
* This file contains all gmnal send and receive functions
*/
-#include "gmnal.h"
+#include "gmlnd.h"
void
-gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
- ptl_nid_t dstnid, int type)
+gmnal_notify_peer_down(gmnal_tx_t *tx)
{
- gmnal_msg_t *msg = tx->tx_msg;
+ time_t then;
+ then = cfs_time_current_sec() -
+ cfs_duration_sec(cfs_time_current() -
+ tx->tx_launchtime);
+
+ lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
+}
+
+void
+gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
+ lnet_nid_t dstnid, int type)
+{
/* CAVEAT EMPTOR! this only sets the common message fields. */
msg->gmm_magic = GMNAL_MSG_MAGIC;
msg->gmm_version = GMNAL_MSG_VERSION;
msg->gmm_type = type;
- msg->gmm_srcnid = gmnalni->gmni_libnal->libnal_ni.ni_pid.nid;
+ msg->gmm_srcnid = gmni->gmni_ni->ni_nid;
msg->gmm_dstnid = dstnid;
}
int
-gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
+gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
{
- gmnal_msg_t *msg = rx->rx_msg;
+ gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
const int hdr_size = offsetof(gmnal_msg_t, gmm_u);
+ int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
+ gmni->gmni_small_msgsize;
int flip;
+ /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
+
+ /* GM may not overflow our buffer */
+ LASSERT (rx->rx_recv_nob <= buffnob);
+
/* 6 bytes are enough to have received magic + version */
if (rx->rx_recv_nob < 6) {
- CERROR("Short message from gmid %u: %d\n",
+ CERROR("Short message from gmid %u: %d\n",
rx->rx_recv_gmid, rx->rx_recv_nob);
return -EPROTO;
}
flip = 0;
} else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
flip = 1;
+ } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
+ msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
+ return EPROTO;
} else {
- CERROR("Bad magic from gmid %u: %08x\n",
+ CERROR("Bad magic from gmid %u: %08x\n",
rx->rx_recv_gmid, msg->gmm_magic);
return -EPROTO;
}
- if (msg->gmm_version !=
+ if (msg->gmm_version !=
(flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
- CERROR("Bad version from gmid %u: %d\n",
- rx->rx_recv_gmid, msg->gmm_version);
- return -EPROTO;
+ return EPROTO;
}
if (rx->rx_recv_nob < hdr_size) {
__swab64s(&msg->gmm_srcnid);
__swab64s(&msg->gmm_dstnid);
}
-
- if (msg->gmm_srcnid == PTL_NID_ANY) {
- CERROR("Bad src nid from %u: "LPX64"\n",
- rx->rx_recv_gmid, msg->gmm_srcnid);
+
+ if (msg->gmm_srcnid == LNET_NID_ANY) {
+ CERROR("Bad src nid from %u: %s\n",
+ rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
return -EPROTO;
}
- if (msg->gmm_dstnid != gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) {
- CERROR("Bad dst nid from %u: "LPX64"\n",
- rx->rx_recv_gmid, msg->gmm_dstnid);
+ if (gmni->gmni_ni->ni_nid != msg->gmm_dstnid) {
+ CERROR("Bad dst nid from %u: %s\n",
+ rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
return -EPROTO;
}
-
+
switch (msg->gmm_type) {
default:
- CERROR("Unknown message type from %u: %x\n",
+ CERROR("Unknown message type from %u: %x\n",
rx->rx_recv_gmid, msg->gmm_type);
return -EPROTO;
-
+
case GMNAL_MSG_IMMEDIATE:
if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
- CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
- rx->rx_recv_gmid, rx->rx_recv_nob,
+ CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
+ rx->rx_recv_gmid, rx->rx_recv_nob,
offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
return -EPROTO;
}
return 0;
}
+gmnal_tx_t *
+gmnal_get_tx(gmnal_ni_t *gmni)
+{
+ gmnal_tx_t *tx = NULL;
-/*
- * The caretaker thread
- * This is main thread of execution for the NAL side
- * This guy waits in gm_blocking_recvive and gets
- * woken up when the myrinet adaptor gets an interrupt.
- * Hands off receive operations to the receive thread
- * This thread Looks after gm_callbacks etc inline.
- */
-int
-gmnal_ct_thread(void *arg)
+ spin_lock(&gmni->gmni_tx_lock);
+
+ if (gmni->gmni_shutdown ||
+ list_empty(&gmni->gmni_idle_txs)) {
+ spin_unlock(&gmni->gmni_tx_lock);
+ return NULL;
+ }
+
+ tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
+ list_del(&tx->tx_list);
+
+ spin_unlock(&gmni->gmni_tx_lock);
+
+ LASSERT (tx->tx_lntmsg == NULL);
+ LASSERT (tx->tx_ltxb == NULL);
+ LASSERT (!tx->tx_credit);
+
+ return tx;
+}
+
+void
+gmnal_tx_done(gmnal_tx_t *tx, int rc)
{
- gmnal_ni_t *gmnalni = arg;
- gm_recv_event_t *rxevent = NULL;
- gm_recv_t *recv = NULL;
+ gmnal_ni_t *gmni = tx->tx_gmni;
+ int wake_sched = 0;
+ lnet_msg_t *lnetmsg = tx->tx_lntmsg;
- sprintf(current->comm, "gmnal_ct");
- kportal_daemonize("gmnalctd");
+ tx->tx_lntmsg = NULL;
- gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
+ spin_lock(&gmni->gmni_tx_lock);
- while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
+ if (tx->tx_ltxb != NULL) {
+ wake_sched = 1;
+ list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
+ tx->tx_ltxb = NULL;
+ }
- spin_lock(&gmnalni->gmni_gm_lock);
- rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
- spin_unlock(&gmnalni->gmni_gm_lock);
+ if (tx->tx_credit) {
+ wake_sched = 1;
+ gmni->gmni_tx_credits++;
+ tx->tx_credit = 0;
+ }
- if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
- CDEBUG(D_NET, "time to exit\n");
- break;
- }
+ list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
- CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
+ if (wake_sched)
+ gmnal_check_txqueues_locked(gmni);
- if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
- recv = (gm_recv_t*)&rxevent->recv;
- gmnal_enqueue_rx(gmnalni, recv);
- } else {
- gm_unknown(gmnalni->gmni_port, rxevent);
- }
- }
+ spin_unlock(&gmni->gmni_tx_lock);
- gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
- CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
- return 0;
+ /* Delay finalize until tx is free */
+ if (lnetmsg != NULL)
+ lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
}
-
-/*
- * process a receive event
- */
-int
-gmnal_rx_thread(void *arg)
+void
+gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
+ gm_status_t status)
{
- gmnal_ni_t *gmnalni = arg;
- char name[16];
- gmnal_rx_t *rx;
- int rank;
-
- for (rank=0; rank<num_rx_threads; rank++)
- if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
- break;
-
- snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
- kportal_daemonize(name);
-
- /*
- * set 1 bit for each thread started
- * doesn't matter which bit
- */
- spin_lock(&gmnalni->gmni_rxthread_flag_lock);
- if (gmnalni->gmni_rxthread_flag)
- gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
- else
- gmnalni->gmni_rxthread_flag = 1;
- spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
-
- while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
- CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
-
- rx = gmnal_dequeue_rx(gmnalni);
- if (rx == NULL) {
- CDEBUG(D_NET, "Receive thread time to exit\n");
- break;
- }
-
- /* We're connectionless: simply ignore packets on error */
-
- if (gmnal_unpack_msg(gmnalni, rx) == 0) {
-
- LASSERT (rx->rx_msg->gmm_type == GMNAL_MSG_IMMEDIATE);
- (void)lib_parse(gmnalni->gmni_libnal,
- &rx->rx_msg->gmm_u.immediate.gmim_hdr,
- rx);
- }
+ gmnal_tx_t *tx = (gmnal_tx_t*)context;
- gmnal_post_rx(gmnalni, rx);
- }
+ LASSERT(!in_interrupt());
- spin_lock(&gmnalni->gmni_rxthread_flag_lock);
- gmnalni->gmni_rxthread_flag /= 2;
- spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
+ CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
+ tx, status, gmnal_gmstatus2str(status),
+ libcfs_nid2str(tx->tx_nid));
- CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
- return 0;
+ gmnal_tx_done(tx, -EIO);
}
void
-gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
+gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
{
- CDEBUG(D_NET, "requeueing rx[%p] gmnalni[%p]\n", rx, gmnalni);
+ gmnal_tx_t *tx = (gmnal_tx_t*)context;
+ gmnal_ni_t *gmni = tx->tx_gmni;
- spin_lock(&gmnalni->gmni_gm_lock);
- gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, rx->rx_msg,
- rx->rx_gmsize, GM_LOW_PRIORITY, 0 );
- spin_unlock(&gmnalni->gmni_gm_lock);
+ LASSERT(!in_interrupt());
+
+ switch(status) {
+ case GM_SUCCESS:
+ gmnal_tx_done(tx, 0);
+ return;
+
+ case GM_SEND_DROPPED:
+ CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
+ tx, libcfs_nid2str(tx->tx_nid));
+ /* Another tx failed and called gm_drop_sends() which made this
+ * one complete immediately */
+ gmnal_tx_done(tx, -EIO);
+ return;
+
+ default:
+ /* Some error; NB don't complete tx yet; we need its credit for
+ * gm_drop_sends() */
+ CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
+ tx, status, gmnal_gmstatus2str(status),
+ libcfs_nid2str(tx->tx_nid));
+
+ gmnal_notify_peer_down(tx);
+
+ spin_lock(&gmni->gmni_gm_lock);
+ gm_drop_sends(gmni->gmni_port,
+ tx->tx_ltxb != NULL ?
+ GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
+ tx->tx_gmlid, *gmnal_tunables.gm_port,
+ gmnal_drop_sends_callback, tx);
+ spin_unlock(&gmni->gmni_gm_lock);
+ return;
+ }
+
+ /* not reached */
+ LBUG();
}
-void
-gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
- gm_status_t status)
+void
+gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
{
- gmnal_tx_t *tx = (gmnal_tx_t*)context;
- gmnal_ni_t *gmnalni = tx->tx_gmni;
- lib_msg_t *libmsg = tx->tx_libmsg;
+ gmnal_tx_t *tx;
+ gmnal_txbuf_t *ltxb;
+ int gmsize;
+ int pri;
+ void *netaddr;
+
+ tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
+ list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
+
+ if (tx != NULL &&
+ (tx->tx_large_nob == 0 ||
+ !list_empty(&gmni->gmni_idle_ltxbs))) {
+
+ /* consume tx */
+ list_del(&tx->tx_list);
+
+ LASSERT (tx->tx_ltxb == NULL);
+
+ if (tx->tx_large_nob != 0) {
+ ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
+ gmnal_txbuf_t, txb_list);
+
+ /* consume large buffer */
+ list_del(<xb->txb_list);
+
+ spin_unlock(&gmni->gmni_tx_lock);
+
+ /* Unlocking here allows sends to get re-ordered,
+ * but we want to allow other CPUs to progress... */
+
+ tx->tx_ltxb = ltxb;
+
+ /* marshall message in tx_ltxb...
+ * 1. Copy what was marshalled so far (in tx_buf) */
+ memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf),
+ GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
+
+ /* 2. Copy the payload */
+ if (tx->tx_large_iskiov)
+ lnet_copy_kiov2kiov(
+ gmni->gmni_large_pages,
+ ltxb->txb_buf.nb_kiov,
+ tx->tx_msgnob,
+ tx->tx_large_niov,
+ tx->tx_large_frags.kiov,
+ tx->tx_large_offset,
+ tx->tx_large_nob);
+ else
+ lnet_copy_iov2kiov(
+ gmni->gmni_large_pages,
+ ltxb->txb_buf.nb_kiov,
+ tx->tx_msgnob,
+ tx->tx_large_niov,
+ tx->tx_large_frags.iov,
+ tx->tx_large_offset,
+ tx->tx_large_nob);
+
+ tx->tx_msgnob += tx->tx_large_nob;
+
+ spin_lock(&gmni->gmni_tx_lock);
+ }
+
+ list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
+ }
+
+ if (!list_empty(&gmni->gmni_cred_txq) &&
+ gmni->gmni_tx_credits != 0) {
+
+ tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
- CWARN("status for tx [%p] is [%d][%s]\n",
- tx, status, gmnal_gmstatus2str(status));
+ /* consume tx and 1 credit */
+ list_del(&tx->tx_list);
+ gmni->gmni_tx_credits--;
- gmnal_return_tx(gmnalni, tx);
- lib_finalize(gmnalni->gmni_libnal, NULL, libmsg, PTL_FAIL);
+ spin_unlock(&gmni->gmni_tx_lock);
+
+ /* Unlocking here allows sends to get re-ordered, but we want
+ * to allow other CPUs to progress... */
+
+ LASSERT(!tx->tx_credit);
+ tx->tx_credit = 1;
+
+ tx->tx_launchtime = cfs_time_current();
+
+ if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
+ LASSERT (tx->tx_ltxb == NULL);
+ netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
+ gmsize = gmni->gmni_small_gmsize;
+ pri = GMNAL_SMALL_PRIORITY;
+ } else {
+ LASSERT (tx->tx_ltxb != NULL);
+ netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
+ gmsize = gmni->gmni_large_gmsize;
+ pri = GMNAL_LARGE_PRIORITY;
+ }
+
+ spin_lock(&gmni->gmni_gm_lock);
+
+ gm_send_to_peer_with_callback(gmni->gmni_port,
+ netaddr, gmsize,
+ tx->tx_msgnob,
+ pri,
+ tx->tx_gmlid,
+ gmnal_tx_callback,
+ (void*)tx);
+
+ spin_unlock(&gmni->gmni_gm_lock);
+ spin_lock(&gmni->gmni_tx_lock);
+ }
}
-void
-gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
- gm_status_t status)
+void
+gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
{
- gmnal_tx_t *tx = (gmnal_tx_t*)context;
- gmnal_ni_t *gmnalni = tx->tx_gmni;
+ int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
+ gmni->gmni_small_gmsize;
+ int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
+ GMNAL_SMALL_PRIORITY;
+ void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
+
+ CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
+
+ spin_lock(&gmni->gmni_gm_lock);
+ gm_provide_receive_buffer_with_tag(gmni->gmni_port,
+ buffer, gmsize, pri, 0);
+ spin_unlock(&gmni->gmni_gm_lock);
+}
- CERROR("status for tx [%p] is [%d][%s]\n",
- tx, status, gmnal_gmstatus2str(status));
+void
+gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
+{
+ /* Future protocol version compatibility support!
+ * The next gmlnd-specific protocol rev will first send a message to
+ * check version; I reply with a stub message containing my current
+ * magic+version... */
+ gmnal_msg_t *msg;
+ gmnal_tx_t *tx = gmnal_get_tx(gmni);
+
+ if (tx == NULL) {
+ CERROR("Can't allocate tx to send version info to %u\n",
+ rx->rx_recv_gmid);
+ return;
+ }
+
+ LASSERT (tx->tx_lntmsg == NULL); /* no finalize */
+
+ tx->tx_nid = LNET_NID_ANY;
+ tx->tx_gmlid = rx->rx_recv_gmid;
+
+ msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
+ msg->gmm_magic = GMNAL_MSG_MAGIC;
+ msg->gmm_version = GMNAL_MSG_VERSION;
+
+ /* just send magic + version */
+ tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
+ tx->tx_large_nob = 0;
+
+ spin_lock(&gmni->gmni_tx_lock);
- spin_lock(&gmnalni->gmni_gm_lock);
- gm_resume_sending(gmnalni->gmni_port, tx->tx_gm_priority,
- tx->tx_gmlid, gm_port_id,
- gmnal_resume_sending_callback, tx);
- spin_unlock(&gmnalni->gmni_gm_lock);
+ list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+ gmnal_check_txqueues_locked(gmni);
+
+ spin_unlock(&gmni->gmni_tx_lock);
}
-void
-gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
+int
+gmnal_rx_thread(void *arg)
{
- gmnal_tx_t *tx = (gmnal_tx_t*)context;
- gmnal_ni_t *gmnalni = tx->tx_gmni;
- lib_nal_t *libnal = gmnalni->gmni_libnal;
- lib_msg_t *libmsg = tx->tx_libmsg;
- ptl_err_t rc;
-
- if (!tx) {
- CERROR("send completion event for unknown tx\n");
- return;
- }
-
- switch(status) {
- case(GM_SUCCESS):
- rc = PTL_OK;
- break;
+ gmnal_ni_t *gmni = arg;
+ gm_recv_event_t *rxevent = NULL;
+ gm_recv_t *recv = NULL;
+ gmnal_rx_t *rx;
+ int rc;
+
+ cfs_daemonize("gmnal_rxd");
+
+ while (!gmni->gmni_shutdown) {
+ rc = down_interruptible(&gmni->gmni_rx_mutex);
+ LASSERT (rc == 0 || rc == -EINTR);
+ if (rc != 0)
+ continue;
+
+ spin_lock(&gmni->gmni_gm_lock);
+ rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
+ spin_unlock(&gmni->gmni_gm_lock);
+
+ switch (GM_RECV_EVENT_TYPE(rxevent)) {
+ default:
+ gm_unknown(gmni->gmni_port, rxevent);
+ up(&gmni->gmni_rx_mutex);
+ continue;
+
+ case GM_FAST_RECV_EVENT:
+ case GM_FAST_PEER_RECV_EVENT:
+ case GM_PEER_RECV_EVENT:
+ case GM_FAST_HIGH_RECV_EVENT:
+ case GM_FAST_HIGH_PEER_RECV_EVENT:
+ case GM_HIGH_PEER_RECV_EVENT:
+ case GM_RECV_EVENT:
+ case GM_HIGH_RECV_EVENT:
+ break;
+ }
- case(GM_SEND_DROPPED):
- rc = PTL_FAIL;
- break;
-
- default:
- CERROR("Error %d(%s), nid "LPD64"\n",
- status, gmnal_gmstatus2str(status), tx->tx_nid);
+ recv = &rxevent->recv;
+ rx = gm_hash_find(gmni->gmni_rx_hash,
+ gm_ntohp(recv->buffer));
+ LASSERT (rx != NULL);
+
+ rx->rx_recv_nob = gm_ntoh_u32(recv->length);
+ rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
+ rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
+ rx->rx_recv_type = gm_ntoh_u8(recv->type);
+
+ switch (GM_RECV_EVENT_TYPE(rxevent)) {
+ case GM_FAST_RECV_EVENT:
+ case GM_FAST_PEER_RECV_EVENT:
+ case GM_FAST_HIGH_RECV_EVENT:
+ case GM_FAST_HIGH_PEER_RECV_EVENT:
+ LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
+
+ memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
+ gm_ntohp(recv->message), rx->rx_recv_nob);
+ break;
+ }
- spin_lock(&gmnalni->gmni_gm_lock);
- gm_drop_sends(gmnalni->gmni_port, tx->tx_gm_priority,
- tx->tx_gmlid, gm_port_id,
- gmnal_drop_sends_callback, tx);
- spin_unlock(&gmnalni->gmni_gm_lock);
- return;
- }
+ up(&gmni->gmni_rx_mutex);
+
+ CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
+ GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
+ gm_ntohp(recv->buffer), rx->rx_recv_nob);
+
+ /* We're connectionless: simply drop packets with
+ * errors */
+ rc = gmnal_unpack_msg(gmni, rx);
+
+ if (rc == 0) {
+ gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
- gmnal_return_tx(gmnalni, tx);
- lib_finalize(libnal, NULL, libmsg, rc);
- return;
+ LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+ rc = lnet_parse(gmni->gmni_ni,
+ &msg->gmm_u.immediate.gmim_hdr,
+ msg->gmm_srcnid, rx, 0);
+ } else if (rc > 0) {
+ gmnal_version_reply(gmni, rx);
+ rc = -EPROTO; /* repost rx */
+ }
+
+ if (rc < 0) /* parse failure */
+ gmnal_post_rx(gmni, rx);
+ }
+
+ CDEBUG(D_NET, "exiting\n");
+ atomic_dec(&gmni->gmni_nthreads);
+ return 0;
}
-ptl_err_t
-gmnal_post_tx (gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
- lib_msg_t *libmsg, ptl_nid_t nid, int nob)
+void
+gmnal_stop_threads(gmnal_ni_t *gmni)
{
- gm_status_t gm_status;
-
- CDEBUG(D_NET, "send %d bytes to "LPU64"\n", nob, nid);
-
- LASSERT ((nid >> 32) == 0);
-
- gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid,
- &tx->tx_gmlid);
- if (gm_status != GM_SUCCESS) {
- CERROR("Failed to obtain local id\n");
- gmnal_return_tx(gmnalni, tx);
- return PTL_FAIL;
- }
-
- CDEBUG(D_NET, "Local Node_id is [%u][%x]\n",
- tx->tx_gmlid, tx->tx_gmlid);
-
- tx->tx_nid = nid;
- tx->tx_libmsg = libmsg;
- tx->tx_gm_priority = GM_LOW_PRIORITY;
- tx->tx_msg_size = nob;
-
- CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
- "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
- "tx [%p]\n", gmnalni->gmni_port, tx->tx_msg,
- tx->tx_gm_size, tx->tx_msg_size,
- tx->tx_nid, tx->tx_gmlid, tx);
-
- spin_lock(&gmnalni->gmni_gm_lock);
- gm_send_to_peer_with_callback(gmnalni->gmni_port, tx->tx_msg,
- tx->tx_gm_size, tx->tx_msg_size,
- tx->tx_gm_priority, tx->tx_gmlid,
- gmnal_tx_callback, (void*)tx);
- spin_unlock(&gmnalni->gmni_gm_lock);
-
- return PTL_OK;
+ int count = 2;
+
+ gmni->gmni_shutdown = 1;
+ mb();
+
+ /* wake rxthread owning gmni_rx_mutex with an alarm. */
+ spin_lock(&gmni->gmni_gm_lock);
+ gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
+ spin_unlock(&gmni->gmni_gm_lock);
+
+ while (atomic_read(&gmni->gmni_nthreads) != 0) {
+ count++;
+ if ((count & (count - 1)) == 0)
+ CWARN("Waiting for %d threads to stop\n",
+ atomic_read(&gmni->gmni_nthreads));
+ gmnal_yield(1);
+ }
+}
+
+int
+gmnal_start_threads(gmnal_ni_t *gmni)
+{
+ int i;
+ int pid;
+
+ LASSERT (!gmni->gmni_shutdown);
+ LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
+
+ gm_initialize_alarm(&gmni->gmni_alarm);
+
+ for (i = 0; i < num_online_cpus(); i++) {
+
+ pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
+ if (pid < 0) {
+ CERROR("rx thread failed to start: %d\n", pid);
+ gmnal_stop_threads(gmni);
+ return pid;
+ }
+
+ atomic_inc(&gmni->gmni_nthreads);
+ }
+
+ return 0;
}