X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lnet%2Fklnds%2Fgmlnd%2Fgmlnd_cb.c;h=503bedff16430e7b81a86d8fd71a1f539b669d5f;hb=b48ab0632ba0c88326c8d9466760bf56301b3676;hp=a96f6e6aa0bc09b35fce88c41f407ffdd8bf392f;hpb=5e285d95f67dd419778d68ad7e5ad5571c5dda9e;p=fs%2Flustre-release.git diff --git a/lnet/klnds/gmlnd/gmlnd_cb.c b/lnet/klnds/gmlnd/gmlnd_cb.c index a96f6e6..503bedf 100644 --- a/lnet/klnds/gmlnd/gmlnd_cb.c +++ b/lnet/klnds/gmlnd/gmlnd_cb.c @@ -25,287 +25,137 @@ */ -#include "gmnal.h" +#include "gmlnd.h" -ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - unsigned int niov, struct iovec *iov, size_t offset, - size_t mlen, size_t rlen) +int +gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, + int delayed, unsigned int niov, + struct iovec *iov, lnet_kiov_t *kiov, + unsigned int offset, unsigned int mlen, unsigned int rlen) { - void *buffer = NULL; - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - int status = PTL_OK; - size_t msglen = mlen; - size_t nob; - - CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], " - "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, niov, iov, offset, mlen, rlen); - - switch(srxd->type) { - case(GMNAL_SMALL_MESSAGE): - CDEBUG(D_INFO, "gmnal_cb_recv got small message\n"); - /* HP SFS 1380: Proactively change receives to avoid a receive - * side occurrence of filling pkmap_count[]. - */ - buffer = srxd->buffer; - buffer += GMNAL_MSGHDR_SIZE; - buffer += sizeof(ptl_hdr_t); - - while(niov--) { - if (offset >= iov->iov_len) { - offset -= iov->iov_len; - } else { - nob = MIN (iov->iov_len - offset, msglen); - CDEBUG(D_INFO, "processing iov [%p] base [%p] " - "offset [%d] len ["LPSZ"] to [%p] left " - "["LPSZ"]\n", iov, iov->iov_base, - offset, nob, buffer, msglen); - gm_bcopy(buffer, iov->iov_base + offset, nob); - buffer += nob; - msglen -= nob; - offset = 0; - } - iov++; - } - status = gmnal_small_rx(libnal, private, cookie); - break; - case(GMNAL_LARGE_MESSAGE_INIT): - CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n"); - status = gmnal_large_rx(libnal, private, cookie, niov, - iov, offset, mlen, rlen); - } - - CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status); - return(status); -} - -ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, - lib_msg_t *cookie, unsigned int kniov, - ptl_kiov_t *kiov, size_t offset, size_t mlen, - size_t rlen) -{ - gmnal_srxd_t *srxd = (gmnal_srxd_t*)private; - int status = PTL_OK; - char *ptr = NULL; - void *buffer = NULL; - - - CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], " - "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", - libnal, private, cookie, kniov, kiov, offset, mlen, rlen); - - if (srxd->type == GMNAL_SMALL_MESSAGE) { - size_t msglen = mlen; - size_t nob; - - buffer = srxd->buffer; - buffer += GMNAL_MSGHDR_SIZE; - buffer += sizeof(ptl_hdr_t); - - /* - * map each page and create an iovec for it - */ - while (kniov--) { - /* HP SFS 1380: Proactively change receives to avoid a - * receive side occurrence of filling pkmap_count[]. - */ - CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", - kniov, kiov); - - if (offset >= kiov->kiov_len) { - offset -= kiov->kiov_len; - } else { - nob = MIN (kiov->kiov_len - offset, msglen); - CDEBUG(D_INFO, "kniov page [%p] len [%d] " - "offset[%d]\n", kiov->kiov_page, - kiov->kiov_len, kiov->kiov_offset); - ptr = ((char *)kmap(kiov->kiov_page)) + - kiov->kiov_offset; - - CDEBUG(D_INFO, "processing ptr [%p] offset [%d] " - "len ["LPSZ"] from [%p] left ["LPSZ"]\n", - ptr, offset, nob, buffer, msglen); - gm_bcopy(buffer, ptr + offset, nob); - kunmap(kiov->kiov_page); - buffer += nob; - msglen -= nob; - offset = 0; - } - kiov++; - } - CDEBUG(D_INFO, "calling gmnal_small_rx\n"); - status = gmnal_small_rx(libnal, private, cookie); - } - - CDEBUG(D_INFO, "gmnal_return status [%d]\n", status); - return(status); -} - - -ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, - ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, - unsigned int niov, struct iovec *iov, size_t offset, - size_t len) -{ - - gmnal_data_t *nal_data; - void *buffer = NULL; - gmnal_stxd_t *stxd = NULL; - - - CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ - "] nid["LPU64"]\n", niov, offset, len, nid); - nal_data = libnal->libnal_data; - if (!nal_data) { - CERROR("no nal_data\n"); - return(PTL_FAIL); - } else { - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - } - - if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) { - size_t msglen = len; - size_t nob; - - CDEBUG(D_INFO, "This is a small message send\n"); - /* - * HP SFS 1380: With the change to gmnal_small_tx, need to get - * the stxd and do relevant setup here - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->buffer + GMNAL_MSGHDR_SIZE + sizeof(ptl_hdr_t); - while(niov--) { - if (offset >= iov->iov_len) { - offset -= iov->iov_len; - } else { - nob = MIN (iov->iov_len - offset, msglen); - CDEBUG(D_INFO, "processing iov [%p] base [%p]" - " offset [%d] len ["LPSZ"] to [%p] left" - " ["LPSZ"]\n", iov, iov->iov_base, - offset, nob, buffer, msglen); - gm_bcopy(iov->iov_base + offset, buffer, nob); - buffer += nob; - msglen -= nob; - offset = 0; - } - iov++; - } - gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid, - stxd, len); - } else { - CERROR("Large message send is not supported\n"); - lib_finalize(libnal, private, cookie, PTL_FAIL); - return(PTL_FAIL); - gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid, - niov, iov, offset, len); - } - return(PTL_OK); -} - -ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, - lib_msg_t *cookie, ptl_hdr_t *hdr, int type, - ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov, - ptl_kiov_t *kiov, size_t offset, size_t len) -{ - - gmnal_data_t *nal_data; - char *ptr; - void *buffer = NULL; - gmnal_stxd_t *stxd = NULL; - ptl_err_t status = PTL_OK; - - CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset[" - LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len); - nal_data = libnal->libnal_data; - if (!nal_data) { - CERROR("no nal_data\n"); - return(PTL_FAIL); - } else { - CDEBUG(D_INFO, "nal_data [%p]\n", nal_data); - } - - /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap - * more aggressively. This is the fix for a livelock situation under - * load on ia32 that occurs when there are no more available entries in - * the pkmap_count array. Just fill the buffer and let gmnal_small_tx - * put the headers in after we pass it the stxd pointer. - */ - stxd = gmnal_get_stxd(nal_data, 1); - CDEBUG(D_INFO, "stxd [%p]\n", stxd); - /* Set the offset of the data to copy into the buffer */ - buffer = stxd->buffer + GMNAL_MSGHDR_SIZE + sizeof(ptl_hdr_t); - - if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) { - size_t msglen = len; - size_t nob; - - CDEBUG(D_INFO, "This is a small message send\n"); - - while(kniov--) { - CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov); - if (offset >= kiov->kiov_len) { - offset -= kiov->kiov_len; - } else { - nob = MIN (kiov->kiov_len - offset, msglen); - CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n", - kiov->kiov_page, kiov->kiov_len, - kiov->kiov_offset); - - ptr = ((char *)kmap(kiov->kiov_page)) + - kiov->kiov_offset; - - CDEBUG(D_INFO, "processing ptr [%p] offset [%d]" - " len ["LPSZ"] to [%p] left ["LPSZ"]\n", - ptr, offset, nob, buffer, msglen); - gm_bcopy(ptr + offset, buffer, nob); - kunmap(kiov->kiov_page); - buffer += nob; - msglen -= nob; - offset = 0; - } - kiov++; - } - status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid, - pid, stxd, len); - } else { - int i = 0; - struct iovec *iovec = NULL, *iovec_dup = NULL; - ptl_kiov_t *kiov_dup = kiov; - - PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec)); - iovec_dup = iovec; - CERROR("Large message send it is not supported yet\n"); - PORTAL_FREE(iovec, kniov*sizeof(struct iovec)); - return(PTL_FAIL); - for (i=0; ikiov_page, kiov->kiov_len, - kiov->kiov_offset); - - iovec->iov_base = kmap(kiov->kiov_page) - + kiov->kiov_offset; - iovec->iov_len = kiov->kiov_len; - iovec++; - kiov++; - } - gmnal_large_tx(libnal, private, cookie, hdr, type, nid, - pid, kniov, iovec, offset, len); - for (i=0; ikiov_page); - kiov_dup++; - } - PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec)); - } - return(status); + gmnal_ni_t *gmni = ni->ni_data; + gmnal_rx_t *rx = (gmnal_rx_t*)private; + gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf); + int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1; + int payload_offset = offsetof(gmnal_msg_t, + gmm_u.immediate.gmim_payload[0]); + int nob = payload_offset + mlen; + + LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE); + LASSERT (iov == NULL || kiov == NULL); + + if (rx->rx_recv_nob < nob) { + CERROR("Short message from nid %s: got %d, need %d\n", + libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob); + gmnal_post_rx(gmni, rx); + return -EIO; + } + + if (kiov != NULL) + lnet_copy_kiov2kiov(niov, kiov, offset, + npages, rx->rx_buf.nb_kiov, payload_offset, + mlen); + else + lnet_copy_kiov2iov(niov, iov, offset, + npages, rx->rx_buf.nb_kiov, payload_offset, + mlen); + + lnet_finalize(ni, lntmsg, 0); + gmnal_post_rx(gmni, rx); + return 0; } -int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist) +int +gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg) { - CDEBUG(D_TRACE, "gmnal_cb_dist\n"); - if (dist) - *dist = 27; - return(PTL_OK); + lnet_hdr_t *hdr= &lntmsg->msg_hdr; + int type = lntmsg->msg_type; + lnet_process_id_t target = lntmsg->msg_target; + unsigned int niov = lntmsg->msg_niov; + struct iovec *iov = lntmsg->msg_iov; + lnet_kiov_t *kiov = lntmsg->msg_kiov; + unsigned int offset = lntmsg->msg_offset; + unsigned int len = lntmsg->msg_len; + gmnal_ni_t *gmni = ni->ni_data; + gm_status_t gmrc; + gmnal_tx_t *tx; + + LASSERT (iov == NULL || kiov == NULL); + + /* I may not block for a tx if I'm responding to an incoming message */ + tx = gmnal_get_tx(gmni); + if (tx == NULL) { + if (!gmni->gmni_shutdown) + CERROR ("Can't get tx for msg type %d for %s\n", + type, libcfs_nid2str(target.nid)); + return -EIO; + } + + tx->tx_nid = target.nid; + + gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid), + &tx->tx_gmlid); + if (gmrc != GM_SUCCESS) { + CERROR("Can't map Nid %s to a GM local ID: %d\n", + libcfs_nid2str(target.nid), gmrc); + /* NB tx_lntmsg not set => doesn't finalize */ + gmnal_tx_done(tx, -EIO); + return -EIO; + } + + gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), + target.nid, GMNAL_MSG_IMMEDIATE); + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr; + tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]); + + if (the_lnet.ln_testprotocompat != 0) { + /* single-shot proto test */ + LNET_LOCK(); + if ((the_lnet.ln_testprotocompat & 1) != 0) { + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++; + the_lnet.ln_testprotocompat &= ~1; + } + if ((the_lnet.ln_testprotocompat & 2) != 0) { + GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic = + LNET_PROTO_MAGIC; + the_lnet.ln_testprotocompat &= ~2; + } + LNET_UNLOCK(); + } + + if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) { + /* whole message fits in tx_buf */ + char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]); + + if (iov != NULL) + lnet_copy_iov2flat(len, buffer, 0, + niov, iov, offset, len); + else + lnet_copy_kiov2flat(len, buffer, 0, + niov, kiov, offset, len); + + tx->tx_msgnob += len; + tx->tx_large_nob = 0; + } else { + /* stash payload pts to copy later */ + tx->tx_large_nob = len; + tx->tx_large_iskiov = (kiov != NULL); + tx->tx_large_niov = niov; + if (tx->tx_large_iskiov) + tx->tx_large_frags.kiov = kiov; + else + tx->tx_large_frags.iov = iov; + } + + LASSERT(tx->tx_lntmsg == NULL); + tx->tx_lntmsg = lntmsg; + + spin_lock(&gmni->gmni_tx_lock); + + list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq); + gmnal_check_txqueues_locked(gmni); + + spin_unlock(&gmni->gmni_tx_lock); + + return 0; }