Whamcloud - gitweb
b=6871
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
index 4af7186..206d86b 100644 (file)
@@ -48,6 +48,8 @@ gmnal_ct_thread(void *arg)
        nal_data = (gmnal_data_t*)arg;
        CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
 
+       sprintf(current->comm, "gmnal_ct");
+
        daemonize();
 
        nal_data->ctthread_flag = GMNAL_CTTHREAD_STARTED;
@@ -113,6 +115,7 @@ int gmnal_rx_thread(void *arg)
        gmnal_data_t            *nal_data;
        void                    *buffer;
        gmnal_rxtwe_t           *we = NULL;
+       int                     rank;
 
        if (!arg) {
                CDEBUG(D_TRACE, "NO nal_data. Exiting\n");
@@ -122,6 +125,12 @@ int gmnal_rx_thread(void *arg)
        nal_data = (gmnal_data_t*)arg;
        CDEBUG(D_TRACE, "nal_data is [%p]\n", arg);
 
+       for (rank=0; rank<num_rx_threads; rank++)
+               if (nal_data->rxthread_pid[rank] == current->pid)
+                       break;
+
+       sprintf(current->comm, "gmnal_rx_%d", rank);
+
        daemonize();
        /*
         *      set 1 bit for each thread started
@@ -317,16 +326,12 @@ gmnal_rx_bad(gmnal_data_t *nal_data, gmnal_rxtwe_t *we, gmnal_srxd_t *srxd)
  *     Hang out the receive buffer again for another receive
  *     Call lib_finalize
  */
-int
-gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
+ptl_err_t
+gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie)
 {
        gmnal_srxd_t    *srxd = NULL;
-       void    *buffer = NULL;
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->nal_data;
-
+       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
 
-       CDEBUG(D_TRACE, "niov [%d] mlen["LPSZ"]\n", niov, mlen);
 
        if (!private) {
                CDEBUG(D_ERROR, "gmnal_small_rx no context\n");
@@ -335,21 +340,9 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        }
 
        srxd = (gmnal_srxd_t*)private;
-       buffer = srxd->buffer;
-       buffer += sizeof(gmnal_msghdr_t);
-       buffer += sizeof(ptl_hdr_t);
-
-       while(niov--) {
-               CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov, 
-                      iov->iov_len);
-               gm_bcopy(buffer, iov->iov_base, iov->iov_len);                  
-               buffer += iov->iov_len;
-               iov++;
-       }
-
 
        /*
-        *      let portals library know receive is complete
+        *      let portals library know receive is complete
         */
        CDEBUG(D_PORTALS, "calling lib_finalize\n");
        lib_finalize(libnal, private, cookie, PTL_OK);
@@ -358,8 +351,8 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
         */
        CDEBUG(D_NET, "calling gm_provide_receive_buffer\n");
        GMNAL_GM_LOCK(nal_data);
-       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, 
-                                          srxd->gmsize, GM_LOW_PRIORITY, 0);   
+       gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
+                                          srxd->gmsize, GM_LOW_PRIORITY, 0);
        GMNAL_GM_UNLOCK(nal_data);
 
        return(PTL_OK);
@@ -368,18 +361,16 @@ gmnal_small_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
 
 /*
  *     Start a small transmit. 
- *     Get a send token (and wired transmit buffer).
- *     Copy data from senders buffer to wired buffer and
- *     initiate gm_send from the wired buffer.
+ *     Use the given send token (and wired transmit buffer).
+ *     Copy headers to wired buffer and initiate gm_send from the wired buffer.
  *     The callback function informs when the send is complete.
  */
-int
-gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
-               unsigned int niov, struct iovec *iov, int size)
+ptl_err_t
+gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
+               ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid,
+               gmnal_stxd_t *stxd, int size)
 {
-       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->nal_data;
-       gmnal_stxd_t    *stxd = NULL;
+       gmnal_data_t    *nal_data = (gmnal_data_t*)libnal->libnal_data;
        void            *buffer = NULL;
        gmnal_msghdr_t  *msghdr = NULL;
        int             tot_size = 0;
@@ -387,16 +378,16 @@ gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        gm_status_t     gm_status = GM_SUCCESS;
 
        CDEBUG(D_TRACE, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
-              "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] niov [%d] "
-              "iov [%p] size [%d]\n", libnal, private, cookie, hdr, type, 
-              global_nid, pid, niov, iov, size);
+              "hdr [%p] type [%d] global_nid ["LPU64"] pid [%d] stxd [%p] "
+              "size [%d]\n", libnal, private, cookie, hdr, type,
+              global_nid, pid, stxd, size);
 
        CDEBUG(D_INFO, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
               hdr->dest_nid, hdr->src_nid);
 
        if (!nal_data) {
                CDEBUG(D_ERROR, "no nal_data\n");
-               return(GMNAL_STATUS_FAIL);
+               return(PTL_FAIL);
        } else {
                CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
        }
