Whamcloud - gitweb
Use the newer libsysio tags.
[fs/lustre-release.git] / lustre / portals / knals / lonal / lonal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (C) 2004 Cluster File Systems, Inc.
5  *
6  *   This file is part of Lustre, http://www.lustre.org.
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "lonal.h"
23
24 /*
25  *  LIB functions follow
26  *
27  */
28 static int
29 klonal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
30 {
31         *dist = 0;                      /* it's me */
32         return (0);
33 }
34
35 static ptl_err_t
36 klonal_send (lib_nal_t    *nal,
37              void         *private,
38              lib_msg_t    *libmsg,
39              ptl_hdr_t    *hdr,
40              int           type,
41              ptl_nid_t     nid,
42              ptl_pid_t     pid,
43              unsigned int  payload_niov,
44              struct iovec *payload_iov,
45              size_t        payload_offset,
46              size_t        payload_nob)
47 {
48         klo_desc_t klod = {
49                 .klod_type    = KLOD_IOV,
50                 .klod_niov    = payload_niov,
51                 .klod_offset  = payload_offset,
52                 .klod_nob     = payload_nob,
53                 .klod_iov     = { .iov = payload_iov } };
54         ptl_err_t rc;
55
56         LASSERT(nid == klonal_lib.libnal_ni.ni_pid.nid);
57
58         rc = lib_parse(&klonal_lib, hdr, &klod);
59         if (rc == PTL_OK)
60                 lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
61         
62         return rc;
63 }
64
65 static ptl_err_t
66 klonal_send_pages (lib_nal_t    *nal,
67                    void         *private,
68                    lib_msg_t    *libmsg,
69                    ptl_hdr_t    *hdr,
70                    int           type,
71                    ptl_nid_t     nid,
72                    ptl_pid_t     pid,
73                    unsigned int  payload_niov,
74                    ptl_kiov_t   *payload_kiov,
75                    size_t        payload_offset,
76                    size_t        payload_nob)
77 {
78         klo_desc_t klod = {
79                 .klod_type     = KLOD_KIOV,
80                 .klod_niov     = payload_niov,
81                 .klod_offset   = payload_offset,
82                 .klod_nob      = payload_nob,
83                 .klod_iov      = { .kiov = payload_kiov } };
84         ptl_err_t   rc;
85
86         LASSERT(nid == klonal_lib.libnal_ni.ni_pid.nid);
87         
88         rc = lib_parse(&klonal_lib, hdr, &klod);
89         if (rc == PTL_OK)
90                 lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
91         
92         return rc;
93 }
94
95 static ptl_err_t
96 klonal_recv(lib_nal_t    *nal,
97             void         *private,
98             lib_msg_t    *libmsg,
99             unsigned int  niov,
100             struct iovec *iov,
101             size_t        offset,
102             size_t        mlen,
103             size_t        rlen)
104 {
105         klo_desc_t *klod = (klo_desc_t *)private;
106
107         /* I only handle mapped->mapped matches */
108         LASSERT(klod->klod_type == KLOD_IOV);
109
110         if (mlen == 0)
111                 return PTL_OK;
112
113         while (offset >= iov->iov_len) {
114                 offset -= iov->iov_len;
115                 iov++;
116                 niov--;
117                 LASSERT(niov > 0);
118         }
119         
120         while (klod->klod_offset >= klod->klod_iov.iov->iov_len) {
121                 klod->klod_offset -= klod->klod_iov.iov->iov_len;
122                 klod->klod_iov.iov++;
123                 klod->klod_niov--;
124                 LASSERT(klod->klod_niov > 0);
125         }
126         
127         do {
128                 int fraglen = MIN(iov->iov_len - offset,
129                                   klod->klod_iov.iov->iov_len - klod->klod_offset);
130
131                 LASSERT(niov > 0);
132                 LASSERT(klod->klod_niov > 0);
133
134                 if (fraglen > mlen)
135                         fraglen = mlen;
136                 
137                 memcpy((void *)((unsigned long)iov->iov_base + offset),
138                        (void *)((unsigned long)klod->klod_iov.iov->iov_base +
139                                 klod->klod_offset),
140                        fraglen);
141
142                 if (offset + fraglen < iov->iov_len) {
143                         offset += fraglen;
144                 } else {
145                         offset = 0;
146                         iov++;
147                         niov--;
148                 }
149
150                 if (klod->klod_offset + fraglen < klod->klod_iov.iov->iov_len ) {
151                         klod->klod_offset += fraglen;
152                 } else {
153                         klod->klod_offset = 0;
154                         klod->klod_iov.iov++;
155                         klod->klod_niov--;
156                 }
157
158                 mlen -= fraglen;
159         } while (mlen > 0);
160         
161         lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
162         return PTL_OK;
163 }
164
165 static ptl_err_t
166 klonal_recv_pages(lib_nal_t    *nal,
167                   void         *private,
168                   lib_msg_t    *libmsg,
169                   unsigned int  niov,
170                   ptl_kiov_t   *kiov,
171                   size_t        offset,
172                   size_t        mlen,
173                   size_t        rlen)
174 {
175         void          *srcaddr = NULL;
176         void          *dstaddr = NULL;
177         unsigned long  srcfrag = 0;
178         unsigned long  dstfrag = 0;
179         unsigned long  fraglen;
180         klo_desc_t    *klod = (klo_desc_t *)private;
181
182         /* I only handle unmapped->unmapped matches */
183         LASSERT(klod->klod_type == KLOD_KIOV);
184
185         if (mlen == 0)
186                 return PTL_OK;
187
188         while (offset >= kiov->kiov_len) {
189                 offset -= kiov->kiov_len;
190                 kiov++;
191                 niov--;
192                 LASSERT(niov > 0);
193         }
194
195         while (klod->klod_offset >= klod->klod_iov.kiov->kiov_len) {
196                 klod->klod_offset -= klod->klod_iov.kiov->kiov_len;
197                 klod->klod_iov.kiov++;
198                 klod->klod_niov--;
199                 LASSERT(klod->klod_niov > 0);
200         }
201
202         do {
203         /* CAVEAT EMPTOR: I kmap 2 pages at once == slight risk of deadlock */
204                 LASSERT(niov > 0);
205                 if (dstaddr == NULL) {
206                         dstaddr = (void *)((unsigned long)kmap(kiov->kiov_page) +
207                                            kiov->kiov_offset + offset);
208                         dstfrag = kiov->kiov_len -  offset;
209                 }
210
211                 LASSERT(klod->klod_niov > 0);
212                 if (srcaddr == NULL) {
213                         srcaddr = (void *)((unsigned long)kmap(klod->klod_iov.kiov->kiov_page) +
214                                            klod->klod_iov.kiov->kiov_offset + klod->klod_offset);
215                         srcfrag = klod->klod_iov.kiov->kiov_len - klod->klod_offset;
216                 }
217                 
218                 fraglen = MIN(srcfrag, dstfrag);
219                 if (fraglen > mlen)
220                         fraglen = mlen;
221                 
222                 memcpy(dstaddr, srcaddr, fraglen);
223                 
224                 if (fraglen < dstfrag) {
225                         dstfrag -= fraglen;
226                         dstaddr = (void *)((unsigned long)dstaddr + fraglen);
227                 } else {
228                         kunmap(kiov->kiov_page);
229                         dstaddr = NULL;
230                         offset = 0;
231                         kiov++;
232                         niov--;
233                 }
234
235                 if (fraglen < srcfrag) {
236                         srcfrag -= fraglen;
237                         srcaddr = (void *)((unsigned long)srcaddr + fraglen);
238                 } else {
239                         kunmap(klod->klod_iov.kiov->kiov_page);
240                         srcaddr = NULL;
241                         klod->klod_offset = 0;
242                         klod->klod_iov.kiov++;
243                         klod->klod_niov--;
244                 }
245
246                 mlen -= fraglen;
247         } while (mlen > 0);
248
249         if (dstaddr != NULL)
250                 kunmap(kiov->kiov_page);
251
252         if (srcaddr != NULL)
253                 kunmap(klod->klod_iov.kiov->kiov_page);
254
255         lib_finalize(&klonal_lib, private, libmsg, PTL_OK);
256         return PTL_OK;
257 }
258
259 lib_nal_t klonal_lib =
260 {
261         libnal_data:       &klonal_data,         /* NAL private data */
262         libnal_send:        klonal_send,
263         libnal_send_pages:  klonal_send_pages,
264         libnal_recv:        klonal_recv,
265         libnal_recv_pages:  klonal_recv_pages,
266         libnal_dist:        klonal_dist
267 };