Whamcloud - gitweb
b=14300
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
6  *
7  *   This file is part of Lustre, http://www.lustre.org/
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23
24 /*
25  *      This file implements the nal cb functions
26  */
27
28
29 #include "gmlnd.h"
30
31 int
32 gmnal_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
33            int delayed, unsigned int niov, 
34            struct iovec *iov, lnet_kiov_t *kiov,
35            unsigned int offset, unsigned int mlen, unsigned int rlen)
36 {
37         gmnal_ni_t      *gmni = ni->ni_data;
38         gmnal_rx_t      *rx = (gmnal_rx_t*)private;
39         gmnal_msg_t     *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
40         int              npages = rx->rx_islarge ? gmni->gmni_large_pages : 1;
41         int              payload_offset = offsetof(gmnal_msg_t, 
42                                               gmm_u.immediate.gmim_payload[0]);
43         int              nob = payload_offset + mlen;
44
45         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
46         LASSERT (iov == NULL || kiov == NULL);
47         
48         if (rx->rx_recv_nob < nob) {
49                 CERROR("Short message from nid %s: got %d, need %d\n",
50                        libcfs_nid2str(msg->gmm_srcnid), rx->rx_recv_nob, nob);
51                 gmnal_post_rx(gmni, rx);
52                 return -EIO;
53         }
54
55         if (kiov != NULL)
56                 lnet_copy_kiov2kiov(niov, kiov, offset,
57                                     npages, rx->rx_buf.nb_kiov, payload_offset, 
58                                     mlen);
59         else
60                 lnet_copy_kiov2iov(niov, iov, offset,
61                                    npages, rx->rx_buf.nb_kiov, payload_offset,
62                                    mlen);
63
64         lnet_finalize(ni, lntmsg, 0);
65         gmnal_post_rx(gmni, rx);
66         return 0;
67 }
68
69 int
70 gmnal_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
71 {
72         lnet_hdr_t       *hdr= &lntmsg->msg_hdr;
73         int               type = lntmsg->msg_type;
74         lnet_process_id_t target = lntmsg->msg_target;
75         unsigned int      niov = lntmsg->msg_niov;
76         struct iovec     *iov = lntmsg->msg_iov;
77         lnet_kiov_t      *kiov = lntmsg->msg_kiov;
78         unsigned int      offset = lntmsg->msg_offset;
79         unsigned int      len = lntmsg->msg_len;
80         gmnal_ni_t       *gmni = ni->ni_data;
81         gm_status_t       gmrc;
82         gmnal_tx_t       *tx;
83
84         LASSERT (iov == NULL || kiov == NULL);
85
86         /* I may not block for a tx if I'm responding to an incoming message */
87         tx = gmnal_get_tx(gmni);
88         if (tx == NULL) {
89                 if (!gmni->gmni_shutdown)
90                         CERROR ("Can't get tx for msg type %d for %s\n",
91                                 type, libcfs_nid2str(target.nid));
92                 return -EIO;
93         }
94
95         tx->tx_nid = target.nid;
96
97         gmrc = gm_global_id_to_node_id(gmni->gmni_port, LNET_NIDADDR(target.nid),
98                                        &tx->tx_gmlid);
99         if (gmrc != GM_SUCCESS) {
100                 CERROR("Can't map Nid %s to a GM local ID: %d\n", 
101                        libcfs_nid2str(target.nid), gmrc);
102                 /* NB tx_lntmsg not set => doesn't finalize */
103                 gmnal_tx_done(tx, -EIO);
104                 return -EIO;
105         }
106
107         gmnal_pack_msg(gmni, GMNAL_NETBUF_MSG(&tx->tx_buf), 
108                        target.nid, GMNAL_MSG_IMMEDIATE);
109         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_hdr = *hdr;
110         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]);
111
112         if (the_lnet.ln_testprotocompat != 0) {
113                 /* single-shot proto test */
114                 LNET_LOCK();
115                 if ((the_lnet.ln_testprotocompat & 1) != 0) {
116                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_version++;
117                         the_lnet.ln_testprotocompat &= ~1;
118                 }
119                 if ((the_lnet.ln_testprotocompat & 2) != 0) {
120                         GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_magic =
121                                 LNET_PROTO_MAGIC;
122                         the_lnet.ln_testprotocompat &= ~2;
123                 }
124                 LNET_UNLOCK();
125         }
126
127         if (tx->tx_msgnob + len <= gmni->gmni_small_msgsize) {
128                 /* whole message fits in tx_buf */
129                 char *buffer = &(GMNAL_NETBUF_MSG(&tx->tx_buf)->gmm_u.immediate.gmim_payload[0]);
130
131                 if (iov != NULL)
132                         lnet_copy_iov2flat(len, buffer, 0,
133                                            niov, iov, offset, len);
134                 else
135                         lnet_copy_kiov2flat(len, buffer, 0,
136                                             niov, kiov, offset, len);
137                 
138                 tx->tx_msgnob += len;
139                 tx->tx_large_nob = 0;
140         } else {
141                 /* stash payload pts to copy later */
142                 tx->tx_large_nob = len;
143                 tx->tx_large_iskiov = (kiov != NULL);
144                 tx->tx_large_niov = niov;
145                 if (tx->tx_large_iskiov)
146                         tx->tx_large_frags.kiov = kiov;
147                 else
148                         tx->tx_large_frags.iov = iov;
149         }
150
151         LASSERT(tx->tx_lntmsg == NULL);
152         tx->tx_lntmsg = lntmsg;
153         
154         spin_lock(&gmni->gmni_tx_lock);
155
156         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
157         gmnal_check_txqueues_locked(gmni);
158
159         spin_unlock(&gmni->gmni_tx_lock);
160
161         return 0;
162 }