@@ -407,19 +398,17 @@ gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        GMNAL_GM_UNLOCK(nal_data);
        if (gm_status != GM_SUCCESS) {
                CDEBUG(D_ERROR, "Failed to obtain local id\n");
-               return(GMNAL_STATUS_FAIL);
+               return(PTL_FAIL);
        }
        CDEBUG(D_INFO, "Local Node_id is [%u][%x]\n", local_nid, local_nid);
 
-       stxd = gmnal_get_stxd(nal_data, 1);
-       CDEBUG(D_INFO, "stxd [%p]\n", stxd);
-
        stxd->type = GMNAL_SMALL_MESSAGE;
        stxd->cookie = cookie;
 
        /*
         *      Copy gmnal_msg_hdr and portals header to the transmit buffer
-        *      Then copy the data in
+        *      Then send the message, as the data has previously been copied in
+        *      (HP SFS 1380).
         */
        buffer = stxd->buffer;
        msghdr = (gmnal_msghdr_t*)buffer;
@@ -436,14 +425,6 @@ gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
 
        buffer += sizeof(ptl_hdr_t);
 
-       while(niov--) {
-               CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n", 
-                      iov, iov->iov_len, buffer);
-               gm_bcopy(iov->iov_base, buffer, iov->iov_len);
-               buffer+= iov->iov_len;
-               iov++;
-       }
-
        CDEBUG(D_INFO, "sending\n");
        tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
        stxd->msg_size = tot_size;
@@ -482,23 +463,34 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
        lib_msg_t       *cookie = stxd->cookie;
        gmnal_data_t    *nal_data = (gmnal_data_t*)stxd->nal_data;
        lib_nal_t       *libnal = nal_data->libnal;
+       unsigned         gnid = 0;
+       gm_status_t      gm_status = 0;
 
        if (!stxd) {
                CDEBUG(D_TRACE, "send completion event for unknown stxd\n");
                return;
        }
        if (status != GM_SUCCESS) {
-               CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s]\n", 
-                      stxd, gmnal_gm_error(status));
+               GMNAL_GM_LOCK(nal_data);
+               gm_status = gm_node_id_to_global_id(nal_data->gm_port,
+                                                   stxd->gm_target_node,&gnid);
+               GMNAL_GM_UNLOCK(nal_data);
+               if (gm_status != GM_SUCCESS) {
+                       CDEBUG(D_INFO, "gm_node_id_to_global_id failed[%d]\n",
+                              gm_status);
+                       gnid = 0;
+               }
+               CDEBUG(D_ERROR, "Result of send stxd [%p] is [%s] to [%u]\n",
+                      stxd, gmnal_gm_error(status), gnid);
        }
 
        switch(status) {
-               case(GM_SUCCESS):
+               case(GM_SUCCESS):
                break;
 
 
 
-               case(GM_SEND_DROPPED):
+               case(GM_SEND_DROPPED):
                /*
                 *      do a resend on the dropped ones
                 */
@@ -524,7 +516,7 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
                        CDEBUG(D_INFO, "calling gm_drop_sends\n");
                        GMNAL_GM_LOCK(nal_data);
                        gm_drop_sends(nal_data->gm_port, stxd->gm_priority, 
-                                     stxd->gm_target_node, GMNAL_GM_PORT, 
+                                     stxd->gm_target_node, GMNAL_GM_PORT_ID
                                      gmnal_drop_sends_callback, context);
                        GMNAL_GM_UNLOCK(nal_data);
 
@@ -577,9 +569,8 @@ gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
                case(GM_FIRMWARE_NOT_RUNNING):
                case(GM_YP_NO_MATCH):
                default:
-                       CDEBUG(D_ERROR, "Unknown send error\n");
                 gm_resume_sending(nal_data->gm_port, stxd->gm_priority,
-                                      stxd->gm_target_node, GMNAL_GM_PORT,
+                                      stxd->gm_target_node, GMNAL_GM_PORT_ID,
                                       gmnal_resume_sending_callback, context);
                 return;
 
@@ -635,7 +626,7 @@ void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
                                              stxd->gm_target_node, 
                                              gmnal_small_tx_callback, 
                                              context);
