Whamcloud - gitweb
Mass conversion of all copyright messages to Oracle.
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
index 6394c37..8addb11 100644 (file)
 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
  * vim:expandtab:shiftwidth=8:tabstop=8:
  *
- *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ * GPL HEADER START
  *
- *   This file is part of Lustre, http://www.lustre.org/
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
  *
- *   Lustre is free software; you can redistribute it and/or
- *   modify it under the terms of version 2 of the GNU General Public
- *   License as published by the Free Software Foundation.
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
  *
- *   Lustre is distributed in the hope that it will be useful,
- *   but WITHOUT ANY WARRANTY; without even the implied warranty of
- *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
- *   GNU General Public License for more details.
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
  *
- *   You should have received a copy of the GNU General Public License
- *   along with Lustre; if not, write to the Free Software
- *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
+ *
+ * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
+ * CA 95054 USA or visit www.sun.com if you need additional information or
+ * have any questions.
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
+ * Use is subject to license terms.
+ *
+ * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ * Lustre is a trademark of Sun Microsystems, Inc.
  */
-
 
 /*
  *     This file implements the nal cb functions
  */
 
 
-#include "gmnal.h"
-
-ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-                  unsigned int niov, struct iovec *iov, size_t offset,
-                  size_t mlen, size_t rlen)
-{
-        void            *buffer = NULL;
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
-              "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, niov, iov, offset, mlen, rlen);
-
-       switch(srxd->type) {
-       case(GMNAL_SMALL_MESSAGE):
-               CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
-               /* HP SFS 1380: Proactively change receives to avoid a receive
-                *  side occurrence of filling pkmap_count[].
-                */
-               buffer = srxd->buffer;
-               buffer += sizeof(gmnal_msghdr_t);
-               buffer += sizeof(ptl_hdr_t);
-
-               while(niov--) {
-                       if (offset >= iov->iov_len) {
-                               offset -= iov->iov_len;
-                       } else if (offset > 0) {
-                               CDEBUG(D_INFO, "processing [%p] base [%p] "
-                                       "len %d, offset %d, len ["LPSZ"]\n", iov,
-                                       iov->iov_base + offset, iov->iov_len,
-                                       offset, iov->iov_len - offset);
-                               gm_bcopy(buffer, iov->iov_base + offset,
-                                        iov->iov_len - offset);
-                               buffer += iov->iov_len - offset;
-                               offset = 0;
-                       } else {
-                               CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n",
-                                       iov, iov->iov_len);
-                               gm_bcopy(buffer, iov->iov_base, iov->iov_len);
-                               buffer += iov->iov_len;
-                       }
-                       iov++;
-               }
-               status = gmnal_small_rx(libnal, private, cookie);
-       break;
-       case(GMNAL_LARGE_MESSAGE_INIT):
-               CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
-               status = gmnal_large_rx(libnal, private, cookie, niov, 
-                                        iov, offset, mlen, rlen);
-       }
-
-       CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
-       return(status);
-}
+#include "gmlnd.h"
 
-ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, unsigned int kniov,
-                              ptl_kiov_t *kiov, size_t offset, size_t mlen,
-                              size_t rlen)
+int
+gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+           int delayed, unsigned int niov, 
+           struct iovec *iov, lnet_kiov_t *kiov,
+           unsigned int offset, unsigned int mlen, unsigned int rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-       char            *ptr = NULL;
-       void            *buffer = NULL;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
-              "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
-
-       if (srxd->type == GMNAL_SMALL_MESSAGE) {
-               buffer = srxd->buffer;
-               buffer += sizeof(gmnal_msghdr_t);
-               buffer += sizeof(ptl_hdr_t);
-
-               /*
-                *      map each page and create an iovec for it
-                */
-               while (kniov--) {
-                       /* HP SFS 1380: Proactively change receives to avoid a
-                        *  receive side occurrence of filling pkmap_count[].
-                        */
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
-                               kniov, kiov);
-
-                       if (offset >= kiov->kiov_len) {
-                               offset -= kiov->kiov_len;
-                       } else {
-                               CDEBUG(D_INFO, "kniov page [%p] len [%d] "
-                                       "offset[%d]\n", kiov->kiov_page,
-                                       kiov->kiov_len, kiov->kiov_offset);
-                               CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
-                               ptr = ((char *)kmap(kiov->kiov_page)) +
-                                        kiov->kiov_offset;
-
-                               if (offset > 0) {
-                                       CDEBUG(D_INFO, "processing [%p] base "
-                                               "[%p] len %d, offset %d, len ["
-                                               LPSZ"]\n", ptr, ptr + offset,
-                                               kiov->kiov_len, offset,
-                                              kiov->kiov_len - offset);
-                                       gm_bcopy(buffer, ptr + offset,
-                                                 kiov->kiov_len - offset);
-                                       buffer += kiov->kiov_len - offset;
-                                       offset = 0;
-                               } else {
-                                       CDEBUG(D_INFO, "processing [%p] len ["
-                                               LPSZ"]\n", ptr, kiov->kiov_len);
-                                       gm_bcopy(buffer, ptr, kiov->kiov_len);
-                                       buffer += kiov->kiov_len;
-                               }
-                               kunmap(kiov->kiov_page);
-                               CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
-                        }
-                        kiov++;
-               }
-               CDEBUG(D_INFO, "calling gmnal_small_rx\n");
-               status = gmnal_small_rx(libnal, private, cookie);
-       }
-
-       CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
-       return(status);
-}
-
-
-ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                        unsigned int niov, struct iovec *iov, size_t offset,
-                        size_t len)
-{
-
-       gmnal_data_t    *nal_data;
-       void            *buffer = NULL;
-       gmnal_stxd_t    *stxd = NULL;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
-               "] nid["LPU64"]\n", niov, offset, len, nid);
-       nal_data = libnal->libnal_data;
-       if (!nal_data) {
-               CDEBUG(D_ERROR, "no nal_data\n");
-               return(PTL_FAIL);
-       } else {
-               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
-       }
-
-       if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
-               CDEBUG(D_INFO, "This is a small message send\n");
-               /*
-                * HP SFS 1380: With the change to gmnal_small_tx, need to get
-                * the stxd and do relevant setup here
-                */
-               stxd = gmnal_get_stxd(nal_data, 1);
-               CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-               /* Set the offset of the data to copy into the buffer */
-               buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t);
-               while(niov--) {
-                       if (offset >= iov->iov_len) {
-                               offset -= iov->iov_len;
-                       } else if (offset > 0) {
-                               CDEBUG(D_INFO, "processing iov [%p] base [%p] "
-                                       "len ["LPSZ"] to [%p]\n",
-                                       iov, iov->iov_base + offset,
-                                       iov->iov_len - offset, buffer);
-                               gm_bcopy(iov->iov_base + offset, buffer,
-                                         iov->iov_len - offset);
-                               buffer+= iov->iov_len - offset;
-                               offset = 0;
-                       } else {
-                               CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ
-                                       "] to [%p]\n", iov, iov->iov_len,buffer);
-                               gm_bcopy(iov->iov_base, buffer, iov->iov_len);
-                               buffer+= iov->iov_len;
-                       }
-                       iov++;
-               }
-               gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
-                              stxd,  len);
-       } else {
-               CDEBUG(D_ERROR, "Large message send is not supported\n");
-               lib_finalize(libnal, private, cookie, PTL_FAIL);
-               return(PTL_FAIL);
-               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
-                               niov, iov, offset, len);
-       }
-       return(PTL_OK);
-}
-
-ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
-                              ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
-                              ptl_kiov_t *kiov, size_t offset, size_t len)
-{
-
-       gmnal_data_t    *nal_data;
-       char            *ptr;
-       void            *buffer = NULL;
-       gmnal_stxd_t    *stxd = NULL;
-       ptl_err_t       status = PTL_OK;
-
-       CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
-               LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
-       nal_data = libnal->libnal_data;
-       if (!nal_data) {
-               CDEBUG(D_ERROR, "no nal_data\n");
-               return(PTL_FAIL);
-       } else {
-               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
-       }
-
-       /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
-        * more aggressively.  This is the fix for a livelock situation under
-        * load on ia32 that occurs when there are no more available entries in
-        * the pkmap_count array.  Just fill the buffer and let gmnal_small_tx
-        * put the headers in after we pass it the stxd pointer.
-        */
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-       /* Set the offset of the data to copy into the buffer */
-       buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
-
-       if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
-               CDEBUG(D_INFO, "This is a small message send\n");
-
-               while(kniov--) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
-                       if (offset >= kiov->kiov_len) {
-                               offset -= kiov->kiov_len;
-                       } else {
-                               CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                                      kiov->kiov_page, kiov->kiov_len, 
-                                      kiov->kiov_offset);
-
-                               ptr = ((char *)kmap(kiov->kiov_page)) +
-                                        kiov->kiov_offset;
-
-                               if (offset > 0) {
-                                       CDEBUG(D_INFO, "processing [%p] base "
-                                               "[%p] len ["LPSZ"] to [%p]\n",
-                                              ptr, ptr + offset,
-                                               kiov->kiov_len - offset, buffer);
-                                       gm_bcopy(ptr + offset, buffer,
-                                                 kiov->kiov_len - offset);
-                                       buffer+= kiov->kiov_len - offset;
-                                       offset = 0;
-                               } else {
-                                       CDEBUG(D_INFO, "processing kmapped [%p]"
-                                               " len ["LPSZ"] to [%p]\n",
-                                              ptr, kiov->kiov_len, buffer);
-                                       gm_bcopy(ptr, buffer, kiov->kiov_len);
-
-                                       buffer += kiov->kiov_len;
-                               }
-                               kunmap(kiov->kiov_page);
-                       }
-                        kiov++;
-               }
-               status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
-                                       pid, stxd, len);
-       } else {
-               int     i = 0;
-               struct  iovec   *iovec = NULL, *iovec_dup = NULL;
-               ptl_kiov_t *kiov_dup = kiov;
-
-               PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
-               iovec_dup = iovec;
-               CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
-               PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
-               return(PTL_FAIL);
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                                + kiov->kiov_offset;
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
-                        kiov++;
-               }
-               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec, offset, len);
-               for (i=0; i<kniov; i++) {
-                       kunmap(kiov_dup->kiov_page);
-                       kiov_dup++;
-               }
-               PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
-       }
-       return(status);
+        gmnal_ni_t      *gmni = ni->ni_data;
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+        int              npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
+        int              payload_offset = offsetof(gmnal_msg_t, 
+                                              gmm_u.immediate.gmim_payload[0]);
+        int              nob = payload_offset + mlen;
+
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+        LASSERT (iov == NULL || kiov == NULL);
+
+        if (rx->rx_recv_nob < nob) {
+                CERROR("Short message from nid %s: got %d, need %d\n",
+                       libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
+                gmnal_post_rx(gmni, rx);
+                return -EIO;
+        }
+
+        if (kiov != NULL)
+                lnet_copy_kiov2kiov(niov, kiov, offset,
+                                    npages, rx->rx_buf.nb_kiov, payload_offset, 
+                                    mlen);
+        else
+                lnet_copy_kiov2iov(niov, iov, offset,
+                                   npages, rx->rx_buf.nb_kiov, payload_offset,
+                                   mlen);
+
+        lnet_finalize(ni, lntmsg, 0);
+        gmnal_post_rx(gmni, rx);
+       return 0;
 }
 
