Whamcloud - gitweb
b3d48becd87db8ac3cf256803f9445207e57e468
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /*
2  * -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
3  * vim:expandtab:shiftwidth=8:tabstop=8:
4  *
5  * GPL HEADER START
6  *
7  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License version 2 only,
11  * as published by the Free Software Foundation.
12  *
13  * This program is distributed in the hope that it will be useful, but
14  * WITHOUT ANY WARRANTY; without even the implied warranty of
15  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
16  * General Public License version 2 for more details (a copy is included
17  * in the LICENSE file that accompanied this code).
18  *
19  * You should have received a copy of the GNU General Public License
20  * version 2 along with this program; If not, see [sun.com URL with a
21  * copy of GPLv2].
22  *
23  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
24  * CA 95054 USA or visit www.sun.com if you need additional information or
25  * have any questions.
26  *
27  * GPL HEADER END
28  */
29 /*
30  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
31  * Use is subject to license terms.
32  *
33  * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
34  */
35 /*
36  * This file is part of Lustre, http://www.lustre.org/
37  * Lustre is a trademark of Sun Microsystems, Inc.
38  */
39
40 /*
41  *      This file contains all gmnal send and receive functions
42  */
43
44 #include "gmlnd.h"
45
46 void
47 gmnal_notify_peer_down(gmnal_tx_t *tx)
48 {
49         time_t             then;
50
51         then = cfs_time_current_sec() -
52                 cfs_duration_sec(cfs_time_current() -
53                                  tx->tx_launchtime);
54
55         lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
56 }
57
58 void
59 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
60                lnet_nid_t dstnid, int type)
61 {
62         /* CAVEAT EMPTOR! this only sets the common message fields. */
63         msg->gmm_magic    = GMNAL_MSG_MAGIC;
64         msg->gmm_version  = GMNAL_MSG_VERSION;
65         msg->gmm_type     = type;
66         msg->gmm_srcnid   = gmni->gmni_ni->ni_nid;
67         msg->gmm_dstnid   = dstnid;
68 }
69
70 int
71 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
72 {
73         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
74         const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
75         int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
76                                                 gmni->gmni_small_msgsize;
77         int          flip;
78
79         /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
80
81         /* GM may not overflow our buffer */
82         LASSERT (rx->rx_recv_nob <= buffnob);
83
84         /* 6 bytes are enough to have received magic + version */
85         if (rx->rx_recv_nob < 6) {
86                 CERROR("Short message from gmid %u: %d\n",
87                        rx->rx_recv_gmid, rx->rx_recv_nob);
88                 return -EPROTO;
89         }
90
91         if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
92                 flip = 0;
93         } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
94                 flip = 1;
95         } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
96                    msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
97                 return EPROTO;
98         } else {
99                 CERROR("Bad magic from gmid %u: %08x\n",
100                        rx->rx_recv_gmid, msg->gmm_magic);
101                 return -EPROTO;
102         }
103
104         if (msg->gmm_version !=
105             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
106                 return EPROTO;
107         }
108
109         if (rx->rx_recv_nob < hdr_size) {
110                 CERROR("Short message from %u: %d\n",
111                        rx->rx_recv_gmid, rx->rx_recv_nob);
112                 return -EPROTO;
113         }
114
115         if (flip) {
116                 /* leave magic unflipped as a clue to peer endianness */
117                 __swab16s(&msg->gmm_version);
118                 __swab16s(&msg->gmm_type);
119                 __swab64s(&msg->gmm_srcnid);
120                 __swab64s(&msg->gmm_dstnid);
121         }
122
123         if (msg->gmm_srcnid == LNET_NID_ANY) {
124                 CERROR("Bad src nid from %u: %s\n",
125                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
126                 return -EPROTO;
127         }
128
129         if (gmni->gmni_ni->ni_nid != msg->gmm_dstnid) {
130                 CERROR("Bad dst nid from %u: %s\n",
131                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
132                 return -EPROTO;
133         }
134
135         switch (msg->gmm_type) {
136         default:
137                 CERROR("Unknown message type from %u: %x\n",
138                        rx->rx_recv_gmid, msg->gmm_type);
139                 return -EPROTO;
140
141         case GMNAL_MSG_IMMEDIATE:
142                 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
143                         CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n",
144                                rx->rx_recv_gmid, rx->rx_recv_nob,
145                                offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
146                         return -EPROTO;
147                 }
148                 break;
149         }
150         return 0;
151 }
152
153 gmnal_tx_t *
154 gmnal_get_tx(gmnal_ni_t *gmni)
155 {
156         gmnal_tx_t *tx = NULL;
157
158         spin_lock(&gmni->gmni_tx_lock);
159
160         if (gmni->gmni_shutdown ||
161             list_empty(&gmni->gmni_idle_txs)) {
162                 spin_unlock(&gmni->gmni_tx_lock);
163                 return NULL;
164         }
165
166         tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
167         list_del(&tx->tx_list);
168
169         spin_unlock(&gmni->gmni_tx_lock);
170
171         LASSERT (tx->tx_lntmsg == NULL);
172         LASSERT (tx->tx_ltxb == NULL);
173         LASSERT (!tx->tx_credit);
174
175         return tx;
176 }
177
178 void
179 gmnal_tx_done(gmnal_tx_t *tx, int rc)
180 {
181         gmnal_ni_t *gmni = tx->tx_gmni;
182         int         wake_sched = 0;
183         lnet_msg_t *lnetmsg = tx->tx_lntmsg;
184
185         tx->tx_lntmsg = NULL;
186
187         spin_lock(&gmni->gmni_tx_lock);
188
189         if (tx->tx_ltxb != NULL) {
190                 wake_sched = 1;
191                 list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
192                 tx->tx_ltxb = NULL;
193         }
194
195         if (tx->tx_credit) {
196                 wake_sched = 1;
197                 gmni->gmni_tx_credits++;
198                 tx->tx_credit = 0;
199         }
200
201         list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
202
203         if (wake_sched)
204                 gmnal_check_txqueues_locked(gmni);
205
206         spin_unlock(&gmni->gmni_tx_lock);
207
208         /* Delay finalize until tx is free */
209         if (lnetmsg != NULL)
210                 lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
211 }
212
213 void
214 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
215                           gm_status_t status)
216 {
217         gmnal_tx_t *tx = (gmnal_tx_t*)context;
218
219         LASSERT(!in_interrupt());
220
221         CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
222                tx, status, gmnal_gmstatus2str(status),
223                libcfs_nid2str(tx->tx_nid));
224
225         gmnal_tx_done(tx, -EIO);
226 }
227
228 void
229 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
230 {
231         gmnal_tx_t *tx = (gmnal_tx_t*)context;
232         gmnal_ni_t *gmni = tx->tx_gmni;
233
234         LASSERT(!in_interrupt());
235
236         switch(status) {
237         case GM_SUCCESS:
238                 gmnal_tx_done(tx, 0);
239                 return;
240
241         case GM_SEND_DROPPED:
242                 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
243                        tx, libcfs_nid2str(tx->tx_nid));
244                 /* Another tx failed and called gm_drop_sends() which made this
245                  * one complete immediately */
246                 gmnal_tx_done(tx, -EIO);
247                 return;
248
249         default:
250                 /* Some error; NB don't complete tx yet; we need its credit for
251                  * gm_drop_sends() */
252                 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
253                        tx, status, gmnal_gmstatus2str(status),
254                        libcfs_nid2str(tx->tx_nid));
255
256                 gmnal_notify_peer_down(tx);
257
258                 spin_lock(&gmni->gmni_gm_lock);
259                 gm_drop_sends(gmni->gmni_port,
260                               tx->tx_ltxb != NULL ?
261                               GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
262                               tx->tx_gmlid, *gmnal_tunables.gm_port,
263                               gmnal_drop_sends_callback, tx);
264                 spin_unlock(&gmni->gmni_gm_lock);
265                 return;
266         }
267
268         /* not reached */
269         LBUG();
270 }
271
272 void
273 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
274 {
275         gmnal_tx_t    *tx;
276         gmnal_txbuf_t *ltxb;
277         int            gmsize;
278         int            pri;
279         void          *netaddr;
280
281         tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
282              list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
283
284         if (tx != NULL &&
285             (tx->tx_large_nob == 0 ||
286              !list_empty(&gmni->gmni_idle_ltxbs))) {
287
288                 /* consume tx */
289                 list_del(&tx->tx_list);
290
291                 LASSERT (tx->tx_ltxb == NULL);
292
293                 if (tx->tx_large_nob != 0) {
294                         ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
295                                           gmnal_txbuf_t, txb_list);
296
297                         /* consume large buffer */
298                         list_del(&ltxb->txb_list);
299
300                         spin_unlock(&gmni->gmni_tx_lock);
301
302                         /* Unlocking here allows sends to get re-ordered,
303                          * but we want to allow other CPUs to progress... */
304
305                         tx->tx_ltxb = ltxb;
306
307                         /* marshall message in tx_ltxb...
308                          * 1. Copy what was marshalled so far (in tx_buf) */
309                         memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),
310                                GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
311
312                         /* 2. Copy the payload */
313                         if (tx->tx_large_iskiov)
314                                 lnet_copy_kiov2kiov(
315                                         gmni->gmni_large_pages,
316                                         ltxb->txb_buf.nb_kiov,
317                                         tx->tx_msgnob,
318                                         tx->tx_large_niov,
319                                         tx->tx_large_frags.kiov,
320                                         tx->tx_large_offset,
321                                         tx->tx_large_nob);
322                         else
323                                 lnet_copy_iov2kiov(
324                                         gmni->gmni_large_pages,
325                                         ltxb->txb_buf.nb_kiov,
326                                         tx->tx_msgnob,
327                                         tx->tx_large_niov,
328                                         tx->tx_large_frags.iov,
329                                         tx->tx_large_offset,
330                                         tx->tx_large_nob);
331
332                         tx->tx_msgnob += tx->tx_large_nob;
333
334                         spin_lock(&gmni->gmni_tx_lock);
335                 }
336
337                 list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
338         }
339
340         if (!list_empty(&gmni->gmni_cred_txq) &&
341             gmni->gmni_tx_credits != 0) {
342
343                 tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
344
345                 /* consume tx and 1 credit */
346                 list_del(&tx->tx_list);
347                 gmni->gmni_tx_credits--;
348
349                 spin_unlock(&gmni->gmni_tx_lock);
350
351                 /* Unlocking here allows sends to get re-ordered, but we want
352                  * to allow other CPUs to progress... */
353
354                 LASSERT(!tx->tx_credit);
355                 tx->tx_credit = 1;
356
357                 tx->tx_launchtime = cfs_time_current();
358
359                 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
360                         LASSERT (tx->tx_ltxb == NULL);
361                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
362                         gmsize = gmni->gmni_small_gmsize;
363                         pri = GMNAL_SMALL_PRIORITY;
364                 } else {
365                         LASSERT (tx->tx_ltxb != NULL);
366                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
367                         gmsize = gmni->gmni_large_gmsize;
368                         pri = GMNAL_LARGE_PRIORITY;
369                 }
370
371                 spin_lock(&gmni->gmni_gm_lock);
372
373                 gm_send_to_peer_with_callback(gmni->gmni_port,
374                                               netaddr, gmsize,
375                                               tx->tx_msgnob,
376                                               pri,
377                                               tx->tx_gmlid,
378                                               gmnal_tx_callback,
379                                               (void*)tx);
380
381                 spin_unlock(&gmni->gmni_gm_lock);
382                 spin_lock(&gmni->gmni_tx_lock);
383         }
384 }
385
386 void
387 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
388 {
389         int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
390                                         gmni->gmni_small_gmsize;
391         int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
392                                         GMNAL_SMALL_PRIORITY;
393         void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
394
395         CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
396
397         spin_lock(&gmni->gmni_gm_lock);
398         gm_provide_receive_buffer_with_tag(gmni->gmni_port,
399                                            buffer, gmsize, pri, 0);
400         spin_unlock(&gmni->gmni_gm_lock);
401 }
402
403 void
404 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
405 {
406         /* Future protocol version compatibility support!
407          * The next gmlnd-specific protocol rev will first send a message to
408          * check version; I reply with a stub message containing my current
409          * magic+version... */
410         gmnal_msg_t *msg;
411         gmnal_tx_t  *tx = gmnal_get_tx(gmni);
412
413         if (tx == NULL) {
414                 CERROR("Can't allocate tx to send version info to %u\n",
415                        rx->rx_recv_gmid);
416                 return;
417         }
418
419         LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */
420
421         tx->tx_nid = LNET_NID_ANY;
422         tx->tx_gmlid = rx->rx_recv_gmid;
423
424         msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
425         msg->gmm_magic   = GMNAL_MSG_MAGIC;
426         msg->gmm_version = GMNAL_MSG_VERSION;
427
428         /* just send magic + version */
429         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
430         tx->tx_large_nob = 0;
431
432         spin_lock(&gmni->gmni_tx_lock);
433
434         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
435         gmnal_check_txqueues_locked(gmni);
436
437         spin_unlock(&gmni->gmni_tx_lock);
438 }
439
440 int
441 gmnal_rx_thread(void *arg)
442 {
443         gmnal_ni_t      *gmni = arg;
444         gm_recv_event_t *rxevent = NULL;
445         gm_recv_t       *recv = NULL;
446         gmnal_rx_t      *rx;
447         int              rc;
448
449         cfs_daemonize("gmnal_rxd");
450
451         while (!gmni->gmni_shutdown) {
452                 rc = down_interruptible(&gmni->gmni_rx_mutex);
453                 LASSERT (rc == 0 || rc == -EINTR);
454                 if (rc != 0)
455                         continue;
456
457                 spin_lock(&gmni->gmni_gm_lock);
458                 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
459                 spin_unlock(&gmni->gmni_gm_lock);
460
461                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
462                 default:
463                         gm_unknown(gmni->gmni_port, rxevent);
464                         up(&gmni->gmni_rx_mutex);
465                         continue;
466
467                 case GM_FAST_RECV_EVENT:
468                 case GM_FAST_PEER_RECV_EVENT:
469                 case GM_PEER_RECV_EVENT:
470                 case GM_FAST_HIGH_RECV_EVENT:
471                 case GM_FAST_HIGH_PEER_RECV_EVENT:
472                 case GM_HIGH_PEER_RECV_EVENT:
473                 case GM_RECV_EVENT:
474                 case GM_HIGH_RECV_EVENT:
475                         break;
476                 }
477
478                 recv = &rxevent->recv;
479                 rx = gm_hash_find(gmni->gmni_rx_hash,
480                                   gm_ntohp(recv->buffer));
481                 LASSERT (rx != NULL);
482
483                 rx->rx_recv_nob  = gm_ntoh_u32(recv->length);
484                 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
485                 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
486                 rx->rx_recv_type = gm_ntoh_u8(recv->type);
487
488                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
489                 case GM_FAST_RECV_EVENT:
490                 case GM_FAST_PEER_RECV_EVENT:
491                 case GM_FAST_HIGH_RECV_EVENT:
492                 case GM_FAST_HIGH_PEER_RECV_EVENT:
493                         LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
494
495                         memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
496                                gm_ntohp(recv->message), rx->rx_recv_nob);
497                         break;
498                 }
499
500                 up(&gmni->gmni_rx_mutex);
501
502                 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
503                         GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
504                         gm_ntohp(recv->buffer), rx->rx_recv_nob);
505
506                 /* We're connectionless: simply drop packets with
507                  * errors */
508                 rc = gmnal_unpack_msg(gmni, rx);
509
510                 if (rc == 0) {
511                         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
512
513                         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
514                         rc = lnet_parse(gmni->gmni_ni,
515                                         &msg->gmm_u.immediate.gmim_hdr,
516                                         msg->gmm_srcnid, rx, 0);
517                 } else if (rc > 0) {
518                         gmnal_version_reply(gmni, rx);
519                         rc = -EPROTO;           /* repost rx */
520                 }
521
522                 if (rc < 0)                     /* parse failure */
523                         gmnal_post_rx(gmni, rx);
524         }
525
526         CDEBUG(D_NET, "exiting\n");
527         atomic_dec(&gmni->gmni_nthreads);
528         return 0;
529 }
530
531 void
532 gmnal_stop_threads(gmnal_ni_t *gmni)
533 {
534         int count = 2;
535
536         gmni->gmni_shutdown = 1;
537         mb();
538
539         /* wake rxthread owning gmni_rx_mutex with an alarm. */
540         spin_lock(&gmni->gmni_gm_lock);
541         gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
542         spin_unlock(&gmni->gmni_gm_lock);
543
544         while (atomic_read(&gmni->gmni_nthreads) != 0) {
545                 count++;
546                 if ((count & (count - 1)) == 0)
547                         CWARN("Waiting for %d threads to stop\n",
548                               atomic_read(&gmni->gmni_nthreads));
549                 gmnal_yield(1);
550         }
551 }
552
553 int
554 gmnal_start_threads(gmnal_ni_t *gmni)
555 {
556         int     i;
557         int     pid;
558
559         LASSERT (!gmni->gmni_shutdown);
560         LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
561
562         gm_initialize_alarm(&gmni->gmni_alarm);
563
564         for (i = 0; i < num_online_cpus(); i++) {
565
566                 pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
567                 if (pid < 0) {
568                         CERROR("rx thread failed to start: %d\n", pid);
569                         gmnal_stop_threads(gmni);
570                         return pid;
571                 }
572
573                 atomic_inc(&gmni->gmni_nthreads);
574         }
575
576         return 0;
577 }