Whamcloud - gitweb
* cleaned up startup/shutdown handling
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmnal.h"
27
28 void
29 gmnal_pack_msg(gmnal_ni_t *gmnalni, gmnal_tx_t *tx,
30                ptl_nid_t dstnid, int type)
31 {
32         gmnal_msg_t *msg = tx->tx_msg;
33
34         /* CAVEAT EMPTOR! this only sets the common message fields. */
35         msg->gmm_magic    = GMNAL_MSG_MAGIC;
36         msg->gmm_version  = GMNAL_MSG_VERSION;
37         msg->gmm_type     = type;
38         msg->gmm_srcnid   = gmnalni->gmni_libnal->libnal_ni.ni_pid.nid;
39         msg->gmm_dstnid   = dstnid;
40 }
41
42 int
43 gmnal_unpack_msg(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
44 {
45         gmnal_msg_t *msg = rx->rx_msg;
46         const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
47         int          flip;
48
49         /* 6 bytes are enough to have received magic + version */
50         if (rx->rx_recv_nob < 6) {
51                 CERROR("Short message from gmid %u: %d\n", 
52                        rx->rx_recv_gmid, rx->rx_recv_nob);
53                 return -EPROTO;
54         }
55
56         if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
57                 flip = 0;
58         } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
59                 flip = 1;
60         } else {
61                 CERROR("Bad magic from gmid %u: %08x\n", 
62                        rx->rx_recv_gmid, msg->gmm_magic);
63                 return -EPROTO;
64         }
65
66         if (msg->gmm_version != 
67             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
68                 CERROR("Bad version from gmid %u: %d\n", 
69                        rx->rx_recv_gmid, msg->gmm_version);
70                 return -EPROTO;
71         }
72
73         if (rx->rx_recv_nob < hdr_size) {
74                 CERROR("Short message from %u: %d\n",
75                        rx->rx_recv_gmid, rx->rx_recv_nob);
76                 return -EPROTO;
77         }
78
79         if (flip) {
80                 /* leave magic unflipped as a clue to peer endianness */
81                 __swab16s(&msg->gmm_version);
82                 __swab16s(&msg->gmm_type);
83                 __swab64s(&msg->gmm_srcnid);
84                 __swab64s(&msg->gmm_dstnid);
85         }
86         
87         if (msg->gmm_srcnid == PTL_NID_ANY) {
88                 CERROR("Bad src nid from %u: "LPX64"\n", 
89                        rx->rx_recv_gmid, msg->gmm_srcnid);
90                 return -EPROTO;
91         }
92
93         if (msg->gmm_dstnid != gmnalni->gmni_libnal->libnal_ni.ni_pid.nid) {
94                 CERROR("Bad dst nid from %u: "LPX64"\n",
95                        rx->rx_recv_gmid, msg->gmm_dstnid);
96                 return -EPROTO;
97         }
98         
99         switch (msg->gmm_type) {
100         default:
101                 CERROR("Unknown message type from %u: %x\n", 
102                        rx->rx_recv_gmid, msg->gmm_type);
103                 return -EPROTO;
104                 
105         case GMNAL_MSG_IMMEDIATE:
106                 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
107                         CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", 
108                                rx->rx_recv_gmid, rx->rx_recv_nob, 
109                                offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
110                         return -EPROTO;
111                 }
112                 break;
113         }
114         return 0;
115 }
116
117
118 /*
119  *      The caretaker thread
120  *      This is main thread of execution for the NAL side
121  *      This guy waits in gm_blocking_recvive and gets
122  *      woken up when the myrinet adaptor gets an interrupt.
123  *      Hands off receive operations to the receive thread 
124  *      This thread Looks after gm_callbacks etc inline.
125  */
126 int
127 gmnal_ct_thread(void *arg)
128 {
129         gmnal_ni_t              *gmnalni = arg;
130         gm_recv_event_t         *rxevent = NULL;
131         gm_recv_t               *recv = NULL;
132
133         sprintf(current->comm, "gmnal_ct");
134         kportal_daemonize("gmnalctd");
135
136         gmnalni->gmni_ctthread_flag = GMNAL_CTTHREAD_STARTED;
137
138         while(gmnalni->gmni_ctthread_flag == GMNAL_CTTHREAD_STARTED) {
139
140                 spin_lock(&gmnalni->gmni_gm_lock);
141                 rxevent = gm_blocking_receive_no_spin(gmnalni->gmni_port);
142                 spin_unlock(&gmnalni->gmni_gm_lock);
143
144                 if (gmnalni->gmni_ctthread_flag == GMNAL_THREAD_STOP) {
145                         CDEBUG(D_NET, "time to exit\n");
146                         break;
147                 }
148
149                 CDEBUG(D_NET, "got [%s]\n", gmnal_rxevent2str(rxevent));
150
151                 if (GM_RECV_EVENT_TYPE(rxevent) == GM_RECV_EVENT) {
152                         recv = (gm_recv_t*)&rxevent->recv;
153                         gmnal_enqueue_rx(gmnalni, recv);
154                 } else {
155                         gm_unknown(gmnalni->gmni_port, rxevent);
156                 }
157         }
158
159         gmnalni->gmni_ctthread_flag = GMNAL_THREAD_RESET;
160         CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
161         return 0;
162 }
163
164
165 /*
166  *      process a receive event
167  */
168 int 
169 gmnal_rx_thread(void *arg)
170 {
171         gmnal_ni_t    *gmnalni = arg;
172         char           name[16];
173         gmnal_rx_t    *rx;
174         int            rank;
175
176         for (rank=0; rank<num_rx_threads; rank++)
177                 if (gmnalni->gmni_rxthread_pid[rank] == current->pid)
178                         break;
179
180         snprintf(name, sizeof(name), "gmnal_rx_%d", rank);
181         kportal_daemonize(name);
182
183         /*
184          *      set 1 bit for each thread started
185          *      doesn't matter which bit
186          */
187         spin_lock(&gmnalni->gmni_rxthread_flag_lock);
188         if (gmnalni->gmni_rxthread_flag)
189                 gmnalni->gmni_rxthread_flag = gmnalni->gmni_rxthread_flag*2 + 1;
190         else
191                 gmnalni->gmni_rxthread_flag = 1;
192         spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
193
194         while(gmnalni->gmni_rxthread_stop_flag != GMNAL_THREAD_STOP) {
195                 CDEBUG(D_NET, "RXTHREAD:: Receive thread waiting\n");
196
197                 rx = gmnal_dequeue_rx(gmnalni);
198                 if (rx == NULL) {
199                         CDEBUG(D_NET, "Receive thread time to exit\n");
200                         break;
201                 }
202                 
203                 /* We're connectionless: simply ignore packets on error */
204                 
205                 if (gmnal_unpack_msg(gmnalni, rx) == 0) {
206                         
207                         LASSERT (rx->rx_msg->gmm_type == GMNAL_MSG_IMMEDIATE);
208                         (void)lib_parse(gmnalni->gmni_libnal, 
209                                         &rx->rx_msg->gmm_u.immediate.gmim_hdr,
210                                         rx);
211                 }
212
213                 gmnal_post_rx(gmnalni, rx);
214         }
215
216         spin_lock(&gmnalni->gmni_rxthread_flag_lock);
217         gmnalni->gmni_rxthread_flag /= 2;
218         spin_unlock(&gmnalni->gmni_rxthread_flag_lock);
219
220         CDEBUG(D_NET, "thread gmnalni [%p] is exiting\n", gmnalni);
221         return 0;
222 }
223
224 void
225 gmnal_post_rx(gmnal_ni_t *gmnalni, gmnal_rx_t *rx)
226 {
227         CDEBUG(D_NET, "requeueing rx[%p] gmnalni[%p]\n", rx, gmnalni);
228
229         spin_lock(&gmnalni->gmni_gm_lock);
230         gm_provide_receive_buffer_with_tag(gmnalni->gmni_port, rx->rx_msg,
231                                            rx->rx_gmsize, GM_LOW_PRIORITY, 0 );
232         spin_unlock(&gmnalni->gmni_gm_lock);
233 }
234
235 void 
236 gmnal_resume_sending_callback(struct gm_port *gm_port, void *context,
237                               gm_status_t status)
238 {
239         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
240         gmnal_ni_t      *gmnalni = tx->tx_gmni;
241         lib_msg_t       *libmsg = tx->tx_libmsg;
242
243         CWARN("status for tx [%p] is [%d][%s]\n", 
244               tx, status, gmnal_gmstatus2str(status));
245
246         gmnal_return_tx(gmnalni, tx);
247         lib_finalize(gmnalni->gmni_libnal, NULL, libmsg, PTL_FAIL);
248 }
249
250 void 
251 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
252                           gm_status_t status)
253 {
254         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
255         gmnal_ni_t      *gmnalni = tx->tx_gmni;
256
257         CERROR("status for tx [%p] is [%d][%s]\n", 
258                tx, status, gmnal_gmstatus2str(status));
259
260         gm_resume_sending(gmnalni->gmni_port, tx->tx_gm_priority,
261                           tx->tx_gmlid, gm_port_id,
262                           gmnal_resume_sending_callback, tx);
263 }
264
265 void 
266 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
267 {
268         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
269         gmnal_ni_t      *gmnalni = tx->tx_gmni;
270         lib_nal_t       *libnal = gmnalni->gmni_libnal;
271         lib_msg_t       *libmsg = tx->tx_libmsg;
272         ptl_err_t        rc;
273
274         if (!tx) {
275                 CERROR("send completion event for unknown tx\n");
276                 return;
277         }
278
279         switch(status) {
280         case(GM_SUCCESS):
281                 rc = PTL_OK;
282                 break;
283
284         case(GM_SEND_DROPPED):
285                 rc = PTL_FAIL;
286                 break;
287                         
288         default:
289                 CERROR("Error %d(%s), nid "LPD64"\n",
290                        status, gmnal_gmstatus2str(status), tx->tx_nid);
291
292                 spin_lock(&gmnalni->gmni_gm_lock);
293                 gm_drop_sends(gmnalni->gmni_port, tx->tx_gm_priority, 
294                               tx->tx_gmlid, gm_port_id, 
295                               gmnal_drop_sends_callback, tx);
296                 spin_unlock(&gmnalni->gmni_gm_lock);
297                 return;
298         }
299
300         gmnal_return_tx(gmnalni, tx);
301         lib_finalize(libnal, NULL, libmsg, rc);
302         return;
303 }
304
305 ptl_err_t
306 gmnal_post_tx (gmnal_ni_t *gmnalni, gmnal_tx_t *tx, 
307                lib_msg_t *libmsg, ptl_nid_t nid, int nob)
308 {
309         gm_status_t  gm_status;
310
311         CDEBUG(D_NET, "send %d bytes to "LPU64"\n", nob, nid);
312
313         LASSERT ((nid >> 32) == 0);
314
315         spin_lock(&gmnalni->gmni_gm_lock);
316         gm_status = gm_global_id_to_node_id(gmnalni->gmni_port, (__u32)nid, 
317                                             &tx->tx_gmlid);
318         spin_unlock(&gmnalni->gmni_gm_lock);
319
320         if (gm_status != GM_SUCCESS) {
321                 CERROR("Failed to obtain local id\n");
322                 gmnal_return_tx(gmnalni, tx);
323                 return PTL_FAIL;
324         }
325
326         CDEBUG(D_NET, "Local Node_id is [%u][%x]\n", 
327                tx->tx_gmlid, tx->tx_gmlid);
328
329         tx->tx_nid = nid;
330         tx->tx_libmsg = libmsg;
331         tx->tx_gm_priority = GM_LOW_PRIORITY;
332         tx->tx_msg_size = nob;
333
334         CDEBUG(D_NET, "Calling gm_send_to_peer port [%p] buffer [%p] "
335                "gmsize [%lu] msize [%d] nid ["LPU64"] local_gmid[%d] "
336                "tx [%p]\n", gmnalni->gmni_port, tx->tx_msg, 
337                tx->tx_gm_size, tx->tx_msg_size, 
338                tx->tx_nid, tx->tx_gmlid, tx);
339
340         spin_lock(&gmnalni->gmni_gm_lock);
341         gm_send_to_peer_with_callback(gmnalni->gmni_port, tx->tx_msg,
342                                       tx->tx_gm_size, tx->tx_msg_size,
343                                       tx->tx_gm_priority, tx->tx_gmlid,
344                                       gmnal_tx_callback, (void*)tx);
345         spin_unlock(&gmnalni->gmni_gm_lock);
346
347         return PTL_OK;
348 }