Whamcloud - gitweb
e19995c538fda845cff964770c096adc3d60b564
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22
23 /*
24  *      This file implements the nal cb functions
25  */
26
27
28 #include "gmnal.h"
29
30 ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
31                    unsigned int niov, struct iovec *iov, size_t offset, 
32                    size_t mlen, size_t rlen)
33 {
34    void            *buffer = NULL;
35         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
36         int             status = PTL_OK;
37
38         CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
39                "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n", 
40                libnal, private, cookie, niov, iov, offset, mlen, rlen);
41
42         switch(srxd->type) {
43         case(GMNAL_SMALL_MESSAGE):
44                 CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
45                 /* HP SFS 1380: Proactively change receives to avoid a receive
46                  *  side occurrence of filling pkmap_count[].
47                  */
48                 buffer = srxd->buffer; 
49                 buffer += sizeof(gmnal_msghdr_t);
50                 buffer += sizeof(ptl_hdr_t);
51
52                 while(niov--) { 
53                         if (offset >= iov->iov_len) {
54                                 offset -= iov->iov_len;
55                         } else if (offset > 0) {
56                                 CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
57                                        "offset %d, len ["LPSZ"]\n", iov,
58                                 iov->iov_base + offset, iov->iov_len, offset,
59                                 iov->iov_len - offset);
60                                 gm_bcopy(buffer, iov->iov_base + offset,
61                                          iov->iov_len - offset);
62                                 buffer += iov->iov_len - offset;
63                                 offset = 0;
64                         } else {
65                                 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", iov,
66                                        iov->iov_len);
67                                 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
68                                 buffer += iov->iov_len;
69                         }
70                         iov++;
71                 }
72                 status = gmnal_small_rx(libnal, private, cookie);
73         break;
74         case(GMNAL_LARGE_MESSAGE_INIT):
75                 CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
76                 status = gmnal_large_rx(libnal, private, cookie, niov, 
77                                          iov, offset, mlen, rlen);
78         }
79                 
80
81         CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
82         return(status);
83 }
84
85 ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
86                          unsigned int kniov, ptl_kiov_t *kiov, size_t offset, 
87                          size_t mlen, size_t rlen)
88 {
89         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
90         int             status = PTL_OK;
91         char            *ptr = NULL;
92         void            *buffer = NULL;
93
94
95         CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
96                "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
97                libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
98
99         if (srxd->type == GMNAL_SMALL_MESSAGE) {
100                 buffer = srxd->buffer; 
101                 buffer += sizeof(gmnal_msghdr_t);
102                 buffer += sizeof(ptl_hdr_t);
103
104                 /*
105                  *      map each page and create an iovec for it
106                  */
107                 while (kniov--) {
108                         /* HP SFS 1380: Proactively change receives to avoid a receive
109                          *  side occurrence of filling pkmap_count[].
110                          */
111                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
112
113                         if (offset >= kiov->kiov_len) {
114                                 offset -= kiov->kiov_len;
115                         } else {
116                                 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
117                                        kiov->kiov_page, kiov->kiov_len, 
118                                        kiov->kiov_offset);
119                                 CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
120                                 ptr = ((char *)kmap(kiov->kiov_page)) + kiov->kiov_offset;
121
122                                 if (offset > 0) {
123                                         CDEBUG(D_INFO, "processing [%p] base [%p] len %d, "
124                                                "offset %d, len ["LPSZ"]\n", ptr,
125                                                ptr + offset, kiov->kiov_len, offset,
126                                                kiov->kiov_len - offset);
127                                         gm_bcopy(buffer, ptr + offset,
128                                                kiov->kiov_len - offset);
129                                         buffer += kiov->kiov_len - offset;
130                                         offset = 0;
131                                 } else {
132                                         CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n", ptr,
133                                                kiov->kiov_len);
134                                         gm_bcopy(buffer, ptr, kiov->kiov_len);
135                                         buffer += kiov->kiov_len;
136                                 }
137                                 kunmap(kiov->kiov_page);
138                                 CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
139                          }
140                         kiov++;
141                 }
142                 CDEBUG(D_INFO, "calling gmnal_small_rx\n");
143                 status = gmnal_small_rx(libnal, private, cookie);
144         }
145                 
146
147         CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
148         return(status);
149 }
150
151
152 ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
153                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
154                    unsigned int niov, struct iovec *iov, size_t offset, size_t len)
155 {
156
157         gmnal_data_t    *nal_data;
158         void            *buffer = NULL;
159         gmnal_stxd_t    *stxd = NULL;
160
161
162         CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ"] nid["LPU64"]\n", 
163                niov, offset, len, nid);
164         nal_data = libnal->libnal_data;
165         if (!nal_data) {
166                 CDEBUG(D_ERROR, "no nal_data\n");
167                 return(PTL_FAIL);
168         } else {
169                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
170         }
171         
172         if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
173                 CDEBUG(D_INFO, "This is a small message send\n");
174                 /*
175                  * HP SFS 1380: With the change to gmnal_small_tx, need to get the stxd
176                  * and do relevant setup here
177                  */
178                 stxd = gmnal_get_stxd(nal_data, 1);
179                 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
180                 /* Set the offset of the data to copy into the buffer */
181                 buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
182                 while(niov--) {
183                         if (offset >= iov->iov_len) {
184                                 offset -= iov->iov_len;
185                         } else if (offset > 0) {
186                                 CDEBUG(D_INFO, "processing iov [%p] base [%p] len ["LPSZ"] to [%p]\n",
187                                        iov, iov->iov_base + offset, iov->iov_len - offset, buffer);
188                                 gm_bcopy(iov->iov_base + offset, buffer, iov->iov_len - offset);
189                                 buffer+= iov->iov_len - offset;
190                                 offset = 0;
191                         } else {
192                                 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ"] to [%p]\n",
193                                        iov, iov->iov_len, buffer);
194                                 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
195                                 buffer+= iov->iov_len;
196                         }
197                         iov++;
198                 }
199                 gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid, 
200                                stxd,  len);
201         } else {
202                 CDEBUG(D_ERROR, "Large message send is not supported\n");
203                 lib_finalize(libnal, private, cookie, PTL_FAIL);
204                 return(PTL_FAIL);
205                 gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid, 
206                                 niov, iov, offset, len);
207         }
208         return(PTL_OK);
209 }
210
211 ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private, lib_msg_t *cookie, 
212                          ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
213                          unsigned int kniov, ptl_kiov_t *kiov, size_t offset, size_t len)
214 {
215
216         gmnal_data_t    *nal_data;
217         char            *ptr;
218         void            *buffer = NULL;
219         gmnal_stxd_t    *stxd = NULL;
220         ptl_err_t       status = PTL_OK;
221
222         CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["LPSZ"] len["LPSZ"]\n", 
223                nid, kniov, offset, len);
224         nal_data = libnal->libnal_data;
225         if (!nal_data) {
226                 CDEBUG(D_ERROR, "no nal_data\n");
227                 return(PTL_FAIL);
228         } else {
229                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
230         }
231
232         /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
233          * more aggressively.  This is the fix for a livelock situation under load
234          * on ia32 that occurs when there are no more available entries in the
235          * pkmap_count array.  Just fill the buffer and let gmnal_small_tx
236          * put the headers in after we pass it the stxd pointer.
237          */
238         stxd = gmnal_get_stxd(nal_data, 1);
239         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
240         /* Set the offset of the data to copy into the buffer */
241         buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
242
243         if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
244                 CDEBUG(D_INFO, "This is a small message send\n");
245                 
246                 while(kniov--) {
247                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
248                         if (offset >= kiov->kiov_len) {
249                                 offset -= kiov->kiov_len;
250                         } else {
251                                 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
252                                        kiov->kiov_page, kiov->kiov_len, 
253                                        kiov->kiov_offset);
254
255                                 ptr = ((char *)kmap(kiov->kiov_page)) + kiov->kiov_offset;
256
257                                 if (offset > 0) {
258                                         CDEBUG(D_INFO, "processing [%p] base [%p] len ["LPSZ"] to [%p]\n",
259                                                ptr, ptr + offset, kiov->kiov_len - offset, buffer);
260                                         gm_bcopy(ptr + offset, buffer, kiov->kiov_len - offset);
261                                         buffer+= kiov->kiov_len - offset;
262                                         offset = 0;
263                                 } else {
264                                         CDEBUG(D_INFO, "processing kmapped [%p] len ["LPSZ"] to [%p]\n",
265                                                ptr, kiov->kiov_len, buffer);
266                                         gm_bcopy(ptr, buffer, kiov->kiov_len);
267
268                                         buffer += kiov->kiov_len;
269                                 }
270                                 kunmap(kiov->kiov_page);
271                         }
272                         kiov++;
273                 }
274                 status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid, 
275                                         pid, stxd, len);
276         } else {
277                 int     i = 0;
278                 struct  iovec   *iovec = NULL, *iovec_dup = NULL;
279                 ptl_kiov_t *kiov_dup = kiov;
280
281                 PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
282                 iovec_dup = iovec;
283                 CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
284                 PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
285                 return(PTL_FAIL);
286                 for (i=0; i<kniov; i++) {
287                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
288                         CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
289                                kiov->kiov_page, kiov->kiov_len, 
290                                kiov->kiov_offset);
291
292                         iovec->iov_base = kmap(kiov->kiov_page) 
293                                                  + kiov->kiov_offset;
294                         iovec->iov_len = kiov->kiov_len;
295                         iovec++;
296                         kiov++;
297                 }
298                 gmnal_large_tx(libnal, private, cookie, hdr, type, nid, 
299                                 pid, kniov, iovec, offset, len);
300                 for (i=0; i<kniov; i++) {
301                         kunmap(kiov_dup->kiov_page);
302                         kiov_dup++;
303                 }
304                 PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
305         }
306         return(status);
307 }
308
309 int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
310 {
311         CDEBUG(D_TRACE, "gmnal_cb_dist\n");
312         if (dist)
313                 *dist = 27;
314         return(PTL_OK);
315 }