Whamcloud - gitweb
Branch b1_6
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
index e055242..503bedf 100644 (file)
  */
 
 
-#include "gmnal.h"
+#include "gmlnd.h"
 
-int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                  unsigned int niov, struct iovec *iov, size_t mlen, 
-                  size_t rlen)
+int
+gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
+           int delayed, unsigned int niov, 
+           struct iovec *iov, lnet_kiov_t *kiov,
+           unsigned int offset, unsigned int mlen, unsigned int rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p], 
-              niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n", 
-              nal_cb, private, cookie, niov, iov, mlen, rlen);
-
-       switch(srxd->type) {
-       case(GMNAL_SMALL_MESSAGE):
-               CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, niov, 
-                                        iov, mlen, rlen);
-       break;
-       case(GMNAL_LARGE_MESSAGE_INIT):
-               CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
-               status = gmnal_large_rx(nal_cb, private, cookie, niov, 
-                                        iov, mlen, rlen);
-       }
-               
-
-       CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
-       return(status);
-}
-
-int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        unsigned int kniov, ptl_kiov_t *kiov, size_t mlen, 
-                        size_t rlen)
-{
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-       struct iovec    *iovec = NULL, *iovec_dup = NULL;
-       int             i = 0;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p], 
-              cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-              nal_cb, private, cookie, kniov, kiov, mlen, rlen);
-
-       if (srxd->type == GMNAL_SMALL_MESSAGE) {
-               PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
-               if (!iovec) {
-                       CDEBUG(D_ERROR, "Can't malloc\n");
-                       return(GMNAL_STATUS_FAIL);
-               }
-                iovec_dup = iovec;
-
-               /*
-                *      map each page and create an iovec for it
-                */
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-                       iovec->iov_len = kiov->kiov_len;
-                       CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) + 
-                                                 kiov->kiov_offset;
-
-                       CDEBUG(D_INFO, "iov_base is [%p]\n", iovec->iov_base);
-                        iovec++;
-                        kiov++;
-               }
-               CDEBUG(D_INFO, "calling gmnal_small_rx\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, kniov, 
-                                        iovec_dup, mlen, rlen);
-               PORTAL_FREE(iovec_dup, sizeof(struct iovec)*kniov);
-       }
-               
-
-       CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
-       return(status);
-}
-
-
-int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                  ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                  unsigned int niov, struct iovec *iov, size_t len)
-{
-
-       gmnal_data_t    *nal_data;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n", 
-              niov, len, nid);
-       nal_data = nal_cb->nal_data;
-       
-       if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
-               CDEBUG(D_INFO, "This is a small message send\n");
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
-                               niov, iov, len);
-       } else {
-               CDEBUG(D_ERROR, "Large message send it is not supported\n");
-               return(PTL_FAIL);
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
-                               niov, iov, len);
-       }
-       return(PTL_OK);
-}
-
-int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,                         unsigned int kniov, ptl_kiov_t *kiov, size_t len)
-{
-
-       int     i = 0;
-       gmnal_data_t    *nal_data;
-       struct  iovec   *iovec = NULL, *iovec_dup = NULL;
-
-       CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
-       nal_data = nal_cb->nal_data;
-       PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
-        iovec_dup = iovec;
-       if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
-               CDEBUG(D_INFO, "This is a small message send\n");
-               
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                               + kiov->kiov_offset;
-
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
-                        kiov++;
-               }
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec_dup, len);
-       } else {
-               CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
-               return(PTL_FAIL);
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                                + kiov->kiov_offset;
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
-                        kiov++;
-               }
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec, len);
-       }
-       PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
-       return(PTL_OK);
-}
-
-int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst, 
-                  user_ptr src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
-
-int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, 
-                   void *src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
-
-void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
-{
-       void *ptr = NULL;
-       CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
-       PORTAL_ALLOC(ptr, len);
-       return(ptr);
-}
-
-void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
-       PORTAL_FREE(buf, len);
-       return;
-}
-
-void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                    void **addrkey)
-{
-       return;
-}
-
-int  gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                  void**addrkey)
-{
-       return(PTL_OK);
-}
-
-void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_printf\n");
-       printk(fmt);
-       return;
-}
-
-void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
-       spin_lock_irqsave(&nal_data->cb_lock, *flags);
-       return;
-}
-
-void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
-       spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
-       return;
+        gmnal_ni_t      *gmni = ni->ni_data;
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
+        int              npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
+        int              payload_offset = offsetof(gmnal_msg_t, 
+                                              gmm_u.immediate.gmim_payload[0]);
+        int              nob = payload_offset + mlen;
+
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+        LASSERT (iov == NULL || kiov == NULL);
+        
+        if (rx->rx_recv_nob < nob) {
+                CERROR("Short message from nid %s: got %d, need %d\n",
+                       libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
+                gmnal_post_rx(gmni, rx);
+                return -EIO;
+        }
+
+        if (kiov != NULL)
+                lnet_copy_kiov2kiov(niov, kiov, offset,
+                                    npages, rx->rx_buf.nb_kiov, payload_offset, 
+                                    mlen);
+        else
+                lnet_copy_kiov2iov(niov, iov, offset,
+                                   npages, rx->rx_buf.nb_kiov, payload_offset,
+                                   mlen);
+
+        lnet_finalize(ni, lntmsg, 0);
+        gmnal_post_rx(gmni, rx);
+       return 0;
 }
 
