/* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
* vim:expandtab:shiftwidth=8:tabstop=8:
*
- * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ * GPL HEADER START
*
- * This file is part of Lustre, http://www.lustre.org/
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
*
- * Lustre is free software; you can redistribute it and/or
- * modify it under the terms of version 2 of the GNU General Public
- * License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
*
- * Lustre is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
*
- * You should have received a copy of the GNU General Public License
- * along with Lustre; if not, write to the Free Software
- * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
*/
-
/*
* This file implements the nal cb functions
*/
-#include "gmnal.h"
-
-ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- unsigned int niov, struct iovec *iov, size_t offset,
- size_t mlen, size_t rlen)
-{
- void *buffer = NULL;
- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
-
- CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
- "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, niov, iov, offset, mlen, rlen);
-
- switch(srxd->type) {
- case(GMNAL_SMALL_MESSAGE):
- CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
- /* HP SFS 1380: Proactively change receives to avoid a receive
- * side occurrence of filling pkmap_count[].
- */
- buffer = srxd->buffer;
- buffer += sizeof(gmnal_msghdr_t);
- buffer += sizeof(ptl_hdr_t);
-
- while(niov--) {
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else if (offset > 0) {
- CDEBUG(D_INFO, "processing [%p] base [%p] "
- "len %d, offset %d, len ["LPSZ"]\n", iov,
- iov->iov_base + offset, iov->iov_len,
- offset, iov->iov_len - offset);
- gm_bcopy(buffer, iov->iov_base + offset,
- iov->iov_len - offset);
- buffer += iov->iov_len - offset;
- offset = 0;
- } else {
- CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n",
- iov, iov->iov_len);
- gm_bcopy(buffer, iov->iov_base, iov->iov_len);
- buffer += iov->iov_len;
- }
- iov++;
- }
- status = gmnal_small_rx(libnal, private, cookie);
- break;
- case(GMNAL_LARGE_MESSAGE_INIT):
- CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
- status = gmnal_large_rx(libnal, private, cookie, niov,
- iov, offset, mlen, rlen);
- }
-
- CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
- return(status);
-}
+#include "gmlnd.h"
-ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *cookie, unsigned int kniov,
- ptl_kiov_t *kiov, size_t offset, size_t mlen,
- size_t rlen)
+int
+gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+ int delayed, unsigned int niov,
+ struct iovec *iov, lnet_kiov_t *kiov,
+ unsigned int offset, unsigned int mlen, unsigned int rlen)
{
- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
- char *ptr = NULL;
- void *buffer = NULL;
-
-
- CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
- "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
-
- if (srxd->type == GMNAL_SMALL_MESSAGE) {
- buffer = srxd->buffer;
- buffer += sizeof(gmnal_msghdr_t);
- buffer += sizeof(ptl_hdr_t);
-
- /*
- * map each page and create an iovec for it
- */
- while (kniov--) {
- /* HP SFS 1380: Proactively change receives to avoid a
- * receive side occurrence of filling pkmap_count[].
- */
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
- kniov, kiov);
-
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- CDEBUG(D_INFO, "kniov page [%p] len [%d] "
- "offset[%d]\n", kiov->kiov_page,
- kiov->kiov_len, kiov->kiov_offset);
- CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- if (offset > 0) {
- CDEBUG(D_INFO, "processing [%p] base "
- "[%p] len %d, offset %d, len ["
- LPSZ"]\n", ptr, ptr + offset,
- kiov->kiov_len, offset,
- kiov->kiov_len - offset);
- gm_bcopy(buffer, ptr + offset,
- kiov->kiov_len - offset);
- buffer += kiov->kiov_len - offset;
- offset = 0;
- } else {
- CDEBUG(D_INFO, "processing [%p] len ["
- LPSZ"]\n", ptr, kiov->kiov_len);
- gm_bcopy(buffer, ptr, kiov->kiov_len);
- buffer += kiov->kiov_len;
- }
- kunmap(kiov->kiov_page);
- CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
- }
- kiov++;
- }
- CDEBUG(D_INFO, "calling gmnal_small_rx\n");
- status = gmnal_small_rx(libnal, private, cookie);
- }
-
- CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
- return(status);
-}
-
-
-ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t offset,
- size_t len)
-{
-
- gmnal_data_t *nal_data;
- void *buffer = NULL;
- gmnal_stxd_t *stxd = NULL;
-
-
- CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
- "] nid["LPU64"]\n", niov, offset, len, nid);
- nal_data = libnal->libnal_data;
- if (!nal_data) {
- CDEBUG(D_ERROR, "no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
-
- if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
- CDEBUG(D_INFO, "This is a small message send\n");
- /*
- * HP SFS 1380: With the change to gmnal_small_tx, need to get
- * the stxd and do relevant setup here
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
- /* Set the offset of the data to copy into the buffer */
- buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t);
- while(niov--) {
- if (offset >= iov->iov_len) {
- offset -= iov->iov_len;
- } else if (offset > 0) {
- CDEBUG(D_INFO, "processing iov [%p] base [%p] "
- "len ["LPSZ"] to [%p]\n",
- iov, iov->iov_base + offset,
- iov->iov_len - offset, buffer);
- gm_bcopy(iov->iov_base + offset, buffer,
- iov->iov_len - offset);
- buffer+= iov->iov_len - offset;
- offset = 0;
- } else {
- CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ
- "] to [%p]\n", iov, iov->iov_len,buffer);
- gm_bcopy(iov->iov_base, buffer, iov->iov_len);
- buffer+= iov->iov_len;
- }
- iov++;
- }
- gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
- stxd, len);
- } else {
- CDEBUG(D_ERROR, "Large message send is not supported\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
- niov, iov, offset, len);
- }
- return(PTL_OK);
-}
-
-ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
- lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
- ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
- ptl_kiov_t *kiov, size_t offset, size_t len)
-{
-
- gmnal_data_t *nal_data;
- char *ptr;
- void *buffer = NULL;
- gmnal_stxd_t *stxd = NULL;
- ptl_err_t status = PTL_OK;
-
- CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
- LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
- nal_data = libnal->libnal_data;
- if (!nal_data) {
- CDEBUG(D_ERROR, "no nal_data\n");
- return(PTL_FAIL);
- } else {
- CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
- }
-
- /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
- * more aggressively. This is the fix for a livelock situation under
- * load on ia32 that occurs when there are no more available entries in
- * the pkmap_count array. Just fill the buffer and let gmnal_small_tx
- * put the headers in after we pass it the stxd pointer.
- */
- stxd = gmnal_get_stxd(nal_data, 1);
- CDEBUG(D_INFO, "stxd [%p]\n", stxd);
- /* Set the offset of the data to copy into the buffer */
- buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
-
- if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
- CDEBUG(D_INFO, "This is a small message send\n");
-
- while(kniov--) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
- if (offset >= kiov->kiov_len) {
- offset -= kiov->kiov_len;
- } else {
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- ptr = ((char *)kmap(kiov->kiov_page)) +
- kiov->kiov_offset;
-
- if (offset > 0) {
- CDEBUG(D_INFO, "processing [%p] base "
- "[%p] len ["LPSZ"] to [%p]\n",
- ptr, ptr + offset,
- kiov->kiov_len - offset, buffer);
- gm_bcopy(ptr + offset, buffer,
- kiov->kiov_len - offset);
- buffer+= kiov->kiov_len - offset;
- offset = 0;
- } else {
- CDEBUG(D_INFO, "processing kmapped [%p]"
- " len ["LPSZ"] to [%p]\n",
- ptr, kiov->kiov_len, buffer);
- gm_bcopy(ptr, buffer, kiov->kiov_len);
-
- buffer += kiov->kiov_len;
- }
- kunmap(kiov->kiov_page);
- }
- kiov++;
- }
- status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
- pid, stxd, len);
- } else {
- int i = 0;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
- ptl_kiov_t *kiov_dup = kiov;
-
- PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
- iovec_dup = iovec;
- CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
- PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
- return(PTL_FAIL);
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- iovec->iov_base = kmap(kiov->kiov_page)
- + kiov->kiov_offset;
- iovec->iov_len = kiov->kiov_len;
- iovec++;
- kiov++;
- }
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
- pid, kniov, iovec, offset, len);
- for (i=0; i<kniov; i++) {
- kunmap(kiov_dup->kiov_page);
- kiov_dup++;
- }
- PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
- }
- return(status);
+ gmnal_ni_t *gmni = ni->ni_data;
+ gmnal_rx_t *rx = (gmnal_rx_t*)private;
+ gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+ int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
+ int payload_offset = offsetof(gmnal_msg_t,
+ gmm_u.immediate.gmim_payload[0]);
+ int nob = payload_offset + mlen;
+
+ LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+ LASSERT (iov == NULL || kiov == NULL);
+
+ if (rx->rx_recv_nob < nob) {
+ CERROR("Short message from nid %s: got %d, need %d\n",
+ libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
+ gmnal_post_rx(gmni, rx);
+ return -EIO;
+ }
+
+ if (kiov != NULL)
+ lnet_copy_kiov2kiov(niov, kiov, offset,
+ npages, rx->rx_buf.nb_kiov, payload_offset,
+ mlen);
+ else
+ lnet_copy_kiov2iov(niov, iov, offset,
+ npages, rx->rx_buf.nb_kiov, payload_offset,
+ mlen);
+
+ lnet_finalize(ni, lntmsg, 0);
+ gmnal_post_rx(gmni, rx);
+ return 0;
}
-int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
+int
+gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
{
- CDEBUG(D_TRACE, "gmnal_cb_dist\n");
- if (dist)
- *dist = 27;
- return(PTL_OK);
+ lnet_hdr_t *hdr= &lntmsg->msg_hdr;
+ int type = lntmsg->msg_type;
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int len = lntmsg->msg_len;
+ gmnal_ni_t *gmni = ni->ni_data;
+ gm_status_t gmrc;
+ gmnal_tx_t *tx;
+
+ LASSERT (iov == NULL || kiov == NULL);
+
+ /* I may not block for a tx if I'm responding to an incoming message */
+ tx = gmnal_get_tx(gmni);
+ if (tx == NULL) {
+ if (!gmni->gmni_shutdown)
+ CERROR ("Can't get tx for msg type %d for %s\n",
+ type, libcfs_nid2str(target.nid));
+ return -EIO;
+ }
+
+ tx->tx_nid = target.nid;
+
+ gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
+ &tx->tx_gmlid);
+ if (gmrc != GM_SUCCESS) {
+ CERROR("Can't map Nid %s to a GM local ID: %d\n",
+ libcfs_nid2str(target.nid), gmrc);
+ /* NB tx_lntmsg not set => doesn't finalize */
+ gmnal_tx_done(tx, -EIO);
+ return -EIO;
+ }
+
+ gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf),
+ target.nid, GMNAL_MSG_IMMEDIATE);
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
+ tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
+
+ if (the_lnet.ln_testprotocompat != 0) {
+ /* single-shot proto test */
+ LNET_LOCK();
+ if ((the_lnet.ln_testprotocompat & 1) != 0) {
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
+ the_lnet.ln_testprotocompat &= ~1;
+ }
+ if ((the_lnet.ln_testprotocompat & 2) != 0) {
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
+ LNET_PROTO_MAGIC;
+ the_lnet.ln_testprotocompat &= ~2;
+ }
+ LNET_UNLOCK();
+ }
+
+ if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
+ /* whole message fits in tx_buf */
+ char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
+
+ if (iov != NULL)
+ lnet_copy_iov2flat(len, buffer, 0,
+ niov, iov, offset, len);
+ else
+ lnet_copy_kiov2flat(len, buffer, 0,
+ niov, kiov, offset, len);
+
+ tx->tx_msgnob += len;
+ tx->tx_large_nob = 0;
+ } else {
+ /* stash payload pts to copy later */
+ tx->tx_large_nob = len;
+ tx->tx_large_iskiov = (kiov != NULL);
+ tx->tx_large_niov = niov;
+ if (tx->tx_large_iskiov)
+ tx->tx_large_frags.kiov = kiov;
+ else
+ tx->tx_large_frags.iov = iov;
+ }
+
+ LASSERT(tx->tx_lntmsg == NULL);
+ tx->tx_lntmsg = lntmsg;
+
+ cfs_spin_lock(&gmni->gmni_tx_lock);
+
+ cfs_list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+ gmnal_check_txqueues_locked(gmni);
+
+ cfs_spin_unlock(&gmni->gmni_tx_lock);
+
+ return 0;
}