Whamcloud - gitweb
* cleaned up startup/shutdown handling
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
index ac4c485..d7e7f5b 100644 (file)
 
 #include "gmnal.h"
 
-ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-                  unsigned int niov, struct iovec *iov, size_t offset,
-                  size_t mlen, size_t rlen)
+ptl_err_t 
+gmnal_cb_recv(lib_nal_t *libnal, void *private, 
+              lib_msg_t *libmsg,
+              unsigned int niov, struct iovec *iov, 
+              size_t offset, size_t mlen, size_t rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = rx->rx_msg;
         size_t           nobleft = mlen;
-        void            *buffer = NULL;
+        int              rxnob;
+        char            *buffer;
         size_t           nob;
 
-       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
+       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], "
               "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, niov, iov, offset, mlen, rlen);
+              libnal, private, libmsg, niov, iov, offset, mlen, rlen);
 
-       LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
         
-        buffer = srxd->rx_buffer;
-        buffer += sizeof(gmnal_msghdr_t);
-        buffer += sizeof(ptl_hdr_t);
-
-        while(nobleft > 0) {
+        buffer = &msg->gmm_u.immediate.gmim_payload[0];
+        rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
+        
+        if (rx->rx_recv_nob < rxnob) {
+                CERROR("Short message from nid "LPD64": got %d, need %d\n",
+                       msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+                return PTL_FAIL;
+        }
+        
+        while (nobleft > 0) {
                 LASSERT (niov > 0);
 
                 if (offset >= iov->iov_len) {
@@ -64,32 +73,40 @@ ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                 iov++;
         }
 
-        lib_finalize(libnal, private, cookie, PTL_OK);
+        lib_finalize(libnal, private, libmsg, PTL_OK);
        return PTL_OK;
 }
 
-ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, unsigned int nkiov,
-                              ptl_kiov_t *kiov, size_t offset, size_t mlen,
-                              size_t rlen)
+ptl_err_t 
+gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, 
+                    lib_msg_t *libmsg, 
+                    unsigned int nkiov, ptl_kiov_t *kiov, 
+                    size_t offset, size_t mlen, size_t rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = rx->rx_msg;
         size_t           nobleft = mlen;
+        int              rxnob;
         size_t           nob;
        char            *ptr;
        void            *buffer;
 
        CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
-              "cookie[%p], kniov[%d], kiov [%p], "
+              "libmsg[%p], kniov[%d], kiov [%p], "
                "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
-              libnal, private, cookie, nkiov, kiov, offset, mlen, rlen);
+              libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen);
 
-       LASSERT (srxd->rx_type == GMNAL_SMALL_MESSAGE);
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
 
-        buffer = srxd->rx_buffer;
-        buffer += sizeof(gmnal_msghdr_t);
-        buffer += sizeof(ptl_hdr_t);
+        buffer = &msg->gmm_u.immediate.gmim_payload[0];
+        rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
 
+        if (rx->rx_recv_nob < rxnob) {
+                CERROR("Short message from nid "LPD64": got %d, need %d\n",
+                       msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+                return PTL_FAIL;
+        }
+        
         while (nobleft > 0) {
                 LASSERT (nkiov > 0);
 
@@ -113,24 +130,23 @@ ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
                 nkiov--;
        }
 
-        lib_finalize(libnal, private, cookie, PTL_OK);
-
+        lib_finalize(libnal, private, libmsg, PTL_OK);
        return PTL_OK;
 }
 
-
-ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
-                        unsigned int niov, struct iovec *iov, 
-                        size_t offset, size_t len)
+ptl_err_t
+gmnal_cb_send(lib_nal_t *libnal, void *private, 
+              lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, 
+              ptl_nid_t nid, ptl_pid_t pid,
+              unsigned int niov, struct iovec *iov, 
+              size_t offset, size_t len)
 {
 
        gmnal_ni_t      *gmnalni = libnal->libnal_data;
-       void            *buffer = NULL;
-       gmnal_stxd_t    *stxd = NULL;
         size_t           nobleft = len;
+       void            *buffer;
+       gmnal_tx_t      *tx;
         size_t           nob;
-        ptl_err_t        rc;
 
        CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
                "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
@@ -140,13 +156,13 @@ ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                 return PTL_FAIL;
         }
 
-        stxd = gmnal_get_stxd(gmnalni, 1);
-        CDEBUG(D_NET, "stxd [%p]\n", stxd);
+        tx = gmnal_get_tx(gmnalni, 1);
 
-        /* Set the offset of the data to copy into the buffer */
-        buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+        gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+        gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
 
-        while(nobleft > 0) {
+        buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
+        while (nobleft > 0) {
                 LASSERT (niov > 0);
                 
                 if (offset >= iov->iov_len) {
@@ -163,27 +179,24 @@ ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                 niov--;
                 iov++;
         }
-
-        rc = gmnal_small_tx(libnal, private, cookie, hdr, type, 
-                            nid, stxd,  len);
-        if (rc != PTL_OK)
-                gmnal_return_stxd(gmnalni, stxd);
-
-       return rc;
+        
+        nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+        return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
 }
 
-ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
-                              lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
-                              ptl_nid_t nid, ptl_pid_t pid, unsigned int nkiov,
-                              ptl_kiov_t *kiov, size_t offset, size_t len)
+ptl_err_t
+gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+                    lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
+                    ptl_nid_t nid, ptl_pid_t pid, 
+                    unsigned int nkiov, ptl_kiov_t *kiov, 
+                    size_t offset, size_t len)
 {
 
        gmnal_ni_t      *gmnalni = libnal->libnal_data;
-       void            *buffer = NULL;
-       gmnal_stxd_t    *stxd = NULL;
         size_t           nobleft = len;
+       void            *buffer;
+       gmnal_tx_t      *tx;
        char            *ptr;
-       ptl_err_t        rc;
         size_t           nob;
 
        CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
@@ -194,12 +207,12 @@ ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
                 return PTL_FAIL;
         }
 
-       stxd = gmnal_get_stxd(gmnalni, 1);
-       CDEBUG(D_NET, "stxd [%p]\n", stxd);
+       tx = gmnal_get_tx(gmnalni, 1);
 
-       /* Set the offset of the data to copy into the buffer */
-       buffer = stxd->tx_buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
+        gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+        gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
 
+       buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
         while (nobleft > 0) {
                 LASSERT (nkiov > 0);
 
@@ -223,16 +236,12 @@ ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
                 kiov++;
         }
 
-        rc = gmnal_small_tx(libnal, private, cookie, hdr, type, 
-                                nid, stxd, len);
-
-        if (rc != PTL_OK)
-                gmnal_return_stxd(gmnalni, stxd);
-        
-       return rc;
+        nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+        return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
 }
 
-int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
+int
+gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
 {
        CDEBUG(D_TRACE, "gmnal_cb_dist\n");