-int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
+int
+gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 {
-       CDEBUG(D_TRACE, "gmnal_cb_dist\n");
-       if (dist)
-               *dist = 27;
-       return(PTL_OK);
+        lnet_hdr_t       *hdr= &lntmsg->msg_hdr;
+        int               type = lntmsg->msg_type;
+        lnet_process_id_t target = lntmsg->msg_target;
+        unsigned int      niov = lntmsg->msg_niov;
+        struct iovec     *iov = lntmsg->msg_iov;
+        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
+        unsigned int      offset = lntmsg->msg_offset;
+        unsigned int      len = lntmsg->msg_len;
+       gmnal_ni_t       *gmni = ni->ni_data;
+        gm_status_t       gmrc;
+       gmnal_tx_t       *tx;
+
+        LASSERT (iov == NULL || kiov == NULL);
+
+        /* I may not block for a tx if I'm responding to an incoming message */
+        tx = gmnal_get_tx(gmni);
+        if (tx == NULL) {
+                if (!gmni->gmni_shutdown)
+                        CERROR ("Can't get tx for msg type %d for %s\n",
+                                type, libcfs_nid2str(target.nid));
+                return -EIO;
+        }
+
+        tx->tx_nid = target.nid;
+
+        gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
+                                       &tx->tx_gmlid);
+        if (gmrc != GM_SUCCESS) {
+                CERROR("Can't map Nid %s to a GM local ID: %d\n", 
+                       libcfs_nid2str(target.nid), gmrc);
+                /* NB tx_lntmsg not set => doesn't finalize */
+                gmnal_tx_done(tx, -EIO);
+                return -EIO;
+        }
+
+        gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), 
+                       target.nid, GMNAL_MSG_IMMEDIATE);
+        GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
+        tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
+
+        if (the_lnet.ln_testprotocompat != 0) {
+                /* single-shot proto test */
+                LNET_LOCK();
+                if ((the_lnet.ln_testprotocompat & 1) != 0) {
+                        GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
+                        the_lnet.ln_testprotocompat &= ~1;
+                }
+                if ((the_lnet.ln_testprotocompat & 2) != 0) {
+                        GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
+                                LNET_PROTO_MAGIC;
+                        the_lnet.ln_testprotocompat &= ~2;
+                }
+                LNET_UNLOCK();
+        }
+
+        if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
+                /* whole message fits in tx_buf */
+                char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
+
+                if (iov != NULL)
+                        lnet_copy_iov2flat(len, buffer, 0,
+                                           niov, iov, offset, len);
+                else
+                        lnet_copy_kiov2flat(len, buffer, 0,
+                                            niov, kiov, offset, len);
+
+                tx->tx_msgnob += len;
+                tx->tx_large_nob = 0;
+        } else {
+                /* stash payload pts to copy later */
+                tx->tx_large_nob = len;
+                tx->tx_large_iskiov = (kiov != NULL);
+                tx->tx_large_niov = niov;
+                if (tx->tx_large_iskiov)
+                        tx->tx_large_frags.kiov = kiov;
+                else
+                        tx->tx_large_frags.iov = iov;
+        }
+
+        LASSERT(tx->tx_lntmsg == NULL);
+        tx->tx_lntmsg = lntmsg;
+
+        cfs_spin_lock(&gmni->gmni_tx_lock);
+
+        cfs_list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+        gmnal_check_txqueues_locked(gmni);
+
+        cfs_spin_unlock(&gmni->gmni_tx_lock);
+
+        return 0;
 }