Whamcloud - gitweb
* simplified gmnal thread startup/shutdown
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22
23 /*
24  *      This file implements the nal cb functions
25  */
26
27
28 #include "gmnal.h"
29
30 ptl_err_t 
31 gmnal_cb_recv(lib_nal_t *libnal, void *private, 
32               lib_msg_t *libmsg,
33               unsigned int niov, struct iovec *iov, 
34               size_t offset, size_t mlen, size_t rlen)
35 {
36         gmnal_rx_t      *rx = (gmnal_rx_t*)private;
37         gmnal_msg_t     *msg = rx->rx_msg;
38         size_t           nobleft = mlen;
39         int              rxnob;
40         char            *buffer;
41         size_t           nob;
42
43         CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], libmsg[%p], "
44                "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
45                libnal, private, libmsg, niov, iov, offset, mlen, rlen);
46
47         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
48         
49         buffer = &msg->gmm_u.immediate.gmim_payload[0];
50         rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
51         
52         if (rx->rx_recv_nob < rxnob) {
53                 CERROR("Short message from nid "LPD64": got %d, need %d\n",
54                        msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
55                 return PTL_FAIL;
56         }
57         
58         while (nobleft > 0) {
59                 LASSERT (niov > 0);
60
61                 if (offset >= iov->iov_len) {
62                         offset -= iov->iov_len;
63                 } else {
64                         nob = MIN (iov->iov_len - offset, nobleft);
65
66                         gm_bcopy(buffer, iov->iov_base + offset, nob);
67
68                         buffer += nob;
69                         nobleft -= nob;
70                         offset = 0;
71                 }
72                 niov--;
73                 iov++;
74         }
75
76         lib_finalize(libnal, private, libmsg, PTL_OK);
77         return PTL_OK;
78 }
79
80 ptl_err_t 
81 gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, 
82                     lib_msg_t *libmsg, 
83                     unsigned int nkiov, ptl_kiov_t *kiov, 
84                     size_t offset, size_t mlen, size_t rlen)
85 {
86         gmnal_rx_t      *rx = (gmnal_rx_t*)private;
87         gmnal_msg_t     *msg = rx->rx_msg;
88         size_t           nobleft = mlen;
89         int              rxnob;
90         size_t           nob;
91         char            *ptr;
92         void            *buffer;
93
94         CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
95                "libmsg[%p], kniov[%d], kiov [%p], "
96                "offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
97                libnal, private, libmsg, nkiov, kiov, offset, mlen, rlen);
98
99         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
100
101         buffer = &msg->gmm_u.immediate.gmim_payload[0];
102         rxnob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[nobleft]);
103
104         if (rx->rx_recv_nob < rxnob) {
105                 CERROR("Short message from nid "LPD64": got %d, need %d\n",
106                        msg->gmm_srcnid, rx->rx_recv_nob, rxnob);
107                 return PTL_FAIL;
108         }
109         
110         while (nobleft > 0) {
111                 LASSERT (nkiov > 0);
112
113                 if (offset >= kiov->kiov_len) {
114                         offset -= kiov->kiov_len;
115                 } else {
116                         nob = MIN (kiov->kiov_len - offset, nobleft);
117
118                         ptr = ((char *)kmap(kiov->kiov_page)) +
119                               kiov->kiov_offset;
120
121                         gm_bcopy(buffer, ptr + offset, nob);
122
123                         kunmap(kiov->kiov_page);
124
125                         buffer += nob;
126                         nobleft -= nob;
127                         offset = 0;
128                 }
129                 kiov++;
130                 nkiov--;
131         }
132
133         lib_finalize(libnal, private, libmsg, PTL_OK);
134         return PTL_OK;
135 }
136
137 ptl_err_t
138 gmnal_cb_send(lib_nal_t *libnal, void *private, 
139               lib_msg_t *libmsg, ptl_hdr_t *hdr, int type, 
140               ptl_nid_t nid, ptl_pid_t pid,
141               unsigned int niov, struct iovec *iov, 
142               size_t offset, size_t len)
143 {
144
145         gmnal_ni_t      *gmnalni = libnal->libnal_data;
146         size_t           nobleft = len;
147         void            *buffer;
148         gmnal_tx_t      *tx;
149         size_t           nob;
150
151         CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] "
152                "len["LPSZ"] nid["LPU64"]\n", niov, offset, len, nid);
153
154         if ((nid >> 32) != 0) {
155                 CERROR("Illegal nid: "LPU64"\n", nid);
156                 return PTL_FAIL;
157         }
158
159         tx = gmnal_get_tx(gmnalni, 1);
160
161         gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
162         gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
163
164         buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
165         while (nobleft > 0) {
166                 LASSERT (niov > 0);
167                 
168                 if (offset >= iov->iov_len) {
169                         offset -= iov->iov_len;
170                 } else {
171                         nob = MIN (iov->iov_len - offset, nobleft);
172
173                         gm_bcopy(iov->iov_base + offset, buffer, nob);
174
175                         buffer += nob;
176                         nobleft -= nob;
177                         offset = 0;
178                 }
179                 niov--;
180                 iov++;
181         }
182         
183         nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
184         return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
185 }
186
187 ptl_err_t
188 gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
189                     lib_msg_t *libmsg, ptl_hdr_t *hdr, int type,
190                     ptl_nid_t nid, ptl_pid_t pid, 
191                     unsigned int nkiov, ptl_kiov_t *kiov, 
192                     size_t offset, size_t len)
193 {
194
195         gmnal_ni_t      *gmnalni = libnal->libnal_data;
196         size_t           nobleft = len;
197         void            *buffer;
198         gmnal_tx_t      *tx;
199         char            *ptr;
200         size_t           nob;
201
202         CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
203                LPSZ"] len["LPSZ"]\n", nid, nkiov, offset, len);
204
205         if ((nid >> 32) != 0) {
206                 CERROR("Illegal nid: "LPU64"\n", nid);
207                 return PTL_FAIL;
208         }
209
210         tx = gmnal_get_tx(gmnalni, 1);
211
212         gmnal_pack_msg(gmnalni, tx, nid, GMNAL_MSG_IMMEDIATE);
213         gm_bcopy(hdr, &tx->tx_msg->gmm_u.immediate.gmim_hdr, sizeof(*hdr));
214
215         buffer = &tx->tx_msg->gmm_u.immediate.gmim_payload[0];
216         while (nobleft > 0) {
217                 LASSERT (nkiov > 0);
218
219                 if (offset >= kiov->kiov_len) {
220                         offset -= kiov->kiov_len;
221                 } else {
222                         nob = MIN (kiov->kiov_len - offset, nobleft);
223
224                         ptr = ((char *)kmap(kiov->kiov_page)) +
225                               kiov->kiov_offset;
226
227                         gm_bcopy(ptr + offset, buffer, nob);
228
229                         kunmap(kiov->kiov_page);
230
231                         buffer += nob;
232                         nobleft -= nob;
233                         offset = 0;
234                 }
235                 nkiov--;
236                 kiov++;
237         }
238
239         nob = offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[len]);
240         return gmnal_post_tx(gmnalni, tx, libmsg, nid, nob);
241 }
242
243 int
244 gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
245 {
246         CDEBUG(D_TRACE, "gmnal_cb_dist\n");
247
248         if (dist != NULL)
249                 *dist = 1;
250
251         return PTL_OK;
252 }