Whamcloud - gitweb
* simplified gmnal thread startup/shutdown
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
index 6ae91db..d7e7f5b 100644 (file)
 
 #include "gmnal.h"
 
-int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                  unsigned int niov, struct iovec *iov, size_t mlen, 
-                  size_t rlen)
+ptl_err_t 
+gmnal_cb_recv(lib_nal_t *libnal, void *private, 
+              lib_msg_t *libmsg,
+              unsigned int niov, struct iovec *iov, 
+              size_t offset, size_t mlen, size_t rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p], 
-              niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n", 
-              nal_cb, private, cookie, niov, iov, mlen, rlen);
-
-       switch(srxd->type) {
-       case(GMNAL_SMALL_MESSAGE):
-               CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, niov, 
-                                        iov, mlen, rlen);
-       break;
-       case(GMNAL_LARGE_MESSAGE_INIT):
-               CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
-               status = gmnal_large_rx(nal_cb, private, cookie, niov, 
-                                        iov, mlen, rlen);
-       }
-               
-
-       CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
-       return(status);
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = rx->rx_msg;
+        size_t           nobleft = mlen;
+        int              rxnob;
+        char            *buffer;
+        size_t           nob;
+
+       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], "
+              "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+              libnal, private, libmsg, niov, iov, offset, mlen, rlen);
+
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+        
+        buffer = &msg->gmm_u.immediate.gmim_payload[0];
+        rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
+        
+        if (rx->rx_recv_nob < rxnob) {
+                CERROR("Short message from nid "LPD64": got %d, need %d\n",
+                       msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+                return PTL_FAIL;
+        }
+        
+        while (nobleft > 0) {
+                LASSERT (niov > 0);
+
+                if (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                } else {
+                        nob = MIN (iov->iov_len - offset, nobleft);
+
+                        gm_bcopy(buffer, iov->iov_base + offset, nob);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
+                }
+                niov--;
+                iov++;
+        }
+
+        lib_finalize(libnal, private, libmsg, PTL_OK);
+       return PTL_OK;
 }
 
-int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        unsigned int kniov, ptl_kiov_t *kiov, size_t mlen, 
-                        size_t rlen)
+ptl_err_t 
+gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, 
+                    lib_msg_t *libmsg, 
+                    unsigned int nkiov, ptl_kiov_t *kiov, 
+                    size_t offset, size_t mlen, size_t rlen)
 {
-       gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
-       int             status = PTL_OK;
-       struct iovec    *iovec = NULL, *iovec_dup = NULL;
-       int             i = 0;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p], 
-              cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-              nal_cb, private, cookie, kniov, kiov, mlen, rlen);
-
-       if (srxd->type == GMNAL_SMALL_MESSAGE) {
-               PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
-               if (!iovec) {
-                       CDEBUG(D_ERROR, "Can't malloc\n");
-                       return(GMNAL_STATUS_FAIL);
+       gmnal_rx_t      *rx = (gmnal_rx_t*)private;
+        gmnal_msg_t     *msg = rx->rx_msg;
+        size_t           nobleft = mlen;
+        int              rxnob;
+        size_t           nob;
+       char            *ptr;
+       void            *buffer;
+
+       CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
+              "libmsg[%p], kniov[%d], kiov [%p], "
+               "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+              libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen);
+
+       LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
+
+        buffer = &msg->gmm_u.immediate.gmim_payload[0];
+        rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
+
+        if (rx->rx_recv_nob < rxnob) {
+                CERROR("Short message from nid "LPD64": got %d, need %d\n",
+                       msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
+                return PTL_FAIL;
+        }
+        
+        while (nobleft > 0) {
+                LASSERT (nkiov > 0);
+
+                if (offset >= kiov->kiov_len) {
+                        offset -= kiov->kiov_len;
+                } else {
+                        nob = MIN (kiov->kiov_len - offset, nobleft);
+
+                        ptr = ((char *)kmap(kiov->kiov_page)) +
+                              kiov->kiov_offset;
+
+                        gm_bcopy(buffer, ptr + offset, nob);
+
+                        kunmap(kiov->kiov_page);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
                }
-                iovec_dup = iovec;
-
-               /*
-                *      map each page and create an iovec for it
-                */
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-                       iovec->iov_len = kiov->kiov_len;
-                       CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) + 
-                                                 kiov->kiov_offset;
-
-                       CDEBUG(D_INFO, "iov_base is [%p]\n", iovec->iov_base);
-                        iovec++;
-                        kiov++;
-               }
-               CDEBUG(D_INFO, "calling gmnal_small_rx\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, kniov, 
-                                        iovec_dup, mlen, rlen);
-               PORTAL_FREE(iovec_dup, sizeof(struct iovec)*kniov);
+                kiov++;
+                nkiov--;
        }
-               
 
-       CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
-       return(status);
+        lib_finalize(libnal, private, libmsg, PTL_OK);
+       return PTL_OK;
 }
 
