- gmnal_srxd_t *srxd = (gmnal_srxd_t*)private;
- int status = PTL_OK;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
- int i = 0;
- ptl_kiov_t *kiov_dup = kiov;;
-
-
- CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
- "cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
- libnal, private, cookie, kniov, kiov, mlen, rlen);
-
- if (srxd->type == GMNAL_SMALL_MESSAGE) {
- PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
- if (!iovec) {
- CDEBUG(D_ERROR, "Can't malloc\n");
- return(GMNAL_STATUS_FAIL);
- }
- iovec_dup = iovec;
-
- /*
- * map each page and create an iovec for it
- */
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
- iovec->iov_len = kiov->kiov_len;
- CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
-
- iovec->iov_base = kmap(kiov->kiov_page) +
- kiov->kiov_offset;
-
- CDEBUG(D_INFO, "iov_base is [%p]\n", iovec->iov_base);
- iovec++;
- kiov++;
- }
- CDEBUG(D_INFO, "calling gmnal_small_rx\n");
- status = gmnal_small_rx(libnal, private, cookie, kniov,
- iovec_dup, mlen, rlen);
- for (i=0; i<kniov; i++) {
- kunmap(kiov_dup->kiov_page);
- kiov_dup++;
- }
- PORTAL_FREE(iovec_dup, sizeof(struct iovec)*kniov);
- }
-
-
- CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
- return(status);
-}
-
-
-int gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int niov, struct iovec *iov, size_t len)
-{
-
- gmnal_data_t *nal_data;
-
-
- CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n",
- niov, len, nid);
- nal_data = libnal->libnal_data;
-
- if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
- CDEBUG(D_INFO, "This is a small message send\n");
- gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
- niov, iov, len);
- } else {
- CDEBUG(D_ERROR, "Large message send it is not supported\n");
- lib_finalize(libnal, private, cookie, PTL_FAIL);
- return(PTL_FAIL);
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
- niov, iov, len);
- }
- return(PTL_OK);
-}
-
-int gmnal_cb_send_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
- ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
- unsigned int kniov, ptl_kiov_t *kiov, size_t len)
-{
-
- int i = 0;
- gmnal_data_t *nal_data;
- struct iovec *iovec = NULL, *iovec_dup = NULL;
- ptl_kiov_t *kiov_dup = kiov;
-
- CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
- nal_data = libnal->libnal_data;
- PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
- iovec_dup = iovec;
- if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
- CDEBUG(D_INFO, "This is a small message send\n");
-
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- iovec->iov_base = kmap(kiov->kiov_page)
- + kiov->kiov_offset;
-
- iovec->iov_len = kiov->kiov_len;
- iovec++;
- kiov++;
- }
- gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
- pid, kniov, iovec_dup, len);
- } else {
- CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
- return(PTL_FAIL);
- for (i=0; i<kniov; i++) {
- CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
- CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
- kiov->kiov_page, kiov->kiov_len,
- kiov->kiov_offset);
-
- iovec->iov_base = kmap(kiov->kiov_page)
- + kiov->kiov_offset;
- iovec->iov_len = kiov->kiov_len;
- iovec++;
- kiov++;
- }
- gmnal_large_tx(libnal, private, cookie, hdr, type, nid,
- pid, kniov, iovec, len);
- }
- for (i=0; i<kniov; i++) {
- kunmap(kiov_dup->kiov_page);
- kiov_dup++;
- }
- PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
- return(PTL_OK);
-}
-
-int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
-{
- CDEBUG(D_TRACE, "gmnal_cb_dist\n");
- if (dist)
- *dist = 27;
- return(PTL_OK);
+ lnet_hdr_t *hdr= &lntmsg->msg_hdr;
+ int type = lntmsg->msg_type;
+ lnet_process_id_t target = lntmsg->msg_target;
+ unsigned int niov = lntmsg->msg_niov;
+ struct iovec *iov = lntmsg->msg_iov;
+ lnet_kiov_t *kiov = lntmsg->msg_kiov;
+ unsigned int offset = lntmsg->msg_offset;
+ unsigned int len = lntmsg->msg_len;
+ gmnal_ni_t *gmni = ni->ni_data;
+ gm_status_t gmrc;
+ gmnal_tx_t *tx;
+
+ LASSERT (iov == NULL || kiov == NULL);
+
+ /* I may not block for a tx if I'm responding to an incoming message */
+ tx = gmnal_get_tx(gmni);
+ if (tx == NULL) {
+ if (!gmni->gmni_shutdown)
+ CERROR ("Can't get tx for msg type %d for %s\n",
+ type, libcfs_nid2str(target.nid));
+ return -EIO;
+ }
+
+ tx->tx_nid = target.nid;
+
+ gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
+ &tx->tx_gmlid);
+ if (gmrc != GM_SUCCESS) {
+ CERROR("Can't map Nid %s to a GM local ID: %d\n",
+ libcfs_nid2str(target.nid), gmrc);
+ /* NB tx_lntmsg not set => doesn't finalize */
+ gmnal_tx_done(tx, -EIO);
+ return -EIO;
+ }
+
+ gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf),
+ target.nid, GMNAL_MSG_IMMEDIATE);
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
+ tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
+
+ if (the_lnet.ln_testprotocompat != 0) {
+ /* single-shot proto test */
+ LNET_LOCK();
+ if ((the_lnet.ln_testprotocompat & 1) != 0) {
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
+ the_lnet.ln_testprotocompat &= ~1;
+ }
+ if ((the_lnet.ln_testprotocompat & 2) != 0) {
+ GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
+ LNET_PROTO_MAGIC;
+ the_lnet.ln_testprotocompat &= ~2;
+ }
+ LNET_UNLOCK();
+ }
+
+ if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
+ /* whole message fits in tx_buf */
+ char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
+
+ if (iov != NULL)
+ lnet_copy_iov2flat(len, buffer, 0,
+ niov, iov, offset, len);
+ else
+ lnet_copy_kiov2flat(len, buffer, 0,
+ niov, kiov, offset, len);
+
+ tx->tx_msgnob += len;
+ tx->tx_large_nob = 0;
+ } else {
+ /* stash payload pts to copy later */
+ tx->tx_large_nob = len;
+ tx->tx_large_iskiov = (kiov != NULL);
+ tx->tx_large_niov = niov;
+ if (tx->tx_large_iskiov)
+ tx->tx_large_frags.kiov = kiov;
+ else
+ tx->tx_large_frags.iov = iov;
+ }
+
+ LASSERT(tx->tx_lntmsg == NULL);
+ tx->tx_lntmsg = lntmsg;
+
+ spin_lock(&gmni->gmni_tx_lock);
+
+ list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
+ gmnal_check_txqueues_locked(gmni);
+
+ spin_unlock(&gmni->gmni_tx_lock);
+
+ return 0;