Whamcloud - gitweb
b=3031
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
index 6ae91db..6394c37 100644 (file)
 
 #include "gmnal.h"
 
-int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                  unsigned int niov, struct iovec *iov, size_t mlen, 
-                  size_t rlen)
+ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
+                  unsigned int niov, struct iovec *iov, size_t offset,
+                  size_t mlen, size_t rlen)
 {
+        void            *buffer = NULL;
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
        int             status = PTL_OK;
 
-
-       CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p], 
-              niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n", 
-              nal_cb, private, cookie, niov, iov, mlen, rlen);
+       CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
+              "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+              libnal, private, cookie, niov, iov, offset, mlen, rlen);
 
        switch(srxd->type) {
        case(GMNAL_SMALL_MESSAGE):
                CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, niov, 
-                                        iov, mlen, rlen);
+               /* HP SFS 1380: Proactively change receives to avoid a receive
+                *  side occurrence of filling pkmap_count[].
+                */
+               buffer = srxd->buffer;
+               buffer += sizeof(gmnal_msghdr_t);
+               buffer += sizeof(ptl_hdr_t);
+
+               while(niov--) {
+                       if (offset >= iov->iov_len) {
+                               offset -= iov->iov_len;
+                       } else if (offset > 0) {
+                               CDEBUG(D_INFO, "processing [%p] base [%p] "
+                                       "len %d, offset %d, len ["LPSZ"]\n", iov,
+                                       iov->iov_base + offset, iov->iov_len,
+                                       offset, iov->iov_len - offset);
+                               gm_bcopy(buffer, iov->iov_base + offset,
+                                        iov->iov_len - offset);
+                               buffer += iov->iov_len - offset;
+                               offset = 0;
+                       } else {
+                               CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n",
+                                       iov, iov->iov_len);
+                               gm_bcopy(buffer, iov->iov_base, iov->iov_len);
+                               buffer += iov->iov_len;
+                       }
+                       iov++;
+               }
+               status = gmnal_small_rx(libnal, private, cookie);
        break;
        case(GMNAL_LARGE_MESSAGE_INIT):
                CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
-               status = gmnal_large_rx(nal_cb, private, cookie, niov, 
-                                        iov, mlen, rlen);
+               status = gmnal_large_rx(libnal, private, cookie, niov, 
+                                        iov, offset, mlen, rlen);
        }
-               
 
        CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
        return(status);
 }
 
-int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        unsigned int kniov, ptl_kiov_t *kiov, size_t mlen, 
-                        size_t rlen)
+ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
+                              lib_msg_t *cookie, unsigned int kniov,
+                              ptl_kiov_t *kiov, size_t offset, size_t mlen,
+                              size_t rlen)
 {
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
        int             status = PTL_OK;
-       struct iovec    *iovec = NULL, *iovec_dup = NULL;
-       int             i = 0;
+       char            *ptr = NULL;
+       void            *buffer = NULL;
 
 
-       CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p], 
-              cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
-              nal_cb, private, cookie, kniov, kiov, mlen, rlen);
+       CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
+              "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
+              libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
 
        if (srxd->type == GMNAL_SMALL_MESSAGE) {
-               PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
-               if (!iovec) {
-                       CDEBUG(D_ERROR, "Can't malloc\n");
-                       return(GMNAL_STATUS_FAIL);
-               }
-                iovec_dup = iovec;
+               buffer = srxd->buffer;
+               buffer += sizeof(gmnal_msghdr_t);
+               buffer += sizeof(ptl_hdr_t);
 
                /*
                 *      map each page and create an iovec for it
                 */
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
-                       iovec->iov_len = kiov->kiov_len;
-                       CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
-
-                       iovec->iov_base = kmap(kiov->kiov_page) + 
-                                                 kiov->kiov_offset;
-
-                       CDEBUG(D_INFO, "iov_base is [%p]\n", iovec->iov_base);
-                        iovec++;
+               while (kniov--) {
+                       /* HP SFS 1380: Proactively change receives to avoid a
+                        *  receive side occurrence of filling pkmap_count[].
+                        */
+                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
+                               kniov, kiov);
+
+                       if (offset >= kiov->kiov_len) {
+                               offset -= kiov->kiov_len;
+                       } else {
+                               CDEBUG(D_INFO, "kniov page [%p] len [%d] "
+                                       "offset[%d]\n", kiov->kiov_page,
+                                       kiov->kiov_len, kiov->kiov_offset);
+                               CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
+                               ptr = ((char *)kmap(kiov->kiov_page)) +
+                                        kiov->kiov_offset;
+
+                               if (offset > 0) {
+                                       CDEBUG(D_INFO, "processing [%p] base "
+                                               "[%p] len %d, offset %d, len ["
+                                               LPSZ"]\n", ptr, ptr + offset,
+                                               kiov->kiov_len, offset,
+                                              kiov->kiov_len - offset);
+                                       gm_bcopy(buffer, ptr + offset,
+                                                 kiov->kiov_len - offset);
+                                       buffer += kiov->kiov_len - offset;
+                                       offset = 0;
+                               } else {
+                                       CDEBUG(D_INFO, "processing [%p] len ["
+                                               LPSZ"]\n", ptr, kiov->kiov_len);
+                                       gm_bcopy(buffer, ptr, kiov->kiov_len);
+                                       buffer += kiov->kiov_len;
+                               }
+                               kunmap(kiov->kiov_page);
+                               CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
+                        }
                         kiov++;
                }
                CDEBUG(D_INFO, "calling gmnal_small_rx\n");
