Whamcloud - gitweb
add macro LCONSOLE_ERROR_MSG with extra parameter and map
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22
23 /*
24  *      This file implements the nal cb functions
25  */
26
27
28 #include "gmlnd.h"
29
30 int
31 gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
32            int delayed, unsigned int niov, 
33            struct iovec *iov, lnet_kiov_t *kiov,
34            unsigned int offset, unsigned int mlen, unsigned int rlen)
35 {
36         gmnal_ni_t      *gmni = ni->ni_data;
37         gmnal_rx_t      *rx = (gmnal_rx_t*)private;
38         gmnal_msg_t     *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
39         int              npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
40         int              payload_offset = offsetof(gmnal_msg_t, 
41                                               gmm_u.immediate.gmim_payload[0]);
42         int              nob = payload_offset + mlen;
43
44         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
45         LASSERT (iov == NULL || kiov == NULL);
46         
47         if (rx->rx_recv_nob < nob) {
48                 CERROR("Short message from nid %s: got %d, need %d\n",
49                        libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
50                 gmnal_post_rx(gmni, rx);
51                 return -EIO;
52         }
53
54         if (kiov != NULL)
55                 lnet_copy_kiov2kiov(niov, kiov, offset,
56                                     npages, rx->rx_buf.nb_kiov, payload_offset, 
57                                     mlen);
58         else
59                 lnet_copy_kiov2iov(niov, iov, offset,
60                                    npages, rx->rx_buf.nb_kiov, payload_offset,
61                                    mlen);
62
63         lnet_finalize(ni, lntmsg, 0);
64         gmnal_post_rx(gmni, rx);
65         return 0;
66 }
67
68 int
69 gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
70 {
71         lnet_hdr_t       *hdr= &lntmsg->msg_hdr;
72         int               type = lntmsg->msg_type;
73         lnet_process_id_t target = lntmsg->msg_target;
74         unsigned int      niov = lntmsg->msg_niov;
75         struct iovec     *iov = lntmsg->msg_iov;
76         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
77         unsigned int      offset = lntmsg->msg_offset;
78         unsigned int      len = lntmsg->msg_len;
79         gmnal_ni_t       *gmni = ni->ni_data;
80         gm_status_t       gmrc;
81         gmnal_tx_t       *tx;
82
83         LASSERT (iov == NULL || kiov == NULL);
84
85         /* I may not block for a tx if I'm responding to an incoming message */
86         tx = gmnal_get_tx(gmni);
87         if (tx == NULL) {
88                 if (!gmni->gmni_shutdown)
89                         CERROR ("Can't get tx for msg type %d for %s\n",
90                                 type, libcfs_nid2str(target.nid));
91                 return -EIO;
92         }
93
94         tx->tx_nid = target.nid;
95
96         gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
97                                        &tx->tx_gmlid);
98         if (gmrc != GM_SUCCESS) {
99                 CERROR("Can't map Nid %s to a GM local ID: %d\n", 
100                        libcfs_nid2str(target.nid), gmrc);
101                 /* NB tx_lntmsg not set => doesn't finalize */
102                 gmnal_tx_done(tx, -EIO);
103                 return -EIO;
104         }
105
106         gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), 
107                        target.nid, GMNAL_MSG_IMMEDIATE);
108         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
109         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
110
111         if (the_lnet.ln_testprotocompat != 0) {
112                 /* single-shot proto test */
113                 LNET_LOCK();
114                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
115                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
116                         the_lnet.ln_testprotocompat &= ~1;
117                 }
118                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
119                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
120                                 LNET_PROTO_MAGIC;
121                         the_lnet.ln_testprotocompat &= ~2;
122                 }
123                 LNET_UNLOCK();
124         }
125
126         if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
127                 /* whole message fits in tx_buf */
128                 char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
129
130                 if (iov != NULL)
131                         lnet_copy_iov2flat(len, buffer, 0,
132                                            niov, iov, offset, len);
133                 else
134                         lnet_copy_kiov2flat(len, buffer, 0,
135                                             niov, kiov, offset, len);
136                 
137                 tx->tx_msgnob += len;
138                 tx->tx_large_nob = 0;
139         } else {
140                 /* stash payload pts to copy later */
141                 tx->tx_large_nob = len;
142                 tx->tx_large_iskiov = (kiov != NULL);
143                 tx->tx_large_niov = niov;
144                 if (tx->tx_large_iskiov)
145                         tx->tx_large_frags.kiov = kiov;
146                 else
147                         tx->tx_large_frags.iov = iov;
148         }
149
150         LASSERT(tx->tx_lntmsg == NULL);
151         tx->tx_lntmsg = lntmsg;
152         
153         spin_lock(&gmni->gmni_tx_lock);
154
155         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
156         gmnal_check_txqueues_locked(gmni);
157
158         spin_unlock(&gmni->gmni_tx_lock);
159
160         return 0;
161 }