Whamcloud - gitweb
Portals NAL for Myrinet GM2 for Lustre (lgmnal)
[fs/lustre-release.git] / lustre / portals / knals / lgmnal / lgmnal_comm.c
1 /*
2  * This program was prepared by the Regents of the University of
3  * California at Los Alamos National Laboratory (the University) under 
4  * contract number W-7405-ENG-36 with the U.S. Department of Energy
5  * (DoE). Neither the U.S. Government nor the
6  * University makes any warranty, express or implied, or assumes any
7  * liability or responsibility for the use of this software.
8  */
9
10 /*
11  *      This file contains all lgmnal send and receive functions
12  */
13
14 #include "lgmnal.h"
15
16 int
17 lgmnal_requeue_rxbuffer(lgmnal_data_t *nal_data, lgmnal_srxd_t *srxd)
18 {
19         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_requeue_rxbuffer\n"));
20
21         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("requeueing srxd[%p] nal_data[%p]\n", srxd, nal_data));
22
23         LGMNAL_GM_LOCK(nal_data);
24         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer,
25                                         srxd->gmsize, GM_LOW_PRIORITY, 0 );
26         LGMNAL_GM_UNLOCK(nal_data);
27
28         return(LGMNAL_STATUS_OK);
29 }
30
31
32 /*
33  *      Handle a bad message
34  *      A bad message is one we don't expect or can't interpret
35  */
36 int
37 lgmnal_badrx_message(lgmnal_data_t *nal_data, gm_recv_t *recv, lgmnal_srxd_t *srxd)
38 {
39         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("Can't handle message\n"));
40
41         if (!srxd)
42                 srxd = lgmnal_rxbuffer_to_srxd(nal_data, gm_ntohp(recv->buffer));
43         if (srxd) {
44                 lgmnal_requeue_rxbuffer(nal_data, srxd);
45         } else {
46                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Can't find a descriptor for this buffer\n"));
47                 /*
48                  *      get rid of it ?
49                  */
50                 return(LGMNAL_STATUS_FAIL);
51         }
52
53         return(LGMNAL_STATUS_OK);
54 }
55
56
57 /*
58  *      Start processing a small message receive
59  *      Get here from lgmnal_receive_thread
60  *      Hand off to lib_parse, which calls cb_recv
61  *      which hands back to lgmnal_small_receive2
62  *      Deal with all endian stuff here (if we can!)
63  */
64 int
65 lgmnal_small_receive1(lgmnal_data_t *nal_data, gm_recv_t *recv)
66 {
67         lgmnal_srxd_t   *srxd = NULL;
68         void            *buffer = NULL;
69         unsigned int snode, sport, type, length;
70         lgmnal_msghdr_t *lgmnal_msghdr;
71         ptl_hdr_t       *portals_hdr;
72
73         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive1 nal_data [%p], recv [%p]\n", nal_data, recv));
74
75         buffer = gm_ntohp(recv->buffer);;
76         snode = (int)gm_ntoh_u16(recv->sender_node_id);
77         sport = (int)gm_ntoh_u8(recv->sender_port_id);
78         type = (int)gm_ntoh_u8(recv->type);
79         buffer = gm_ntohp(recv->buffer);
80         length = (int) gm_ntohl(recv->length);
81
82         lgmnal_msghdr = (lgmnal_msghdr_t*)buffer;
83         portals_hdr = (ptl_hdr_t*)(buffer+LGMNAL_MSGHDR_SIZE);
84
85         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("rx_event:: Sender node [%d], Sender Port [%d], type [%d], length [%d], buffer [%p]\n",
86                                 snode, sport, type, length, buffer));
87         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lgmnal_msghdr:: Sender node [%u], magic [%lx], type [%d]\n",
88                                 lgmnal_msghdr->sender_node_id, lgmnal_msghdr->magic, lgmnal_msghdr->type));
89         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: Sender node [%ul], dest_node [%ul]\n",
90                                 portals_hdr->src_nid, portals_hdr->dest_nid));
91
92
93         /*
94          *      Get a transmit descriptor for this message
95          */
96         srxd = lgmnal_rxbuffer_to_srxd(nal_data, buffer);
97         LGMNAL_PRINT(LGMNAL_DEBUG, ("Back from lgmnal_rxbuffer_to_srxd\n"));
98         if (!srxd) {
99                 LGMNAL_PRINT(LGMNAL_DEBUG, ("Failed to get receive descriptor for this buffer\n"));
100                 lib_parse(nal_data->nal_cb, portals_hdr, srxd);
101                 return(LGMNAL_STATUS_FAIL);
102         }
103         srxd->type = LGMNAL_SMALL_MESSAGE;
104         
105         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling lib_parse buffer is [%p]\n", buffer+LGMNAL_MSGHDR_SIZE));
106         /*
107          *      control passes to lib, which calls cb_recv 
108          *      cb_recv is responsible for returning the buffer 
109          *      for future receive
110          */
111         lib_parse(nal_data->nal_cb, portals_hdr, srxd);
112
113         return(LGMNAL_STATUS_OK);
114 }
115
116 /*
117  *      Get here from lgmnal_receive_thread, lgmnal_small_receive1
118  *      lib_parse, cb_recv
119  *      Put data from prewired receive buffer into users buffer(s)
120  *      Hang out the receive buffer again for another receive
121  *      Call lib_finalize
122  */
123 int
124 lgmnal_small_receive2(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, unsigned int niov, 
125                                                         struct iovec *iov, size_t mlen, size_t rlen)
126 {
127         lgmnal_srxd_t   *srxd = NULL;
128         void    *buffer = NULL;
129         lgmnal_data_t   *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
130
131
132         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_receive2 niov [%d] mlen[%d]\n", niov, mlen));
133
134         if (!private) {
135                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("lgmnal_small_receive2 no context\n"));
136                 lib_finalize(nal_cb, private, cookie);
137                 return(PTL_FAIL);
138         }
139
140         srxd = (lgmnal_srxd_t*)private;
141         buffer = srxd->buffer;
142         buffer += sizeof(lgmnal_msghdr_t);
143         buffer += sizeof(ptl_hdr_t);
144
145         while(niov--) {
146                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing [%p] len [%d]\n", iov, iov->iov_len));
147                 gm_bcopy(buffer, iov->iov_base, iov->iov_len);                  
148                 buffer += iov->iov_len;
149                 iov++;
150         }
151
152
153         /*
154          *      let portals library know receive is complete
155          */
156         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling lib_finalize\n"));
157         if (lib_finalize(nal_cb, private, cookie) != PTL_OK) {
158                 /* TO DO what to do with failed lib_finalise? */
159                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("lib_finalize failed\n"));
160         }
161         /*
162          *      return buffer so it can be used again
163          */
164         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("calling gm_provide_receive_buffer\n"));
165         LGMNAL_GM_LOCK(nal_data);
166         gm_provide_receive_buffer_with_tag(nal_data->gm_port, srxd->buffer, srxd->gmsize, GM_LOW_PRIORITY, 0);  
167         LGMNAL_GM_UNLOCK(nal_data);
168
169         return(PTL_OK);
170 }
171
172
173
174 /*
175  *      The recevive thread
176  *      This guy wait in gm_blocking_recvive and gets
177  *      woken up when the myrinet adaptor gets an interrupt.
178  *      Hands off processing of small messages and blocks again
179  */
180 int
181 lgmnal_receive_thread(void *arg)
182 {
183         lgmnal_data_t           *nal_data;
184         gm_recv_event_t         *rxevent = NULL;
185         gm_recv_t               *recv = NULL;
186         void                    *buffer;
187
188         if (!arg) {
189                 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread. NO nal_data. Exiting\n", arg));
190                 return(-1);
191         }
192
193         nal_data = (lgmnal_data_t*)arg;
194         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("RXTHREAD:: This is the lgmnal_receive_thread nal_data is [%p]\n", arg));
195
196         nal_data->rxthread_flag = LGMNAL_THREAD_STARTED;
197         while (nal_data->rxthread_flag == LGMNAL_THREAD_STARTED) {
198                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: lgmnal_receive_threads waiting for LGMNAL_CONTINUE flag\n"));
199                 set_current_state(TASK_INTERRUPTIBLE);
200                 schedule_timeout(1024);
201                 
202         }
203
204         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: calling daemonize\n"));
205         daemonize();
206         LGMNAL_GM_LOCK(nal_data);
207         while(nal_data->rxthread_flag == LGMNAL_THREAD_CONTINUE) {
208                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Receive thread waiting\n"));
209                 rxevent = gm_blocking_receive_no_spin(nal_data->gm_port);
210                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: receive thread got [%s]\n", lgmnal_rxevent(rxevent)));
211                 if (nal_data->rxthread_flag != LGMNAL_THREAD_CONTINUE) {
212                         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Receive thread time to exit\n"));
213                         break;
214                 }
215                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
216
217                         case(GM_RECV_EVENT):
218                                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: GM_RECV_EVENT\n"));
219                                 recv = (gm_recv_t*)&(rxevent->recv);
220                                 buffer = gm_ntohp(recv->buffer);
221                                 if (((lgmnal_msghdr_t*)buffer)->type == LGMNAL_SMALL_MESSAGE) {
222                                         LGMNAL_GM_UNLOCK(nal_data);
223                                         lgmnal_small_receive1(nal_data, recv);
224                                         LGMNAL_GM_LOCK(nal_data);
225                                 } else {
226                                         LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: Unsupported message type\n"));
227                                         lgmnal_badrx_message(nal_data, recv, NULL);
228                                 }
229                         break;
230                         case(_GM_SLEEP_EVENT):
231                                 /*
232                                  *      Blocking receive above just returns
233                                  *      immediatly with _GM_SLEEP_EVENT
234                                  *      Don't know what this is
235                                  */
236                                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Sleeping in gm_unknown\n"));
237                                 LGMNAL_GM_UNLOCK(nal_data);
238                                 gm_unknown(nal_data->gm_port, rxevent);
239                                 LGMNAL_GM_LOCK(nal_data);
240                                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Awake from gm_unknown\n"));
241                                 break;
242                                 
243                         default:
244                                 /*
245                                  *      Don't know what this is
246                                  *      gm_unknown will make sense of it
247                                  */
248                                 LGMNAL_PRINT(LGMNAL_DEBUG_V, ("RXTHREAD:: Passing event to gm_unknown\n"));
249                                 gm_unknown(nal_data->gm_port, rxevent);
250                                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("RXTHREAD:: Processed unknown event\n"));
251                                 
252                 }
253
254                 
255         }
256         LGMNAL_GM_UNLOCK(nal_data);
257         nal_data->rxthread_flag = LGMNAL_THREAD_STOPPED;
258         LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("RXTHREAD:: The lgmnal_receive_thread nal_data [%p] is exiting\n", nal_data));
259         return(LGMNAL_STATUS_OK);
260 }
261
262
263 int
264 lgmnal_small_transmit(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
265         ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
266 {
267         lgmnal_data_t   *nal_data = (lgmnal_data_t*)nal_cb->nal_data;
268         lgmnal_stxd_t   *stxd = NULL;
269         void            *buffer = NULL;
270         lgmnal_msghdr_t *msghdr = NULL;
271         int             tot_size = 0;
272         unsigned int    local_nid;
273         gm_status_t     gm_status = GM_SUCCESS;
274
275         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_small_transmit nal_cb [%p] private [%p] cookie [%p] hdr [%p] type [%d] global_nid [%u][%x] pid [%d] niov [%d] iov [%p] size [%d]\n", nal_cb, private, cookie, hdr, type, global_nid, global_nid, pid, niov, iov, size));
276
277         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("portals_hdr:: dest_nid [%lu], src_nid [%lu]\n", hdr->dest_nid, hdr->src_nid));
278
279         if (!nal_data) {
280                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_data\n"));
281                 return(LGMNAL_STATUS_FAIL);
282         } else {
283                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("nal_data [%p]\n", nal_data));
284         }
285
286         LGMNAL_GM_LOCK(nal_data);
287         gm_status = gm_global_id_to_node_id(nal_data->gm_port, global_nid, &local_nid);
288         LGMNAL_GM_UNLOCK(nal_data);
289         if (gm_status != GM_SUCCESS) {
290                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("Failed to obtain local id\n"));
291                 return(LGMNAL_STATUS_FAIL);
292         }
293         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Local Node_id is [%u][%x]\n", local_nid, local_nid));
294
295         stxd = lgmnal_get_stxd(nal_data, 1);
296         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
297
298         stxd->type = LGMNAL_SMALL_MESSAGE;
299         stxd->cookie = cookie;
300
301         /*
302          *      Copy lgmnal_msg_hdr and portals header to the transmit buffer
303          *      Then copy the data in
304          */
305         buffer = stxd->buffer;
306         msghdr = (lgmnal_msghdr_t*)buffer;
307
308         msghdr->magic = LGMNAL_MAGIC;
309         msghdr->type = LGMNAL_SMALL_MESSAGE;
310         msghdr->sender_node_id = nal_data->gm_global_nid;
311         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
312
313         buffer += sizeof(lgmnal_msghdr_t);
314         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Advancing buffer pointer by [%x] to [%p]\n", sizeof(lgmnal_msghdr_t), buffer));
315
316         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing  portals hdr at [%p]\n", buffer));
317         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
318
319         buffer += sizeof(ptl_hdr_t);
320
321         while(niov--) {
322                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing iov [%p] len [%d] to [%p]\n", iov, iov->iov_len, buffer));
323                 gm_bcopy(iov->iov_base, buffer, iov->iov_len);
324                 buffer+= iov->iov_len;
325                 iov++;
326         }
327
328         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending\n"));
329         tot_size = size+sizeof(ptl_hdr_t)+sizeof(lgmnal_msghdr_t);
330
331
332         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Calling gm_send_to_peer port [%p] buffer [%p] gmsize [%d] msize [%d] global_nid [%u][%x] local_nid[%d] stxd [%p]\n",
333                         nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, global_nid, global_nid, local_nid, stxd));
334         LGMNAL_GM_LOCK(nal_data);
335         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, tot_size, GM_LOW_PRIORITY, local_nid, lgmnal_small_tx_done, (void*)stxd);
336         
337         LGMNAL_GM_UNLOCK(nal_data);
338         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
339                 
340         return(PTL_OK);
341 }
342
343
344 void 
345 lgmnal_small_tx_done(gm_port_t *gm_port, void *context, gm_status_t status)
346 {
347         lgmnal_stxd_t   *stxd = (lgmnal_stxd_t*)context;
348         lib_msg_t       *cookie = stxd->cookie;
349         lgmnal_data_t   *nal_data = (lgmnal_data_t*)stxd->nal_data;
350         nal_cb_t        *nal_cb = nal_data->nal_cb;
351
352         if (!stxd) {
353                 LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("send completion event for unknown stxd\n"));
354                 return;
355         }
356         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Result of send stxd [%p] is [%s]\n", stxd, lgmnal_gm_error(status)));
357         /* TO DO figure out which sends are worth retrying and get a send token to retry */
358         if (lib_finalize(nal_cb, stxd, cookie) != PTL_OK) {
359                 LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("Call to lib_finalize failed for stxd [%p]\n", stxd));
360         }
361         lgmnal_return_stxd(nal_data, stxd);
362         return;
363 }
364
365
366 void 
367 lgmnal_large_tx1_done(gm_port_t *gm_port, void *context, gm_status_t status)
368 {
369
370 }
371
372 /*
373  *      Begin a large transmit
374  */
375 int
376 lgmnal_large_transmit1(nal_cb_t *nal_cb, void *private, lib_msg_t *cookie, ptl_hdr_t *hdr, int type,
377         ptl_nid_t global_nid, ptl_pid_t pid, unsigned int niov, struct iovec *iov, int size)
378 {
379
380         lgmnal_data_t   *nal_data;
381         lgmnal_stxd_t   *stxd = NULL;
382         void            *buffer = NULL;
383         lgmnal_msghdr_t *msghdr = NULL;
384         unsigned int    local_nid;
385         int             mlen = 0;       /* the size of the init message data */
386
387
388         LGMNAL_PRINT(LGMNAL_DEBUG_TRACE, ("lgmnal_large_transmit1 nal_cb [%p] private [%p], cookie [%p] hdr [%p], type [%d] global_nid [%u], pid [%d], 
389                                         niov [%d], iov [%p], size [%d]\n",
390                                         nal_cb, private, cookie, hdr, type, global_nid, pid, niov, iov, size));
391
392         if (nal_cb)
393                 nal_data = (lgmnal_data_t*)nal_cb->nal_data;
394         else  {
395                 LGMNAL_PRINT(LGMNAL_DEBUG_ERR, ("no nal_cb.\n"));
396                 return(LGMNAL_STATUS_FAIL);
397         }
398         
399
400         /*
401          *      TO DO large transmit uses stxd. Should it have control descriptor?
402          */
403         stxd = lgmnal_get_stxd(nal_data, 1);
404         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("stxd [%p]\n", stxd));
405
406         stxd->type = LGMNAL_LARGE_MESSAGE_INIT;
407         stxd->cookie = cookie;
408
409         /*
410          *      Copy lgmnal_msg_hdr and portals header to the transmit buffer
411          *      Then copy the iov in
412          */
413         buffer = stxd->buffer;
414         msghdr = (lgmnal_msghdr_t*)buffer;
415
416         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing msghdr at [%p]\n", buffer));
417
418         msghdr->magic = LGMNAL_MAGIC;
419         msghdr->type = LGMNAL_LARGE_MESSAGE_INIT;
420         msghdr->sender_node_id = nal_data->gm_global_nid;
421         msghdr->stxd = stxd;
422         buffer += sizeof(lgmnal_msghdr_t);
423         mlen = sizeof(lgmnal_msghdr_t);
424
425
426         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("processing  portals hdr at [%p]\n", buffer));
427
428         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
429         buffer += sizeof(ptl_hdr_t);
430         mlen += sizeof(ptl_hdr_t); 
431
432         /*
433          *      Store the iovs in the stxd for we can get them later
434          *      in large_transmit2
435          */
436         LGMNAL_PRINT(LGMNAL_DEBUG_V, ("Copying iov [%p] to [%p]\n", iov, stxd->iov));
437         gm_bcopy(iov, stxd->iov, niov*sizeof(struct iovec));
438         stxd->niov = niov;
439         
440         /*
441          *      Send the init message to the target
442          */
443         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("sending mlen [%d]\n", mlen));
444         LGMNAL_GM_LOCK(nal_data);
445         gm_send_to_peer_with_callback(nal_data->gm_port, stxd->buffer, stxd->gmsize, mlen, GM_LOW_PRIORITY, local_nid, lgmnal_large_tx1_done, (void*)stxd);
446         LGMNAL_GM_UNLOCK(nal_data);
447         
448         LGMNAL_PRINT(LGMNAL_DEBUG_VV, ("done\n"));
449                 
450         return(PTL_OK);
451 }
452
453
454
455
456 EXPORT_SYMBOL(lgmnal_requeue_rxbuffer);
457 EXPORT_SYMBOL(lgmnal_badrx_message);
458 EXPORT_SYMBOL(lgmnal_large_tx1_done);
459 EXPORT_SYMBOL(lgmnal_large_transmit1);
460 EXPORT_SYMBOL(lgmnal_small_receive1);
461 EXPORT_SYMBOL(lgmnal_small_receive2);
462 EXPORT_SYMBOL(lgmnal_receive_thread);
463 EXPORT_SYMBOL(lgmnal_small_transmit);
464 EXPORT_SYMBOL(lgmnal_small_tx_done);