Whamcloud - gitweb
- landing of b_hd_cleanup_merge to HEAD.
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
index 4af7186..6a8fcbc 100644 (file)
@@ -319,11 +319,11 @@ gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
  */
 int
 gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
+               unsigned int niov, struct iovec *iov, size_t offset, size_t mlen, size_t rlen)
 {
        gmnal_srxd_t    *srxd = NULL;
        void    *buffer = NULL;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->nal_data;
+       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
 
 
        CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
@@ -340,11 +340,24 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        buffer += sizeof(ptl_hdr_t);
 
        while(niov--) {
-               CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov, 
-                      iov->iov_len);
-               gm_bcopy(buffer, iov->iov_base, iov->iov_len);                  
-               buffer += iov->iov_len;
-               iov++;
+                if (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                } else if (offset > 0) {
+                       CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
+                               "offset %d, len ["LPSZ"]\n", iov,
+                              iov->iov_base + offset, iov->iov_len, offset,
+                               iov->iov_len - offset);
+                       gm_bcopy(buffer, iov->iov_base + offset,
+                                 iov->iov_len - offset);
+                        offset = 0;
+                        buffer += iov->iov_len - offset;
+                } else {
+                       CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov,
+                              iov->iov_len);
+                       gm_bcopy(buffer, iov->iov_base, iov->iov_len);
+                       buffer += iov->iov_len;
+                }
+                iov++;
        }
 
 
@@ -376,9 +389,9 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
 int
 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
-               unsigned int niov, struct iovec *iov, int size)
+               unsigned int niov, struct iovec *iov, size_t offset, int size)
 {
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->nal_data;
+       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
        gmnal_stxd_t    *stxd = NULL;
        void            *buffer = NULL;
        gmnal_msghdr_t  *msghdr = NULL;
@@ -437,11 +450,21 @@ gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        buffer += sizeof(ptl_hdr_t);
 
        while(niov--) {
-               CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n", 
-                      iov, iov->iov_len, buffer);
-               gm_bcopy(iov->iov_base, buffer, iov->iov_len);
-               buffer+= iov->iov_len;
-               iov++;
+                if (offset >= iov->iov_len) {
+                        offset -= iov->iov_len;
+                } else if (offset > 0) {
+                       CDEBUG(D_INFO, "processing iov [%p] base [%p] len ["LPSZ"] to [%p]\n", 
+                               iov, iov->iov_base + offset, iov->iov_len - offset, buffer);
+                       gm_bcopy(iov->iov_base + offset, buffer, iov->iov_len - offset);
+                       buffer+= iov->iov_len - offset;
+                        offset = 0;
+                } else {
+                       CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n", 
+                               iov, iov->iov_len, buffer);
+                       gm_bcopy(iov->iov_base, buffer, iov->iov_len);
+                       buffer+= iov->iov_len;
+                } 
+                iov++;
        }
 
        CDEBUG(D_INFO, "sending\n");
@@ -656,7 +679,7 @@ void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
 int
 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
-               unsigned int niov, struct iovec *iov, int size)
+               unsigned int niov, struct iovec *iov, size_t offset, int size)
 {
 
        gmnal_data_t    *nal_data;
@@ -676,7 +699,7 @@ gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
               global_nid, pid, niov, iov, size);
 
        if (libnal)
-               nal_data = (gmnal_data_t*)libnal->nal_data;
+               nal_data = (gmnal_data_t*)libnal->libnal_data;
        else  {
                CDEBUG(D_ERROR, "no libnal.\n");
                return(GMNAL_STATUS_FAIL);
@@ -721,30 +744,39 @@ gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        mlen += sizeof(ptl_hdr_t); 
        CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
 
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+        } 
+
+        LASSERT(offset >= 0);
+        /*
+        *      Store the iovs in the stxd for we can get 
+        *      them later if we need them
+        */
+        stxd->iov[0].iov_base = iov->iov_base + offset; 
+        stxd->iov[0].iov_len = iov->iov_len - offset; 
+       CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
+        if (niov > 1)
+               gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
+       stxd->niov = niov;
+
        /*
         *      copy the iov to the buffer so target knows 
         *      where to get the data from
         */
        CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
-       gm_bcopy(iov, buffer, niov*sizeof(struct iovec));
-       mlen += niov*(sizeof(struct iovec));
+       gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
+       mlen += stxd->niov*(sizeof(struct iovec));
        CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-
-       /*
-        *      Store the iovs in the stxd for we can get 
-        *      them later if we need them
-        */
-       CDEBUG(D_NET, "Copying iov [%p] to [%p]\n", iov, stxd->iov);
-       gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
-       stxd->niov = niov;
        
-
        /*
         *      register the memory so the NIC can get hold of the data
         *      This is a slow process. it'd be good to overlap it 
         *      with something else.
         */
+        iov = stxd->iov;
        iov_dup = iov;
        niov_dup = niov;
        while(niov--) {
@@ -821,10 +853,10 @@ gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
  */
 int
 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               unsigned int nriov, struct iovec *riov, size_t mlen
-               size_t rlen)
+               unsigned int nriov, struct iovec *riov, size_t offset
+               size_t mlen, size_t rlen)
 {
-       gmnal_data_t    *nal_data = libnal->nal_data;
+       gmnal_data_t    *nal_data = libnal->libnal_data;
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
        void            *buffer = NULL;
        struct  iovec   *riov_dup;
@@ -863,6 +895,25 @@ gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
         *      If the iovecs match, could interleave 
         *      gm_registers and gm_gets for each element
         */
+        while (offset >= riov->iov_len) {
+                offset -= riov->iov_len;
+                riov++;
+                nriov--;
+        } 
+        LASSERT (nriov >= 0);
+        LASSERT (offset >= 0);
+       /*
+        *      do this so the final gm_get callback can deregister the memory
+        */
+       PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
+
+        srxd->riov[0].iov_base = riov->iov_base + offset;
+        srxd->riov[0].iov_len = riov->iov_len - offset;
+        if (nriov > 1)
+               gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
+       srxd->nriov = nriov;
+        
+        riov = srxd->riov;
        nriov_dup = nriov;
        riov_dup = riov;
        while(nriov--) {
@@ -888,17 +939,12 @@ gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                        /*
                         *      give back srxd and buffer. Send NACK to sender
                         */
+                        PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
                        return(PTL_FAIL);
                }
                GMNAL_GM_UNLOCK(nal_data);
                riov++;
        }
-       /*
-        *      do this so the final gm_get callback can deregister the memory
-        */
-       PORTAL_ALLOC(srxd->riov, nriov_dup*(sizeof(struct iovec)));
-       gm_bcopy(riov_dup, srxd->riov, nriov_dup*(sizeof(struct iovec)));
-       srxd->nriov = nriov_dup;
 
        /*
         *      now do gm_get to get the data