Whamcloud - gitweb
* GMNAL
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmnal.h"
27
28 /*
29  *      The caretaker thread
30  *      This is main thread of execution for the NAL side
31  *      This guy waits in gm_blocking_recvive and gets
32  *      woken up when the myrinet adaptor gets an interrupt.
33  *      Hands off receive operations to the receive thread 
34  *      This thread Looks after gm_callbacks etc inline.
35  */
36 int
37 gmnal_ct_thread(void *arg)
38 {
39         gmnal_ni_t              *gmnalni;
40         gm_recv_event_t         *rxevent = NULL;
41         gm_recv_t               *recv = NULL;
42
43         if (!arg) {
44                 CDEBUG(D_NET, "NO gmnalni. Exiting\n");
45                 return(-1);
46         }
47
48         gmnalni = (gmnal_ni_t*)arg;
49         CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
50
51         sprintf(current->comm, "gmnal_ct");
52
53         kportal_daemonize("gmnalctd");
54
55         gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
56
57         spin_lock(&gmnalni->gmni_gm_lock);
58         while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
59                 CDEBUG(D_NET, "waiting\n");
60                 rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
61                 if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
62                         CDEBUG(D_NET, "time to exit\n");
63                         break;
64                 }
65                 CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent(rxevent));
66                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
67
68                         case(GM_RECV_EVENT):
69                                 CDEBUG(D_NET, "CTTHREAD:: GM_RECV_EVENT\n");
70                                 recv = (gm_recv_t*)&rxevent->recv;
71                                 spin_unlock(&gmnalni->gmni_gm_lock);
72                                 gmnal_add_rxtwe(gmnalni, recv);
73                                 spin_lock(&gmnalni->gmni_gm_lock);
74                                 CDEBUG(D_NET, "CTTHREAD:: Added event to Q\n");
75                         break;
76                         case(_GM_SLEEP_EVENT):
77                                 /*
78                                  *      Blocking receive above just returns
79                                  *      immediatly with _GM_SLEEP_EVENT
80                                  *      Don't know what this is
81                                  */
82                                 CDEBUG(D_NET, "Sleeping in gm_unknown\n");
83                                 spin_unlock(&gmnalni->gmni_gm_lock);
84                                 gm_unknown(gmnalni->gmni_port, rxevent);
85                                 spin_lock(&gmnalni->gmni_gm_lock);
86                                 CDEBUG(D_NET, "Awake from gm_unknown\n");
87                                 break;
88                                 
89                         default:
90                                 /*
91                                  *      Don't know what this is
92                                  *      gm_unknown will make sense of it
93                                  *      Should be able to do something with
94                                  *      FAST_RECV_EVENTS here.
95                                  */
96                                 CDEBUG(D_NET, "Passing event to gm_unknown\n");
97                                 spin_unlock(&gmnalni->gmni_gm_lock);
98                                 gm_unknown(gmnalni->gmni_port, rxevent);
99                                 spin_lock(&gmnalni->gmni_gm_lock);
100                                 CDEBUG(D_NET, "Processed unknown event\n");
101                 }
102         }
103         spin_unlock(&gmnalni->gmni_gm_lock);
104         gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
105         CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
106
107         return 0;
108 }
109
110
111 /*
112  *      process a receive event
113  */
114 int 
115 gmnal_rx_thread(void *arg)
116 {
117         char                     name[16];
118         gmnal_ni_t              *gmnalni;
119         void                    *buffer;
120         gmnal_rxtwe_t           *we = NULL;
121         int                     rank;
122
123         if (!arg) {
124                 CDEBUG(D_NET, "NO gmnalni. Exiting\n");
125                 return(-1);
126         }
127
128         gmnalni = (gmnal_ni_t*)arg;
129         CDEBUG(D_NET, "gmnalni is [%p]\n", arg);
130
131         for (rank=0; rank<num_rx_threads; rank++)
132                 if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
133                         break;
134
135         snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
136         kportal_daemonize(name);
137
138         /*
139          *      set 1 bit for each thread started
140          *      doesn't matter which bit
141          */
142         spin_lock(&gmnalni->gmni_rxthread_flag_lock);
143         if (gmnalni->gmni_rxthread_flag)
144                 gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
145         else
146                 gmnalni->gmni_rxthread_flag = 1;
147         CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
148         spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
149
150         while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
151                 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
152                 we = gmnal_get_rxtwe(gmnalni);
153                 if (!we) {
154                         CDEBUG(D_NET, "Receive thread time to exit\n");
155                         break;
156                 }
157
158                 buffer = we->buffer;
159                 switch(((gmnal_msghdr_t*)buffer)->gmm_type) {
160                 case(GMNAL_SMALL_MESSAGE):
161                         gmnal_pre_receive(gmnalni, we, GMNAL_SMALL_MESSAGE);
162                 break;
163                 default:
164 #warning better handling
165                         CERROR("Unsupported message type\n");
166                         gmnal_rx_bad(gmnalni, we);
167                 }
168                 PORTAL_FREE(we, sizeof(gmnal_rxtwe_t));
169         }
170
171         spin_lock(&gmnalni->gmni_rxthread_flag_lock);
172         gmnalni->gmni_rxthread_flag/=2;
173         CDEBUG(D_NET, "rxthread flag is [%ld]\n", gmnalni->gmni_rxthread_flag);
174         spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
175         CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
176
177         return 0;
178 }
179
180
181
182 /*
183  *      Start processing a small message receive
184  *      Get here from gmnal_receive_thread
185  *      Hand off to lib_parse, which calls cb_recv
186  *      which hands back to gmnal_small_receive
187  *      Deal with all endian stuff here.
188  */
189 void
190 gmnal_pre_receive(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we, int gmnal_type)
191 {
192         gmnal_srxd_t    *srxd = NULL;
193         void            *buffer = NULL;
194         gmnal_msghdr_t  *gmnal_msghdr;
195         ptl_hdr_t       *portals_hdr;
196
197         CDEBUG(D_NET, "gmnalni [%p], we[%p] type [%d]\n",
198                gmnalni, we, gmnal_type);
199
200         buffer = we->buffer;
201
202         gmnal_msghdr = (gmnal_msghdr_t*)buffer;
203         portals_hdr = (ptl_hdr_t*)(buffer+sizeof(gmnal_msghdr_t));
204
205         CDEBUG(D_NET, "rx_event:: Sender node [%d], Sender Port [%d], "
206                "type [%d], length [%d], buffer [%p]\n",
207                we->snode, we->sport, we->type, we->length, buffer);
208         CDEBUG(D_NET, "gmnal_msghdr:: Sender node [%u], magic [%d], "
209                "gmnal_type [%d]\n", gmnal_msghdr->gmm_sender_gmid,
210                gmnal_msghdr->gmm_magic, gmnal_msghdr->gmm_type);
211         CDEBUG(D_NET, "portals_hdr:: Sender node ["LPD64"], "
212                "dest_node ["LPD64"]\n", portals_hdr->src_nid,
213                portals_hdr->dest_nid);
214
215         /*
216          *      Get a receive descriptor for this message
217          */
218         srxd = gmnal_rxbuffer_to_srxd(gmnalni, buffer);
219         CDEBUG(D_NET, "Back from gmnal_rxbuffer_to_srxd\n");
220         if (!srxd) {
221                 CERROR("Failed to get receive descriptor\n");
222                 LBUG();
223         }
224
225         srxd->rx_gmni = gmnalni;
226         srxd->rx_type = gmnal_type;
227         srxd->rx_nsiov = gmnal_msghdr->gmm_niov;
228         srxd->rx_sender_gmid = gmnal_msghdr->gmm_sender_gmid;
229
230         CDEBUG(D_PORTALS, "Calling lib_parse buffer is [%p]\n",
231                buffer+sizeof(gmnal_msghdr_t));
232
233         (void)lib_parse(gmnalni->gmni_libnal, portals_hdr, srxd);
234         /* Ignore error; we're connectionless */
235
236         gmnal_rx_requeue_buffer(gmnalni, srxd);
237 }
238
239
240
241 /*
242  *      After a receive has been processed, 
243  *      hang out the receive buffer again.
244  *      This implicitly returns a receive token.
245  */
246 void
247 gmnal_rx_requeue_buffer(gmnal_ni_t *gmnalni, gmnal_srxd_t *srxd)
248 {
249         CDEBUG(D_NET, "requeueing srxd[%p] gmnalni[%p]\n", srxd, gmnalni);
250
251         spin_lock(&gmnalni->gmni_gm_lock);
252         gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, srxd->rx_buffer,
253                                            srxd->rx_gmsize, GM_LOW_PRIORITY, 0 );
254         spin_unlock(&gmnalni->gmni_gm_lock);
255 }
256
257
258 /*
259  *      Handle a bad message
260  *      A bad message is one we don't expect or can't interpret
261  */
262 void
263 gmnal_rx_bad(gmnal_ni_t *gmnalni, gmnal_rxtwe_t *we)
264 {
265         gmnal_srxd_t *srxd = gmnal_rxbuffer_to_srxd(gmnalni, 
266                                                     we->buffer);
267         if (srxd == NULL) {
268                 CERROR("Can't find a descriptor for this buffer\n");
269                 return;
270         }
271
272         gmnal_rx_requeue_buffer(gmnalni, srxd);
273 }
274
275
276
277 /*
278  *      Start a small transmit. 
279  *      Use the given send token (and wired transmit buffer).
280  *      Copy headers to wired buffer and initiate gm_send from the wired buffer.
281  *      The callback function informs when the send is complete.
282  */
283 ptl_err_t
284 gmnal_small_tx(lib_nal_t *libnal, void *private, lib_msg_t *cookie,
285                 ptl_hdr_t *hdr, int type, ptl_nid_t nid,
286                 gmnal_stxd_t *stxd, int size)
287 {
288         gmnal_ni_t      *gmnalni = (gmnal_ni_t*)libnal->libnal_data;
289         void            *buffer = NULL;
290         gmnal_msghdr_t  *msghdr = NULL;
291         int             tot_size = 0;
292         gm_status_t     gm_status = GM_SUCCESS;
293
294         CDEBUG(D_NET, "gmnal_small_tx libnal [%p] private [%p] cookie [%p] "
295                "hdr [%p] type [%d] nid ["LPU64"] stxd [%p] "
296                "size [%d]\n", libnal, private, cookie, hdr, type,
297                nid, stxd, size);
298
299         CDEBUG(D_NET, "portals_hdr:: dest_nid ["LPU64"], src_nid ["LPU64"]\n",
300                hdr->dest_nid, hdr->src_nid);
301
302         LASSERT ((nid >> 32) == 0);
303         LASSERT (gmnalni != NULL);
304
305         spin_lock(&gmnalni->gmni_gm_lock);
306         gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, 
307                                             &stxd->tx_gmlid);
308         spin_unlock(&gmnalni->gmni_gm_lock);
309
310         if (gm_status != GM_SUCCESS) {
311                 CERROR("Failed to obtain local id\n");
312                 return(PTL_FAIL);
313         }
314
315         CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", 
316                stxd->tx_gmlid, stxd->tx_gmlid);
317
318         stxd->tx_nid = nid;
319         stxd->tx_cookie = cookie;
320         stxd->tx_type = GMNAL_SMALL_MESSAGE;
321         stxd->tx_gm_priority = GM_LOW_PRIORITY;
322
323         /*
324          *      Copy gmnal_msg_hdr and portals header to the transmit buffer
325          *      Then send the message, as the data has previously been copied in
326          *      (HP SFS 1380).
327          */
328         buffer = stxd->tx_buffer;
329         msghdr = (gmnal_msghdr_t*)buffer;
330
331         msghdr->gmm_magic = GMNAL_MAGIC;
332         msghdr->gmm_type = GMNAL_SMALL_MESSAGE;
333         msghdr->gmm_sender_gmid = gmnalni->gmni_global_gmid;
334         CDEBUG(D_NET, "processing msghdr at [%p]\n", buffer);
335
336         buffer += sizeof(gmnal_msghdr_t);
337
338         CDEBUG(D_NET, "processing  portals hdr at [%p]\n", buffer);
339         gm_bcopy(hdr, buffer, sizeof(ptl_hdr_t));
340
341         buffer += sizeof(ptl_hdr_t);
342
343         CDEBUG(D_NET, "sending\n");
344         tot_size = size+sizeof(ptl_hdr_t)+sizeof(gmnal_msghdr_t);
345         stxd->tx_msg_size = tot_size;
346
347         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
348                "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
349                "stxd [%p]\n", gmnalni->gmni_port, stxd->tx_buffer, 
350                stxd->tx_gm_size, stxd->tx_msg_size, nid, stxd->tx_gmlid, 
351                stxd);
352
353         spin_lock(&gmnalni->gmni_gm_lock);
354         gm_send_to_peer_with_callback(gmnalni->gmni_port, stxd->tx_buffer,
355                                       stxd->tx_gm_size, stxd->tx_msg_size,
356                                       stxd->tx_gm_priority, stxd->tx_gmlid,
357                                       gmnal_small_tx_callback, (void*)stxd);
358         spin_unlock(&gmnalni->gmni_gm_lock);
359         CDEBUG(D_NET, "done\n");
360
361         return(PTL_OK);
362 }
363
364
365 /*
366  *      A callback to indicate the small transmit operation is compete
367  *      Check for erros and try to deal with them.
368  *      Call lib_finalise to inform the client application that the send 
369  *      is complete and the memory can be reused.
370  *      Return the stxd when finished with it (returns a send token)
371  */
372 void 
373 gmnal_small_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
374 {
375         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
376         lib_msg_t       *cookie = stxd->tx_cookie;
377         gmnal_ni_t      *gmnalni = stxd->tx_gmni;
378         lib_nal_t       *libnal = gmnalni->gmni_libnal;
379
380         if (!stxd) {
381                 CDEBUG(D_NET, "send completion event for unknown stxd\n");
382                 return;
383         }
384         if (status != GM_SUCCESS)
385                 CERROR("Result of send stxd [%p] is [%s] to ["LPU64"]\n",
386                        stxd, gmnal_gm_error(status), stxd->tx_nid);
387
388         switch(status) {
389                 case(GM_SUCCESS):
390                 break;
391
392
393
394                 case(GM_SEND_DROPPED):
395                 /*
396                  *      do a resend on the dropped ones
397                  */
398                         CERROR("send stxd [%p] dropped, resending\n", context);
399                         spin_lock(&gmnalni->gmni_gm_lock);
400                         gm_send_to_peer_with_callback(gmnalni->gmni_port,
401                                                       stxd->tx_buffer,
402                                                       stxd->tx_gm_size,
403                                                       stxd->tx_msg_size,
404                                                       stxd->tx_gm_priority,
405                                                       stxd->tx_gmlid,
406                                                       gmnal_small_tx_callback,
407                                                       context);
408                         spin_unlock(&gmnalni->gmni_gm_lock);
409                 return;
410                 case(GM_TIMED_OUT):
411                 case(GM_SEND_TIMED_OUT):
412                 /*
413                  *      drop these ones
414                  */
415                         CDEBUG(D_NET, "calling gm_drop_sends\n");
416                         spin_lock(&gmnalni->gmni_gm_lock);
417                         gm_drop_sends(gmnalni->gmni_port, stxd->tx_gm_priority, 
418                                       stxd->tx_gmlid, gm_port_id, 
419                                       gmnal_drop_sends_callback, context);
420                         spin_unlock(&gmnalni->gmni_gm_lock);
421
422                 return;
423
424
425                 /*
426                  *      abort on these ?
427                  */
428                 case(GM_TRY_AGAIN):
429                 case(GM_INTERRUPTED):
430                 case(GM_FAILURE):
431                 case(GM_INPUT_BUFFER_TOO_SMALL):
432                 case(GM_OUTPUT_BUFFER_TOO_SMALL):
433                 case(GM_BUSY):
434                 case(GM_MEMORY_FAULT):
435                 case(GM_INVALID_PARAMETER):
436                 case(GM_OUT_OF_MEMORY):
437                 case(GM_INVALID_COMMAND):
438                 case(GM_PERMISSION_DENIED):
439                 case(GM_INTERNAL_ERROR):
440                 case(GM_UNATTACHED):
441                 case(GM_UNSUPPORTED_DEVICE):
442                 case(GM_SEND_REJECTED):
443                 case(GM_SEND_TARGET_PORT_CLOSED):
444                 case(GM_SEND_TARGET_NODE_UNREACHABLE):
445                 case(GM_SEND_PORT_CLOSED):
446                 case(GM_NODE_ID_NOT_YET_SET):
447                 case(GM_STILL_SHUTTING_DOWN):
448                 case(GM_CLONE_BUSY):
449                 case(GM_NO_SUCH_DEVICE):
450                 case(GM_ABORTED):
451                 case(GM_INCOMPATIBLE_LIB_AND_DRIVER):
452                 case(GM_UNTRANSLATED_SYSTEM_ERROR):
453                 case(GM_ACCESS_DENIED):
454                 case(GM_NO_DRIVER_SUPPORT):
455                 case(GM_PTE_REF_CNT_OVERFLOW):
456                 case(GM_NOT_SUPPORTED_IN_KERNEL):
457                 case(GM_NOT_SUPPORTED_ON_ARCH):
458                 case(GM_NO_MATCH):
459                 case(GM_USER_ERROR):
460                 case(GM_DATA_CORRUPTED):
461                 case(GM_HARDWARE_FAULT):
462                 case(GM_SEND_ORPHANED):
463                 case(GM_MINOR_OVERFLOW):
464                 case(GM_PAGE_TABLE_FULL):
465                 case(GM_UC_ERROR):
466                 case(GM_INVALID_PORT_NUMBER):
467                 case(GM_DEV_NOT_FOUND):
468                 case(GM_FIRMWARE_NOT_RUNNING):
469                 case(GM_YP_NO_MATCH):
470                 default:
471                 gm_resume_sending(gmnalni->gmni_port, stxd->tx_gm_priority,
472                                   stxd->tx_gmlid, gm_port_id,
473                                   gmnal_resume_sending_callback, context);
474                 return;
475
476         }
477
478         gmnal_return_stxd(gmnalni, stxd);
479         lib_finalize(libnal, stxd, cookie, PTL_OK);
480         return;
481 }
482
483 /*
484  *      After an error on the port
485  *      call this to allow future sends to complete
486  */
487 void gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
488                                  gm_status_t status)
489 {
490         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
491         gmnal_ni_t     *gmnalni = stxd->tx_gmni;
492
493         CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
494         gmnal_return_stxd(gmnalni, stxd);
495         lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
496         return;
497 }
498
499
500 void gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
501                                 gm_status_t status)
502 {
503         gmnal_stxd_t    *stxd = (gmnal_stxd_t*)context;
504         gmnal_ni_t      *gmnalni = stxd->tx_gmni;
505
506         CDEBUG(D_NET, "status is [%d] context is [%p]\n", status, context);
507         if (status == GM_SUCCESS) {
508                 spin_lock(&gmnalni->gmni_gm_lock);
509                 gm_send_to_peer_with_callback(gm_port, stxd->tx_buffer, 
510                                               stxd->tx_gm_size, 
511                                               stxd->tx_msg_size, 
512                                               stxd->tx_gm_priority, 
513                                               stxd->tx_gmlid, 
514                                               gmnal_small_tx_callback, 
515                                               context);
516                 spin_unlock(&gmnalni->gmni_gm_lock);
517         } else {
518                 CERROR("send_to_peer status for stxd [%p] is "
519                        "[%d][%s]\n", stxd, status, gmnal_gm_error(status));
520                 /* Recycle the stxd */
521                 gmnal_return_stxd(gmnalni, stxd);
522                 lib_finalize(gmnalni->gmni_libnal, stxd, stxd->tx_cookie, PTL_FAIL);
523         }
524
525         return;
526 }
527
528