-               status = gmnal_small_rx(nal_cb, private, cookie, kniov, 
-                                        iovec_dup, mlen, rlen);
-               PORTAL_FREE(iovec_dup, sizeof(struct iovec)*kniov);
+               status = gmnal_small_rx(libnal, private, cookie);
        }
-               
 
        CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
        return(status);
 }
 
 
-int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                  ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
-                  unsigned int niov, struct iovec *iov, size_t len)
+ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
+                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
+                        unsigned int niov, struct iovec *iov, size_t offset,
+                        size_t len)
 {
 
        gmnal_data_t    *nal_data;
+       void            *buffer = NULL;
+       gmnal_stxd_t    *stxd = NULL;
 
 
-       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n", 
-              niov, len, nid);
-       nal_data = nal_cb->nal_data;
-       
+       CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
+               "] nid["LPU64"]\n", niov, offset, len, nid);
+       nal_data = libnal->libnal_data;
+       if (!nal_data) {
+               CDEBUG(D_ERROR, "no nal_data\n");
+               return(PTL_FAIL);
+       } else {
+               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
+       }
+
        if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
                CDEBUG(D_INFO, "This is a small message send\n");
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
-                               niov, iov, len);
+               /*
+                * HP SFS 1380: With the change to gmnal_small_tx, need to get
+                * the stxd and do relevant setup here
+                */
+               stxd = gmnal_get_stxd(nal_data, 1);
+               CDEBUG(D_INFO, "stxd [%p]\n", stxd);
+               /* Set the offset of the data to copy into the buffer */
+               buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t);
+               while(niov--) {
+                       if (offset >= iov->iov_len) {
+                               offset -= iov->iov_len;
+                       } else if (offset > 0) {
+                               CDEBUG(D_INFO, "processing iov [%p] base [%p] "
+                                       "len ["LPSZ"] to [%p]\n",
+                                       iov, iov->iov_base + offset,
+                                       iov->iov_len - offset, buffer);
+                               gm_bcopy(iov->iov_base + offset, buffer,
+                                         iov->iov_len - offset);
+                               buffer+= iov->iov_len - offset;
+                               offset = 0;
+                       } else {
+                               CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ
+                                       "] to [%p]\n", iov, iov->iov_len,buffer);
+                               gm_bcopy(iov->iov_base, buffer, iov->iov_len);
+                               buffer+= iov->iov_len;
+                       }
+                       iov++;
+               }
+               gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
+                              stxd,  len);
        } else {
-               CDEBUG(D_ERROR, "Large message send it is not supported\n");
-               lib_finalize(nal_cb, private, cookie);
+               CDEBUG(D_ERROR, "Large message send is not supported\n");
+               lib_finalize(libnal, private, cookie, PTL_FAIL);
                return(PTL_FAIL);
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
-                               niov, iov, len);
+               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
+                               niov, iov, offset, len);
        }
        return(PTL_OK);
 }
 
