Whamcloud - gitweb
ea6a8d142549e6e4a734962351892f9a5366a051
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  *  Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  *
6  *   This file is part of Lustre, http://www.lustre.org/
7  *
8  *   Lustre is free software; you can redistribute it and/or
9  *   modify it under the terms of version 2 of the GNU General Public
10  *   License as published by the Free Software Foundation.
11  *
12  *   Lustre is distributed in the hope that it will be useful,
13  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *   GNU General Public License for more details.
16  *
17  *   You should have received a copy of the GNU General Public License
18  *   along with Lustre; if not, write to the Free Software
19  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 /*
23  *      This file contains all gmnal send and receive functions
24  */
25
26 #include "gmlnd.h"
27
28 void
29 gmnal_notify_peer_down(gmnal_tx_t *tx)
30 {
31         struct timeval     now;
32         time_t             then;
33
34         do_gettimeofday (&now);
35         then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;
36
37         lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
38 }
39
40 void
41 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
42                lnet_nid_t dstnid, int type)
43 {
44         /* CAVEAT EMPTOR! this only sets the common message fields. */
45         msg->gmm_magic    = GMNAL_MSG_MAGIC;
46         msg->gmm_version  = GMNAL_MSG_VERSION;
47         msg->gmm_type     = type;
48         msg->gmm_srcnid   = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,
49                                                   dstnid);
50         msg->gmm_dstnid   = dstnid;
51 }
52
53 int
54 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
55 {
56         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
57         const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
58         int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
59                                                 gmni->gmni_small_msgsize;
60         int          flip;
61
62         /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
63
64         /* GM may not overflow our buffer */
65         LASSERT (rx->rx_recv_nob <= buffnob);
66
67         /* 6 bytes are enough to have received magic + version */
68         if (rx->rx_recv_nob < 6) {
69                 CERROR("Short message from gmid %u: %d\n", 
70                        rx->rx_recv_gmid, rx->rx_recv_nob);
71                 return -EPROTO;
72         }
73
74         if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
75                 flip = 0;
76         } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
77                 flip = 1;
78         } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
79                    msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
80                 return EPROTO;
81         } else {
82                 CERROR("Bad magic from gmid %u: %08x\n", 
83                        rx->rx_recv_gmid, msg->gmm_magic);
84                 return -EPROTO;
85         }
86
87         if (msg->gmm_version != 
88             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
89                 return EPROTO;
90         }
91
92         if (rx->rx_recv_nob < hdr_size) {
93                 CERROR("Short message from %u: %d\n",
94                        rx->rx_recv_gmid, rx->rx_recv_nob);
95                 return -EPROTO;
96         }
97
98         if (flip) {
99                 /* leave magic unflipped as a clue to peer endianness */
100                 __swab16s(&msg->gmm_version);
101                 __swab16s(&msg->gmm_type);
102                 __swab64s(&msg->gmm_srcnid);
103                 __swab64s(&msg->gmm_dstnid);
104         }
105         
106         if (msg->gmm_srcnid == LNET_NID_ANY) {
107                 CERROR("Bad src nid from %u: %s\n", 
108                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
109                 return -EPROTO;
110         }
111
112         if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid, 
113                                      msg->gmm_dstnid)) {
114                 CERROR("Bad dst nid from %u: %s\n",
115                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
116                 return -EPROTO;
117         }
118         
119         switch (msg->gmm_type) {
120         default:
121                 CERROR("Unknown message type from %u: %x\n", 
122                        rx->rx_recv_gmid, msg->gmm_type);
123                 return -EPROTO;
124                 
125         case GMNAL_MSG_IMMEDIATE:
126                 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
127                         CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", 
128                                rx->rx_recv_gmid, rx->rx_recv_nob, 
129                                offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
130                         return -EPROTO;
131                 }
132                 break;
133         }
134         return 0;
135 }
136
137 gmnal_tx_t *
138 gmnal_get_tx(gmnal_ni_t *gmni)
139 {
140         gmnal_tx_t       *tx = NULL;
141
142         spin_lock(&gmni->gmni_tx_lock);
143
144         if (gmni->gmni_shutdown ||
145             list_empty(&gmni->gmni_idle_txs)) {
146                 spin_unlock(&gmni->gmni_tx_lock);
147                 return NULL;
148         }
149         
150         tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
151         list_del(&tx->tx_list);
152
153         spin_unlock(&gmni->gmni_tx_lock);
154
155         LASSERT (tx->tx_lntmsg == NULL);
156         LASSERT (tx->tx_ltxb == NULL);
157         LASSERT (!tx->tx_credit);
158         
159         return tx;
160 }
161
162 void
163 gmnal_tx_done(gmnal_tx_t *tx, int rc)
164 {
165         gmnal_ni_t *gmni = tx->tx_gmni;
166         int         wake_sched = 0;
167         lnet_msg_t *lnetmsg = tx->tx_lntmsg;
168         
169         tx->tx_lntmsg = NULL;
170
171         spin_lock(&gmni->gmni_tx_lock);
172         
173         if (tx->tx_ltxb != NULL) {
174                 wake_sched = 1;
175                 list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
176                 tx->tx_ltxb = NULL;
177         }
178         
179         if (tx->tx_credit) {
180                 wake_sched = 1;
181                 gmni->gmni_tx_credits++;
182                 tx->tx_credit = 0;
183         }
184         
185         list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
186
187         if (wake_sched)
188                 gmnal_check_txqueues_locked(gmni);
189
190         spin_unlock(&gmni->gmni_tx_lock);
191
192         /* Delay finalize until tx is free */
193         if (lnetmsg != NULL)
194                 lnet_finalize(gmni->gmni_ni, lnetmsg, 0);
195 }
196
197 void 
198 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
199                           gm_status_t status)
200 {
201         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
202
203         LASSERT(!in_interrupt());
204          
205         CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n", 
206                tx, status, gmnal_gmstatus2str(status),
207                libcfs_nid2str(tx->tx_nid));
208
209         gmnal_tx_done(tx, -EIO);
210 }
211
212 void 
213 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
214 {
215         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
216         gmnal_ni_t      *gmni = tx->tx_gmni;
217
218         LASSERT(!in_interrupt());
219
220         switch(status) {
221         case GM_SUCCESS:
222                 gmnal_tx_done(tx, 0);
223                 return;
224
225         case GM_SEND_DROPPED:
226                 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n", 
227                        tx, libcfs_nid2str(tx->tx_nid));
228                 /* Another tx failed and called gm_drop_sends() which made this
229                  * one complete immediately */
230                 gmnal_tx_done(tx, -EIO);
231                 return;
232                         
233         default:
234                 /* Some error; NB don't complete tx yet; we need its credit for
235                  * gm_drop_sends() */
236                 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
237                        tx, status, gmnal_gmstatus2str(status), 
238                        libcfs_nid2str(tx->tx_nid));
239
240                 gmnal_notify_peer_down(tx);
241
242                 spin_lock(&gmni->gmni_gm_lock);
243                 gm_drop_sends(gmni->gmni_port, 
244                               tx->tx_ltxb != NULL ?
245                               GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
246                               tx->tx_gmlid, *gmnal_tunables.gm_port, 
247                               gmnal_drop_sends_callback, tx);
248                 spin_unlock(&gmni->gmni_gm_lock);
249                 return;
250         }
251
252         /* not reached */
253         LBUG();
254 }
255
256 void
257 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
258 {
259         gmnal_tx_t    *tx;
260         gmnal_txbuf_t *ltxb;
261         int            gmsize;
262         int            pri;
263         void          *netaddr;
264         
265         tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
266              list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
267
268         if (tx != NULL &&
269             (tx->tx_large_nob == 0 || 
270              !list_empty(&gmni->gmni_idle_ltxbs))) {
271
272                 /* consume tx */
273                 list_del(&tx->tx_list);
274                 
275                 LASSERT (tx->tx_ltxb == NULL);
276
277                 if (tx->tx_large_nob != 0) {
278                         ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
279                                           gmnal_txbuf_t, txb_list);
280
281                         /* consume large buffer */
282                         list_del(&ltxb->txb_list);
283
284                         spin_unlock(&gmni->gmni_tx_lock);
285
286                         /* Unlocking here allows sends to get re-ordered,
287                          * but we want to allow other CPUs to progress... */
288
289                         tx->tx_ltxb = ltxb;
290
291                         /* marshall message in tx_ltxb...
292                          * 1. Copy what was marshalled so far (in tx_buf) */
293                         memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),
294                                GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
295
296                         /* 2. Copy the payload */
297                         if (tx->tx_large_iskiov)
298                                 lnet_copy_kiov2kiov(
299                                         gmni->gmni_large_pages,
300                                         ltxb->txb_buf.nb_kiov,
301                                         tx->tx_msgnob,
302                                         tx->tx_large_niov,
303                                         tx->tx_large_frags.kiov,
304                                         tx->tx_large_offset,
305                                         tx->tx_large_nob);
306                         else
307                                 lnet_copy_iov2kiov(
308                                         gmni->gmni_large_pages,
309                                         ltxb->txb_buf.nb_kiov,
310                                         tx->tx_msgnob,
311                                         tx->tx_large_niov,
312                                         tx->tx_large_frags.iov,
313                                         tx->tx_large_offset,
314                                         tx->tx_large_nob);
315
316                         tx->tx_msgnob += tx->tx_large_nob;
317
318                         spin_lock(&gmni->gmni_tx_lock);
319                 }
320
321                 list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
322         }
323
324         if (!list_empty(&gmni->gmni_cred_txq) &&
325             gmni->gmni_tx_credits != 0) {
326
327                 tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
328
329                 /* consume tx and 1 credit */
330                 list_del(&tx->tx_list);
331                 gmni->gmni_tx_credits--;
332
333                 spin_unlock(&gmni->gmni_tx_lock);
334
335                 /* Unlocking here allows sends to get re-ordered, but we want
336                  * to allow other CPUs to progress... */
337
338                 LASSERT(!tx->tx_credit);
339                 tx->tx_credit = 1;
340
341                 tx->tx_launchtime = jiffies;
342
343                 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
344                         LASSERT (tx->tx_ltxb == NULL);
345                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
346                         gmsize = gmni->gmni_small_gmsize;
347                         pri = GMNAL_SMALL_PRIORITY;
348                 } else {
349                         LASSERT (tx->tx_ltxb != NULL);
350                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
351                         gmsize = gmni->gmni_large_gmsize;
352                         pri = GMNAL_LARGE_PRIORITY;
353                 }
354
355                 spin_lock(&gmni->gmni_gm_lock);
356
357                 gm_send_to_peer_with_callback(gmni->gmni_port, 
358                                               netaddr, gmsize, 
359                                               tx->tx_msgnob,
360                                               pri, 
361                                               tx->tx_gmlid,
362                                               gmnal_tx_callback, 
363                                               (void*)tx);
364
365                 spin_unlock(&gmni->gmni_gm_lock);
366                 spin_lock(&gmni->gmni_tx_lock);
367         }
368 }
369
370 void
371 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
372 {
373         int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
374                                         gmni->gmni_small_gmsize;
375         int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
376                                         GMNAL_SMALL_PRIORITY;
377         void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
378
379         CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
380
381         spin_lock(&gmni->gmni_gm_lock);
382         gm_provide_receive_buffer_with_tag(gmni->gmni_port, 
383                                            buffer, gmsize, pri, 0);
384         spin_unlock(&gmni->gmni_gm_lock);
385 }
386
387 void
388 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
389 {
390         /* Future protocol version compatibility support!
391          * The next gmlnd-specific protocol rev will first send a message to
392          * check version; I reply with a stub message containing my current
393          * magic+version... */
394         gmnal_msg_t *msg;
395         gmnal_tx_t  *tx = gmnal_get_tx(gmni);
396
397         if (tx == NULL) {
398                 CERROR("Can't allocate tx to send version info to %u\n",
399                        rx->rx_recv_gmid);
400                 return;
401         }
402
403         LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */
404
405         tx->tx_nid = LNET_NID_ANY;
406         tx->tx_gmlid = rx->rx_recv_gmid;
407
408         msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
409         msg->gmm_magic   = GMNAL_MSG_MAGIC;
410         msg->gmm_version = GMNAL_MSG_VERSION;
411
412         /* just send magic + version */
413         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
414         tx->tx_large_nob = 0;
415
416         spin_lock(&gmni->gmni_tx_lock);
417
418         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
419         gmnal_check_txqueues_locked(gmni);
420
421         spin_unlock(&gmni->gmni_tx_lock);
422 }
423
424 int
425 gmnal_rx_thread(void *arg)
426 {
427         gmnal_ni_t      *gmni = arg;
428         gm_recv_event_t *rxevent = NULL;
429         gm_recv_t       *recv = NULL;
430         gmnal_rx_t      *rx;
431         int              rc;
432
433         cfs_daemonize("gmnal_rxd");
434
435         down(&gmni->gmni_rx_mutex);
436
437         while (!gmni->gmni_shutdown) {
438
439                 spin_lock(&gmni->gmni_gm_lock);
440                 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
441                 spin_unlock(&gmni->gmni_gm_lock);
442
443                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
444                 default:
445                         gm_unknown(gmni->gmni_port, rxevent);
446                         continue;
447
448                 case GM_FAST_RECV_EVENT:
449                 case GM_FAST_PEER_RECV_EVENT:
450                 case GM_PEER_RECV_EVENT:
451                 case GM_FAST_HIGH_RECV_EVENT:
452                 case GM_FAST_HIGH_PEER_RECV_EVENT:
453                 case GM_HIGH_PEER_RECV_EVENT:
454                 case GM_RECV_EVENT:
455                 case GM_HIGH_RECV_EVENT:
456                         break;
457                 }
458                 
459                 recv = &rxevent->recv;
460                 rx = gm_hash_find(gmni->gmni_rx_hash, 
461                                   gm_ntohp(recv->buffer));
462                 LASSERT (rx != NULL);
463
464                 rx->rx_recv_nob  = gm_ntoh_u32(recv->length);
465                 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
466                 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
467                 rx->rx_recv_type = gm_ntoh_u8(recv->type);
468
469                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
470                 case GM_FAST_RECV_EVENT:
471                 case GM_FAST_PEER_RECV_EVENT:
472                 case GM_FAST_HIGH_RECV_EVENT:
473                 case GM_FAST_HIGH_PEER_RECV_EVENT:
474                         LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
475
476                         memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
477                                gm_ntohp(recv->message), rx->rx_recv_nob);
478                         break;
479                 }
480
481                 up(&gmni->gmni_rx_mutex);
482
483                 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx, 
484                         GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
485                         gm_ntohp(recv->buffer), rx->rx_recv_nob);
486
487                 /* We're connectionless: simply drop packets with
488                  * errors */
489                 rc = gmnal_unpack_msg(gmni, rx);
490
491                 if (rc == 0) {
492                         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
493                         
494                         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
495                         rc =  lnet_parse(gmni->gmni_ni, 
496                                          &msg->gmm_u.immediate.gmim_hdr,
497                                          msg->gmm_srcnid,
498                                          rx, 0);
499                 } else if (rc > 0) {
500                         gmnal_version_reply(gmni, rx);
501                         rc = -EPROTO;           /* repost rx */
502                 }
503
504                 if (rc < 0)                     /* parse failure */
505                         gmnal_post_rx(gmni, rx);
506
507                 down(&gmni->gmni_rx_mutex);
508         }
509
510         up(&gmni->gmni_rx_mutex);
511
512         CDEBUG(D_NET, "exiting\n");
513         atomic_dec(&gmni->gmni_nthreads);
514         return 0;
515 }
516
517 void
518 gmnal_stop_threads(gmnal_ni_t *gmni)
519 {
520         int count = 2;
521
522         gmni->gmni_shutdown = 1;
523         mb();
524         
525         /* wake rxthread owning gmni_rx_mutex with an alarm. */
526         spin_lock(&gmni->gmni_gm_lock);
527         gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
528         spin_unlock(&gmni->gmni_gm_lock);
529
530         while (atomic_read(&gmni->gmni_nthreads) != 0) {
531                 count++;
532                 if ((count & (count - 1)) == 0)
533                         CWARN("Waiting for %d threads to stop\n",
534                               atomic_read(&gmni->gmni_nthreads));
535                 gmnal_yield(1);
536         }
537 }
538
539 int
540 gmnal_start_threads(gmnal_ni_t *gmni)
541 {
542         int     i;
543         int     pid;
544
545         LASSERT (!gmni->gmni_shutdown);
546         LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
547
548         gm_initialize_alarm(&gmni->gmni_alarm);
549
550         for (i = 0; i < num_online_cpus(); i++) {
551
552                 pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
553                 if (pid < 0) {
554                         CERROR("rx thread failed to start: %d\n", pid);
555                         gmnal_stop_threads(gmni);
556                         return pid;
557                 }
558
559                 atomic_inc(&gmni->gmni_nthreads);
560         }
561
562         return 0;
563 }