Whamcloud - gitweb
Land b_release_1_4_3 onto HEAD (20050619_0305)
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22
23 /*
24  *      This file implements the nal cb functions
25  */
26
27
28 #include "gmnal.h"
29
30 ptl_err_t gmnal_cb_recv(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
31                    unsigned int niov, struct iovec *iov, size_t offset,
32                    size_t mlen, size_t rlen)
33 {
34         void            *buffer = NULL;
35         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
36         int             status = PTL_OK;
37
38         CDEBUG(D_TRACE, "gmnal_cb_recv libnal [%p], private[%p], cookie[%p], "
39                "niov[%d], iov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
40                libnal, private, cookie, niov, iov, offset, mlen, rlen);
41
42         switch(srxd->type) {
43         case(GMNAL_SMALL_MESSAGE):
44                 CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
45                 /* HP SFS 1380: Proactively change receives to avoid a receive
46                  *  side occurrence of filling pkmap_count[].
47                  */
48                 buffer = srxd->buffer;
49                 buffer += sizeof(gmnal_msghdr_t);
50                 buffer += sizeof(ptl_hdr_t);
51
52                 while(niov--) {
53                         if (offset >= iov->iov_len) {
54                                 offset -= iov->iov_len;
55                         } else if (offset > 0) {
56                                 CDEBUG(D_INFO, "processing [%p] base [%p] "
57                                        "len %d, offset %d, len ["LPSZ"]\n", iov,
58                                        iov->iov_base + offset, iov->iov_len,
59                                        offset, iov->iov_len - offset);
60                                 gm_bcopy(buffer, iov->iov_base + offset,
61                                          iov->iov_len - offset);
62                                 buffer += iov->iov_len - offset;
63                                 offset = 0;
64                         } else {
65                                 CDEBUG(D_INFO, "processing [%p] len ["LPSZ"]\n",
66                                        iov, iov->iov_len);
67                                 gm_bcopy(buffer, iov->iov_base, iov->iov_len);
68                                 buffer += iov->iov_len;
69                         }
70                         iov++;
71                 }
72                 status = gmnal_small_rx(libnal, private, cookie);
73         break;
74         case(GMNAL_LARGE_MESSAGE_INIT):
75                 CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
76                 status = gmnal_large_rx(libnal, private, cookie, niov, 
77                                          iov, offset, mlen, rlen);
78         }
79
80         CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
81         return(status);
82 }
83
84 ptl_err_t gmnal_cb_recv_pages(lib_nal_t *libnal, void *private,
85                               lib_msg_t *cookie, unsigned int kniov,
86                               ptl_kiov_t *kiov, size_t offset, size_t mlen,
87                               size_t rlen)
88 {
89         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
90         int             status = PTL_OK;
91         char            *ptr = NULL;
92         void            *buffer = NULL;
93
94
95         CDEBUG(D_TRACE, "gmnal_cb_recv_pages libnal [%p],private[%p], "
96                "cookie[%p], kniov[%d], kiov [%p], offset["LPSZ"], mlen["LPSZ"], rlen["LPSZ"]\n",
97                libnal, private, cookie, kniov, kiov, offset, mlen, rlen);
98
99         if (srxd->type == GMNAL_SMALL_MESSAGE) {
100                 buffer = srxd->buffer;
101                 buffer += sizeof(gmnal_msghdr_t);
102                 buffer += sizeof(ptl_hdr_t);
103
104                 /*
105                  *      map each page and create an iovec for it
106                  */
107                 while (kniov--) {
108                         /* HP SFS 1380: Proactively change receives to avoid a
109                          *  receive side occurrence of filling pkmap_count[].
110                          */
111                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n",
112                                kniov, kiov);
113
114                         if (offset >= kiov->kiov_len) {
115                                 offset -= kiov->kiov_len;
116                         } else {
117                                 CDEBUG(D_INFO, "kniov page [%p] len [%d] "
118                                        "offset[%d]\n", kiov->kiov_page,
119                                        kiov->kiov_len, kiov->kiov_offset);
120                                 CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
121                                 ptr = ((char *)kmap(kiov->kiov_page)) +
122                                         kiov->kiov_offset;
123
124                                 if (offset > 0) {
125                                         CDEBUG(D_INFO, "processing [%p] base "
126                                                "[%p] len %d, offset %d, len ["
127                                                LPSZ"]\n", ptr, ptr + offset,
128                                                kiov->kiov_len, offset,
129                                                kiov->kiov_len - offset);
130                                         gm_bcopy(buffer, ptr + offset,
131                                                  kiov->kiov_len - offset);
132                                         buffer += kiov->kiov_len - offset;
133                                         offset = 0;
134                                 } else {
135                                         CDEBUG(D_INFO, "processing [%p] len ["
136                                                LPSZ"]\n", ptr, kiov->kiov_len);
137                                         gm_bcopy(buffer, ptr, kiov->kiov_len);
138                                         buffer += kiov->kiov_len;
139                                 }
140                                 kunmap(kiov->kiov_page);
141                                 CDEBUG(D_INFO, "Stored in [%p]\n", ptr);
142                         }
143                         kiov++;
144                 }
145                 CDEBUG(D_INFO, "calling gmnal_small_rx\n");
146                 status = gmnal_small_rx(libnal, private, cookie);
147         }
148
149         CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
150         return(status);
151 }
152
153
154 ptl_err_t gmnal_cb_send(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
155                         ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
156                         unsigned int niov, struct iovec *iov, size_t offset,
157                         size_t len)
158 {
159
160         gmnal_data_t    *nal_data;
161         void            *buffer = NULL;
162         gmnal_stxd_t    *stxd = NULL;
163
164
165         CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] offset["LPSZ"] len["LPSZ
166                "] nid["LPU64"]\n", niov, offset, len, nid);
167         nal_data = libnal->libnal_data;
168         if (!nal_data) {
169                 CERROR("no nal_data\n");
170                 return(PTL_FAIL);
171         } else {
172                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
173         }
174
175         if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
176                 CDEBUG(D_INFO, "This is a small message send\n");
177                 /*
178                  * HP SFS 1380: With the change to gmnal_small_tx, need to get
179                  * the stxd and do relevant setup here
180                  */
181                 stxd = gmnal_get_stxd(nal_data, 1);
182                 CDEBUG(D_INFO, "stxd [%p]\n", stxd);
183                 /* Set the offset of the data to copy into the buffer */
184                 buffer = stxd->buffer +sizeof(gmnal_msghdr_t)+sizeof(ptl_hdr_t);
185                 while(niov--) {
186                         if (offset >= iov->iov_len) {
187                                 offset -= iov->iov_len;
188                         } else if (offset > 0) {
189                                 CDEBUG(D_INFO, "processing iov [%p] base [%p] "
190                                        "len ["LPSZ"] to [%p]\n",
191                                        iov, iov->iov_base + offset,
192                                        iov->iov_len - offset, buffer);
193                                 gm_bcopy(iov->iov_base + offset, buffer,
194                                          iov->iov_len - offset);
195                                 buffer+= iov->iov_len - offset;
196                                 offset = 0;
197                         } else {
198                                 CDEBUG(D_INFO, "processing iov [%p] len ["LPSZ
199                                        "] to [%p]\n", iov, iov->iov_len,buffer);
200                                 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
201                                 buffer+= iov->iov_len;
202                         }
203                         iov++;
204                 }
205                 gmnal_small_tx(libnal, private, cookie, hdr, type, nid, pid,
206                                stxd,  len);
207         } else {
208                 CERROR("Large message send is not supported\n");
209                 lib_finalize(libnal, private, cookie, PTL_FAIL);
210                 return(PTL_FAIL);
211                 gmnal_large_tx(libnal, private, cookie, hdr, type, nid, pid,
212                                 niov, iov, offset, len);
213         }
214         return(PTL_OK);
215 }
216
217 ptl_err_t gmnal_cb_send_pages(lib_nal_t *libnal, void *private,
218                               lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
219                               ptl_nid_t nid, ptl_pid_t pid, unsigned int kniov,
220                               ptl_kiov_t *kiov, size_t offset, size_t len)
221 {
222
223         gmnal_data_t    *nal_data;
224         char            *ptr;
225         void            *buffer = NULL;
226         gmnal_stxd_t    *stxd = NULL;
227         ptl_err_t       status = PTL_OK;
228
229         CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] offset["
230                LPSZ"] len["LPSZ"]\n", nid, kniov, offset, len);
231         nal_data = libnal->libnal_data;
232         if (!nal_data) {
233                 CERROR("no nal_data\n");
234                 return(PTL_FAIL);
235         } else {
236                 CDEBUG(D_INFO, "nal_data [%p]\n", nal_data);
237         }
238
239         /* HP SFS 1380: Need to do the gm_bcopy after the kmap so we can kunmap
240          * more aggressively.  This is the fix for a livelock situation under
241          * load on ia32 that occurs when there are no more available entries in
242          * the pkmap_count array.  Just fill the buffer and let gmnal_small_tx
243          * put the headers in after we pass it the stxd pointer.
244          */
245         stxd = gmnal_get_stxd(nal_data, 1);
246         CDEBUG(D_INFO, "stxd [%p]\n", stxd);
247         /* Set the offset of the data to copy into the buffer */
248         buffer = stxd->buffer + sizeof(gmnal_msghdr_t) + sizeof(ptl_hdr_t);
249
250         if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
251                 CDEBUG(D_INFO, "This is a small message send\n");
252
253                 while(kniov--) {
254                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", kniov, kiov);
255                         if (offset >= kiov->kiov_len) {
256                                 offset -= kiov->kiov_len;
257                         } else {
258                                 CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
259                                        kiov->kiov_page, kiov->kiov_len, 
260                                        kiov->kiov_offset);
261
262                                 ptr = ((char *)kmap(kiov->kiov_page)) +
263                                         kiov->kiov_offset;
264
265                                 if (offset > 0) {
266                                         CDEBUG(D_INFO, "processing [%p] base "
267                                                "[%p] len ["LPSZ"] to [%p]\n",
268                                                ptr, ptr + offset,
269                                                kiov->kiov_len - offset, buffer);
270                                         gm_bcopy(ptr + offset, buffer,
271                                                  kiov->kiov_len - offset);
272                                         buffer+= kiov->kiov_len - offset;
273                                         offset = 0;
274                                 } else {
275                                         CDEBUG(D_INFO, "processing kmapped [%p]"
276                                                " len ["LPSZ"] to [%p]\n",
277                                                ptr, kiov->kiov_len, buffer);
278                                         gm_bcopy(ptr, buffer, kiov->kiov_len);
279
280                                         buffer += kiov->kiov_len;
281                                 }
282                                 kunmap(kiov->kiov_page);
283                         }
284                         kiov++;
285                 }
286                 status = gmnal_small_tx(libnal, private, cookie, hdr, type, nid,
287                                         pid, stxd, len);
288         } else {
289                 int     i = 0;
290                 struct  iovec   *iovec = NULL, *iovec_dup = NULL;
291                 ptl_kiov_t *kiov_dup = kiov;
292
293                 PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
294                 iovec_dup = iovec;
295                 CERROR("Large message send it is not supported yet\n");
296                 PORTAL_FREE(iovec, kniov*sizeof(struct iovec));
297                 return(PTL_FAIL);
298                 for (i=0; i<kniov; i++) {
299                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
300                         CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
301                                kiov->kiov_page, kiov->kiov_len, 
302                                kiov->kiov_offset);
303
304                         iovec->iov_base = kmap(kiov->kiov_page) 
305                                                  + kiov->kiov_offset;
306                         iovec->iov_len = kiov->kiov_len;
307                         iovec++;
308                         kiov++;
309                 }
310                 gmnal_large_tx(libnal, private, cookie, hdr, type, nid, 
311                                 pid, kniov, iovec, offset, len);
312                 for (i=0; i<kniov; i++) {
313                         kunmap(kiov_dup->kiov_page);
314                         kiov_dup++;
315                 }
316                 PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
317         }
318         return(status);
319 }
320
321 int gmnal_cb_dist(lib_nal_t *libnal, ptl_nid_t nid, unsigned long *dist)
322 {
323         CDEBUG(D_TRACE, "gmnal_cb_dist\n");
324         if (dist)
325                 *dist = 27;
326         return(PTL_OK);
327 }