Whamcloud - gitweb
invoking section 3 of the GNU LGPL, to instead apply the terms of the GPL
[fs/lustre-release.git] / lnet / klnds / lgmlnd / lgmnal_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22
23 /*
24  *      This file contains all lgmnal send and receive functions
25  */
26
27 #include "lgmnal.h"
28
29 int
30 lgmnal_requeue_rxbuffer(lgmnal_data_t *nal_data, lgmnal_srxd_t *srxd)
31 {
32         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_requeue_rxbuffer\n"));
33
34         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data));
35
36         LGMNAL_GM_LOCK(nal_data);
37         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
38                                         srxd->gmsize, GM_LOW_PRIORITY, 0 );
39         LGMNAL_GM_UNLOCK(nal_data);
40
41         return(LGMNAL_STATUS_OK);
42 }
43
44
45 /*
46  *      Handle a bad message
47  *      A bad message is one we don't expect or can't interpret
48  */
49 int
50 lgmnal_badrx_message(lgmnal_data_t *nal_data, gm_recv_t *recv, lgmnal_srxd_t *srxd)
51 {
52         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("Can't handle message\n"));
53
54         if (!srxd)
55                 srxd = lgmnal_rxbuffer_to_srxd(nal_data, gm_ntohp(recv->buffer));
56         if (srxd) {
57                 lgmnal_requeue_rxbuffer(nal_data, srxd);
58         } else {
59                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Can't find a descriptor for this buffer\n"));
60                 /*
61                  *      get rid of it ?
62                  */
63                 return(LGMNAL_STATUS_FAIL);
64         }
65
66         return(LGMNAL_STATUS_OK);
67 }
68
69
70 /*
71  *      Start processing a small message receive
72  *      Get here from lgmnal_receive_thread
73  *      Hand off to lib_parse, which calls cb_recv
74  *      which hands back to lgmnal_small_receive2
75  *      Deal with all endian stuff here (if we can!)
76  */
77 int
78 lgmnal_small_receive1(lgmnal_data_t *nal_data, gm_recv_t *recv)
79 {
80         lgmnal_srxd_t   *srxd = NULL;
81         void            *buffer = NULL;
82         unsigned int snode, sport, type, length;
83         lgmnal_msghdr_t *lgmnal_msghdr;
84         ptl_hdr_t       *portals_hdr;
85
86         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive1 nal_data [%p], recv [%p]\n", nal_data, recv));
87
88         buffer = gm_ntohp(recv->buffer);;
89         snode = (int)gm_ntoh_u16(recv->sender_node_id);
90         sport = (int)gm_ntoh_u8(recv->sender_port_id);
91         type = (int)gm_ntoh_u8(recv->type);
92         buffer = gm_ntohp(recv->buffer);
93         length = (int) gm_ntohl(recv->length);
94
95         lgmnal_msghdr = (lgmnal_msghdr_t*)buffer;
96         portals_hdr = (ptl_hdr_t*)(buffer+LGMNAL_MSGHDR_SIZE);
97
98         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("rx_event:: Sender node [%d], Sender Port [%d], type [%d], length [%d], buffer [%p]\n",
99                                 snode, sport, type, length, buffer));
100         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lgmnal_msghdr:: Sender node [%u], magic [%lx], type [%d]\n",
101                                 lgmnal_msghdr->sender_node_id, lgmnal_msghdr->magic, lgmnal_msghdr->type));
102         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: Sender node [%ul], dest_node [%ul]\n",
103                                 portals_hdr->src_nid, portals_hdr->dest_nid));
104
105
106         /*
107          *      Get a transmit descriptor for this message
108          */
109         srxd = lgmnal_rxbuffer_to_srxd(nal_data, buffer);
110         LGMNAL_PRINT(LGMNAL_DEBUG, ("Back from lgmnal_rxbuffer_to_srxd\n"));
111         if (!srxd) {
112                 LGMNAL_PRINT(LGMNAL_DEBUG, ("Failed to get receive descriptor for this buffer\n"));
113                 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
114                 return(LGMNAL_STATUS_FAIL);
115         }
116         srxd->type = LGMNAL_SMALL_MESSAGE;
117         
118         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling lib_parse buffer is [%p]\n", buffer+LGMNAL_MSGHDR_SIZE));
119         /*
120          *      control passes to lib, which calls cb_recv 
121          *      cb_recv is responsible for returning the buffer 
122          *      for future receive
123          */
124         lib_parse(nal_data->nal_cb, portals_hdr, srxd);
125
126         return(LGMNAL_STATUS_OK);
127 }
128
129 /*
130  *      Get here from lgmnal_receive_thread, lgmnal_small_receive1
131  *      lib_parse, cb_recv
132  *      Put data from prewired receive buffer into users buffer(s)
133  *      Hang out the receive buffer again for another receive
134  *      Call lib_finalize
135  */
136 int
137 lgmnal_small_receive2(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, unsigned int niov, 
138                                                         struct iovec *iov, size_t mlen, size_t rlen)
139 {
140         lgmnal_srxd_t   *srxd = NULL;
141         void    *buffer = NULL;
142         lgmnal_data_t   *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
143
144
145         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive2 niov [%d] mlen[%d]\n", niov, mlen));
146
147         if (!private) {
148                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("lgmnal_small_receive2 no context\n"));
149                 lib_finalize(nal_cb, private, cookie);
150                 return(PTL_FAIL);
151         }
152
153         srxd = (lgmnal_srxd_t*)private;
154         buffer = srxd->buffer;
155         buffer += sizeof(lgmnal_msghdr_t);
156         buffer += sizeof(ptl_hdr_t);
157
158         while(niov--) {
159                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing [%p] len [%d]\n", iov, iov->iov_len));
160                 gm_bcopy(buffer, iov->iov_base, iov->iov_len);                  
161                 buffer += iov->iov_len;
162                 iov++;
163         }
164
165
166         /*
167          *      let portals library know receive is complete
168          */
169         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling lib_finalize\n"));
170         if (lib_finalize(nal_cb, private, cookie) != PTL_OK) {
171                 /* TO DO what to do with failed lib_finalise? */
172                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lib_finalize failed\n"));
173         }
174         /*
175          *      return buffer so it can be used again
176          */
177         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling gm_provide_receive_buffer\n"));
178         LGMNAL_GM_LOCK(nal_data);
179         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, srxd->gmsize, GM_LOW_PRIORITY, 0);  
180         LGMNAL_GM_UNLOCK(nal_data);
181
182         return(PTL_OK);
183 }
184
185
186
187 /*
188  *      The recevive thread
189  *      This guy wait in gm_blocking_recvive and gets
190  *      woken up when the myrinet adaptor gets an interrupt.
191  *      Hands off processing of small messages and blocks again
192  */
193 int
194 lgmnal_receive_thread(void *arg)
195 {
196         lgmnal_data_t           *nal_data;
197         gm_recv_event_t         *rxevent = NULL;
198         gm_recv_t               *recv = NULL;
199         void                    *buffer;
200
201         if (!arg) {
202                 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread. NO nal_data. Exiting\n", arg));
203                 return(-1);
204         }
205
206         nal_data = (lgmnal_data_t*)arg;
207         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread nal_data is [%p]\n", arg));
208
209         nal_data->rxthread_flag = LGMNAL_THREAD_STARTED;
210         while (nal_data->rxthread_flag == LGMNAL_THREAD_STARTED) {
211                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: lgmnal_receive_threads waiting for LGMNAL_CONTINUE flag\n"));
212                 set_current_state(TASK_INTERRUPTIBLE);
213                 schedule_timeout(1024);
214                 
215         }
216
217         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: calling daemonize\n"));
218         daemonize();
219         LGMNAL_GM_LOCK(nal_data);
220         while(nal_data->rxthread_flag == LGMNAL_THREAD_CONTINUE) {
221                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Receive thread waiting\n"));
222                 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
223                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: receive thread got [%s]\n", lgmnal_rxevent(rxevent)));
224                 if (nal_data->rxthread_flag != LGMNAL_THREAD_CONTINUE) {
225                         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Receive thread time to exit\n"));
226                         break;
227                 }
228                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
229
230                         case(GM_RECV_EVENT):
231                                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: GM_RECV_EVENT\n"));
232                                 recv = (gm_recv_t*)&(rxevent->recv);
233                                 buffer = gm_ntohp(recv->buffer);
234                                 if (((lgmnal_msghdr_t*)buffer)->type == LGMNAL_SMALL_MESSAGE) {
235                                         LGMNAL_GM_UNLOCK(nal_data);
236                                         lgmnal_small_receive1(nal_data, recv);
237                                         LGMNAL_GM_LOCK(nal_data);
238                                 } else {
239                                         LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: Unsupported message type\n"));
240                                         lgmnal_badrx_message(nal_data, recv, NULL);
241                                 }
242                         break;
243                         case(_GM_SLEEP_EVENT):
244                                 /*
245                                  *      Blocking receive above just returns
246                                  *      immediatly with _GM_SLEEP_EVENT
247                                  *      Don't know what this is
248                                  */
249                                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Sleeping in gm_unknown\n"));
250                                 LGMNAL_GM_UNLOCK(nal_data);
251                                 gm_unknown(nal_data->gm_port, rxevent);
252                                 LGMNAL_GM_LOCK(nal_data);
253                                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Awake from gm_unknown\n"));
254                                 break;
255                                 
256                         default:
257                                 /*
258                                  *      Don't know what this is
259                                  *      gm_unknown will make sense of it
260                                  */
261                                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Passing event to gm_unknown\n"));
262                                 gm_unknown(nal_data->gm_port, rxevent);
263                                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Processed unknown event\n"));
264                                 
265                 }
266
267                 
268         }
269         LGMNAL_GM_UNLOCK(nal_data);
270         nal_data->rxthread_flag = LGMNAL_THREAD_STOPPED;
271         LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: The lgmnal_receive_thread nal_data [%p] is exiting\n", nal_data));
272         return(LGMNAL_STATUS_OK);
273 }
274
275
276 int
277 lgmnal_small_transmit(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
278         ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
279 {
280         lgmnal_data_t   *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
281         lgmnal_stxd_t   *stxd = NULL;
282         void            *buffer = NULL;
283         lgmnal_msghdr_t *msghdr = NULL;
284         int             tot_size = 0;
285         unsigned int    local_nid;
286         gm_status_t     gm_status = GM_SUCCESS;
287
288         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_transmit nal_cb [%p] private [%p] cookie [%p] hdr [%p] type [%d] global_nid [%u][%x] pid [%d] niov [%d] iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type, global_nid, global_nid, pid, niov, iov, size));
289
290         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: dest_nid [%lu], src_nid [%lu]\n", hdr->dest_nid, hdr->src_nid));
291
292         if (!nal_data) {
293                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_data\n"));
294                 return(LGMNAL_STATUS_FAIL);
295         } else {
296                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("nal_data [%p]\n", nal_data));
297         }
298
299         LGMNAL_GM_LOCK(nal_data);
300         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, &local_nid);
301         LGMNAL_GM_UNLOCK(nal_data);
302         if (gm_status != GM_SUCCESS) {
303                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Failed to obtain local id\n"));
304                 return(LGMNAL_STATUS_FAIL);
305         }
306         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Local Node_id is [%u][%x]\n", local_nid, local_nid));
307
308         stxd = lgmnal_get_stxd(nal_data, 1);
309         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
310
311         stxd->type = LGMNAL_SMALL_MESSAGE;
312         stxd->cookie = cookie;
313
314         /*
315          *      Copy lgmnal_msg_hdr and portals header to the transmit buffer
316          *      Then copy the data in
317          */
318         buffer = stxd->buffer;
319         msghdr = (lgmnal_msghdr_t*)buffer;
320
321         msghdr->magic = LGMNAL_MAGIC;
322         msghdr->type = LGMNAL_SMALL_MESSAGE;
323         msghdr->sender_node_id = nal_data->gm_global_nid;
324         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
325
326         buffer += sizeof(lgmnal_msghdr_t);
327         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Advancing buffer pointer by [%x] to [%p]\n", sizeof(lgmnal_msghdr_t), buffer));
328
329         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing  portals hdr at [%p]\n", buffer));
330         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
331
332         buffer += sizeof(ptl_hdr_t);
333
334         while(niov--) {
335                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing iov [%p] len [%d] to [%p]\n", iov, iov->iov_len, buffer));
336                 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
337                 buffer+= iov->iov_len;
338                 iov++;
339         }
340
341         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending\n"));
342         tot_size = size+sizeof(ptl_hdr_t)+sizeof(lgmnal_msghdr_t);
343
344
345         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling gm_send_to_peer port [%p] buffer [%p] gmsize [%d] msize [%d] global_nid [%u][%x] local_nid[%d] stxd [%p]\n",
346                         nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, global_nid, global_nid, local_nid, stxd));
347         LGMNAL_GM_LOCK(nal_data);
348         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, GM_LOW_PRIORITY, local_nid, lgmnal_small_tx_done, (void*)stxd);
349         
350         LGMNAL_GM_UNLOCK(nal_data);
351         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
352                 
353         return(PTL_OK);
354 }
355
356
357 void 
358 lgmnal_small_tx_done(gm_port_t *gm_port, void *context, gm_status_t status)
359 {
360         lgmnal_stxd_t   *stxd = (lgmnal_stxd_t*)context;
361         lib_msg_t       *cookie = stxd->cookie;
362         lgmnal_data_t   *nal_data = (lgmnal_data_t*)stxd->nal_data;
363         nal_cb_t        *nal_cb = nal_data->nal_cb;
364
365         if (!stxd) {
366                 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("send completion event for unknown stxd\n"));
367                 return;
368         }
369         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Result of send stxd [%p] is [%s]\n", stxd, lgmnal_gm_error(status)));
370         /* TO DO figure out which sends are worth retrying and get a send token to retry */
371         if (lib_finalize(nal_cb, stxd, cookie) != PTL_OK) {
372                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Call to lib_finalize failed for stxd [%p]\n", stxd));
373         }
374         lgmnal_return_stxd(nal_data, stxd);
375         return;
376 }
377
378
379 void 
380 lgmnal_large_tx1_done(gm_port_t *gm_port, void *context, gm_status_t status)
381 {
382
383 }
384
385 /*
386  *      Begin a large transmit
387  */
388 int
389 lgmnal_large_transmit1(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
390         ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
391 {
392
393         lgmnal_data_t   *nal_data;
394         lgmnal_stxd_t   *stxd = NULL;
395         void            *buffer = NULL;
396         lgmnal_msghdr_t *msghdr = NULL;
397         unsigned int    local_nid;
398         int             mlen = 0;       /* the size of the init message data */
399
400
401         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_large_transmit1 nal_cb [%p] private [%p], cookie [%p] hdr [%p], type [%d] global_nid [%u], pid [%d], 
402                                         niov [%d], iov [%p], size [%d]\n",
403                                         nal_cb, private, cookie, hdr, type, global_nid, pid, niov, iov, size));
404
405         if (nal_cb)
406                 nal_data = (lgmnal_data_t*)nal_cb->nal_data;
407         else  {
408                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_cb.\n"));
409                 return(LGMNAL_STATUS_FAIL);
410         }
411         
412
413         /*
414          *      TO DO large transmit uses stxd. Should it have control descriptor?
415          */
416         stxd = lgmnal_get_stxd(nal_data, 1);
417         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
418
419         stxd->type = LGMNAL_LARGE_MESSAGE_INIT;
420         stxd->cookie = cookie;
421
422         /*
423          *      Copy lgmnal_msg_hdr and portals header to the transmit buffer
424          *      Then copy the iov in
425          */
426         buffer = stxd->buffer;
427         msghdr = (lgmnal_msghdr_t*)buffer;
428
429         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
430
431         msghdr->magic = LGMNAL_MAGIC;
432         msghdr->type = LGMNAL_LARGE_MESSAGE_INIT;
433         msghdr->sender_node_id = nal_data->gm_global_nid;
434         msghdr->stxd = stxd;
435         buffer += sizeof(lgmnal_msghdr_t);
436         mlen = sizeof(lgmnal_msghdr_t);
437
438
439         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing  portals hdr at [%p]\n", buffer));
440
441         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
442         buffer += sizeof(ptl_hdr_t);
443         mlen += sizeof(ptl_hdr_t); 
444
445         /*
446          *      Store the iovs in the stxd for we can get them later
447          *      in large_transmit2
448          */
449         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Copying iov [%p] to [%p]\n", iov, stxd->iov));
450         gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
451         stxd->niov = niov;
452         
453         /*
454          *      Send the init message to the target
455          */
456         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending mlen [%d]\n", mlen));
457         LGMNAL_GM_LOCK(nal_data);
458         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, mlen, GM_LOW_PRIORITY, local_nid, lgmnal_large_tx1_done, (void*)stxd);
459         LGMNAL_GM_UNLOCK(nal_data);
460         
461         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
462                 
463         return(PTL_OK);
464 }
465
466
467
468
469 EXPORT_SYMBOL(lgmnal_requeue_rxbuffer);
470 EXPORT_SYMBOL(lgmnal_badrx_message);
471 EXPORT_SYMBOL(lgmnal_large_tx1_done);
472 EXPORT_SYMBOL(lgmnal_large_transmit1);
473 EXPORT_SYMBOL(lgmnal_small_receive1);
474 EXPORT_SYMBOL(lgmnal_small_receive2);
475 EXPORT_SYMBOL(lgmnal_receive_thread);
476 EXPORT_SYMBOL(lgmnal_small_transmit);
477 EXPORT_SYMBOL(lgmnal_small_tx_done);