/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ * GPL HEADER START
*
- * This file is part of Lustre, http://www.lustre.org/
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2008 Sun Microsystems, Inc. All rights reserved
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
/*
* This file contains all gmnal send and receive functions
*/
-#include "gmnal.h"
+#include "gmlnd.h"
-/*
- * The caretaker thread
- * This is main thread of execution for the NAL side
- * This guy waits in gm_blocking_recvive and gets
- * woken up when the myrinet adaptor gets an interrupt.
- * Hands off receive operations to the receive thread
- * This thread Looks after gm_callbacks etc inline.
- */
-int
-gmnal_ct_thread(void *arg)
+void
+gmnal_notify_peer_down(gmnal_tx_t *tx)
{
- gmnal_data_t *nal_data;
- gm_recv_event_t *rxevent = NULL;
- gm_recv_t *recv = NULL;
-
- if (!arg) {
- CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
- return(-1);
- }
-
- nal_data = (gmnal_data_t*)arg;
- CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
-
- sprintf(current->comm, "gmnal_ct");
-
- kportal_daemonize("gmnalctd");
-
- nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
-
- spin_lock(&nal_data->gm_lock);
- while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) {
- CDEBUG(D_NET, "waiting\n");
- rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
- if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) {
- CDEBUG(D_INFO, "time to exit\n");
- break;
- }
- CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent));
- switch (GM_RECV_EVENT_TYPE(rxevent)) {
-
- case(GM_RECV_EVENT):
- CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
- recv = (gm_recv_t*)&rxevent->recv;
- spin_unlock(&nal_data->gm_lock);
- gmnal_add_rxtwe(nal_data, recv);
- spin_lock(&nal_data->gm_lock);
- CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
- break;
- case(_GM_SLEEP_EVENT):
- /*
- * Blocking receive above just returns
- * immediatly with _GM_SLEEP_EVENT
- * Don't know what this is
- */
- CDEBUG(D_NET, "Sleeping in gm_unknown\n");
- spin_unlock(&nal_data->gm_lock);
- gm_unknown(nal_data->gm_port, rxevent);
- spin_lock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "Awake from gm_unknown\n");
- break;
-
- default:
- /*
- * Don't know what this is
- * gm_unknown will make sense of it
- * Should be able to do something with
- * FAST_RECV_EVENTS here.
- */
- CDEBUG(D_NET, "Passing event to gm_unknown\n");
- spin_unlock(&nal_data->gm_lock);
- gm_unknown(nal_data->gm_port, rxevent);
- spin_lock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "Processed unknown event\n");
- }
- }
- spin_unlock(&nal_data->gm_lock);
- nal_data->ctthread_flag = GMNAL_THREAD_RESET;
- CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
- return(GMNAL_STATUS_OK);
-}
+ time_t then;
+ then = cfs_time_current_sec() -
+ cfs_duration_sec(cfs_time_current() -
+ tx->tx_launchtime);
-/*
- * process a receive event
- */
-int gmnal_rx_thread(void *arg)
-{
- char name[16];
- gmnal_data_t *nal_data;
- void *buffer;
- gmnal_rxtwe_t *we = NULL;
- int rank;
-
- if (!arg) {
- CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
- return(-1);
- }
-
- nal_data = (gmnal_data_t*)arg;
- CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
-
- for (rank=0; rank<num_rx_threads; rank++)
- if (nal_data->rxthread_pid[rank] == current->pid)
- break;
-
- snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
-
- kportal_daemonize(name);
- /*
- * set 1 bit for each thread started
- * doesn't matter which bit
- */
- spin_lock(&nal_data->rxthread_flag_lock);
- if (nal_data->rxthread_flag)
- nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1;
- else
- nal_data->rxthread_flag = 1;
- CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
- spin_unlock(&nal_data->rxthread_flag_lock);
-
- while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) {
- CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
- we = gmnal_get_rxtwe(nal_data);
- if (!we) {
- CDEBUG(D_INFO, "Receive thread time to exit\n");
- break;
- }
-
- buffer = we->buffer;
- switch(((gmnal_msghdr_t*)buffer)->type) {
- case(GMNAL_SMALL_MESSAGE):
- gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE);
- break;
- case(GMNAL_LARGE_MESSAGE_INIT):
- gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT);
- break;
- case(GMNAL_LARGE_MESSAGE_ACK):
- gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK);
- break;
- default:
- CERROR("Unsupported message type\n");
- gmnal_rx_bad(nal_data, we, NULL);
- }
- PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
- }
-
- spin_lock(&nal_data->rxthread_flag_lock);
- nal_data->rxthread_flag/=2;
- CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag);
- spin_unlock(&nal_data->rxthread_flag_lock);
- CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data);
- return(GMNAL_STATUS_OK);
+ lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
}
+void
+gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
+ lnet_nid_t dstnid, int type)
+{
+ /* CAVEAT EMPTOR! this only sets the common message fields. */
+ msg->gmm_magic = GMNAL_MSG_MAGIC;
+ msg->gmm_version = GMNAL_MSG_VERSION;
+ msg->gmm_type = type;
+ msg->gmm_srcnid = gmni->gmni_ni->ni_nid;
+ msg->gmm_dstnid = dstnid;
+}
-
-/*
- * Start processing a small message receive
- * Get here from gmnal_receive_thread
- * Hand off to lib_parse, which calls cb_recv
- * which hands back to gmnal_small_receive
- * Deal with all endian stuff here.
- */
int
-gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type)
+gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
{
- gmnal_srxd_t *srxd = NULL;
- void *buffer = NULL;
- unsigned int snode, sport, type, length;
- gmnal_msghdr_t *gmnal_msghdr;
- ptl_hdr_t *portals_hdr;
- int rc;
+ gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+ const int hdr_size = offsetof(gmnal_msg_t, gmm_u);
+ int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
+ gmni->gmni_small_msgsize;
+ int flip;
+
+ /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
+
+ /* GM may not overflow our buffer */
+ LASSERT (rx->rx_recv_nob <= buffnob);
+
+ /* 6 bytes are enough to have received magic + version */
+ if (rx->rx_recv_nob < 6) {
+ CERROR("Short message from gmid %u: %d\n",
+ rx->rx_recv_gmid, rx->rx_recv_nob);
+ return -EPROTO;
+ }
- CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n",
- nal_data, we, gmnal_type);
-
- buffer = we->buffer;
- snode = we->snode;
- sport = we->sport;
- type = we->type;
- buffer = we->buffer;
- length = we->length;
-
- gmnal_msghdr = (gmnal_msghdr_t*)buffer;
- portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
-
- CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], "
- "type [%d], length [%d], buffer [%p]\n",
- snode, sport, type, length, buffer);
- CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], "
- "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id,
- gmnal_msghdr->magic, gmnal_msghdr->type);
- CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], "
- "dest_node ["LPD64"]\n", portals_hdr->src_nid,
- portals_hdr->dest_nid);
-
- /*
- * Get a receive descriptor for this message
- */
- srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer);
- CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n");
- if (!srxd) {
- CERROR("Failed to get receive descriptor\n");
- /* I think passing a NULL srxd to lib_parse will crash
- * gmnal_recv() */
- LBUG();
- lib_parse(nal_data->libnal, portals_hdr, srxd);
- return(GMNAL_STATUS_FAIL);
- }
-
- /*
- * no need to bother portals library with this
- */
- if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) {
- gmnal_large_tx_ack_received(nal_data, srxd);
- return(GMNAL_STATUS_OK);
- }
-
- srxd->nal_data = nal_data;
- srxd->type = gmnal_type;
- srxd->nsiov = gmnal_msghdr->niov;
- srxd->gm_source_node = gmnal_msghdr->sender_node_id;
-
- CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
- buffer+sizeof(gmnal_msghdr_t));
- /*
- * control passes to lib, which calls cb_recv
- * cb_recv is responsible for returning the buffer
- * for future receive
- */
- rc = lib_parse(nal_data->libnal, portals_hdr, srxd);
-
- if (rc != PTL_OK) {
- /* I just received garbage; return the srxd for use */
- CWARN("Returning srxd and discarding message, "
- "lib_parse didn't like it.\n");
- return(gmnal_rx_bad(nal_data, we, srxd));
+ if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
+ flip = 0;
+ } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
+ flip = 1;
+ } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
+ msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
+ return EPROTO;
+ } else {
+ CERROR("Bad magic from gmid %u: %08x\n",
+ rx->rx_recv_gmid, msg->gmm_magic);
+ return -EPROTO;
}
- return(GMNAL_STATUS_OK);
-}
+ if (msg->gmm_version !=
+ (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
+ return EPROTO;
+ }
+ if (rx->rx_recv_nob < hdr_size) {
+ CERROR("Short message from %u: %d\n",
+ rx->rx_recv_gmid, rx->rx_recv_nob);
+ return -EPROTO;
+ }
+ if (flip) {
+ /* leave magic unflipped as a clue to peer endianness */
+ __swab16s(&msg->gmm_version);
+ __swab16s(&msg->gmm_type);
+ __swab64s(&msg->gmm_srcnid);
+ __swab64s(&msg->gmm_dstnid);
+ }
-/*
- * After a receive has been processed,
- * hang out the receive buffer again.
- * This implicitly returns a receive token.
- */
-int
-gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
+ if (msg->gmm_srcnid == LNET_NID_ANY) {
+ CERROR("Bad src nid from %u: %s\n",
+ rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
+ return -EPROTO;
+ }
+
+ if (gmni->gmni_ni->ni_nid != msg->gmm_dstnid) {
+ CERROR("Bad dst nid from %u: %s\n",
+ rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
+ return -EPROTO;
+ }
+
+ switch (msg->gmm_type) {
+ default:
+ CERROR("Unknown message type from %u: %x\n",
+ rx->rx_recv_gmid, msg->gmm_type);
+ return -EPROTO;
+
+ case GMNAL_MSG_IMMEDIATE:
+ if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
+ CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
+ rx->rx_recv_gmid, rx->rx_recv_nob,
+ offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
+ return -EPROTO;
+ }
+ break;
+ }
+ return 0;
+}
+
+gmnal_tx_t *
+gmnal_get_tx(gmnal_ni_t *gmni)
{
- CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n");
+ gmnal_tx_t *tx = NULL;
- CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data);
+ spin_lock(&gmni->gmni_tx_lock);
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
- srxd->gmsize, GM_LOW_PRIORITY, 0 );
- spin_unlock(&nal_data->gm_lock);
+ if (gmni->gmni_shutdown ||
+ list_empty(&gmni->gmni_idle_txs)) {
+ spin_unlock(&gmni->gmni_tx_lock);
+ return NULL;
+ }
- return(GMNAL_STATUS_OK);
-}
+ tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
+ list_del(&tx->tx_list);
+ spin_unlock(&gmni->gmni_tx_lock);
-/*
- * Handle a bad message
- * A bad message is one we don't expect or can't interpret
- */
-int
-gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
-{
- CDEBUG(D_TRACE, "Can't handle message\n");
-
- if (!srxd)
- srxd = gmnal_rxbuffer_to_srxd(nal_data,
- we->buffer);
- if (srxd) {
- gmnal_rx_requeue_buffer(nal_data, srxd);
- } else {
- CERROR("Can't find a descriptor for this buffer\n");
- /*
- * get rid of it ?
- */
- return(GMNAL_STATUS_FAIL);
- }
-
- return(GMNAL_STATUS_OK);
+ LASSERT (tx->tx_lntmsg == NULL);
+ LASSERT (tx->tx_ltxb == NULL);
+ LASSERT (!tx->tx_credit);
+
+ return tx;
}
+void
+gmnal_tx_done(gmnal_tx_t *tx, int rc)
+{
+ gmnal_ni_t *gmni = tx->tx_gmni;
+ int wake_sched = 0;
+ lnet_msg_t *lnetmsg = tx->tx_lntmsg;
+ tx->tx_lntmsg = NULL;
-/*
- * Process a small message receive.
- * Get here from gmnal_receive_thread, gmnal_pre_receive
- * lib_parse, cb_recv
- * Put data from prewired receive buffer into users buffer(s)
- * Hang out the receive buffer again for another receive
- * Call lib_finalize
- */
-ptl_err_t
-gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
-{
- gmnal_srxd_t *srxd = NULL;
- gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
-
-
- if (!private) {
- CERROR("gmnal_small_rx no context\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- }
-
- srxd = (gmnal_srxd_t*)private;
-
- /*
- * let portals library know receive is complete
- */
- CDEBUG(D_PORTALS, "calling lib_finalize\n");
- lib_finalize(libnal, private, cookie, PTL_OK);
- /*
- * return buffer so it can be used again
- */
- CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
- srxd->gmsize, GM_LOW_PRIORITY, 0);
- spin_unlock(&nal_data->gm_lock);
-
- return(PTL_OK);
-}
+ spin_lock(&gmni->gmni_tx_lock);
+ if (tx->tx_ltxb != NULL) {
+ wake_sched = 1;
+ list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
+ tx->tx_ltxb = NULL;
+ }
-/*
- * Start a small transmit.
- * Use the given send token (and wired transmit buffer).
- * Copy headers to wired buffer and initiate gm_send from the wired buffer.
- * The callback function informs when the send is complete.
- */
-ptl_err_t
-gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
- gmnal_stxd_t *stxd, int size)
-{
- gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data;
- void *buffer = NULL;
- gmnal_msghdr_t *msghdr = NULL;
- int tot_size = 0;
- unsigned int local_nid;
- gm_status_t gm_status = GM_SUCCESS;
-
- CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
- "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
- "size [%d]\n", libnal, private, cookie, hdr, type,
- global_nid, pid, stxd, size);
-
- CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
- hdr->dest_nid, hdr->src_nid);
-
- if (!nal_data) {
- CERROR("no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
-
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
- &local_nid);
- spin_unlock(&nal_data->gm_lock);
- if (gm_status != GM_SUCCESS) {
- CERROR("Failed to obtain local id\n");
- return(PTL_FAIL);
- }
- CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
-
- stxd->type = GMNAL_SMALL_MESSAGE;
- stxd->cookie = cookie;
-
- /*
- * Copy gmnal_msg_hdr and portals header to the transmit buffer
- * Then send the message, as the data has previously been copied in
- * (HP SFS 1380).
- */
- buffer = stxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
-
- msghdr->magic = GMNAL_MAGIC;
- msghdr->type = GMNAL_SMALL_MESSAGE;
- msghdr->sender_node_id = nal_data->gm_global_nid;
- CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
- buffer += sizeof(gmnal_msghdr_t);
-
- CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
- gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
-
- buffer += sizeof(ptl_hdr_t);
-
- CDEBUG(D_INFO, "sending\n");
- tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
- stxd->msg_size = tot_size;
-
-
- CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
- "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] "
- "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
- stxd->msg_size, global_nid, local_nid, stxd);
-
- spin_lock(&nal_data->gm_lock);
- stxd->gm_priority = GM_LOW_PRIORITY;
- stxd->gm_target_node = local_nid;
- gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
- stxd->gm_size, stxd->msg_size,
- GM_LOW_PRIORITY, local_nid,
- gmnal_small_tx_callback, (void*)stxd);
- spin_unlock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "done\n");
-
- return(PTL_OK);
-}
+ if (tx->tx_credit) {
+ wake_sched = 1;
+ gmni->gmni_tx_credits++;
+ tx->tx_credit = 0;
+ }
+ list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
-/*
- * A callback to indicate the small transmit operation is compete
- * Check for erros and try to deal with them.
- * Call lib_finalise to inform the client application that the send
- * is complete and the memory can be reused.
- * Return the stxd when finished with it (returns a send token)
- */
-void
-gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
-{
- gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- lib_msg_t *cookie = stxd->cookie;
- gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
- lib_nal_t *libnal = nal_data->libnal;
- unsigned gnid = 0;
- gm_status_t gm_status = 0;
-
- if (!stxd) {
- CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
- return;
- }
- if (status != GM_SUCCESS) {
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_node_id_to_global_id(nal_data->gm_port,
- stxd->gm_target_node,&gnid);
- spin_unlock(&nal_data->gm_lock);
- if (gm_status != GM_SUCCESS) {
- CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
- gm_status);
- gnid = 0;
- }
- CERROR("Result of send stxd [%p] is [%s] to [%u]\n",
- stxd, gmnal_gm_error(status), gnid);
- }
-
- switch(status) {
- case(GM_SUCCESS):
- break;
-
-
-
- case(GM_SEND_DROPPED):
- /*
- * do a resend on the dropped ones
- */
- CERROR("send stxd [%p] dropped, resending\n", context);
- spin_lock(&nal_data->gm_lock);
- gm_send_to_peer_with_callback(nal_data->gm_port,
- stxd->buffer,
- stxd->gm_size,
- stxd->msg_size,
- stxd->gm_priority,
- stxd->gm_target_node,
- gmnal_small_tx_callback,
- context);
- spin_unlock(&nal_data->gm_lock);
- return;
- case(GM_TIMED_OUT):
- case(GM_SEND_TIMED_OUT):
- /*
- * drop these ones
- */
- CDEBUG(D_INFO, "calling gm_drop_sends\n");
- spin_lock(&nal_data->gm_lock);
- gm_drop_sends(nal_data->gm_port, stxd->gm_priority,
- stxd->gm_target_node, gm_port_id,
- gmnal_drop_sends_callback, context);
- spin_unlock(&nal_data->gm_lock);
-
- return;
-
-
- /*
- * abort on these ?
- */
- case(GM_TRY_AGAIN):
- case(GM_INTERRUPTED):
- case(GM_FAILURE):
- case(GM_INPUT_BUFFER_TOO_SMALL):
- case(GM_OUTPUT_BUFFER_TOO_SMALL):
- case(GM_BUSY):
- case(GM_MEMORY_FAULT):
- case(GM_INVALID_PARAMETER):
- case(GM_OUT_OF_MEMORY):
- case(GM_INVALID_COMMAND):
- case(GM_PERMISSION_DENIED):
- case(GM_INTERNAL_ERROR):
- case(GM_UNATTACHED):
- case(GM_UNSUPPORTED_DEVICE):
- case(GM_SEND_REJECTED):
- case(GM_SEND_TARGET_PORT_CLOSED):
- case(GM_SEND_TARGET_NODE_UNREACHABLE):
- case(GM_SEND_PORT_CLOSED):
- case(GM_NODE_ID_NOT_YET_SET):
- case(GM_STILL_SHUTTING_DOWN):
- case(GM_CLONE_BUSY):
- case(GM_NO_SUCH_DEVICE):
- case(GM_ABORTED):
- case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
- case(GM_UNTRANSLATED_SYSTEM_ERROR):
- case(GM_ACCESS_DENIED):
- case(GM_NO_DRIVER_SUPPORT):
- case(GM_PTE_REF_CNT_OVERFLOW):
- case(GM_NOT_SUPPORTED_IN_KERNEL):
- case(GM_NOT_SUPPORTED_ON_ARCH):
- case(GM_NO_MATCH):
- case(GM_USER_ERROR):
- case(GM_DATA_CORRUPTED):
- case(GM_HARDWARE_FAULT):
- case(GM_SEND_ORPHANED):
- case(GM_MINOR_OVERFLOW):
- case(GM_PAGE_TABLE_FULL):
- case(GM_UC_ERROR):
- case(GM_INVALID_PORT_NUMBER):
- case(GM_DEV_NOT_FOUND):
- case(GM_FIRMWARE_NOT_RUNNING):
- case(GM_YP_NO_MATCH):
- default:
- gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
- stxd->gm_target_node, gm_port_id,
- gmnal_resume_sending_callback, context);
- return;
+ if (wake_sched)
+ gmnal_check_txqueues_locked(gmni);
- }
-
- /*
- * TO DO
- * If this is a large message init,
- * we're not finished with the data yet,
- * so can't call lib_finalise.
- * However, we're also holding on to a
- * stxd here (to keep track of the source
- * iovec only). Should use another structure
- * to keep track of iovec and return stxd to
- * free list earlier.
- */
- if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) {
- CDEBUG(D_INFO, "large transmit done\n");
- return;
- }
- gmnal_return_stxd(nal_data, stxd);
- lib_finalize(libnal, stxd, cookie, PTL_OK);
- return;
-}
+ spin_unlock(&gmni->gmni_tx_lock);
-/*
- * After an error on the port
- * call this to allow future sends to complete
- */
-void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
- gm_status_t status)
-{
- gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
- CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
- gmnal_return_stxd(stxd->nal_data, stxd);
- lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
- return;
+ /* Delay finalize until tx is free */
+ if (lnetmsg != NULL)
+ lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
}
-
-void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
- gm_status_t status)
+void
+gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
+ gm_status_t status)
{
- gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- gmnal_data_t *nal_data = stxd->nal_data;
-
- CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context);
- if (status == GM_SUCCESS) {
- spin_lock(&nal_data->gm_lock);
- gm_send_to_peer_with_callback(gm_port, stxd->buffer,
- stxd->gm_size, stxd->msg_size,
- stxd->gm_priority,
- stxd->gm_target_node,
- gmnal_small_tx_callback,
- context);
- spin_unlock(&nal_data->gm_lock);
- } else {
- CERROR("send_to_peer status for stxd [%p] is "
- "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
- /* Recycle the stxd */
- gmnal_return_stxd(nal_data, stxd);
- lib_finalize(nal_data->libnal, stxd, stxd->cookie, PTL_FAIL);
- }
-
- return;
-}
+ gmnal_tx_t *tx = (gmnal_tx_t*)context;
+ LASSERT(!in_interrupt());
-/*
- * Begine a large transmit.
- * Do a gm_register of the memory pointed to by the iovec
- * and send details to the receiver. The receiver does a gm_get
- * to pull the data and sends and ack when finished. Upon receipt of
- * this ack, deregister the memory. Only 1 send token is required here.
- */
-int
-gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t offset, int size)
-{
+ CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
+ tx, status, gmnal_gmstatus2str(status),
+ libcfs_nid2str(tx->tx_nid));
- gmnal_data_t *nal_data;
- gmnal_stxd_t *stxd = NULL;
- void *buffer = NULL;
- gmnal_msghdr_t *msghdr = NULL;
- unsigned int local_nid;
- int mlen = 0; /* the size of the init message data */
- struct iovec *iov_dup = NULL;
- gm_status_t gm_status;
- int niov_dup;
-
-
- CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] "
- "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], "
- "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type,
- global_nid, pid, niov, iov, size);
-
- if (libnal)
- nal_data = (gmnal_data_t*)libnal->libnal_data;
- else {
- CERROR("no libnal.\n");
- return(GMNAL_STATUS_FAIL);
- }
-
-
- /*
- * Get stxd and buffer. Put local address of data in buffer,
- * send local addresses to target,
- * wait for the target node to suck the data over.
- * The stxd is used to ren
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-
- stxd->type = GMNAL_LARGE_MESSAGE_INIT;
- stxd->cookie = cookie;
-
- /*
- * Copy gmnal_msg_hdr and portals header to the transmit buffer
- * Then copy the iov in
- */
- buffer = stxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
-
- CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
- msghdr->magic = GMNAL_MAGIC;
- msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
- msghdr->sender_node_id = nal_data->gm_global_nid;
- msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
- msghdr->niov = niov ;
- buffer += sizeof(gmnal_msghdr_t);
- mlen = sizeof(gmnal_msghdr_t);
- CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-
- CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer);
-
- gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
- buffer += sizeof(ptl_hdr_t);
- mlen += sizeof(ptl_hdr_t);
- CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
- while (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- niov--;
- iov++;
- }
-
- LASSERT(offset >= 0);
- /*
- * Store the iovs in the stxd for we can get
- * them later if we need them
- */
- stxd->iov[0].iov_base = iov->iov_base + offset;
- stxd->iov[0].iov_len = iov->iov_len - offset;
- CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
- if (niov > 1)
- gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
- stxd->niov = niov;
-
- /*
- * copy the iov to the buffer so target knows
- * where to get the data from
- */
- CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
- gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
- mlen += stxd->niov*(sizeof(struct iovec));
- CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
- /*
- * register the memory so the NIC can get hold of the data
- * This is a slow process. it'd be good to overlap it
- * with something else.
- */
- iov = stxd->iov;
- iov_dup = iov;
- niov_dup = niov;
- while(niov--) {
- CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
- iov->iov_base, iov->iov_len);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_register_memory(nal_data->gm_port,
- iov->iov_base, iov->iov_len);
- if (gm_status != GM_SUCCESS) {
- spin_unlock(&nal_data->gm_lock);
- CERROR("gm_register_memory returns [%d][%s] "
- "for memory [%p] len ["LPSZ"]\n",
- gm_status, gmnal_gm_error(gm_status),
- iov->iov_base, iov->iov_len);
- spin_lock(&nal_data->gm_lock);
- while (iov_dup != iov) {
- gm_deregister_memory(nal_data->gm_port,
- iov_dup->iov_base,
- iov_dup->iov_len);
- iov_dup++;
- }
- spin_unlock(&nal_data->gm_lock);
- gmnal_return_stxd(nal_data, stxd);
- return(PTL_FAIL);
- }
-
- spin_unlock(&nal_data->gm_lock);
- iov++;
- }
-
- /*
- * Send the init message to the target
- */
- CDEBUG(D_INFO, "sending mlen [%d]\n", mlen);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid,
- &local_nid);
- if (gm_status != GM_SUCCESS) {
- spin_unlock(&nal_data->gm_lock);
- CERROR("Failed to obtain local id\n");
- gmnal_return_stxd(nal_data, stxd);
- /* TO DO deregister memory on failure */
- return(GMNAL_STATUS_FAIL);
- }
- CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid);
- gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
- stxd->gm_size, mlen, GM_LOW_PRIORITY,
- local_nid, gmnal_large_tx_callback,
- (void*)stxd);
- spin_unlock(&nal_data->gm_lock);
-
- CDEBUG(D_INFO, "done\n");
-
- return(PTL_OK);
+ gmnal_tx_done(tx, -EIO);
}
-/*
- * Callback function indicates that send of buffer with
- * large message iovec has completed (or failed).
- */
-void
-gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
+void
+gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
{
- gmnal_small_tx_callback(gm_port, context, status);
+ gmnal_tx_t *tx = (gmnal_tx_t*)context;
+ gmnal_ni_t *gmni = tx->tx_gmni;
-}
+ LASSERT(!in_interrupt());
+ switch(status) {
+ case GM_SUCCESS:
+ gmnal_tx_done(tx, 0);
+ return;
+ case GM_SEND_DROPPED:
+ CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
+ tx, libcfs_nid2str(tx->tx_nid));
+ /* Another tx failed and called gm_drop_sends() which made this
+ * one complete immediately */
+ gmnal_tx_done(tx, -EIO);
+ return;
-/*
- * Have received a buffer that contains an iovec of the sender.
- * Do a gm_register_memory of the receivers buffer and then do a get
- * data from the sender.
- */
-int
-gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- unsigned int nriov, struct iovec *riov, size_t offset,
- size_t mlen, size_t rlen)
-{
- gmnal_data_t *nal_data = libnal->libnal_data;
- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- void *buffer = NULL;
- struct iovec *riov_dup;
- int nriov_dup;
- gmnal_msghdr_t *msghdr = NULL;
- gm_status_t gm_status;
-
- CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], "
- "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, nriov, riov, mlen, rlen);
-
- if (!srxd) {
- CERROR("gmnal_large_rx no context\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- }
-
- buffer = srxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
- buffer += sizeof(gmnal_msghdr_t);
- buffer += sizeof(ptl_hdr_t);
-
- /*
- * Store the senders stxd address in the srxd for this message
- * The gmnal_large_message_ack needs it to notify the sender
- * the pull of data is complete
- */
- srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
- /*
- * Register the receivers memory
- * get the data,
- * tell the sender that we got the data
- * then tell the receiver we got the data
- * TO DO
- * If the iovecs match, could interleave
- * gm_registers and gm_gets for each element
- */
- while (offset >= riov->iov_len) {
- offset -= riov->iov_len;
- riov++;
- nriov--;
- }
- LASSERT (nriov >= 0);
- LASSERT (offset >= 0);
- /*
- * do this so the final gm_get callback can deregister the memory
- */
- PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
-
- srxd->riov[0].iov_base = riov->iov_base + offset;
- srxd->riov[0].iov_len = riov->iov_len - offset;
- if (nriov > 1)
- gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
- srxd->nriov = nriov;
-
- riov = srxd->riov;
- nriov_dup = nriov;
- riov_dup = riov;
- while(nriov--) {
- CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n",
- riov->iov_base, riov->iov_len);
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_register_memory(nal_data->gm_port,
- riov->iov_base, riov->iov_len);
- if (gm_status != GM_SUCCESS) {
- spin_unlock(&nal_data->gm_lock);
- CERROR("gm_register_memory returns [%d][%s] "
- "for memory [%p] len ["LPSZ"]\n",
- gm_status, gmnal_gm_error(gm_status),
- riov->iov_base, riov->iov_len);
- spin_lock(&nal_data->gm_lock);
- while (riov_dup != riov) {
- gm_deregister_memory(nal_data->gm_port,
- riov_dup->iov_base,
- riov_dup->iov_len);
- riov_dup++;
- }
- spin_lock(&nal_data->gm_lock);
- /*
- * give back srxd and buffer. Send NACK to sender
- */
- PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
- return(PTL_FAIL);
- }
- spin_unlock(&nal_data->gm_lock);
- riov++;
- }
-
- /*
- * now do gm_get to get the data
- */
- srxd->cookie = cookie;
- if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer,
- nriov_dup, riov_dup) != GMNAL_STATUS_OK) {
- CERROR("can't get the data");
- }
-
- CDEBUG(D_INFO, "lgmanl_large_rx done\n");
-
- return(PTL_OK);
-}
+ default:
+ /* Some error; NB don't complete tx yet; we need its credit for
+ * gm_drop_sends() */
+ CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
+ tx, status, gmnal_gmstatus2str(status),
+ libcfs_nid2str(tx->tx_nid));
+
+ gmnal_notify_peer_down(tx);
+
+ spin_lock(&gmni->gmni_gm_lock);
+ gm_drop_sends(gmni->gmni_port,
+ tx->tx_ltxb != NULL ?
+ GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
+ tx->tx_gmlid, *gmnal_tunables.gm_port,
+ gmnal_drop_sends_callback, tx);
+ spin_unlock(&gmni->gmni_gm_lock);
+ return;
+ }
+ /* not reached */
+ LBUG();
+}
-/*
- * Perform a number of remote gets as part of receiving
- * a large message.
- * The final one to complete (i.e. the last callback to get called)
- * tidies up.
- * gm_get requires a send token.
- */
-int
-gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
- int nriov, struct iovec *riov)
+void
+gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
{
+ gmnal_tx_t *tx;
+ gmnal_txbuf_t *ltxb;
+ int gmsize;
+ int pri;
+ void *netaddr;
+
+ tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
+ list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
+
+ if (tx != NULL &&
+ (tx->tx_large_nob == 0 ||
+ !list_empty(&gmni->gmni_idle_ltxbs))) {
+
+ /* consume tx */
+ list_del(&tx->tx_list);
+
+ LASSERT (tx->tx_ltxb == NULL);
+
+ if (tx->tx_large_nob != 0) {
+ ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
+ gmnal_txbuf_t, txb_list);
+
+ /* consume large buffer */
+ list_del(<xb->txb_list);
+
+ spin_unlock(&gmni->gmni_tx_lock);
+
+ /* Unlocking here allows sends to get re-ordered,
+ * but we want to allow other CPUs to progress... */
+
+ tx->tx_ltxb = ltxb;
+
+ /* marshall message in tx_ltxb...
+ * 1. Copy what was marshalled so far (in tx_buf) */
+ memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf),
+ GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
+
+ /* 2. Copy the payload */
+ if (tx->tx_large_iskiov)
+ lnet_copy_kiov2kiov(
+ gmni->gmni_large_pages,
+ ltxb->txb_buf.nb_kiov,
+ tx->tx_msgnob,
+ tx->tx_large_niov,
+ tx->tx_large_frags.kiov,
+ tx->tx_large_offset,
+ tx->tx_large_nob);
+ else
+ lnet_copy_iov2kiov(
+ gmni->gmni_large_pages,
+ ltxb->txb_buf.nb_kiov,
+ tx->tx_msgnob,
+ tx->tx_large_niov,
+ tx->tx_large_frags.iov,
+ tx->tx_large_offset,
+ tx->tx_large_nob);
+
+ tx->tx_msgnob += tx->tx_large_nob;
+
+ spin_lock(&gmni->gmni_tx_lock);
+ }
+
+ list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
+ }
- int ncalls = 0;
+ if (!list_empty(&gmni->gmni_cred_txq) &&
+ gmni->gmni_tx_credits != 0) {
- CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], "
- "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov);
+ tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
+ /* consume tx and 1 credit */
+ list_del(&tx->tx_list);
+ gmni->gmni_tx_credits--;
- ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov);
- if (ncalls < 0) {
- CERROR("there's something wrong with the iovecs\n");
- return(GMNAL_STATUS_FAIL);
- }
- CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls);
- spin_lock_init(&srxd->callback_lock);
- srxd->ncallbacks = ncalls;
- srxd->callback_status = 0;
+ spin_unlock(&gmni->gmni_tx_lock);
- ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov);
- if (ncalls < 0) {
- CERROR("there's something wrong with the iovecs\n");
- return(GMNAL_STATUS_FAIL);
- }
+ /* Unlocking here allows sends to get re-ordered, but we want
+ * to allow other CPUs to progress... */
- return(GMNAL_STATUS_OK);
+ LASSERT(!tx->tx_credit);
+ tx->tx_credit = 1;
-}
+ tx->tx_launchtime = cfs_time_current();
+ if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
+ LASSERT (tx->tx_ltxb == NULL);
+ netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
+ gmsize = gmni->gmni_small_gmsize;
+ pri = GMNAL_SMALL_PRIORITY;
+ } else {
+ LASSERT (tx->tx_ltxb != NULL);
+ netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
+ gmsize = gmni->gmni_large_gmsize;
+ pri = GMNAL_LARGE_PRIORITY;
+ }
-/*
- * pull data from source node (source iovec) to a local iovec.
- * The iovecs may not match which adds the complications below.
- * Count the number of gm_gets that will be required so the callbacks
- * can determine who is the last one.
- */
-int
-gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
- struct iovec *siov, int nriov, struct iovec *riov)
-{
+ spin_lock(&gmni->gmni_gm_lock);
- int ncalls = 0;
- int slen = siov->iov_len, rlen = riov->iov_len;
- char *sbuf = siov->iov_base, *rbuf = riov->iov_base;
- unsigned long sbuf_long;
- gm_remote_ptr_t remote_ptr = 0;
- unsigned int source_node;
- gmnal_ltxd_t *ltxd = NULL;
- gmnal_data_t *nal_data = srxd->nal_data;
-
- CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data);
- if (do_copy) {
- if (!nal_data) {
- CERROR("Bad args No nal_data\n");
- return(GMNAL_STATUS_FAIL);
- }
- spin_lock(&nal_data->gm_lock);
- if (gm_global_id_to_node_id(nal_data->gm_port,
- srxd->gm_source_node,
- &source_node) != GM_SUCCESS) {
-
- CERROR("cannot resolve global_id [%u] "
- "to local node_id\n", srxd->gm_source_node);
- spin_unlock(&nal_data->gm_lock);
- return(GMNAL_STATUS_FAIL);
- }
- spin_unlock(&nal_data->gm_lock);
- /*
- * We need a send token to use gm_get
- * getting an stxd gets us a send token.
- * the stxd is used as the context to the
- * callback function (so stxd can be returned).
- * Set pointer in stxd to srxd so callback count in srxd
- * can be decremented to find last callback to complete
- */
- CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n",
- srxd->gm_source_node, source_node);
- }
-
- do {
- CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n",
- sbuf, slen, rbuf, rlen);
- if (slen > rlen) {
- ncalls++;
- if (do_copy) {
- CDEBUG(D_INFO, "slen>rlen\n");
- ltxd = gmnal_get_ltxd(nal_data);
- ltxd->srxd = srxd;
- spin_lock(&nal_data->gm_lock);
- /*
- * funny business to get rid
- * of compiler warning
- */
- sbuf_long = (unsigned long) sbuf;
- remote_ptr = (gm_remote_ptr_t)sbuf_long;
- gm_get(nal_data->gm_port, remote_ptr, rbuf,
- rlen, GM_LOW_PRIORITY, source_node,
- gm_port_id,
- gmnal_remote_get_callback, ltxd);
- spin_unlock(&nal_data->gm_lock);
- }
- /*
- * at the end of 1 iov element
- */
- sbuf+=rlen;
- slen-=rlen;
- riov++;
- nriov--;
- rbuf = riov->iov_base;
- rlen = riov->iov_len;
- } else if (rlen > slen) {
- ncalls++;
- if (do_copy) {
- CDEBUG(D_INFO, "slen<rlen\n");
- ltxd = gmnal_get_ltxd(nal_data);
- ltxd->srxd = srxd;
- spin_lock(&nal_data->gm_lock);
- sbuf_long = (unsigned long) sbuf;
- remote_ptr = (gm_remote_ptr_t)sbuf_long;
- gm_get(nal_data->gm_port, remote_ptr, rbuf,
- slen, GM_LOW_PRIORITY, source_node,
- gm_port_id,
- gmnal_remote_get_callback, ltxd);
- spin_unlock(&nal_data->gm_lock);
- }
- /*
- * at end of siov element
- */
- rbuf+=slen;
- rlen-=slen;
- siov++;
- sbuf = siov->iov_base;
- slen = siov->iov_len;
- } else {
- ncalls++;
- if (do_copy) {
- CDEBUG(D_INFO, "rlen=slen\n");
- ltxd = gmnal_get_ltxd(nal_data);
- ltxd->srxd = srxd;
- spin_lock(&nal_data->gm_lock);
- sbuf_long = (unsigned long) sbuf;
- remote_ptr = (gm_remote_ptr_t)sbuf_long;
- gm_get(nal_data->gm_port, remote_ptr, rbuf,
- rlen, GM_LOW_PRIORITY, source_node,
- gm_port_id,
- gmnal_remote_get_callback, ltxd);
- spin_unlock(&nal_data->gm_lock);
- }
- /*
- * at end of siov and riov element
- */
- siov++;
- sbuf = siov->iov_base;
- slen = siov->iov_len;
- riov++;
- nriov--;
- rbuf = riov->iov_base;
- rlen = riov->iov_len;
- }
-
- } while (nriov);
- return(ncalls);
+ gm_send_to_peer_with_callback(gmni->gmni_port,
+ netaddr, gmsize,
+ tx->tx_msgnob,
+ pri,
+ tx->tx_gmlid,
+ gmnal_tx_callback,
+ (void*)tx);
+
+ spin_unlock(&gmni->gmni_gm_lock);
+ spin_lock(&gmni->gmni_tx_lock);
+ }
}
+void
+gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
+{
+ int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
+ gmni->gmni_small_gmsize;
+ int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
+ GMNAL_SMALL_PRIORITY;
+ void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
+
+ CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
+
+ spin_lock(&gmni->gmni_gm_lock);
+ gm_provide_receive_buffer_with_tag(gmni->gmni_port,
+ buffer, gmsize, pri, 0);
+ spin_unlock(&gmni->gmni_gm_lock);
+}
-/*
- * The callback function that is invoked after each gm_get call completes.
- * Multiple callbacks may be invoked for 1 transaction, only the final
- * callback has work to do.
- */
void
-gmnal_remote_get_callback(gm_port_t *gm_port, void *context,
- gm_status_t status)
+gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
{
+ /* Future protocol version compatibility support!
+ * The next gmlnd-specific protocol rev will first send a message to
+ * check version; I reply with a stub message containing my current
+ * magic+version... */
+ gmnal_msg_t *msg;
+ gmnal_tx_t *tx = gmnal_get_tx(gmni);
+
+ if (tx == NULL) {
+ CERROR("Can't allocate tx to send version info to %u\n",
+ rx->rx_recv_gmid);
+ return;
+ }
- gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context;
- gmnal_srxd_t *srxd = ltxd->srxd;
- lib_nal_t *libnal = srxd->nal_data->libnal;
- int lastone;
- struct iovec *riov;
- int nriov;
- gmnal_data_t *nal_data;
-
- CDEBUG(D_TRACE, "called for context [%p]\n", context);
-
- if (status != GM_SUCCESS) {
- CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status));
- }
-
- spin_lock(&srxd->callback_lock);
- srxd->ncallbacks--;
- srxd->callback_status |= status;
- lastone = srxd->ncallbacks?0:1;
- spin_unlock(&srxd->callback_lock);
- nal_data = srxd->nal_data;
-
- /*
- * everyone returns a send token
- */
- gmnal_return_ltxd(nal_data, ltxd);
-
- if (!lastone) {
- CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd);
- return;
- }
-
- /*
- * Let our client application proceed
- */
- CERROR("final callback context[%p]\n", srxd);
- lib_finalize(libnal, srxd, srxd->cookie, PTL_OK);
-
- /*
- * send an ack to the sender to let him know we got the data
- */
- gmnal_large_tx_ack(nal_data, srxd);
-
- /*
- * Unregister the memory that was used
- * This is a very slow business (slower then register)
- */
- nriov = srxd->nriov;
- riov = srxd->riov;
- spin_lock(&nal_data->gm_lock);
- while (nriov--) {
- CERROR("deregister memory [%p]\n", riov->iov_base);
- if (gm_deregister_memory(srxd->nal_data->gm_port,
- riov->iov_base, riov->iov_len)) {
- CERROR("failed to deregister memory [%p]\n",
- riov->iov_base);
- }
- riov++;
- }
- spin_unlock(&nal_data->gm_lock);
- PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov);
-
- /*
- * repost the receive buffer (return receive token)
- */
- spin_lock(&nal_data->gm_lock);
- gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
- srxd->gmsize, GM_LOW_PRIORITY, 0);
- spin_unlock(&nal_data->gm_lock);
-
- return;
-}
+ LASSERT (tx->tx_lntmsg == NULL); /* no finalize */
+ tx->tx_nid = LNET_NID_ANY;
+ tx->tx_gmlid = rx->rx_recv_gmid;
-/*
- * Called on target node.
- * After pulling data from a source node
- * send an ack message to indicate the large transmit is complete.
- */
-void
-gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
-{
+ msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
+ msg->gmm_magic = GMNAL_MSG_MAGIC;
+ msg->gmm_version = GMNAL_MSG_VERSION;
+
+ /* just send magic + version */
+ tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
+ tx->tx_large_nob = 0;
+
+ spin_lock(&gmni->gmni_tx_lock);
+
+ list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+ gmnal_check_txqueues_locked(gmni);
- gmnal_stxd_t *stxd;
- gmnal_msghdr_t *msghdr;
- void *buffer = NULL;
- unsigned int local_nid;
- gm_status_t gm_status = GM_SUCCESS;
-
- CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd,
- srxd->gm_source_node);
-
- spin_lock(&nal_data->gm_lock);
- gm_status = gm_global_id_to_node_id(nal_data->gm_port,
- srxd->gm_source_node, &local_nid);
- spin_unlock(&nal_data->gm_lock);
- if (gm_status != GM_SUCCESS) {
- CERROR("Failed to obtain local id\n");
- return;
- }
- CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
-
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd);
-
- stxd->nal_data = nal_data;
- stxd->type = GMNAL_LARGE_MESSAGE_ACK;
-
- /*
- * Copy gmnal_msg_hdr and portals header to the transmit buffer
- * Then copy the data in
- */
- buffer = stxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
-
- /*
- * Add in the address of the original stxd from the sender node
- * so it knows which thread to notify.
- */
- msghdr->magic = GMNAL_MAGIC;
- msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
- msghdr->sender_node_id = nal_data->gm_global_nid;
- msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
- CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
-
- CDEBUG(D_INFO, "sending\n");
- stxd->msg_size= sizeof(gmnal_msghdr_t);
-
-
- CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
- "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] "
- "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size,
- stxd->msg_size, srxd->gm_source_node, local_nid, stxd);
- spin_lock(&nal_data->gm_lock);
- stxd->gm_priority = GM_LOW_PRIORITY;
- stxd->gm_target_node = local_nid;
- gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer,
- stxd->gm_size, stxd->msg_size,
- GM_LOW_PRIORITY, local_nid,
- gmnal_large_tx_ack_callback,
- (void*)stxd);
-
- spin_unlock(&nal_data->gm_lock);
- CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n");
-
- return;
+ spin_unlock(&gmni->gmni_tx_lock);
}
+int
+gmnal_rx_thread(void *arg)
+{
+ gmnal_ni_t *gmni = arg;
+ gm_recv_event_t *rxevent = NULL;
+ gm_recv_t *recv = NULL;
+ gmnal_rx_t *rx;
+ int rc;
+
+ cfs_daemonize("gmnal_rxd");
+
+ while (!gmni->gmni_shutdown) {
+ rc = down_interruptible(&gmni->gmni_rx_mutex);
+ LASSERT (rc == 0 || rc == -EINTR);
+ if (rc != 0)
+ continue;
+
+ spin_lock(&gmni->gmni_gm_lock);
+ rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
+ spin_unlock(&gmni->gmni_gm_lock);
+
+ switch (GM_RECV_EVENT_TYPE(rxevent)) {
+ default:
+ gm_unknown(gmni->gmni_port, rxevent);
+ up(&gmni->gmni_rx_mutex);
+ continue;
+
+ case GM_FAST_RECV_EVENT:
+ case GM_FAST_PEER_RECV_EVENT:
+ case GM_PEER_RECV_EVENT:
+ case GM_FAST_HIGH_RECV_EVENT:
+ case GM_FAST_HIGH_PEER_RECV_EVENT:
+ case GM_HIGH_PEER_RECV_EVENT:
+ case GM_RECV_EVENT:
+ case GM_HIGH_RECV_EVENT:
+ break;
+ }
+
+ recv = &rxevent->recv;
+ rx = gm_hash_find(gmni->gmni_rx_hash,
+ gm_ntohp(recv->buffer));
+ LASSERT (rx != NULL);
+
+ rx->rx_recv_nob = gm_ntoh_u32(recv->length);
+ rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
+ rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
+ rx->rx_recv_type = gm_ntoh_u8(recv->type);
+
+ switch (GM_RECV_EVENT_TYPE(rxevent)) {
+ case GM_FAST_RECV_EVENT:
+ case GM_FAST_PEER_RECV_EVENT:
+ case GM_FAST_HIGH_RECV_EVENT:
+ case GM_FAST_HIGH_PEER_RECV_EVENT:
+ LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
+
+ memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
+ gm_ntohp(recv->message), rx->rx_recv_nob);
+ break;
+ }
+
+ up(&gmni->gmni_rx_mutex);
+
+ CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
+ GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
+ gm_ntohp(recv->buffer), rx->rx_recv_nob);
+
+ /* We're connectionless: simply drop packets with
+ * errors */
+ rc = gmnal_unpack_msg(gmni, rx);
+
+ if (rc == 0) {
+ gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+
+ LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+ rc = lnet_parse(gmni->gmni_ni,
+ &msg->gmm_u.immediate.gmim_hdr,
+ msg->gmm_srcnid, rx, 0);
+ } else if (rc > 0) {
+ gmnal_version_reply(gmni, rx);
+ rc = -EPROTO; /* repost rx */
+ }
+
+ if (rc < 0) /* parse failure */
+ gmnal_post_rx(gmni, rx);
+ }
+
+ CDEBUG(D_NET, "exiting\n");
+ atomic_dec(&gmni->gmni_nthreads);
+ return 0;
+}
-/*
- * A callback to indicate the small transmit operation is compete
- * Check for errors and try to deal with them.
- * Call lib_finalise to inform the client application that the
- * send is complete and the memory can be reused.
- * Return the stxd when finished with it (returns a send token)
- */
void
-gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context,
- gm_status_t status)
+gmnal_stop_threads(gmnal_ni_t *gmni)
{
- gmnal_stxd_t *stxd = (gmnal_stxd_t*)context;
- gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data;
-
- if (!stxd) {
- CERROR("send completion event for unknown stxd\n");
- return;
- }
- CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n",
- stxd, status);
- gmnal_return_stxd(stxd->nal_data, stxd);
-
- spin_unlock(&nal_data->gm_lock);
- return;
+ int count = 2;
+
+ gmni->gmni_shutdown = 1;
+ mb();
+
+ /* wake rxthread owning gmni_rx_mutex with an alarm. */
+ spin_lock(&gmni->gmni_gm_lock);
+ gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
+ spin_unlock(&gmni->gmni_gm_lock);
+
+ while (atomic_read(&gmni->gmni_nthreads) != 0) {
+ count++;
+ if ((count & (count - 1)) == 0)
+ CWARN("Waiting for %d threads to stop\n",
+ atomic_read(&gmni->gmni_nthreads));
+ gmnal_yield(1);
+ }
}
-/*
- * Indicates the large transmit operation is compete.
- * Called on transmit side (means data has been pulled by receiver
- * or failed).
- * Call lib_finalise to inform the client application that the send
- * is complete, deregister the memory and return the stxd.
- * Finally, report the rx buffer that the ack message was delivered in.
- */
-void
-gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
+int
+gmnal_start_threads(gmnal_ni_t *gmni)
{
- lib_nal_t *libnal = nal_data->libnal;
- gmnal_stxd_t *stxd = NULL;
- gmnal_msghdr_t *msghdr = NULL;
- void *buffer = NULL;
- struct iovec *iov;
-
-
- CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer);
-
- buffer = srxd->buffer;
- msghdr = (gmnal_msghdr_t*)buffer;
- stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
-
- CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);
-
- lib_finalize(libnal, stxd, stxd->cookie, PTL_OK);
-
- /*
- * extract the iovec from the stxd, deregister the memory.
- * free the space used to store the iovec
- */
- iov = stxd->iov;
- while(stxd->niov--) {
- CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n",
- iov->iov_base, iov->iov_len);
- spin_lock(&nal_data->gm_lock);
- gm_deregister_memory(nal_data->gm_port, iov->iov_base,
- iov->iov_len);
- spin_unlock(&nal_data->gm_lock);
- iov++;
- }
-
- /*
- * return the send token
- * TO DO It is bad to hold onto the send token so long?
- */
- gmnal_return_stxd(nal_data, stxd);
-
-
- /*
- * requeue the receive buffer
- */
- gmnal_rx_requeue_buffer(nal_data, srxd);
-
-
- return;
+ int i;
+ int pid;
+
+ LASSERT (!gmni->gmni_shutdown);
+ LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
+
+ gm_initialize_alarm(&gmni->gmni_alarm);
+
+ for (i = 0; i < num_online_cpus(); i++) {
+
+ pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
+ if (pid < 0) {
+ CERROR("rx thread failed to start: %d\n", pid);
+ gmnal_stop_threads(gmni);
+ return pid;
+ }
+
+ atomic_inc(&gmni->gmni_nthreads);
+ }
+
+ return 0;
}