-int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
+int
+gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
 {
-       CDEBUG(D_TRACE, "gmnal_cb_dist\n");
-       if (dist)
-               *dist = 27;
-       return(PTL_OK);
+        lnet_hdr_t       *hdr= &lntmsg->msg_hdr;
+        int               type = lntmsg->msg_type;
+        lnet_process_id_t target = lntmsg->msg_target;
+        unsigned int      niov = lntmsg->msg_niov;
+        struct iovec     *iov = lntmsg->msg_iov;
+        lnet_kiov_t      *kiov = lntmsg->msg_kiov;
+        unsigned int      offset = lntmsg->msg_offset;
+        unsigned int      len = lntmsg->msg_len;
+       gmnal_ni_t       *gmni = ni->ni_data;
+        gm_status_t       gmrc;
+       gmnal_tx_t       *tx;
+
+        LASSERT (iov == NULL || kiov == NULL);
+
+        /* I may not block for a tx if I'm responding to an incoming message */
+        tx = gmnal_get_tx(gmni);
+        if (tx == NULL) {
+                if (!gmni->gmni_shutdown)
+                        CERROR ("Can't get tx for msg type %d for %s\n",
+                                type, libcfs_nid2str(target.nid));
+                return -EIO;
+        }
+
+        tx->tx_nid = target.nid;
+
+        gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
+                                       &tx->tx_gmlid);
+        if (gmrc != GM_SUCCESS) {
+                CERROR("Can't map Nid %s to a GM local ID: %d\n", 
+                       libcfs_nid2str(target.nid), gmrc);
+                /* NB tx_lntmsg not set => doesn't finalize */
+                gmnal_tx_done(tx, -EIO);
+                return -EIO;
+        }
+
+        gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), 
+                       target.nid, GMNAL_MSG_IMMEDIATE);
+        GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
+        tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
+
+        if (the_lnet.ln_testprotocompat != 0) {
+                /* single-shot proto test */
+                LNET_LOCK();
+                if ((the_lnet.ln_testprotocompat & 1) != 0) {
+                        GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
+                        the_lnet.ln_testprotocompat &= ~1;
+                }
+                if ((the_lnet.ln_testprotocompat & 2) != 0) {
+                        GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
+                                LNET_PROTO_MAGIC;
+                        the_lnet.ln_testprotocompat &= ~2;
+                }
+                LNET_UNLOCK();
+        }
+
+        if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
+                /* whole message fits in tx_buf */
+                char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
+
+                if (iov != NULL)
+                        lnet_copy_iov2flat(len, buffer, 0,
+                                           niov, iov, offset, len);
+                else
+                        lnet_copy_kiov2flat(len, buffer, 0,
+                                            niov, kiov, offset, len);
+                
+                tx->tx_msgnob += len;
+                tx->tx_large_nob = 0;
+        } else {
+                /* stash payload pts to copy later */
+                tx->tx_large_nob = len;
+                tx->tx_large_iskiov = (kiov != NULL);
+                tx->tx_large_niov = niov;
+                if (tx->tx_large_iskiov)
+                        tx->tx_large_frags.kiov = kiov;
+                else
+                        tx->tx_large_frags.iov = iov;
+        }
+
+        LASSERT(tx->tx_lntmsg == NULL);
+        tx->tx_lntmsg = lntmsg;
+        
+        spin_lock(&gmni->gmni_tx_lock);
+
+        list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+        gmnal_check_txqueues_locked(gmni);
+
+        spin_unlock(&gmni->gmni_tx_lock);
+
+        return 0;
 }