-int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
-                        ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,                         unsigned int kniov, ptl_kiov_t *kiov, size_t len)
+ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
+                              lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
+                              ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
+                              ptl_kiov_t *kiov, size_t offset, size_t len)
 {
 
-       int     i = 0;
        gmnal_data_t    *nal_data;
-       struct  iovec   *iovec = NULL, *iovec_dup = NULL;
+       char            *ptr;
+       void            *buffer = NULL;
+       gmnal_stxd_t    *stxd = NULL;
+       ptl_err_t       status = PTL_OK;
+
+       CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
+               LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
+       nal_data = libnal->libnal_data;
+       if (!nal_data) {
+               CDEBUG(D_ERROR, "no nal_data\n");
+               return(PTL_FAIL);
+       } else {
+               CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
+       }
+
+       /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
+        * more aggressively.  This is the fix for a livelock situation under
+        * load on ia32 that occurs when there are no more available entries in
+        * the pkmap_count array.  Just fill the buffer and let gmnal_small_tx
+        * put the headers in after we pass it the stxd pointer.
+        */
+       stxd = gmnal_get_stxd(nal_data, 1);
+       CDEBUG(D_INFO, "stxd [%p]\n", stxd);
+       /* Set the offset of the data to copy into the buffer */
+       buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
 
-       CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
-       nal_data = nal_cb->nal_data;
-       PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
-        iovec_dup = iovec;
        if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
                CDEBUG(D_INFO, "This is a small message send\n");
-               
-               for (i=0; i<kniov; i++) {
-                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
-                       CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
-                              kiov->kiov_page, kiov->kiov_len, 
-                              kiov->kiov_offset);
 
-                       iovec->iov_base = kmap(kiov->kiov_page) 
-                                               + kiov->kiov_offset;
-
-                       iovec->iov_len = kiov->kiov_len;
-                        iovec++;
+               while(kniov--) {
+                       CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
+                       if (offset >= kiov->kiov_len) {
+                               offset -= kiov->kiov_len;
+                       } else {
+                               CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
+                                      kiov->kiov_page, kiov->kiov_len, 
+                                      kiov->kiov_offset);
+
+                               ptr = ((char *)kmap(kiov->kiov_page)) +
+                                        kiov->kiov_offset;
+
+                               if (offset > 0) {
+                                       CDEBUG(D_INFO, "processing [%p] base "
+                                               "[%p] len ["LPSZ"] to [%p]\n",
+                                              ptr, ptr + offset,
+                                               kiov->kiov_len - offset, buffer);
+                                       gm_bcopy(ptr + offset, buffer,
+                                                 kiov->kiov_len - offset);
+                                       buffer+= kiov->kiov_len - offset;
+                                       offset = 0;
+                               } else {
+                                       CDEBUG(D_INFO, "processing kmapped [%p]"
+                                               " len ["LPSZ"] to [%p]\n",
+                                              ptr, kiov->kiov_len, buffer);
+                                       gm_bcopy(ptr, buffer, kiov->kiov_len);
+
+                                       buffer += kiov->kiov_len;
+                               }
+                               kunmap(kiov->kiov_page);
+                       }
                         kiov++;
                }
-               gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec_dup, len);
+               status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
+                                       pid, stxd, len);
        } else {
+               int     i = 0;
+               struct  iovec   *iovec = NULL, *iovec_dup = NULL;
+               ptl_kiov_t *kiov_dup = kiov;
+
+               PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
+               iovec_dup = iovec;
                CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
+               PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
                return(PTL_FAIL);
                for (i=0; i<kniov; i++) {
                        CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
@@ -179,90 +307,18 @@ int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie,
                         iovec++;
                         kiov++;
                }
-               gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, 
-                               pid, kniov, iovec, len);
-       }
-       PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
-       return(PTL_OK);
-}
-
-int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst, 
-                  user_ptr src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
-
-int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, 
-                   void *src, size_t len)
-{
-       gm_bcopy(src, dst, len);
-       return(PTL_OK);
-}
-
-int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, 
-                      ptl_event_t *ev)
-{
-
-       if (eq->event_callback != NULL) {
-               CDEBUG(D_INFO, "found callback\n");
-               eq->event_callback(ev);
+               gmnal_large_tx(libnal, private, cookie, hdr, type, nid, 
+                               pid, kniov, iovec, offset, len);
+               for (i=0; i<kniov; i++) {
+                       kunmap(kiov_dup->kiov_page);
+                       kiov_dup++;
+               }
+               PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
        }
-       
-       return(PTL_OK);
-}
-
-void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
-{
-       void *ptr = NULL;
-       CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
-       PORTAL_ALLOC(ptr, len);
-       return(ptr);
-}
-
-void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
-       PORTAL_FREE(buf, len);
-       return;
-}
-
-void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                    void **addrkey)
-{
-       return;
-}
-
-int  gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
-                  void**addrkey)
-{
-       return(PTL_OK);
-}
-
-void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
-{
-       CDEBUG(D_TRACE, "gmnal_cb_printf\n");
-       printk(fmt);
-       return;
-}
-
-void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
-       spin_lock_irqsave(&nal_data->cb_lock, *flags);
-       return;
-}
-
-void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
-{
-       gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
-
-       spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
-       return;
+       return(status);
 }
 
-int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
+int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
 {
        CDEBUG(D_TRACE, "gmnal_cb_dist\n");
        if (dist)