-               GMNAL_GM_LOCK(nal_data);
+               GMNAL_GM_UNLOCK(nal_data);
        } else {
                CDEBUG(D_ERROR, "send_to_peer status for stxd [%p] is "
                       "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
@@ -656,7 +647,7 @@ void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
 int
 gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
                ptl_hdr_t *hdr, int type, ptl_nid_t global_nid, ptl_pid_t pid, 
-               unsigned int niov, struct iovec *iov, int size)
+               unsigned int niov, struct iovec *iov, size_t offset, int size)
 {
 
        gmnal_data_t    *nal_data;
@@ -676,7 +667,7 @@ gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
               global_nid, pid, niov, iov, size);
 
        if (libnal)
-               nal_data = (gmnal_data_t*)libnal->nal_data;
+               nal_data = (gmnal_data_t*)libnal->libnal_data;
        else  {
                CDEBUG(D_ERROR, "no libnal.\n");
                return(GMNAL_STATUS_FAIL);
@@ -707,7 +698,7 @@ gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        msghdr->magic = GMNAL_MAGIC;
        msghdr->type = GMNAL_LARGE_MESSAGE_INIT;
        msghdr->sender_node_id = nal_data->gm_global_nid;
-       msghdr->stxd = stxd;
+       msghdr->stxd_remote_ptr = (gm_remote_ptr_t)stxd;
        msghdr->niov = niov ;
        buffer += sizeof(gmnal_msghdr_t);
        mlen = sizeof(gmnal_msghdr_t);
@@ -721,30 +712,39 @@ gmnal_large_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
        mlen += sizeof(ptl_hdr_t); 
        CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
 
+        while (offset >= iov->iov_len) {
+                offset -= iov->iov_len;
+                niov--;
+                iov++;
+        } 
+
+        LASSERT(offset >= 0);
+        /*
+        *      Store the iovs in the stxd for we can get 
+        *      them later if we need them
+        */
+        stxd->iov[0].iov_base = iov->iov_base + offset; 
+        stxd->iov[0].iov_len = iov->iov_len - offset; 
+       CDEBUG(D_NET, "Copying iov [%p] to [%p], niov=%d\n", iov, stxd->iov, niov);
+        if (niov > 1)
+               gm_bcopy(&iov[1], &stxd->iov[1], (niov-1)*sizeof(struct iovec));
+       stxd->niov = niov;
+
        /*
         *      copy the iov to the buffer so target knows 
         *      where to get the data from
         */
        CDEBUG(D_INFO, "processing iov to [%p]\n", buffer);
-       gm_bcopy(iov, buffer, niov*sizeof(struct iovec));
-       mlen += niov*(sizeof(struct iovec));
+       gm_bcopy(stxd->iov, buffer, stxd->niov*sizeof(struct iovec));
+       mlen += stxd->niov*(sizeof(struct iovec));
        CDEBUG(D_INFO, "mlen is [%d]\n", mlen);
-
-
-       /*
-        *      Store the iovs in the stxd for we can get 
-        *      them later if we need them
-        */
-       CDEBUG(D_NET, "Copying iov [%p] to [%p]\n", iov, stxd->iov);
-       gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
-       stxd->niov = niov;
        
-
        /*
         *      register the memory so the NIC can get hold of the data
         *      This is a slow process. it'd be good to overlap it 
         *      with something else.
         */
+        iov = stxd->iov;
        iov_dup = iov;
        niov_dup = niov;
        while(niov--) {
@@ -821,10 +821,10 @@ gmnal_large_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
  */
 int
 gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
-               unsigned int nriov, struct iovec *riov, size_t mlen
-               size_t rlen)
+               unsigned int nriov, struct iovec *riov, size_t offset
+               size_t mlen, size_t rlen)
 {
-       gmnal_data_t    *nal_data = libnal->nal_data;
+       gmnal_data_t    *nal_data = libnal->libnal_data;
        gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
        void            *buffer = NULL;
        struct  iovec   *riov_dup;
@@ -852,7 +852,7 @@ gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
         *      The gmnal_large_message_ack needs it to notify the sender
         *      the pull of data is complete
         */
-       srxd->source_stxd = msghdr->stxd;
+       srxd->source_stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
 
        /*
         *      Register the receivers memory
@@ -863,6 +863,25 @@ gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
         *      If the iovecs match, could interleave 
         *      gm_registers and gm_gets for each element
         */
+        while (offset >= riov->iov_len) {
+                offset -= riov->iov_len;
+                riov++;
+                nriov--;
+        } 
+        LASSERT (nriov >= 0);
+        LASSERT (offset >= 0);
+       /*
+        *      do this so the final gm_get callback can deregister the memory
+        */
+       PORTAL_ALLOC(srxd->riov, nriov*(sizeof(struct iovec)));
+
+        srxd->riov[0].iov_base = riov->iov_base + offset;
+        srxd->riov[0].iov_len = riov->iov_len - offset;
+        if (nriov > 1)
+               gm_bcopy(&riov[1], &srxd->riov[1], (nriov-1)*(sizeof(struct iovec)));
+       srxd->nriov = nriov;
+        
+        riov = srxd->riov;
        nriov_dup = nriov;
        riov_dup = riov;
        while(nriov--) {
@@ -888,17 +907,12 @@ gmnal_large_rx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
                        /*
                         *      give back srxd and buffer. Send NACK to sender
                         */
+                        PORTAL_FREE(srxd->riov, nriov_dup*(sizeof(struct iovec)));
                        return(PTL_FAIL);
                }
                GMNAL_GM_UNLOCK(nal_data);
                riov++;
        }
