1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
6 * This file is part of Lustre, http://www.lustre.org/
8 * Lustre is free software; you can redistribute it and/or
9 * modify it under the terms of version 2 of the GNU General Public
10 * License as published by the Free Software Foundation.
12 * Lustre is distributed in the hope that it will be useful,
13 * but WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15 * GNU General Public License for more details.
17 * You should have received a copy of the GNU General Public License
18 * along with Lustre; if not, write to the Free Software
19 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24 * This file implements the nal cb functions
31 gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
32 int delayed, unsigned int niov,
33 struct iovec *iov, lnet_kiov_t *kiov,
34 unsigned int offset, unsigned int mlen, unsigned int rlen)
36 gmnal_ni_t *gmni = ni->ni_data;
37 gmnal_rx_t *rx = (gmnal_rx_t*)private;
38 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
39 int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
40 int payload_offset = offsetof(gmnal_msg_t,
41 gmm_u.immediate.gmim_payload[0]);
42 int nob = payload_offset + mlen;
44 LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
45 LASSERT (iov == NULL || kiov == NULL);
47 if (rx->rx_recv_nob < nob) {
48 CERROR("Short message from nid %s: got %d, need %d\n",
49 libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
50 gmnal_post_rx(gmni, rx);
55 lnet_copy_kiov2kiov(niov, kiov, offset,
56 npages, rx->rx_buf.nb_kiov, payload_offset,
59 lnet_copy_kiov2iov(niov, iov, offset,
60 npages, rx->rx_buf.nb_kiov, payload_offset,
63 lnet_finalize(ni, lntmsg, 0);
64 gmnal_post_rx(gmni, rx);
69 gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
71 lnet_hdr_t *hdr= &lntmsg->msg_hdr;
72 int type = lntmsg->msg_type;
73 lnet_process_id_t target = lntmsg->msg_target;
74 unsigned int niov = lntmsg->msg_niov;
75 struct iovec *iov = lntmsg->msg_iov;
76 lnet_kiov_t *kiov = lntmsg->msg_kiov;
77 unsigned int offset = lntmsg->msg_offset;
78 unsigned int len = lntmsg->msg_len;
79 gmnal_ni_t *gmni = ni->ni_data;
83 LASSERT (iov == NULL || kiov == NULL);
85 /* I may not block for a tx if I'm responding to an incoming message */
86 tx = gmnal_get_tx(gmni);
88 if (!gmni->gmni_shutdown)
89 CERROR ("Can't get tx for msg type %d for %s\n",
90 type, libcfs_nid2str(target.nid));
94 tx->tx_nid = target.nid;
96 gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
98 if (gmrc != GM_SUCCESS) {
99 CERROR("Can't map Nid %s to a GM local ID: %d\n",
100 libcfs_nid2str(target.nid), gmrc);
101 /* NB tx_lntmsg not set => doesn't finalize */
102 gmnal_tx_done(tx, -EIO);
106 gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf),
107 target.nid, GMNAL_MSG_IMMEDIATE);
108 GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
109 tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
111 if (the_lnet.ln_testprotocompat != 0) {
112 /* single-shot proto test */
114 if ((the_lnet.ln_testprotocompat & 1) != 0) {
115 GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
116 the_lnet.ln_testprotocompat &= ~1;
118 if ((the_lnet.ln_testprotocompat & 2) != 0) {
119 GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
121 the_lnet.ln_testprotocompat &= ~2;
126 if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
127 /* whole message fits in tx_buf */
128 char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
131 lnet_copy_iov2flat(len, buffer, 0,
132 niov, iov, offset, len);
134 lnet_copy_kiov2flat(len, buffer, 0,
135 niov, kiov, offset, len);
137 tx->tx_msgnob += len;
138 tx->tx_large_nob = 0;
140 /* stash payload pts to copy later */
141 tx->tx_large_nob = len;
142 tx->tx_large_iskiov = (kiov != NULL);
143 tx->tx_large_niov = niov;
144 if (tx->tx_large_iskiov)
145 tx->tx_large_frags.kiov = kiov;
147 tx->tx_large_frags.iov = iov;
150 LASSERT(tx->tx_lntmsg == NULL);
151 tx->tx_lntmsg = lntmsg;
153 spin_lock(&gmni->gmni_tx_lock);
155 list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
156 gmnal_check_txqueues_locked(gmni);
158 spin_unlock(&gmni->gmni_tx_lock);