X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fklnds%2Fgmlnd%2Fgmlnd_comm.c;h=4b26f28d71c80f6652d7c3d96b1299127a26d3f4;hb=07c9951b928a98750f96a5f83dbd7c5b2a475760;hp=60e5d67a6da3a9f10403db792579c7a9a0683a0c;hpb=439addad84514e7ff6452710e6a7f15b80d7b589;p=fs%2Flustre-release.git diff --git a/lnet/klnds/gmlnd/gmlnd_comm.c b/lnet/klnds/gmlnd/gmlnd_comm.c index 60e5d67..4b26f28 100644 --- a/lnet/klnds/gmlnd/gmlnd_comm.c +++ b/lnet/klnds/gmlnd/gmlnd_comm.c @@ -23,1319 +23,541 @@ * This file contains all gmnal send and receive functions */ -#include "gmnal.h" +#include "gmlnd.h" -/* - * The caretaker thread - * This is main thread of execution for the NAL side - * This guy waits in gm_blocking_recvive and gets - * woken up when the myrinet adaptor gets an interrupt. - * Hands off receive operations to the receive thread - * This thread Looks after gm_callbacks etc inline. - */ -int -gmnal_ct_thread(void *arg) +void +gmnal_notify_peer_down(gmnal_tx_t *tx) { - gmnal_data_t *nal_data; - gm_recv_event_t *rxevent = NULL; - gm_recv_t *recv = NULL; + struct timeval now; + time_t then; - if (!arg) { - CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); - return(-1); - } + do_gettimeofday (&now); + then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ; - nal_data = (gmnal_data_t*)arg; - CDEBUG(D_TRACE, "nal_data is [%p]\n", arg); - - sprintf(current->comm, "gmnal_ct"); - - daemonize(); - - nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED; - - GMNAL_GM_LOCK(nal_data); - while(nal_data->ctthread_flag == GMNAL_CTTHREAD_STARTED) { - CDEBUG(D_NET, "waiting\n"); - rxevent = gm_blocking_receive_no_spin(nal_data->gm_port); - if (nal_data->ctthread_flag == GMNAL_THREAD_STOP) { - CDEBUG(D_INFO, "time to exit\n"); - break; - } - CDEBUG(D_INFO, "got [%s]\n", gmnal_rxevent(rxevent)); - switch (GM_RECV_EVENT_TYPE(rxevent)) { - - case(GM_RECV_EVENT): - CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n"); - recv = (gm_recv_t*)&rxevent->recv; - GMNAL_GM_UNLOCK(nal_data); - gmnal_add_rxtwe(nal_data, recv); - GMNAL_GM_LOCK(nal_data); - CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n"); - break; - case(_GM_SLEEP_EVENT): - /* - * Blocking receive above just returns - * immediatly with _GM_SLEEP_EVENT - * Don't know what this is - */ - CDEBUG(D_NET, "Sleeping in gm_unknown\n"); - GMNAL_GM_UNLOCK(nal_data); - gm_unknown(nal_data->gm_port, rxevent); - GMNAL_GM_LOCK(nal_data); - CDEBUG(D_INFO, "Awake from gm_unknown\n"); - break; - - default: - /* - * Don't know what this is - * gm_unknown will make sense of it - * Should be able to do something with - * FAST_RECV_EVENTS here. - */ - CDEBUG(D_NET, "Passing event to gm_unknown\n"); - GMNAL_GM_UNLOCK(nal_data); - gm_unknown(nal_data->gm_port, rxevent); - GMNAL_GM_LOCK(nal_data); - CDEBUG(D_INFO, "Processed unknown event\n"); - } - } - GMNAL_GM_UNLOCK(nal_data); - nal_data->ctthread_flag = GMNAL_THREAD_RESET; - CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data); - return(GMNAL_STATUS_OK); + lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then); } - -/* - * process a receive event - */ -int gmnal_rx_thread(void *arg) +void +gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg, + lnet_nid_t dstnid, int type) { - gmnal_data_t *nal_data; - void *buffer; - gmnal_rxtwe_t *we = NULL; - int rank; - - if (!arg) { - CDEBUG(D_TRACE, "NO nal_data. Exiting\n"); - return(-1); - } - - nal_data = (gmnal_data_t*)arg; - CDEBUG(D_TRACE, "nal_data is [%p]\n", arg); - - for (rank=0; rankrxthread_pid[rank] == current->pid) - break; - - sprintf(current->comm, "gmnal_rx_%d", rank); - - daemonize(); - /* - * set 1 bit for each thread started - * doesn't matter which bit - */ - spin_lock(&nal_data->rxthread_flag_lock); - if (nal_data->rxthread_flag) - nal_data->rxthread_flag=nal_data->rxthread_flag*2 + 1; - else - nal_data->rxthread_flag = 1; - CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag); - spin_unlock(&nal_data->rxthread_flag_lock); - - while(nal_data->rxthread_stop_flag != GMNAL_THREAD_STOP) { - CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n"); - we = gmnal_get_rxtwe(nal_data); - if (!we) { - CDEBUG(D_INFO, "Receive thread time to exit\n"); - break; - } - - buffer = we->buffer; - switch(((gmnal_msghdr_t*)buffer)->type) { - case(GMNAL_SMALL_MESSAGE): - gmnal_pre_receive(nal_data, we, GMNAL_SMALL_MESSAGE); - break; - case(GMNAL_LARGE_MESSAGE_INIT): - gmnal_pre_receive(nal_data,we,GMNAL_LARGE_MESSAGE_INIT); - break; - case(GMNAL_LARGE_MESSAGE_ACK): - gmnal_pre_receive(nal_data, we,GMNAL_LARGE_MESSAGE_ACK); - break; - default: - CERROR("Unsupported message type\n"); - gmnal_rx_bad(nal_data, we, NULL); - } - PORTAL_FREE(we, sizeof(gmnal_rxtwe_t)); - } - - spin_lock(&nal_data->rxthread_flag_lock); - nal_data->rxthread_flag/=2; - CDEBUG(D_INFO, "rxthread flag is [%ld]\n", nal_data->rxthread_flag); - spin_unlock(&nal_data->rxthread_flag_lock); - CDEBUG(D_INFO, "thread nal_data [%p] is exiting\n", nal_data); - return(GMNAL_STATUS_OK); + /* CAVEAT EMPTOR! this only sets the common message fields. */ + msg->gmm_magic = GMNAL_MSG_MAGIC; + msg->gmm_version = GMNAL_MSG_VERSION; + msg->gmm_type = type; + msg->gmm_srcnid = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid, + dstnid); + msg->gmm_dstnid = dstnid; } - - -/* - * Start processing a small message receive - * Get here from gmnal_receive_thread - * Hand off to lib_parse, which calls cb_recv - * which hands back to gmnal_small_receive - * Deal with all endian stuff here. - */ int -gmnal_pre_receive(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, int gmnal_type) +gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx) { - gmnal_srxd_t *srxd = NULL; - void *buffer = NULL; - unsigned int snode, sport, type, length; - gmnal_msghdr_t *gmnal_msghdr; - ptl_hdr_t *portals_hdr; - int rc; - - CDEBUG(D_INFO, "nal_data [%p], we[%p] type [%d]\n", - nal_data, we, gmnal_type); - - buffer = we->buffer; - snode = we->snode; - sport = we->sport; - type = we->type; - buffer = we->buffer; - length = we->length; - - gmnal_msghdr = (gmnal_msghdr_t*)buffer; - portals_hdr = (ptl_hdr_t*)(buffer+GMNAL_MSGHDR_SIZE); - - CDEBUG(D_INFO, "rx_event:: Sender node [%d], Sender Port [%d], " - "type [%d], length [%d], buffer [%p]\n", - snode, sport, type, length, buffer); - CDEBUG(D_INFO, "gmnal_msghdr:: Sender node [%u], magic [%d], " - "gmnal_type [%d]\n", gmnal_msghdr->sender_node_id, - gmnal_msghdr->magic, gmnal_msghdr->type); - CDEBUG(D_INFO, "portals_hdr:: Sender node ["LPD64"], " - "dest_node ["LPD64"]\n", portals_hdr->src_nid, - portals_hdr->dest_nid); - - /* - * Get a receive descriptor for this message - */ - srxd = gmnal_rxbuffer_to_srxd(nal_data, buffer); - CDEBUG(D_INFO, "Back from gmnal_rxbuffer_to_srxd\n"); - if (!srxd) { - CERROR("Failed to get receive descriptor\n"); - /* I think passing a NULL srxd to lib_parse will crash - * gmnal_recv() */ - LBUG(); - lib_parse(nal_data->libnal, portals_hdr, srxd); - return(GMNAL_STATUS_FAIL); - } + gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf); + const int hdr_size = offsetof(gmnal_msg_t, gmm_u); + int buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize : + gmni->gmni_small_msgsize; + int flip; + + /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */ + + /* GM may not overflow our buffer */ + LASSERT (rx->rx_recv_nob <= buffnob); + + /* 6 bytes are enough to have received magic + version */ + if (rx->rx_recv_nob < 6) { + CERROR("Short message from gmid %u: %d\n", + rx->rx_recv_gmid, rx->rx_recv_nob); + return -EPROTO; + } - /* - * no need to bother portals library with this - */ - if (gmnal_type == GMNAL_LARGE_MESSAGE_ACK) { - gmnal_large_tx_ack_received(nal_data, srxd); - return(GMNAL_STATUS_OK); - } + if (msg->gmm_magic == GMNAL_MSG_MAGIC) { + flip = 0; + } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) { + flip = 1; + } else if (msg->gmm_magic == LNET_PROTO_MAGIC || + msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) { + return EPROTO; + } else { + CERROR("Bad magic from gmid %u: %08x\n", + rx->rx_recv_gmid, msg->gmm_magic); + return -EPROTO; + } - srxd->nal_data = nal_data; - srxd->type = gmnal_type; - srxd->nsiov = gmnal_msghdr->niov; - srxd->gm_source_node = gmnal_msghdr->sender_node_id; - - CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n", - buffer+GMNAL_MSGHDR_SIZE); - /* - * control passes to lib, which calls cb_recv - * cb_recv is responsible for returning the buffer - * for future receive - */ - rc = lib_parse(nal_data->libnal, portals_hdr, srxd); - - if (rc != PTL_OK) { - /* I just received garbage; take appropriate action... */ - LBUG(); + if (msg->gmm_version != + (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) { + return EPROTO; } - return(GMNAL_STATUS_OK); -} + if (rx->rx_recv_nob < hdr_size) { + CERROR("Short message from %u: %d\n", + rx->rx_recv_gmid, rx->rx_recv_nob); + return -EPROTO; + } + if (flip) { + /* leave magic unflipped as a clue to peer endianness */ + __swab16s(&msg->gmm_version); + __swab16s(&msg->gmm_type); + __swab64s(&msg->gmm_srcnid); + __swab64s(&msg->gmm_dstnid); + } + + if (msg->gmm_srcnid == LNET_NID_ANY) { + CERROR("Bad src nid from %u: %s\n", + rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid)); + return -EPROTO; + } + if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid, + msg->gmm_dstnid)) { + CERROR("Bad dst nid from %u: %s\n", + rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid)); + return -EPROTO; + } + + switch (msg->gmm_type) { + default: + CERROR("Unknown message type from %u: %x\n", + rx->rx_recv_gmid, msg->gmm_type); + return -EPROTO; + + case GMNAL_MSG_IMMEDIATE: + if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) { + CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", + rx->rx_recv_gmid, rx->rx_recv_nob, + offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])); + return -EPROTO; + } + break; + } + return 0; +} -/* - * After a receive has been processed, - * hang out the receive buffer again. - * This implicitly returns a receive token. - */ -int -gmnal_rx_requeue_buffer(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) +gmnal_tx_t * +gmnal_get_tx(gmnal_ni_t *gmni) { - CDEBUG(D_TRACE, "gmnal_rx_requeue_buffer\n"); - - CDEBUG(D_NET, "requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data); + gmnal_tx_t *tx = NULL; - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0 ); - GMNAL_GM_UNLOCK(nal_data); - - return(GMNAL_STATUS_OK); -} + spin_lock(&gmni->gmni_tx_lock); + if (gmni->gmni_shutdown || + list_empty(&gmni->gmni_idle_txs)) { + spin_unlock(&gmni->gmni_tx_lock); + return NULL; + } + + tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list); + list_del(&tx->tx_list); -/* - * Handle a bad message - * A bad message is one we don't expect or can't interpret - */ -int -gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd) -{ - CDEBUG(D_TRACE, "Can't handle message\n"); - - if (!srxd) - srxd = gmnal_rxbuffer_to_srxd(nal_data, - we->buffer); - if (srxd) { - gmnal_rx_requeue_buffer(nal_data, srxd); - } else { - CERROR("Can't find a descriptor for this buffer\n"); - /* - * get rid of it ? - */ - return(GMNAL_STATUS_FAIL); - } + spin_unlock(&gmni->gmni_tx_lock); - return(GMNAL_STATUS_OK); + LASSERT (tx->tx_lntmsg == NULL); + LASSERT (tx->tx_ltxb == NULL); + LASSERT (!tx->tx_credit); + + return tx; } - - -/* - * Process a small message receive. - * Get here from gmnal_receive_thread, gmnal_pre_receive - * lib_parse, cb_recv - * Put data from prewired receive buffer into users buffer(s) - * Hang out the receive buffer again for another receive - * Call lib_finalize - */ -ptl_err_t -gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie) +void +gmnal_tx_done(gmnal_tx_t *tx, int rc) { - gmnal_srxd_t *srxd = NULL; - gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data; + gmnal_ni_t *gmni = tx->tx_gmni; + int wake_sched = 0; + lnet_msg_t *lnetmsg = tx->tx_lntmsg; + + tx->tx_lntmsg = NULL; + + spin_lock(&gmni->gmni_tx_lock); + + if (tx->tx_ltxb != NULL) { + wake_sched = 1; + list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs); + tx->tx_ltxb = NULL; + } + + if (tx->tx_credit) { + wake_sched = 1; + gmni->gmni_tx_credits++; + tx->tx_credit = 0; + } + + list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs); + if (wake_sched) + gmnal_check_txqueues_locked(gmni); - if (!private) { - CERROR("gmnal_small_rx no context\n"); - lib_finalize(libnal, private, cookie, PTL_FAIL); - return(PTL_FAIL); - } + spin_unlock(&gmni->gmni_tx_lock); - srxd = (gmnal_srxd_t*)private; - - /* - * let portals library know receive is complete - */ - CDEBUG(D_PORTALS, "calling lib_finalize\n"); - lib_finalize(libnal, private, cookie, PTL_OK); - /* - * return buffer so it can be used again - */ - CDEBUG(D_NET, "calling gm_provide_receive_buffer\n"); - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0); - GMNAL_GM_UNLOCK(nal_data); - - return(PTL_OK); + /* Delay finalize until tx is free */ + if (lnetmsg != NULL) + lnet_finalize(gmni->gmni_ni, lnetmsg, rc); } - -/* - * Start a small transmit. - * Use the given send token (and wired transmit buffer). - * Copy headers to wired buffer and initiate gm_send from the wired buffer. - * The callback function informs when the send is complete. - */ -ptl_err_t -gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, - gmnal_stxd_t *stxd, int size) +void +gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, + gm_status_t status) { - gmnal_data_t *nal_data = (gmnal_data_t*)libnal->libnal_data; - void *buffer = NULL; - gmnal_msghdr_t *msghdr = NULL; - int tot_size = 0; - unsigned int local_nid; - gm_status_t gm_status = GM_SUCCESS; - - CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] " - "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] " - "size [%d]\n", libnal, private, cookie, hdr, type, - global_nid, pid, stxd, size); - - CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n", - hdr->dest_nid, hdr->src_nid); - - if (!nal_data) { - CERROR("no nal_data\n"); - return(PTL_FAIL); - } else { - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - } - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, - &local_nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CERROR("Failed to obtain local id\n"); - return(PTL_FAIL); - } - CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid); - - stxd->type = GMNAL_SMALL_MESSAGE; - stxd->cookie = cookie; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then send the message, as the data has previously been copied in - * (HP SFS 1380). - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_SMALL_MESSAGE; - msghdr->sender_node_id = nal_data->gm_global_nid; - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - buffer += sizeof(gmnal_msghdr_t); - - CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer); - gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); + gmnal_tx_t *tx = (gmnal_tx_t*)context; - buffer += sizeof(ptl_hdr_t); + LASSERT(!in_interrupt()); + + CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n", + tx, status, gmnal_gmstatus2str(status), + libcfs_nid2str(tx->tx_nid)); - CDEBUG(D_INFO, "sending\n"); - tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t); - stxd->msg_size = tot_size; - - - CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " - "gmsize [%lu] msize [%d] global_nid ["LPU64"] local_nid[%d] " - "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, - stxd->msg_size, global_nid, local_nid, stxd); - - GMNAL_GM_LOCK(nal_data); - stxd->gm_priority = GM_LOW_PRIORITY; - stxd->gm_target_node = local_nid; - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - GM_LOW_PRIORITY, local_nid, - gmnal_small_tx_callback, (void*)stxd); - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_INFO, "done\n"); - - return(PTL_OK); + gmnal_tx_done(tx, -EIO); } - -/* - * A callback to indicate the small transmit operation is compete - * Check for erros and try to deal with them. - * Call lib_finalise to inform the client application that the send - * is complete and the memory can be reused. - * Return the stxd when finished with it (returns a send token) - */ void -gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) +gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) { - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - lib_msg_t *cookie = stxd->cookie; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; - lib_nal_t *libnal = nal_data->libnal; - unsigned gnid = 0; - gm_status_t gm_status = 0; - - if (!stxd) { - CDEBUG(D_TRACE, "send completion event for unknown stxd\n"); - return; - } - if (status != GM_SUCCESS) { - GMNAL_GM_LOCK(nal_data); - gm_status = gm_node_id_to_global_id(nal_data->gm_port, - stxd->gm_target_node,&gnid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n", - gm_status); - gnid = 0; - } - CERROR("Result of send stxd [%p] is [%s] to [%u]\n", - stxd, gmnal_gm_error(status), gnid); - } + gmnal_tx_t *tx = (gmnal_tx_t*)context; + gmnal_ni_t *gmni = tx->tx_gmni; - switch(status) { - case(GM_SUCCESS): - break; - - - - case(GM_SEND_DROPPED): - /* - * do a resend on the dropped ones - */ - CERROR("send stxd [%p] dropped, resending\n", context); - GMNAL_GM_LOCK(nal_data); - gm_send_to_peer_with_callback(nal_data->gm_port, - stxd->buffer, - stxd->gm_size, - stxd->msg_size, - stxd->gm_priority, - stxd->gm_target_node, - gmnal_small_tx_callback, - context); - GMNAL_GM_UNLOCK(nal_data); - return; - case(GM_TIMED_OUT): - case(GM_SEND_TIMED_OUT): - /* - * drop these ones - */ - CDEBUG(D_INFO, "calling gm_drop_sends\n"); - GMNAL_GM_LOCK(nal_data); - gm_drop_sends(nal_data->gm_port, stxd->gm_priority, - stxd->gm_target_node, GMNAL_GM_PORT_ID, - gmnal_drop_sends_callback, context); - GMNAL_GM_UNLOCK(nal_data); - - return; + LASSERT(!in_interrupt()); - - /* - * abort on these ? - */ - case(GM_TRY_AGAIN): - case(GM_INTERRUPTED): - case(GM_FAILURE): - case(GM_INPUT_BUFFER_TOO_SMALL): - case(GM_OUTPUT_BUFFER_TOO_SMALL): - case(GM_BUSY): - case(GM_MEMORY_FAULT): - case(GM_INVALID_PARAMETER): - case(GM_OUT_OF_MEMORY): - case(GM_INVALID_COMMAND): - case(GM_PERMISSION_DENIED): - case(GM_INTERNAL_ERROR): - case(GM_UNATTACHED): - case(GM_UNSUPPORTED_DEVICE): - case(GM_SEND_REJECTED): - case(GM_SEND_TARGET_PORT_CLOSED): - case(GM_SEND_TARGET_NODE_UNREACHABLE): - case(GM_SEND_PORT_CLOSED): - case(GM_NODE_ID_NOT_YET_SET): - case(GM_STILL_SHUTTING_DOWN): - case(GM_CLONE_BUSY): - case(GM_NO_SUCH_DEVICE): - case(GM_ABORTED): - case(GM_INCOMPATIBLE_LIB_AND_DRIVER): - case(GM_UNTRANSLATED_SYSTEM_ERROR): - case(GM_ACCESS_DENIED): - case(GM_NO_DRIVER_SUPPORT): - case(GM_PTE_REF_CNT_OVERFLOW): - case(GM_NOT_SUPPORTED_IN_KERNEL): - case(GM_NOT_SUPPORTED_ON_ARCH): - case(GM_NO_MATCH): - case(GM_USER_ERROR): - case(GM_DATA_CORRUPTED): - case(GM_HARDWARE_FAULT): - case(GM_SEND_ORPHANED): - case(GM_MINOR_OVERFLOW): - case(GM_PAGE_TABLE_FULL): - case(GM_UC_ERROR): - case(GM_INVALID_PORT_NUMBER): - case(GM_DEV_NOT_FOUND): - case(GM_FIRMWARE_NOT_RUNNING): - case(GM_YP_NO_MATCH): - default: - gm_resume_sending(nal_data->gm_port, stxd->gm_priority, - stxd->gm_target_node, GMNAL_GM_PORT_ID, - gmnal_resume_sending_callback, context); + switch(status) { + case GM_SUCCESS: + gmnal_tx_done(tx, 0); return; - } - - /* - * TO DO - * If this is a large message init, - * we're not finished with the data yet, - * so can't call lib_finalise. - * However, we're also holding on to a - * stxd here (to keep track of the source - * iovec only). Should use another structure - * to keep track of iovec and return stxd to - * free list earlier. - */ - if (stxd->type == GMNAL_LARGE_MESSAGE_INIT) { - CDEBUG(D_INFO, "large transmit done\n"); + case GM_SEND_DROPPED: + CDEBUG(D_NETERROR, "Dropped tx %p to %s\n", + tx, libcfs_nid2str(tx->tx_nid)); + /* Another tx failed and called gm_drop_sends() which made this + * one complete immediately */ + gmnal_tx_done(tx, -EIO); + return; + + default: + /* Some error; NB don't complete tx yet; we need its credit for + * gm_drop_sends() */ + CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n", + tx, status, gmnal_gmstatus2str(status), + libcfs_nid2str(tx->tx_nid)); + + gmnal_notify_peer_down(tx); + + spin_lock(&gmni->gmni_gm_lock); + gm_drop_sends(gmni->gmni_port, + tx->tx_ltxb != NULL ? + GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY, + tx->tx_gmlid, *gmnal_tunables.gm_port, + gmnal_drop_sends_callback, tx); + spin_unlock(&gmni->gmni_gm_lock); return; } - gmnal_return_stxd(nal_data, stxd); - lib_finalize(libnal, stxd, cookie, PTL_OK); - return; -} -/* - * After an error on the port - * call this to allow future sends to complete - */ -void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context, - gm_status_t status) -{ - gmnal_data_t *nal_data; - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context); - gmnal_return_stxd(stxd->nal_data, stxd); - return; + /* not reached */ + LBUG(); } - -void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, - gm_status_t status) +void +gmnal_check_txqueues_locked (gmnal_ni_t *gmni) { - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = stxd->nal_data; - - CDEBUG(D_TRACE, "status is [%d] context is [%p]\n", status, context); - if (status == GM_SUCCESS) { - GMNAL_GM_LOCK(nal_data); - gm_send_to_peer_with_callback(gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - stxd->gm_priority, - stxd->gm_target_node, - gmnal_small_tx_callback, - context); - GMNAL_GM_UNLOCK(nal_data); - } else { - CERROR("send_to_peer status for stxd [%p] is " - "[%d][%s]\n", stxd, status, gmnal_gm_error(status)); - } + gmnal_tx_t *tx; + gmnal_txbuf_t *ltxb; + int gmsize; + int pri; + void *netaddr; + + tx = list_empty(&gmni->gmni_buf_txq) ? NULL : + list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list); + + if (tx != NULL && + (tx->tx_large_nob == 0 || + !list_empty(&gmni->gmni_idle_ltxbs))) { + + /* consume tx */ + list_del(&tx->tx_list); + + LASSERT (tx->tx_ltxb == NULL); + + if (tx->tx_large_nob != 0) { + ltxb = list_entry(gmni->gmni_idle_ltxbs.next, + gmnal_txbuf_t, txb_list); + + /* consume large buffer */ + list_del(<xb->txb_list); + + spin_unlock(&gmni->gmni_tx_lock); + + /* Unlocking here allows sends to get re-ordered, + * but we want to allow other CPUs to progress... */ + + tx->tx_ltxb = ltxb; + + /* marshall message in tx_ltxb... + * 1. Copy what was marshalled so far (in tx_buf) */ + memcpy(GMNAL_NETBUF_MSG(<xb->txb_buf), + GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob); + + /* 2. Copy the payload */ + if (tx->tx_large_iskiov) + lnet_copy_kiov2kiov( + gmni->gmni_large_pages, + ltxb->txb_buf.nb_kiov, + tx->tx_msgnob, + tx->tx_large_niov, + tx->tx_large_frags.kiov, + tx->tx_large_offset, + tx->tx_large_nob); + else + lnet_copy_iov2kiov( + gmni->gmni_large_pages, + ltxb->txb_buf.nb_kiov, + tx->tx_msgnob, + tx->tx_large_niov, + tx->tx_large_frags.iov, + tx->tx_large_offset, + tx->tx_large_nob); + + tx->tx_msgnob += tx->tx_large_nob; + + spin_lock(&gmni->gmni_tx_lock); + } + + list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq); + } + if (!list_empty(&gmni->gmni_cred_txq) && + gmni->gmni_tx_credits != 0) { - return; -} + tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list); + /* consume tx and 1 credit */ + list_del(&tx->tx_list); + gmni->gmni_tx_credits--; -/* - * Begine a large transmit. - * Do a gm_register of the memory pointed to by the iovec - * and send details to the receiver. The receiver does a gm_get - * to pull the data and sends and ack when finished. Upon receipt of - * this ack, deregister the memory. Only 1 send token is required here. - */ -int -gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, size_t offset, int size) -{ + spin_unlock(&gmni->gmni_tx_lock); - gmnal_data_t *nal_data; - gmnal_stxd_t *stxd = NULL; - void *buffer = NULL; - gmnal_msghdr_t *msghdr = NULL; - unsigned int local_nid; - int mlen = 0; /* the size of the init message data */ - struct iovec *iov_dup = NULL; - gm_status_t gm_status; - int niov_dup; - - - CDEBUG(D_TRACE, "gmnal_large_tx libnal [%p] private [%p], cookie [%p] " - "hdr [%p], type [%d] global_nid ["LPU64"], pid [%d], niov [%d], " - "iov [%p], size [%d]\n", libnal, private, cookie, hdr, type, - global_nid, pid, niov, iov, size); - - if (libnal) - nal_data = (gmnal_data_t*)libnal->libnal_data; - else { - CERROR("no libnal.\n"); - return(GMNAL_STATUS_FAIL); - } - - - /* - * Get stxd and buffer. Put local address of data in buffer, - * send local addresses to target, - * wait for the target node to suck the data over. - * The stxd is used to ren - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - - stxd->type = GMNAL_LARGE_MESSAGE_INIT; - stxd->cookie = cookie; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then copy the iov in - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_LARGE_MESSAGE_INIT; - msghdr->sender_node_id = nal_data->gm_global_nid; - msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd; - msghdr->niov = niov ; - buffer += sizeof(gmnal_msghdr_t); - mlen = sizeof(gmnal_msghdr_t); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - - CDEBUG(D_INFO, "processing portals hdr at [%p]\n", buffer); - - gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t)); - buffer += sizeof(ptl_hdr_t); - mlen += sizeof(ptl_hdr_t); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - while (offset >= iov->iov_len) { - offset -= iov->iov_len; - niov--; - iov++; - } - - LASSERT(offset >= 0); - /* - * Store the iovs in the stxd for we can get - * them later if we need them - */ - stxd->iov[0].iov_base = iov->iov_base + offset; - stxd->iov[0].iov_len = iov->iov_len - offset; - CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov); - if (niov > 1) - gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec)); - stxd->niov = niov; - - /* - * copy the iov to the buffer so target knows - * where to get the data from - */ - CDEBUG(D_INFO, "processing iov to [%p]\n", buffer); - gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec)); - mlen += stxd->niov*(sizeof(struct iovec)); - CDEBUG(D_INFO, "mlen is [%d]\n", mlen); - - /* - * register the memory so the NIC can get hold of the data - * This is a slow process. it'd be good to overlap it - * with something else. - */ - iov = stxd->iov; - iov_dup = iov; - niov_dup = niov; - while(niov--) { - CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", - iov->iov_base, iov->iov_len); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_register_memory(nal_data->gm_port, - iov->iov_base, iov->iov_len); - if (gm_status != GM_SUCCESS) { - GMNAL_GM_UNLOCK(nal_data); - CERROR("gm_register_memory returns [%d][%s] " - "for memory [%p] len ["LPSZ"]\n", - gm_status, gmnal_gm_error(gm_status), - iov->iov_base, iov->iov_len); - GMNAL_GM_LOCK(nal_data); - while (iov_dup != iov) { - gm_deregister_memory(nal_data->gm_port, - iov_dup->iov_base, - iov_dup->iov_len); - iov_dup++; - } - GMNAL_GM_UNLOCK(nal_data); - gmnal_return_stxd(nal_data, stxd); - return(PTL_FAIL); - } - - GMNAL_GM_UNLOCK(nal_data); - iov++; - } + /* Unlocking here allows sends to get re-ordered, but we want + * to allow other CPUs to progress... */ - /* - * Send the init message to the target - */ - CDEBUG(D_INFO, "sending mlen [%d]\n", mlen); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, - &local_nid); - if (gm_status != GM_SUCCESS) { - GMNAL_GM_UNLOCK(nal_data); - CERROR("Failed to obtain local id\n"); - gmnal_return_stxd(nal_data, stxd); - /* TO DO deregister memory on failure */ - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_INFO, "Local Node_id is [%d]\n", local_nid); - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, mlen, GM_LOW_PRIORITY, - local_nid, gmnal_large_tx_callback, - (void*)stxd); - GMNAL_GM_UNLOCK(nal_data); + LASSERT(!tx->tx_credit); + tx->tx_credit = 1; - CDEBUG(D_INFO, "done\n"); - - return(PTL_OK); -} + tx->tx_launchtime = jiffies; -/* - * Callback function indicates that send of buffer with - * large message iovec has completed (or failed). - */ -void -gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status) -{ - gmnal_small_tx_callback(gm_port, context, status); + if (tx->tx_msgnob <= gmni->gmni_small_msgsize) { + LASSERT (tx->tx_ltxb == NULL); + netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf); + gmsize = gmni->gmni_small_gmsize; + pri = GMNAL_SMALL_PRIORITY; + } else { + LASSERT (tx->tx_ltxb != NULL); + netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf); + gmsize = gmni->gmni_large_gmsize; + pri = GMNAL_LARGE_PRIORITY; + } -} + spin_lock(&gmni->gmni_gm_lock); + gm_send_to_peer_with_callback(gmni->gmni_port, + netaddr, gmsize, + tx->tx_msgnob, + pri, + tx->tx_gmlid, + gmnal_tx_callback, + (void*)tx); + spin_unlock(&gmni->gmni_gm_lock); + spin_lock(&gmni->gmni_tx_lock); + } +} -/* - * Have received a buffer that contains an iovec of the sender. - * Do a gm_register_memory of the receivers buffer and then do a get - * data from the sender. - */ -int -gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - unsigned int nriov, struct iovec *riov, size_t offset, - size_t mlen, size_t rlen) +void +gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx) { - gmnal_data_t *nal_data = libnal->libnal_data; - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - void *buffer = NULL; - struct iovec *riov_dup; - int nriov_dup; - gmnal_msghdr_t *msghdr = NULL; - gm_status_t gm_status; - - CDEBUG(D_TRACE, "gmnal_large_rx :: libnal[%p], private[%p], " - "cookie[%p], niov[%d], iov[%p], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, nriov, riov, mlen, rlen); - - if (!srxd) { - CERROR("gmnal_large_rx no context\n"); - lib_finalize(libnal, private, cookie, PTL_FAIL); - return(PTL_FAIL); - } - - buffer = srxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - buffer += sizeof(gmnal_msghdr_t); - buffer += sizeof(ptl_hdr_t); - - /* - * Store the senders stxd address in the srxd for this message - * The gmnal_large_message_ack needs it to notify the sender - * the pull of data is complete - */ - srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr; - - /* - * Register the receivers memory - * get the data, - * tell the sender that we got the data - * then tell the receiver we got the data - * TO DO - * If the iovecs match, could interleave - * gm_registers and gm_gets for each element - */ - while (offset >= riov->iov_len) { - offset -= riov->iov_len; - riov++; - nriov--; - } - LASSERT (nriov >= 0); - LASSERT (offset >= 0); - /* - * do this so the final gm_get callback can deregister the memory - */ - PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec))); - - srxd->riov[0].iov_base = riov->iov_base + offset; - srxd->riov[0].iov_len = riov->iov_len - offset; - if (nriov > 1) - gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec))); - srxd->nriov = nriov; - - riov = srxd->riov; - nriov_dup = nriov; - riov_dup = riov; - while(nriov--) { - CDEBUG(D_INFO, "Registering memory [%p] len ["LPSZ"] \n", - riov->iov_base, riov->iov_len); - GMNAL_GM_LOCK(nal_data); - gm_status = gm_register_memory(nal_data->gm_port, - riov->iov_base, riov->iov_len); - if (gm_status != GM_SUCCESS) { - GMNAL_GM_UNLOCK(nal_data); - CERROR("gm_register_memory returns [%d][%s] " - "for memory [%p] len ["LPSZ"]\n", - gm_status, gmnal_gm_error(gm_status), - riov->iov_base, riov->iov_len); - GMNAL_GM_LOCK(nal_data); - while (riov_dup != riov) { - gm_deregister_memory(nal_data->gm_port, - riov_dup->iov_base, - riov_dup->iov_len); - riov_dup++; - } - GMNAL_GM_LOCK(nal_data); - /* - * give back srxd and buffer. Send NACK to sender - */ - PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec))); - return(PTL_FAIL); - } - GMNAL_GM_UNLOCK(nal_data); - riov++; - } - - /* - * now do gm_get to get the data - */ - srxd->cookie = cookie; - if (gmnal_remote_get(srxd, srxd->nsiov, (struct iovec*)buffer, - nriov_dup, riov_dup) != GMNAL_STATUS_OK) { - CERROR("can't get the data"); - } - - CDEBUG(D_INFO, "lgmanl_large_rx done\n"); - - return(PTL_OK); + int gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize : + gmni->gmni_small_gmsize; + int pri = rx->rx_islarge ? GMNAL_LARGE_PRIORITY : + GMNAL_SMALL_PRIORITY; + void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf); + + CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer); + + spin_lock(&gmni->gmni_gm_lock); + gm_provide_receive_buffer_with_tag(gmni->gmni_port, + buffer, gmsize, pri, 0); + spin_unlock(&gmni->gmni_gm_lock); } - -/* - * Perform a number of remote gets as part of receiving - * a large message. - * The final one to complete (i.e. the last callback to get called) - * tidies up. - * gm_get requires a send token. - */ -int -gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov, - int nriov, struct iovec *riov) +void +gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx) { + /* Future protocol version compatibility support! + * The next gmlnd-specific protocol rev will first send a message to + * check version; I reply with a stub message containing my current + * magic+version... */ + gmnal_msg_t *msg; + gmnal_tx_t *tx = gmnal_get_tx(gmni); + + if (tx == NULL) { + CERROR("Can't allocate tx to send version info to %u\n", + rx->rx_recv_gmid); + return; + } - int ncalls = 0; + LASSERT (tx->tx_lntmsg == NULL); /* no finalize */ - CDEBUG(D_TRACE, "gmnal_remote_get srxd[%p], nriov[%d], riov[%p], " - "nsiov[%d], siov[%p]\n", srxd, nriov, riov, nsiov, siov); + tx->tx_nid = LNET_NID_ANY; + tx->tx_gmlid = rx->rx_recv_gmid; + msg = GMNAL_NETBUF_MSG(&tx->tx_buf); + msg->gmm_magic = GMNAL_MSG_MAGIC; + msg->gmm_version = GMNAL_MSG_VERSION; - ncalls = gmnal_copyiov(0, srxd, nsiov, siov, nriov, riov); - if (ncalls < 0) { - CERROR("there's something wrong with the iovecs\n"); - return(GMNAL_STATUS_FAIL); - } - CDEBUG(D_INFO, "gmnal_remote_get ncalls [%d]\n", ncalls); - spin_lock_init(&srxd->callback_lock); - srxd->ncallbacks = ncalls; - srxd->callback_status = 0; - - ncalls = gmnal_copyiov(1, srxd, nsiov, siov, nriov, riov); - if (ncalls < 0) { - CERROR("there's something wrong with the iovecs\n"); - return(GMNAL_STATUS_FAIL); - } + /* just send magic + version */ + tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type); + tx->tx_large_nob = 0; - return(GMNAL_STATUS_OK); + spin_lock(&gmni->gmni_tx_lock); -} + list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq); + gmnal_check_txqueues_locked(gmni); + spin_unlock(&gmni->gmni_tx_lock); +} -/* - * pull data from source node (source iovec) to a local iovec. - * The iovecs may not match which adds the complications below. - * Count the number of gm_gets that will be required so the callbacks - * can determine who is the last one. - */ int -gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov, - struct iovec *siov, int nriov, struct iovec *riov) +gmnal_rx_thread(void *arg) { + gmnal_ni_t *gmni = arg; + gm_recv_event_t *rxevent = NULL; + gm_recv_t *recv = NULL; + gmnal_rx_t *rx; + int rc; - int ncalls = 0; - int slen = siov->iov_len, rlen = riov->iov_len; - char *sbuf = siov->iov_base, *rbuf = riov->iov_base; - unsigned long sbuf_long; - gm_remote_ptr_t remote_ptr = 0; - unsigned int source_node; - gmnal_ltxd_t *ltxd = NULL; - gmnal_data_t *nal_data = srxd->nal_data; - - CDEBUG(D_TRACE, "copy[%d] nal_data[%p]\n", do_copy, nal_data); - if (do_copy) { - if (!nal_data) { - CERROR("Bad args No nal_data\n"); - return(GMNAL_STATUS_FAIL); - } - GMNAL_GM_LOCK(nal_data); - if (gm_global_id_to_node_id(nal_data->gm_port, - srxd->gm_source_node, - &source_node) != GM_SUCCESS) { - - CERROR("cannot resolve global_id [%u] " - "to local node_id\n", srxd->gm_source_node); - GMNAL_GM_UNLOCK(nal_data); - return(GMNAL_STATUS_FAIL); - } - GMNAL_GM_UNLOCK(nal_data); - /* - * We need a send token to use gm_get - * getting an stxd gets us a send token. - * the stxd is used as the context to the - * callback function (so stxd can be returned). - * Set pointer in stxd to srxd so callback count in srxd - * can be decremented to find last callback to complete - */ - CDEBUG(D_INFO, "gmnal_copyiov source node is G[%u]L[%d]\n", - srxd->gm_source_node, source_node); + cfs_daemonize("gmnal_rxd"); + + down(&gmni->gmni_rx_mutex); + + while (!gmni->gmni_shutdown) { + + spin_lock(&gmni->gmni_gm_lock); + rxevent = gm_blocking_receive_no_spin(gmni->gmni_port); + spin_unlock(&gmni->gmni_gm_lock); + + switch (GM_RECV_EVENT_TYPE(rxevent)) { + default: + gm_unknown(gmni->gmni_port, rxevent); + continue; + + case GM_FAST_RECV_EVENT: + case GM_FAST_PEER_RECV_EVENT: + case GM_PEER_RECV_EVENT: + case GM_FAST_HIGH_RECV_EVENT: + case GM_FAST_HIGH_PEER_RECV_EVENT: + case GM_HIGH_PEER_RECV_EVENT: + case GM_RECV_EVENT: + case GM_HIGH_RECV_EVENT: + break; + } + + recv = &rxevent->recv; + rx = gm_hash_find(gmni->gmni_rx_hash, + gm_ntohp(recv->buffer)); + LASSERT (rx != NULL); + + rx->rx_recv_nob = gm_ntoh_u32(recv->length); + rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id); + rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id); + rx->rx_recv_type = gm_ntoh_u8(recv->type); + + switch (GM_RECV_EVENT_TYPE(rxevent)) { + case GM_FAST_RECV_EVENT: + case GM_FAST_PEER_RECV_EVENT: + case GM_FAST_HIGH_RECV_EVENT: + case GM_FAST_HIGH_PEER_RECV_EVENT: + LASSERT (rx->rx_recv_nob <= PAGE_SIZE); + + memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf), + gm_ntohp(recv->message), rx->rx_recv_nob); + break; + } + + up(&gmni->gmni_rx_mutex); + + CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx, + GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf), + gm_ntohp(recv->buffer), rx->rx_recv_nob); + + /* We're connectionless: simply drop packets with + * errors */ + rc = gmnal_unpack_msg(gmni, rx); + + if (rc == 0) { + gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf); + + LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE); + rc = lnet_parse(gmni->gmni_ni, + &msg->gmm_u.immediate.gmim_hdr, + msg->gmm_srcnid, + rx, 0); + } else if (rc > 0) { + gmnal_version_reply(gmni, rx); + rc = -EPROTO; /* repost rx */ + } + + if (rc < 0) /* parse failure */ + gmnal_post_rx(gmni, rx); + + down(&gmni->gmni_rx_mutex); } - do { - CDEBUG(D_INFO, "sbuf[%p] slen[%d] rbuf[%p], rlen[%d]\n", - sbuf, slen, rbuf, rlen); - if (slen > rlen) { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "slen>rlen\n"); - ltxd = gmnal_get_ltxd(nal_data); - ltxd->srxd = srxd; - GMNAL_GM_LOCK(nal_data); - /* - * funny business to get rid - * of compiler warning - */ - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - rlen, GM_LOW_PRIORITY, source_node, - GMNAL_GM_PORT_ID, - gmnal_remote_get_callback, ltxd); - GMNAL_GM_UNLOCK(nal_data); - } - /* - * at the end of 1 iov element - */ - sbuf+=rlen; - slen-=rlen; - riov++; - nriov--; - rbuf = riov->iov_base; - rlen = riov->iov_len; - } else if (rlen > slen) { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "slensrxd = srxd; - GMNAL_GM_LOCK(nal_data); - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - slen, GM_LOW_PRIORITY, source_node, - GMNAL_GM_PORT_ID, - gmnal_remote_get_callback, ltxd); - GMNAL_GM_UNLOCK(nal_data); - } - /* - * at end of siov element - */ - rbuf+=slen; - rlen-=slen; - siov++; - sbuf = siov->iov_base; - slen = siov->iov_len; - } else { - ncalls++; - if (do_copy) { - CDEBUG(D_INFO, "rlen=slen\n"); - ltxd = gmnal_get_ltxd(nal_data); - ltxd->srxd = srxd; - GMNAL_GM_LOCK(nal_data); - sbuf_long = (unsigned long) sbuf; - remote_ptr = (gm_remote_ptr_t)sbuf_long; - gm_get(nal_data->gm_port, remote_ptr, rbuf, - rlen, GM_LOW_PRIORITY, source_node, - GMNAL_GM_PORT_ID, - gmnal_remote_get_callback, ltxd); - GMNAL_GM_UNLOCK(nal_data); - } - /* - * at end of siov and riov element - */ - siov++; - sbuf = siov->iov_base; - slen = siov->iov_len; - riov++; - nriov--; - rbuf = riov->iov_base; - rlen = riov->iov_len; - } - - } while (nriov); - return(ncalls); -} + up(&gmni->gmni_rx_mutex); + CDEBUG(D_NET, "exiting\n"); + atomic_dec(&gmni->gmni_nthreads); + return 0; +} -/* - * The callback function that is invoked after each gm_get call completes. - * Multiple callbacks may be invoked for 1 transaction, only the final - * callback has work to do. - */ void -gmnal_remote_get_callback(gm_port_t *gm_port, void *context, - gm_status_t status) +gmnal_stop_threads(gmnal_ni_t *gmni) { - - gmnal_ltxd_t *ltxd = (gmnal_ltxd_t*)context; - gmnal_srxd_t *srxd = ltxd->srxd; - lib_nal_t *libnal = srxd->nal_data->libnal; - int lastone; - struct iovec *riov; - int nriov; - gmnal_data_t *nal_data; - - CDEBUG(D_TRACE, "called for context [%p]\n", context); - - if (status != GM_SUCCESS) { - CERROR("reports error [%d/%s]\n",status,gmnal_gm_error(status)); - } - - spin_lock(&srxd->callback_lock); - srxd->ncallbacks--; - srxd->callback_status |= status; - lastone = srxd->ncallbacks?0:1; - spin_unlock(&srxd->callback_lock); - nal_data = srxd->nal_data; - - /* - * everyone returns a send token - */ - gmnal_return_ltxd(nal_data, ltxd); - - if (!lastone) { - CDEBUG(D_ERROR, "NOT final callback context[%p]\n", srxd); - return; - } - - /* - * Let our client application proceed - */ - CERROR("final callback context[%p]\n", srxd); - lib_finalize(libnal, srxd, srxd->cookie, PTL_OK); - - /* - * send an ack to the sender to let him know we got the data - */ - gmnal_large_tx_ack(nal_data, srxd); - - /* - * Unregister the memory that was used - * This is a very slow business (slower then register) - */ - nriov = srxd->nriov; - riov = srxd->riov; - GMNAL_GM_LOCK(nal_data); - while (nriov--) { - CERROR("deregister memory [%p]\n", riov->iov_base); - if (gm_deregister_memory(srxd->nal_data->gm_port, - riov->iov_base, riov->iov_len)) { - CERROR("failed to deregister memory [%p]\n", - riov->iov_base); - } - riov++; + int count = 2; + + gmni->gmni_shutdown = 1; + mb(); + + /* wake rxthread owning gmni_rx_mutex with an alarm. */ + spin_lock(&gmni->gmni_gm_lock); + gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL); + spin_unlock(&gmni->gmni_gm_lock); + + while (atomic_read(&gmni->gmni_nthreads) != 0) { + count++; + if ((count & (count - 1)) == 0) + CWARN("Waiting for %d threads to stop\n", + atomic_read(&gmni->gmni_nthreads)); + gmnal_yield(1); } - GMNAL_GM_UNLOCK(nal_data); - PORTAL_FREE(srxd->riov, sizeof(struct iovec)*nriov); - - /* - * repost the receive buffer (return receive token) - */ - GMNAL_GM_LOCK(nal_data); - gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, - srxd->gmsize, GM_LOW_PRIORITY, 0); - GMNAL_GM_UNLOCK(nal_data); - - return; } - -/* - * Called on target node. - * After pulling data from a source node - * send an ack message to indicate the large transmit is complete. - */ -void -gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) +int +gmnal_start_threads(gmnal_ni_t *gmni) { + int i; + int pid; - gmnal_stxd_t *stxd; - gmnal_msghdr_t *msghdr; - void *buffer = NULL; - unsigned int local_nid; - gm_status_t gm_status = GM_SUCCESS; - - CDEBUG(D_TRACE, "srxd[%p] target_node [%u]\n", srxd, - srxd->gm_source_node); - - GMNAL_GM_LOCK(nal_data); - gm_status = gm_global_id_to_node_id(nal_data->gm_port, - srxd->gm_source_node, &local_nid); - GMNAL_GM_UNLOCK(nal_data); - if (gm_status != GM_SUCCESS) { - CERROR("Failed to obtain local id\n"); - return; - } - CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid); - - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_TRACE, "gmnal_large_tx_ack got stxd[%p]\n", stxd); - - stxd->nal_data = nal_data; - stxd->type = GMNAL_LARGE_MESSAGE_ACK; - - /* - * Copy gmnal_msg_hdr and portals header to the transmit buffer - * Then copy the data in - */ - buffer = stxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - - /* - * Add in the address of the original stxd from the sender node - * so it knows which thread to notify. - */ - msghdr->magic = GMNAL_MAGIC; - msghdr->type = GMNAL_LARGE_MESSAGE_ACK; - msghdr->sender_node_id = nal_data->gm_global_nid; - msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd; - CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer); - - CDEBUG(D_INFO, "sending\n"); - stxd->msg_size= sizeof(gmnal_msghdr_t); - - - CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] " - "gmsize [%lu] msize [%d] global_nid [%u] local_nid[%d] " - "stxd [%p]\n", nal_data->gm_port, stxd->buffer, stxd->gm_size, - stxd->msg_size, srxd->gm_source_node, local_nid, stxd); - GMNAL_GM_LOCK(nal_data); - stxd->gm_priority = GM_LOW_PRIORITY; - stxd->gm_target_node = local_nid; - gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, - stxd->gm_size, stxd->msg_size, - GM_LOW_PRIORITY, local_nid, - gmnal_large_tx_ack_callback, - (void*)stxd); - - GMNAL_GM_UNLOCK(nal_data); - CDEBUG(D_INFO, "gmnal_large_tx_ack :: done\n"); - - return; -} + LASSERT (!gmni->gmni_shutdown); + LASSERT (atomic_read(&gmni->gmni_nthreads) == 0); + gm_initialize_alarm(&gmni->gmni_alarm); -/* - * A callback to indicate the small transmit operation is compete - * Check for errors and try to deal with them. - * Call lib_finalise to inform the client application that the - * send is complete and the memory can be reused. - * Return the stxd when finished with it (returns a send token) - */ -void -gmnal_large_tx_ack_callback(gm_port_t *gm_port, void *context, - gm_status_t status) -{ - gmnal_stxd_t *stxd = (gmnal_stxd_t*)context; - gmnal_data_t *nal_data = (gmnal_data_t*)stxd->nal_data; + for (i = 0; i < num_online_cpus(); i++) { - if (!stxd) { - CERROR("send completion event for unknown stxd\n"); - return; - } - CDEBUG(D_TRACE, "send completion event for stxd [%p] status is [%d]\n", - stxd, status); - gmnal_return_stxd(stxd->nal_data, stxd); - - GMNAL_GM_UNLOCK(nal_data); - return; -} + pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0); + if (pid < 0) { + CERROR("rx thread failed to start: %d\n", pid); + gmnal_stop_threads(gmni); + return pid; + } -/* - * Indicates the large transmit operation is compete. - * Called on transmit side (means data has been pulled by receiver - * or failed). - * Call lib_finalise to inform the client application that the send - * is complete, deregister the memory and return the stxd. - * Finally, report the rx buffer that the ack message was delivered in. - */ -void -gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd) -{ - lib_nal_t *libnal = nal_data->libnal; - gmnal_stxd_t *stxd = NULL; - gmnal_msghdr_t *msghdr = NULL; - void *buffer = NULL; - struct iovec *iov; - - - CDEBUG(D_TRACE, "gmnal_large_tx_ack_received buffer [%p]\n", buffer); - - buffer = srxd->buffer; - msghdr = (gmnal_msghdr_t*)buffer; - stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr; - - CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd); - - lib_finalize(libnal, stxd, stxd->cookie, PTL_OK); - - /* - * extract the iovec from the stxd, deregister the memory. - * free the space used to store the iovec - */ - iov = stxd->iov; - while(stxd->niov--) { - CDEBUG(D_INFO, "deregister memory [%p] size ["LPSZ"]\n", - iov->iov_base, iov->iov_len); - GMNAL_GM_LOCK(nal_data); - gm_deregister_memory(nal_data->gm_port, iov->iov_base, - iov->iov_len); - GMNAL_GM_UNLOCK(nal_data); - iov++; + atomic_inc(&gmni->gmni_nthreads); } - /* - * return the send token - * TO DO It is bad to hold onto the send token so long? - */ - gmnal_return_stxd(nal_data, stxd); - - - /* - * requeue the receive buffer - */ - gmnal_rx_requeue_buffer(nal_data, srxd); - - - return; + return 0; }