-       /*
-        *      do this so the final gm_get callback can deregister the memory
-        */
-       PORTAL_ALLOC(srxd->riov, nriov_dup*(sizeof(struct iovec)));
-       gm_bcopy(riov_dup, srxd->riov, nriov_dup*(sizeof(struct iovec)));
-       srxd->nriov = nriov_dup;
 
        /*
         *      now do gm_get to get the data
@@ -957,7 +971,7 @@ gmnal_remote_get(gmnal_srxd_t *srxd, int nsiov, struct iovec *siov,
 /*
  *     pull data from source node (source iovec) to a local iovec.
  *     The iovecs may not match which adds the complications below.
- *     Count the number of gm_gets that will be required to the callbacks
+ *     Count the number of gm_gets that will be required so the callbacks
  *     can determine who is the last one.
  */    
 int
@@ -1019,15 +1033,15 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                                 */
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf, 
-                                      rlen, GM_LOW_PRIORITY, source_node, 
-                                      GMNAL_GM_PORT
+                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
+                                      rlen, GM_LOW_PRIORITY, source_node,
+                                      GMNAL_GM_PORT_ID,
                                       gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
                        /*
                         *      at the end of 1 iov element
-                        */
+                        */
                        sbuf+=rlen;
                        slen-=rlen;
                        riov++;
@@ -1043,9 +1057,9 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                                GMNAL_GM_LOCK(nal_data);
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf, 
-                                      slen, GM_LOW_PRIORITY, source_node, 
-                                      GMNAL_GM_PORT
+                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
+                                      slen, GM_LOW_PRIORITY, source_node,
+                                      GMNAL_GM_PORT_ID,
                                       gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
@@ -1066,9 +1080,9 @@ gmnal_copyiov(int do_copy, gmnal_srxd_t *srxd, int nsiov,
                                GMNAL_GM_LOCK(nal_data);
                                sbuf_long = (unsigned long) sbuf;
                                remote_ptr = (gm_remote_ptr_t)sbuf_long;
-                               gm_get(nal_data->gm_port, remote_ptr, rbuf, 
-                                      rlen, GM_LOW_PRIORITY, source_node, 
-                                      GMNAL_GM_PORT
+                               gm_get(nal_data->gm_port, remote_ptr, rbuf,
+                                      rlen, GM_LOW_PRIORITY, source_node,
+                                      GMNAL_GM_PORT_ID,
                                       gmnal_remote_get_callback, ltxd);
                                GMNAL_GM_UNLOCK(nal_data);
                        }
@@ -1221,7 +1235,7 @@ gmnal_large_tx_ack(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
        msghdr->magic = GMNAL_MAGIC;
        msghdr->type = GMNAL_LARGE_MESSAGE_ACK;
        msghdr->sender_node_id = nal_data->gm_global_nid;
-       msghdr->stxd = srxd->source_stxd;
+       msghdr->stxd_remote_ptr = (gm_remote_ptr_t)srxd->source_stxd;
        CDEBUG(D_INFO, "processing msghdr at [%p]\n", buffer);
 
        CDEBUG(D_INFO, "sending\n");
@@ -1296,7 +1310,7 @@ gmnal_large_tx_ack_received(gmnal_data_t *nal_data, gmnal_srxd_t *srxd)
 
        buffer = srxd->buffer;
        msghdr = (gmnal_msghdr_t*)buffer;
-       stxd = msghdr->stxd;
+       stxd = (gmnal_stxd_t*)msghdr->stxd_remote_ptr;
 
        CDEBUG(D_INFO, "gmnal_large_tx_ack_received stxd [%p]\n", stxd);