Whamcloud - gitweb
* Added lonal (loopback NAL)
[fs/lustre-release.git] / lnet / klnds / lolnd / lolnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2002 Cluster File Systems, Inc.
5  *   Author: Eric Barton <eric@bartonsoftware.com>
6  *
7  * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8  * W. Marcus Miller - Based on ksocknal
9  *
10  * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  * Portals is free software; you can redistribute it and/or
13  * modify it under the terms of version 2 of the GNU General Public
14  * License as published by the Free Software Foundation.
15  *
16  * Portals is distributed in the hope that it will be useful,
17  * but WITHOUT ANY WARRANTY; without even the implied warranty of
18  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  * GNU General Public License for more details.
20  *
21  * You should have received a copy of the GNU General Public License
22  * along with Portals; if not, write to the Free Software
23  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  *
25  */
26
27 #include "lonal.h"
28
29 /*
30  *  LIB functions follow
31  *
32  */
33 static int
34 klonal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
35 {
36         *dist = 0;                      /* it's me */
37         return (0);
38 }
39
40 static ptl_err_t
41 klonal_send (lib_nal_t    *nal,
42              void         *private,
43              lib_msg_t    *libmsg,
44              ptl_hdr_t    *hdr,
45              int           type,
46              ptl_nid_t     nid,
47              ptl_pid_t     pid,
48              unsigned int  payload_niov,
49              struct iovec *payload_iov,
50              size_t        payload_offset,
51              size_t        payload_nob)
52 {
53         klo_desc_t klod = {
54                 .klod_type    = KLOD_IOV,
55                 .klod_niov    = payload_niov,
56                 .klod_offset  = payload_offset,
57                 .klod_nob     = payload_nob,
58                 .klod_iov.iov = payload_iov};
59         ptl_err_t rc;
60
61         LASSERT(nid == klonal_lib.libnal_ni.ni_pid.nid);
62
63         rc = lib_parse(&klonal_lib, hdr, &klod);
64         if (rc == PTL_OK)
65                 lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
66         
67         return rc;
68 }
69
70 static ptl_err_t
71 klonal_send_pages (lib_nal_t    *nal,
72                    void         *private,
73                    lib_msg_t    *libmsg,
74                    ptl_hdr_t    *hdr,
75                    int           type,
76                    ptl_nid_t     nid,
77                    ptl_pid_t     pid,
78                    unsigned int  payload_niov,
79                    ptl_kiov_t   *payload_kiov,
80                    size_t        payload_offset,
81                    size_t        payload_nob)
82 {
83         klo_desc_t klod = {
84                 .klod_type     = KLOD_KIOV,
85                 .klod_niov     = payload_niov,
86                 .klod_offset   = payload_offset,
87                 .klod_nob      = payload_nob,
88                 .klod_iov.kiov = payload_kiov};
89         ptl_err_t   rc;
90
91         LASSERT(nid == klonal_lib.libnal_ni.ni_pid.nid);
92         
93         rc = lib_parse(&klonal_lib, hdr, &klod);
94         if (rc == PTL_OK)
95                 lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
96         
97         return rc;
98 }
99
100 static ptl_err_t
101 klonal_recv(lib_nal_t    *nal,
102             void         *private,
103             lib_msg_t    *libmsg,
104             unsigned int  niov,
105             struct iovec *iov,
106             size_t        offset,
107             size_t        mlen,
108             size_t        rlen)
109 {
110         klo_desc_t *klod = (klo_desc_t *)private;
111
112         /* I only handle mapped->mapped matches */
113         LASSERT(klod->klod_type == KLOD_IOV);
114
115         if (mlen == 0)
116                 return PTL_OK;
117
118         while (offset >= iov->iov_len) {
119                 offset -= iov->iov_len;
120                 iov++;
121                 niov--;
122                 LASSERT(niov > 0);
123         }
124         
125         while (klod->klod_offset >= klod->klod_iov.iov->iov_len) {
126                 klod->klod_offset -= klod->klod_iov.iov->iov_len;
127                 klod->klod_iov.iov++;
128                 klod->klod_niov--;
129                 LASSERT(klod->klod_niov > 0);
130         }
131         
132         do {
133                 int fraglen = MIN(iov->iov_len - offset,
134                                   klod->klod_iov.iov->iov_len - klod->klod_offset);
135
136                 LASSERT(niov > 0);
137                 LASSERT(klod->klod_niov > 0);
138
139                 if (fraglen > mlen)
140                         fraglen = mlen;
141                 
142                 memcpy((void *)((unsigned long)iov->iov_base + offset),
143                        (void *)((unsigned long)klod->klod_iov.iov->iov_base +
144                                 klod->klod_offset),
145                        fraglen);
146
147                 if (offset + fraglen < iov->iov_len) {
148                         offset += fraglen;
149                 } else {
150                         offset = 0;
151                         iov++;
152                         niov--;
153                 }
154
155                 if (klod->klod_offset + fraglen < klod->klod_iov.iov->iov_len ) {
156                         klod->klod_offset += fraglen;
157                 } else {
158                         klod->klod_offset = 0;
159                         klod->klod_iov.iov++;
160                         klod->klod_niov--;
161                 }
162
163                 mlen -= fraglen;
164         } while (mlen > 0);
165         
166         lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
167         return PTL_OK;
168 }
169
170 static ptl_err_t
171 klonal_recv_pages(lib_nal_t    *nal,
172                   void         *private,
173                   lib_msg_t    *libmsg,
174                   unsigned int  niov,
175                   ptl_kiov_t   *kiov,
176                   size_t        offset,
177                   size_t        mlen,
178                   size_t        rlen)
179 {
180         void          *srcaddr = NULL;
181         void          *dstaddr = NULL;
182         unsigned long  srcfrag = 0;
183         unsigned long  dstfrag = 0;
184         unsigned long  fraglen;
185         klo_desc_t    *klod = (klo_desc_t *)private;
186
187         /* I only handle unmapped->unmapped matches */
188         LASSERT(klod->klod_type == KLOD_KIOV);
189
190         if (mlen == 0)
191                 return PTL_OK;
192
193         while (offset >= kiov->kiov_len) {
194                 offset -= kiov->kiov_len;
195                 kiov++;
196                 niov--;
197                 LASSERT(niov > 0);
198         }
199
200         while (klod->klod_offset >= klod->klod_iov.kiov->kiov_len) {
201                 klod->klod_offset -= klod->klod_iov.kiov->kiov_len;
202                 klod->klod_iov.kiov++;
203                 klod->klod_niov--;
204                 LASSERT(klod->klod_niov > 0);
205         }
206
207         do {
208         /* CAVEAT EMPTOR: I kmap 2 pages at once == slight risk of deadlock */
209                 LASSERT(niov > 0);
210                 if (dstaddr == NULL) {
211                         dstaddr = (void *)((unsigned long)kmap(kiov->kiov_page) +
212                                            kiov->kiov_offset + offset);
213                         dstfrag = kiov->kiov_len -  offset;
214                 }
215
216                 LASSERT(klod->klod_niov > 0);
217                 if (srcaddr == NULL) {
218                         srcaddr = (void *)((unsigned long)kmap(klod->klod_iov.kiov->kiov_page) +
219                                            klod->klod_iov.kiov->kiov_offset + klod->klod_offset);
220                         srcfrag = klod->klod_iov.kiov->kiov_len - klod->klod_offset;
221                 }
222                 
223                 fraglen = MIN(srcfrag, dstfrag);
224                 if (fraglen > mlen)
225                         fraglen = mlen;
226                 
227                 memcpy(dstaddr, srcaddr, fraglen);
228                 
229                 if (fraglen < dstfrag) {
230                         dstfrag -= fraglen;
231                         dstaddr = (void *)((unsigned long)dstaddr + fraglen);
232                 } else {
233                         kunmap(kiov->kiov_page);
234                         dstaddr = NULL;
235                         offset = 0;
236                         kiov++;
237                         niov--;
238                 }
239
240                 if (fraglen < srcfrag) {
241                         srcfrag -= fraglen;
242                         srcaddr = (void *)((unsigned long)srcaddr + fraglen);
243                 } else {
244                         kunmap(klod->klod_iov.kiov->kiov_page);
245                         srcaddr = NULL;
246                         klod->klod_offset = 0;
247                         klod->klod_iov.kiov++;
248                         klod->klod_niov--;
249                 }
250
251                 mlen -= fraglen;
252         } while (mlen > 0);
253
254         if (dstaddr != NULL)
255                 kunmap(kiov->kiov_page);
256
257         if (srcaddr != NULL)
258                 kunmap(klod->klod_iov.kiov->kiov_page);
259
260         lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
261         return PTL_OK;
262 }
263
264 lib_nal_t klonal_lib =
265 {
266         libnal_data:       &klonal_data,         /* NAL private data */
267         libnal_send:        klonal_send,
268         libnal_send_pages:  klonal_send_pages,
269         libnal_recv:        klonal_recv,
270         libnal_recv_pages:  klonal_recv_pages,
271         libnal_dist:        klonal_dist
272 };