1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5 * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
7 * This file is part of Lustre, http://www.lustre.org/
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 * This file implements the nal cb functions
32 gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
33 int delayed, unsigned int niov,
34 struct iovec *iov, lnet_kiov_t *kiov,
35 unsigned int offset, unsigned int mlen, unsigned int rlen)
37 gmnal_ni_t *gmni = ni->ni_data;
38 gmnal_rx_t *rx = (gmnal_rx_t*)private;
39 gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
40 int npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
41 int payload_offset = offsetof(gmnal_msg_t,
42 gmm_u.immediate.gmim_payload[0]);
43 int nob = payload_offset + mlen;
45 LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
46 LASSERT (iov == NULL || kiov == NULL);
48 if (rx->rx_recv_nob < nob) {
49 CERROR("Short message from nid %s: got %d, need %d\n",
50 libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
51 gmnal_post_rx(gmni, rx);
56 lnet_copy_kiov2kiov(niov, kiov, offset,
57 npages, rx->rx_buf.nb_kiov, payload_offset,
60 lnet_copy_kiov2iov(niov, iov, offset,
61 npages, rx->rx_buf.nb_kiov, payload_offset,
64 lnet_finalize(ni, lntmsg, 0);
65 gmnal_post_rx(gmni, rx);
70 gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
72 lnet_hdr_t *hdr= &lntmsg->msg_hdr;
73 int type = lntmsg->msg_type;
74 lnet_process_id_t target = lntmsg->msg_target;
75 unsigned int niov = lntmsg->msg_niov;
76 struct iovec *iov = lntmsg->msg_iov;
77 lnet_kiov_t *kiov = lntmsg->msg_kiov;
78 unsigned int offset = lntmsg->msg_offset;
79 unsigned int len = lntmsg->msg_len;
80 gmnal_ni_t *gmni = ni->ni_data;
84 LASSERT (iov == NULL || kiov == NULL);
86 /* I may not block for a tx if I'm responding to an incoming message */
87 tx = gmnal_get_tx(gmni);
89 if (!gmni->gmni_shutdown)
90 CERROR ("Can't get tx for msg type %d for %s\n",
91 type, libcfs_nid2str(target.nid));
95 tx->tx_nid = target.nid;
97 gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
99 if (gmrc != GM_SUCCESS) {
100 CERROR("Can't map Nid %s to a GM local ID: %d\n",
101 libcfs_nid2str(target.nid), gmrc);
102 /* NB tx_lntmsg not set => doesn't finalize */
103 gmnal_tx_done(tx, -EIO);
107 gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf),
108 target.nid, GMNAL_MSG_IMMEDIATE);
109 GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
110 tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
112 if (the_lnet.ln_testprotocompat != 0) {
113 /* single-shot proto test */
115 if ((the_lnet.ln_testprotocompat & 1) != 0) {
116 GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
117 the_lnet.ln_testprotocompat &= ~1;
119 if ((the_lnet.ln_testprotocompat & 2) != 0) {
120 GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
122 the_lnet.ln_testprotocompat &= ~2;
127 if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
128 /* whole message fits in tx_buf */
129 char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
132 lnet_copy_iov2flat(len, buffer, 0,
133 niov, iov, offset, len);
135 lnet_copy_kiov2flat(len, buffer, 0,
136 niov, kiov, offset, len);
138 tx->tx_msgnob += len;
139 tx->tx_large_nob = 0;
141 /* stash payload pts to copy later */
142 tx->tx_large_nob = len;
143 tx->tx_large_iskiov = (kiov != NULL);
144 tx->tx_large_niov = niov;
145 if (tx->tx_large_iskiov)
146 tx->tx_large_frags.kiov = kiov;
148 tx->tx_large_frags.iov = iov;
151 LASSERT(tx->tx_lntmsg == NULL);
152 tx->tx_lntmsg = lntmsg;
154 spin_lock(&gmni->gmni_tx_lock);
156 list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
157 gmnal_check_txqueues_locked(gmni);
159 spin_unlock(&gmni->gmni_tx_lock);