Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lustre / portals / knals / gmnal / gmnal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22
23 /*
24  *      This file implements the nal cb functions
25  */
26
27
28 #include "gmnal.h"
29
30 int gmnal_cb_recv(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
31                    unsigned int niov, struct iovec *iov, size_t mlen, 
32                    size_t rlen)
33 {
34         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
35         int             status = PTL_OK;
36
37
38         CDEBUG(D_TRACE, "gmnal_cb_recv nal_cb [%p], private[%p], cookie[%p], "
39                "niov[%d], iov [%p], mlen["LPSZ"], rlen["LPSZ"]\n", 
40                nal_cb, private, cookie, niov, iov, mlen, rlen);
41
42         switch(srxd->type) {
43         case(GMNAL_SMALL_MESSAGE):
44                 CDEBUG(D_INFO, "gmnal_cb_recv got small message\n");
45                 status = gmnal_small_rx(nal_cb, private, cookie, niov, 
46                                          iov, mlen, rlen);
47         break;
48         case(GMNAL_LARGE_MESSAGE_INIT):
49                 CDEBUG(D_INFO, "gmnal_cb_recv got large message init\n");
50                 status = gmnal_large_rx(nal_cb, private, cookie, niov, 
51                                          iov, mlen, rlen);
52         }
53                 
54
55         CDEBUG(D_INFO, "gmnal_cb_recv gmnal_return status [%d]\n", status);
56         return(status);
57 }
58
59 int gmnal_cb_recv_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
60                          unsigned int kniov, ptl_kiov_t *kiov, size_t mlen, 
61                          size_t rlen)
62 {
63         gmnal_srxd_t    *srxd = (gmnal_srxd_t*)private;
64         int             status = PTL_OK;
65         struct iovec    *iovec = NULL, *iovec_dup = NULL;
66         int             i = 0;
67         ptl_kiov_t      *kiov_dup = kiov;;
68
69
70         CDEBUG(D_TRACE, "gmnal_cb_recv_pages nal_cb [%p],private[%p], "
71                "cookie[%p], kniov[%d], kiov [%p], mlen["LPSZ"], rlen["LPSZ"]\n",
72                nal_cb, private, cookie, kniov, kiov, mlen, rlen);
73
74         if (srxd->type == GMNAL_SMALL_MESSAGE) {
75                 PORTAL_ALLOC(iovec, sizeof(struct iovec)*kniov);
76                 if (!iovec) {
77                         CDEBUG(D_ERROR, "Can't malloc\n");
78                         return(GMNAL_STATUS_FAIL);
79                 }
80                 iovec_dup = iovec;
81
82                 /*
83                  *      map each page and create an iovec for it
84                  */
85                 for (i=0; i<kniov; i++) {
86                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
87                         CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
88                                kiov->kiov_page, kiov->kiov_len, 
89                                kiov->kiov_offset);
90                         iovec->iov_len = kiov->kiov_len;
91                         CDEBUG(D_INFO, "Calling kmap[%p]", kiov->kiov_page);
92
93                         iovec->iov_base = kmap(kiov->kiov_page) + 
94                                                   kiov->kiov_offset;
95
96                         CDEBUG(D_INFO, "iov_base is [%p]\n", iovec->iov_base);
97                         iovec++;
98                         kiov++;
99                 }
100                 CDEBUG(D_INFO, "calling gmnal_small_rx\n");
101                 status = gmnal_small_rx(nal_cb, private, cookie, kniov, 
102                                          iovec_dup, mlen, rlen);
103                 for (i=0; i<kniov; i++) {
104                         kunmap(kiov_dup->kiov_page);
105                         kiov_dup++;
106                 }
107                 PORTAL_FREE(iovec_dup, sizeof(struct iovec)*kniov);
108         }
109                 
110
111         CDEBUG(D_INFO, "gmnal_return status [%d]\n", status);
112         return(status);
113 }
114
115
116 int gmnal_cb_send(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
117                    ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid, 
118                    unsigned int niov, struct iovec *iov, size_t len)
119 {
120
121         gmnal_data_t    *nal_data;
122
123
124         CDEBUG(D_TRACE, "gmnal_cb_send niov[%d] len["LPSZ"] nid["LPU64"]\n", 
125                niov, len, nid);
126         nal_data = nal_cb->nal_data;
127         
128         if (GMNAL_IS_SMALL_MESSAGE(nal_data, niov, iov, len)) {
129                 CDEBUG(D_INFO, "This is a small message send\n");
130                 gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
131                                 niov, iov, len);
132         } else {
133                 CDEBUG(D_ERROR, "Large message send it is not supported\n");
134                 lib_finalize(nal_cb, private, cookie, PTL_FAIL);
135                 return(PTL_FAIL);
136                 gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, pid, 
137                                 niov, iov, len);
138         }
139         return(PTL_OK);
140 }
141
142 int gmnal_cb_send_pages(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, 
143                          ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,                         unsigned int kniov, ptl_kiov_t *kiov, size_t len)
144 {
145
146         int     i = 0;
147         gmnal_data_t    *nal_data;
148         struct  iovec   *iovec = NULL, *iovec_dup = NULL;
149         ptl_kiov_t      *kiov_dup = kiov;
150
151         CDEBUG(D_TRACE, "gmnal_cb_send_pages nid ["LPU64"] niov[%d] len["LPSZ"]\n", nid, kniov, len);
152         nal_data = nal_cb->nal_data;
153         PORTAL_ALLOC(iovec, kniov*sizeof(struct iovec));
154         iovec_dup = iovec;
155         if (GMNAL_IS_SMALL_MESSAGE(nal_data, 0, NULL, len)) {
156                 CDEBUG(D_INFO, "This is a small message send\n");
157                 
158                 for (i=0; i<kniov; i++) {
159                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
160                         CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
161                                kiov->kiov_page, kiov->kiov_len, 
162                                kiov->kiov_offset);
163
164                         iovec->iov_base = kmap(kiov->kiov_page) 
165                                                 + kiov->kiov_offset;
166
167                         iovec->iov_len = kiov->kiov_len;
168                         iovec++;
169                         kiov++;
170                 }
171                 gmnal_small_tx(nal_cb, private, cookie, hdr, type, nid, 
172                                 pid, kniov, iovec_dup, len);
173         } else {
174                 CDEBUG(D_ERROR, "Large message send it is not supported yet\n");
175                 return(PTL_FAIL);
176                 for (i=0; i<kniov; i++) {
177                         CDEBUG(D_INFO, "processing kniov [%d] [%p]\n", i, kiov);
178                         CDEBUG(D_INFO, "kniov page [%p] len [%d] offset[%d]\n",
179                                kiov->kiov_page, kiov->kiov_len, 
180                                kiov->kiov_offset);
181
182                         iovec->iov_base = kmap(kiov->kiov_page) 
183                                                  + kiov->kiov_offset;
184                         iovec->iov_len = kiov->kiov_len;
185                         iovec++;
186                         kiov++;
187                 }
188                 gmnal_large_tx(nal_cb, private, cookie, hdr, type, nid, 
189                                 pid, kniov, iovec, len);
190         }
191         for (i=0; i<kniov; i++) {
192                 kunmap(kiov_dup->kiov_page);
193                 kiov_dup++;
194         }
195         PORTAL_FREE(iovec_dup, kniov*sizeof(struct iovec));
196         return(PTL_OK);
197 }
198
199 int gmnal_cb_read(nal_cb_t *nal_cb, void *private, void *dst, 
200                    user_ptr src, size_t len)
201 {
202         gm_bcopy(src, dst, len);
203         return(PTL_OK);
204 }
205
206 int gmnal_cb_write(nal_cb_t *nal_cb, void *private, user_ptr dst, 
207                     void *src, size_t len)
208 {
209         gm_bcopy(src, dst, len);
210         return(PTL_OK);
211 }
212
213 int gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, 
214                        ptl_event_t *ev)
215 {
216
217         if (eq->event_callback != NULL) {
218                 CDEBUG(D_INFO, "found callback\n");
219                 eq->event_callback(ev);
220         }
221         
222         return(PTL_OK);
223 }
224
225 void *gmnal_cb_malloc(nal_cb_t *nal_cb, size_t len)
226 {
227         void *ptr = NULL;
228         CDEBUG(D_TRACE, "gmnal_cb_malloc len["LPSZ"]\n", len);
229         PORTAL_ALLOC(ptr, len);
230         return(ptr);
231 }
232
233 void gmnal_cb_free(nal_cb_t *nal_cb, void *buf, size_t len)
234 {
235         CDEBUG(D_TRACE, "gmnal_cb_free :: buf[%p] len["LPSZ"]\n", buf, len);
236         PORTAL_FREE(buf, len);
237         return;
238 }
239
240 void gmnal_cb_unmap(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
241                      void **addrkey)
242 {
243         return;
244 }
245
246 int  gmnal_cb_map(nal_cb_t *nal_cb, unsigned int niov, struct iovec *iov, 
247                    void**addrkey)
248 {
249         return(PTL_OK);
250 }
251
252 void gmnal_cb_printf(nal_cb_t *nal_cb, const char *fmt, ...)
253 {
254         CDEBUG(D_TRACE, "gmnal_cb_printf\n");
255         printk(fmt);
256         return;
257 }
258
259 void gmnal_cb_cli(nal_cb_t *nal_cb, unsigned long *flags)
260 {
261         gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
262
263         spin_lock_irqsave(&nal_data->cb_lock, *flags);
264         return;
265 }
266
267 void gmnal_cb_sti(nal_cb_t *nal_cb, unsigned long *flags)
268 {
269         gmnal_data_t    *nal_data = (gmnal_data_t*)nal_cb->nal_data;
270
271         spin_unlock_irqrestore(&nal_data->cb_lock, *flags);
272         return;
273 }
274
275 void gmnal_cb_callback(nal_cb_t *nal_cb, void *private, lib_eq_t *eq, ptl_event_t *ev)
276 {
277         /* holding cb_lock */
278
279         if (eq->event_callback != NULL)
280                 eq->event_callback(ev);
281
282         /* We will wake theads sleeping in yield() here, AFTER the
283          * callback, when we implement blocking yield */
284 }
285
286 int gmnal_cb_dist(nal_cb_t *nal_cb, ptl_nid_t nid, unsigned long *dist)
287 {
288         CDEBUG(D_TRACE, "gmnal_cb_dist\n");
289         if (dist)
290                 *dist = 27;
291         return(PTL_OK);
292 }