-
-int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                  ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                  unsigned int niov, struct iovec *iov, size_t len)
+ptl_err_t
+gmnal_cb_send(lib_nal_t *libnal, void *private, 
+              lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, 
+              ptl_nid_t nid, ptl_pid_t pid,
+              unsigned int niov, struct iovec *iov, 
+              size_t offset, size_t len)
 {
 
-       gmnal_data_t    *nal_data;
-
-
-       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n", 
-              niov, len, nid);
-       nal_data = nal_cb->nal_data;
-       
-       if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
-               CDEBUG(D_INFO, "This is a small message send\n");
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
-                               niov, iov, len);
-       } else {
-               CDEBUG(D_ERROR, "Large message send it is not supported\n");
-               lib_finalize(nal_cb, private, cookie);
-               return(PTL_FAIL);
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
-                               niov, iov, len);
-       }
-       return(PTL_OK);
+       gmnal_ni_t      *gmnalni = libnal->libnal_data;
+        size_t           nobleft = len;
+       void            *buffer;
+       gmnal_tx_t      *tx;
+        size_t           nob;
+
+       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
+               "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
+
+        if ((nid >> 32) != 0) {
+                CERROR("Illegal nid: "LPU64"\n", nid);
+                return PTL_FAIL;
+        }
+
+        tx = gmnal_get_tx(gmnalni, 1);
+
+        gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+        gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
+
+        buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
+        while (nobleft > 0) {
+                LASSERT (niov > 0);
+                
+                if (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                } else {
+                        nob = MIN (iov->iov_len - offset, nobleft);
+
+                        gm_bcopy(iov->iov_base + offset, buffer, nob);
+
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
+                }
+                niov--;
+                iov++;
+        }
+        
+        nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+        return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
 }
 
-int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,                         unsigned int kniov, ptl_kiov_t *kiov, size_t len)
+ptl_err_t
+gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+                    lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
+                    ptl_nid_t nid, ptl_pid_t pid, 
+                    unsigned int nkiov, ptl_kiov_t *kiov, 
+                    size_t offset, size_t len)
 {
 
-       int     i = 0;
-       gmnal_data_t    *nal_data;
-       struct  iovec   *iovec = NULL, *iovec_dup = NULL;
-
-       CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
-       nal_data = nal_cb->nal_data;
-       PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
-        iovec_dup = iovec;
-       if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
-               CDEBUG(D_INFO, "This is a small message send\n");
-               
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                               + kiov->kiov_offset;
-
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
-                        kiov++;
-               }
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec_dup, len);
-       } else {
-               CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
-               return(PTL_FAIL);
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                                + kiov->kiov_offset;
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
-                        kiov++;
-               }
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec, len);
-       }
-       PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
-       return(PTL_OK);
-}
+       gmnal_ni_t      *gmnalni = libnal->libnal_data;
+        size_t           nobleft = len;
+       void            *buffer;
+       gmnal_tx_t      *tx;
+       char            *ptr;
+        size_t           nob;
 
-int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst, 
-                  user_ptr src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
+       CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
+               LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len);
 
-int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, 
-                   void *src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
+        if ((nid >> 32) != 0) {
+                CERROR("Illegal nid: "LPU64"\n", nid);
+                return PTL_FAIL;
+        }
 
-int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, 
-                      ptl_event_t *ev)
-{
+       tx = gmnal_get_tx(gmnalni, 1);
 
-       if (eq->event_callback != NULL) {
-               CDEBUG(D_INFO, "found callback\n");
-               eq->event_callback(ev);
-       }
-       
-       return(PTL_OK);
-}
+        gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
+        gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
 
-void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
-{
-       void *ptr = NULL;
-       CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
-       PORTAL_ALLOC(ptr, len);
-       return(ptr);
-}
+       buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
+        while (nobleft > 0) {
+                LASSERT (nkiov > 0);
 
-void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
-       PORTAL_FREE(buf, len);
-       return;
-}
+                if (offset >= kiov->kiov_len) {
+                        offset -= kiov->kiov_len;
+                } else {
+                        nob = MIN (kiov->kiov_len - offset, nobleft);
 
-void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                    void **addrkey)
-{
-       return;
-}
+                        ptr = ((char *)kmap(kiov->kiov_page)) +
+                              kiov->kiov_offset;
 
-int  gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                  void**addrkey)
-{
-       return(PTL_OK);
-}
+                        gm_bcopy(ptr + offset, buffer, nob);
 
-void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_printf\n");
-       printk(fmt);
-       return;
-}
+                        kunmap(kiov->kiov_page);
 
-void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
+                        buffer += nob;
+                        nobleft -= nob;
+                        offset = 0;
+                }
+                nkiov--;
+                kiov++;
+        }
 
-       spin_lock_irqsave(&nal_data->cb_lock, *flags);
-       return;
+        nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
+        return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
 }
 
-void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
+int
+gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
 {
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
+       CDEBUG(D_TRACE, "gmnal_cb_dist\n");
 
-       spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
-       return;
-}
+       if (dist != NULL)
+               *dist = 1;
 
-int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_dist\n");
-       if (dist)
-               *dist = 27;
-       return(PTL_OK);
+       return PTL_OK;
 }