Whamcloud - gitweb
8cb865e71faddc3069b0cb73b59a46e28b486381
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * GPL HEADER START
5  *
6  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7  *
8  * This program is free software; you can redistribute it and/or modify
9  * it under the terms of the GNU General Public License version 2 only,
10  * as published by the Free Software Foundation.
11  *
12  * This program is distributed in the hope that it will be useful, but
13  * WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
15  * General Public License version 2 for more details (a copy is included
16  * in the LICENSE file that accompanied this code).
17  *
18  * You should have received a copy of the GNU General Public License
19  * version 2 along with this program; If not, see
20  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
21  *
22  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23  * CA 95054 USA or visit www.sun.com if you need additional information or
24  * have any questions.
25  *
26  * GPL HEADER END
27  */
28 /*
29  * Copyright  2008 Sun Microsystems, Inc. All rights reserved
30  * Use is subject to license terms.
31  *
32  * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
33  */
34 /*
35  * This file is part of Lustre, http://www.lustre.org/
36  * Lustre is a trademark of Sun Microsystems, Inc.
37  */
38
39 /*
40  *      This file contains all gmnal send and receive functions
41  */
42
43 #include "gmlnd.h"
44
45 void
46 gmnal_notify_peer_down(gmnal_tx_t *tx)
47 {
48         time_t             then;
49
50         then = cfs_time_current_sec() -
51                 cfs_duration_sec(cfs_time_current() -
52                                  tx->tx_launchtime);
53
54         lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
55 }
56
57 void
58 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
59                lnet_nid_t dstnid, int type)
60 {
61         /* CAVEAT EMPTOR! this only sets the common message fields. */
62         msg->gmm_magic    = GMNAL_MSG_MAGIC;
63         msg->gmm_version  = GMNAL_MSG_VERSION;
64         msg->gmm_type     = type;
65         msg->gmm_srcnid   = gmni->gmni_ni->ni_nid;
66         msg->gmm_dstnid   = dstnid;
67 }
68
69 int
70 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
71 {
72         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
73         const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
74         int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
75                                                 gmni->gmni_small_msgsize;
76         int          flip;
77
78         /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
79
80         /* GM may not overflow our buffer */
81         LASSERT (rx->rx_recv_nob <= buffnob);
82
83         /* 6 bytes are enough to have received magic + version */
84         if (rx->rx_recv_nob < 6) {
85                 CERROR("Short message from gmid %u: %d\n",
86                        rx->rx_recv_gmid, rx->rx_recv_nob);
87                 return -EPROTO;
88         }
89
90         if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
91                 flip = 0;
92         } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
93                 flip = 1;
94         } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
95                    msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
96                 return EPROTO;
97         } else {
98                 CERROR("Bad magic from gmid %u: %08x\n",
99                        rx->rx_recv_gmid, msg->gmm_magic);
100                 return -EPROTO;
101         }
102
103         if (msg->gmm_version !=
104             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
105                 return EPROTO;
106         }
107
108         if (rx->rx_recv_nob < hdr_size) {
109                 CERROR("Short message from %u: %d\n",
110                        rx->rx_recv_gmid, rx->rx_recv_nob);
111                 return -EPROTO;
112         }
113
114         if (flip) {
115                 /* leave magic unflipped as a clue to peer endianness */
116                 __swab16s(&msg->gmm_version);
117                 __swab16s(&msg->gmm_type);
118                 __swab64s(&msg->gmm_srcnid);
119                 __swab64s(&msg->gmm_dstnid);
120         }
121
122         if (msg->gmm_srcnid == LNET_NID_ANY) {
123                 CERROR("Bad src nid from %u: %s\n",
124                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
125                 return -EPROTO;
126         }
127
128         if (gmni->gmni_ni->ni_nid != msg->gmm_dstnid) {
129                 CERROR("Bad dst nid from %u: %s\n",
130                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
131                 return -EPROTO;
132         }
133
134         switch (msg->gmm_type) {
135         default:
136                 CERROR("Unknown message type from %u: %x\n",
137                        rx->rx_recv_gmid, msg->gmm_type);
138                 return -EPROTO;
139
140         case GMNAL_MSG_IMMEDIATE:
141                 if (rx->rx_recv_nob <
142                     offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
143                         CERROR("Short IMMEDIATE from %u: %d(%lu)\n",
144                                rx->rx_recv_gmid, rx->rx_recv_nob,
145                                (long)offsetof(gmnal_msg_t,
146                                               gmm_u.immediate.gmim_payload[0]));
147                         return -EPROTO;
148                 }
149                 break;
150         }
151         return 0;
152 }
153
154 gmnal_tx_t *
155 gmnal_get_tx(gmnal_ni_t *gmni)
156 {
157         gmnal_tx_t *tx = NULL;
158
159         cfs_spin_lock(&gmni->gmni_tx_lock);
160
161         if (gmni->gmni_shutdown ||
162             cfs_list_empty(&gmni->gmni_idle_txs)) {
163                 cfs_spin_unlock(&gmni->gmni_tx_lock);
164                 return NULL;
165         }
166
167         tx = cfs_list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
168         cfs_list_del(&tx->tx_list);
169
170         cfs_spin_unlock(&gmni->gmni_tx_lock);
171
172         LASSERT (tx->tx_lntmsg == NULL);
173         LASSERT (tx->tx_ltxb == NULL);
174         LASSERT (!tx->tx_credit);
175
176         return tx;
177 }
178
179 void
180 gmnal_tx_done(gmnal_tx_t *tx, int rc)
181 {
182         gmnal_ni_t *gmni = tx->tx_gmni;
183         int         wake_sched = 0;
184         lnet_msg_t *lnetmsg = tx->tx_lntmsg;
185
186         tx->tx_lntmsg = NULL;
187
188         cfs_spin_lock(&gmni->gmni_tx_lock);
189
190         if (tx->tx_ltxb != NULL) {
191                 wake_sched = 1;
192                 cfs_list_add_tail(&tx->tx_ltxb->txb_list,
193                                   &gmni->gmni_idle_ltxbs);
194                 tx->tx_ltxb = NULL;
195         }
196
197         if (tx->tx_credit) {
198                 wake_sched = 1;
199                 gmni->gmni_tx_credits++;
200                 tx->tx_credit = 0;
201         }
202
203         cfs_list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
204
205         if (wake_sched)
206                 gmnal_check_txqueues_locked(gmni);
207
208         cfs_spin_unlock(&gmni->gmni_tx_lock);
209
210         /* Delay finalize until tx is free */
211         if (lnetmsg != NULL)
212                 lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
213 }
214
215 void
216 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context,
217                           gm_status_t status)
218 {
219         gmnal_tx_t *tx = (gmnal_tx_t*)context;
220
221         LASSERT(!cfs_in_interrupt());
222
223         CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n",
224                tx, status, gmnal_gmstatus2str(status),
225                libcfs_nid2str(tx->tx_nid));
226
227         gmnal_tx_done(tx, -EIO);
228 }
229
230 void
231 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
232 {
233         gmnal_tx_t *tx = (gmnal_tx_t*)context;
234         gmnal_ni_t *gmni = tx->tx_gmni;
235
236         LASSERT(!cfs_in_interrupt());
237
238         switch(status) {
239         case GM_SUCCESS:
240                 gmnal_tx_done(tx, 0);
241                 return;
242
243         case GM_SEND_DROPPED:
244                 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n",
245                        tx, libcfs_nid2str(tx->tx_nid));
246                 /* Another tx failed and called gm_drop_sends() which made this
247                  * one complete immediately */
248                 gmnal_tx_done(tx, -EIO);
249                 return;
250
251         default:
252                 /* Some error; NB don't complete tx yet; we need its credit for
253                  * gm_drop_sends() */
254                 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
255                        tx, status, gmnal_gmstatus2str(status),
256                        libcfs_nid2str(tx->tx_nid));
257
258                 gmnal_notify_peer_down(tx);
259
260                 cfs_spin_lock(&gmni->gmni_gm_lock);
261                 gm_drop_sends(gmni->gmni_port,
262                               tx->tx_ltxb != NULL ?
263                               GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
264                               tx->tx_gmlid, *gmnal_tunables.gm_port,
265                               gmnal_drop_sends_callback, tx);
266                 cfs_spin_unlock(&gmni->gmni_gm_lock);
267                 return;
268         }
269
270         /* not reached */
271         LBUG();
272 }
273
274 void
275 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
276 {
277         gmnal_tx_t    *tx;
278         gmnal_txbuf_t *ltxb;
279         int            gmsize;
280         int            pri;
281         void          *netaddr;
282
283         tx = cfs_list_empty(&gmni->gmni_buf_txq) ? NULL :
284              cfs_list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
285
286         if (tx != NULL &&
287             (tx->tx_large_nob == 0 ||
288              !cfs_list_empty(&gmni->gmni_idle_ltxbs))) {
289
290                 /* consume tx */
291                 cfs_list_del(&tx->tx_list);
292
293                 LASSERT (tx->tx_ltxb == NULL);
294
295                 if (tx->tx_large_nob != 0) {
296                         ltxb = cfs_list_entry(gmni->gmni_idle_ltxbs.next,
297                                               gmnal_txbuf_t, txb_list);
298
299                         /* consume large buffer */
300                         cfs_list_del(&ltxb->txb_list);
301
302                         cfs_spin_unlock(&gmni->gmni_tx_lock);
303
304                         /* Unlocking here allows sends to get re-ordered,
305                          * but we want to allow other CPUs to progress... */
306
307                         tx->tx_ltxb = ltxb;
308
309                         /* marshall message in tx_ltxb...
310                          * 1. Copy what was marshalled so far (in tx_buf) */
311                         memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),
312                                GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
313
314                         /* 2. Copy the payload */
315                         if (tx->tx_large_iskiov)
316                                 lnet_copy_kiov2kiov(
317                                         gmni->gmni_large_pages,
318                                         ltxb->txb_buf.nb_kiov,
319                                         tx->tx_msgnob,
320                                         tx->tx_large_niov,
321                                         tx->tx_large_frags.kiov,
322                                         tx->tx_large_offset,
323                                         tx->tx_large_nob);
324                         else
325                                 lnet_copy_iov2kiov(
326                                         gmni->gmni_large_pages,
327                                         ltxb->txb_buf.nb_kiov,
328                                         tx->tx_msgnob,
329                                         tx->tx_large_niov,
330                                         tx->tx_large_frags.iov,
331                                         tx->tx_large_offset,
332                                         tx->tx_large_nob);
333
334                         tx->tx_msgnob += tx->tx_large_nob;
335
336                         cfs_spin_lock(&gmni->gmni_tx_lock);
337                 }
338
339                 cfs_list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
340         }
341
342         if (!cfs_list_empty(&gmni->gmni_cred_txq) &&
343             gmni->gmni_tx_credits != 0) {
344
345                 tx = cfs_list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t,
346                                     tx_list);
347
348                 /* consume tx and 1 credit */
349                 cfs_list_del(&tx->tx_list);
350                 gmni->gmni_tx_credits--;
351
352                 cfs_spin_unlock(&gmni->gmni_tx_lock);
353
354                 /* Unlocking here allows sends to get re-ordered, but we want
355                  * to allow other CPUs to progress... */
356
357                 LASSERT(!tx->tx_credit);
358                 tx->tx_credit = 1;
359
360                 tx->tx_launchtime = cfs_time_current();
361
362                 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
363                         LASSERT (tx->tx_ltxb == NULL);
364                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
365                         gmsize = gmni->gmni_small_gmsize;
366                         pri = GMNAL_SMALL_PRIORITY;
367                 } else {
368                         LASSERT (tx->tx_ltxb != NULL);
369                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
370                         gmsize = gmni->gmni_large_gmsize;
371                         pri = GMNAL_LARGE_PRIORITY;
372                 }
373
374                 cfs_spin_lock(&gmni->gmni_gm_lock);
375
376                 gm_send_to_peer_with_callback(gmni->gmni_port,
377                                               netaddr, gmsize,
378                                               tx->tx_msgnob,
379                                               pri,
380                                               tx->tx_gmlid,
381                                               gmnal_tx_callback,
382                                               (void*)tx);
383
384                 cfs_spin_unlock(&gmni->gmni_gm_lock);
385                 cfs_spin_lock(&gmni->gmni_tx_lock);
386         }
387 }
388
389 void
390 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
391 {
392         int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
393                                         gmni->gmni_small_gmsize;
394         int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
395                                         GMNAL_SMALL_PRIORITY;
396         void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
397
398         CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
399
400         cfs_spin_lock(&gmni->gmni_gm_lock);
401         gm_provide_receive_buffer_with_tag(gmni->gmni_port,
402                                            buffer, gmsize, pri, 0);
403         cfs_spin_unlock(&gmni->gmni_gm_lock);
404 }
405
406 void
407 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
408 {
409         /* Future protocol version compatibility support!
410          * The next gmlnd-specific protocol rev will first send a message to
411          * check version; I reply with a stub message containing my current
412          * magic+version... */
413         gmnal_msg_t *msg;
414         gmnal_tx_t  *tx = gmnal_get_tx(gmni);
415
416         if (tx == NULL) {
417                 CERROR("Can't allocate tx to send version info to %u\n",
418                        rx->rx_recv_gmid);
419                 return;
420         }
421
422         LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */
423
424         tx->tx_nid = LNET_NID_ANY;
425         tx->tx_gmlid = rx->rx_recv_gmid;
426
427         msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
428         msg->gmm_magic   = GMNAL_MSG_MAGIC;
429         msg->gmm_version = GMNAL_MSG_VERSION;
430
431         /* just send magic + version */
432         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
433         tx->tx_large_nob = 0;
434
435         cfs_spin_lock(&gmni->gmni_tx_lock);
436
437         cfs_list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
438         gmnal_check_txqueues_locked(gmni);
439
440         cfs_spin_unlock(&gmni->gmni_tx_lock);
441 }
442
443 int
444 gmnal_rx_thread(void *arg)
445 {
446         gmnal_ni_t      *gmni = arg;
447         gm_recv_event_t *rxevent = NULL;
448         gm_recv_t       *recv = NULL;
449         gmnal_rx_t      *rx;
450         int              rc;
451
452         cfs_daemonize("gmnal_rxd");
453
454         while (!gmni->gmni_shutdown) {
455                 rc = down_interruptible(&gmni->gmni_rx_mutex);
456                 LASSERT (rc == 0 || rc == -EINTR);
457                 if (rc != 0)
458                         continue;
459
460                 cfs_spin_lock(&gmni->gmni_gm_lock);
461                 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
462                 cfs_spin_unlock(&gmni->gmni_gm_lock);
463
464                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
465                 default:
466                         gm_unknown(gmni->gmni_port, rxevent);
467                         cfs_up(&gmni->gmni_rx_mutex);
468                         continue;
469
470                 case GM_FAST_RECV_EVENT:
471                 case GM_FAST_PEER_RECV_EVENT:
472                 case GM_PEER_RECV_EVENT:
473                 case GM_FAST_HIGH_RECV_EVENT:
474                 case GM_FAST_HIGH_PEER_RECV_EVENT:
475                 case GM_HIGH_PEER_RECV_EVENT:
476                 case GM_RECV_EVENT:
477                 case GM_HIGH_RECV_EVENT:
478                         break;
479                 }
480
481                 recv = &rxevent->recv;
482                 rx = gm_hash_find(gmni->gmni_rx_hash,
483                                   gm_ntohp(recv->buffer));
484                 LASSERT (rx != NULL);
485
486                 rx->rx_recv_nob  = gm_ntoh_u32(recv->length);
487                 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
488                 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
489                 rx->rx_recv_type = gm_ntoh_u8(recv->type);
490
491                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
492                 case GM_FAST_RECV_EVENT:
493                 case GM_FAST_PEER_RECV_EVENT:
494                 case GM_FAST_HIGH_RECV_EVENT:
495                 case GM_FAST_HIGH_PEER_RECV_EVENT:
496                         LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
497
498                         memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
499                                gm_ntohp(recv->message), rx->rx_recv_nob);
500                         break;
501                 }
502
503                 cfs_up(&gmni->gmni_rx_mutex);
504
505                 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx,
506                         GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
507                         gm_ntohp(recv->buffer), rx->rx_recv_nob);
508
509                 /* We're connectionless: simply drop packets with
510                  * errors */
511                 rc = gmnal_unpack_msg(gmni, rx);
512
513                 if (rc == 0) {
514                         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
515
516                         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
517                         rc = lnet_parse(gmni->gmni_ni,
518                                         &msg->gmm_u.immediate.gmim_hdr,
519                                         msg->gmm_srcnid, rx, 0);
520                 } else if (rc > 0) {
521                         gmnal_version_reply(gmni, rx);
522                         rc = -EPROTO;           /* repost rx */
523                 }
524
525                 if (rc < 0)                     /* parse failure */
526                         gmnal_post_rx(gmni, rx);
527         }
528
529         CDEBUG(D_NET, "exiting\n");
530         cfs_atomic_dec(&gmni->gmni_nthreads);
531         return 0;
532 }
533
534 void
535 gmnal_stop_threads(gmnal_ni_t *gmni)
536 {
537         int count = 2;
538
539         gmni->gmni_shutdown = 1;
540         cfs_mb();
541
542         /* wake rxthread owning gmni_rx_mutex with an alarm. */
543         cfs_spin_lock(&gmni->gmni_gm_lock);
544         gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
545         cfs_spin_unlock(&gmni->gmni_gm_lock);
546
547         while (cfs_atomic_read(&gmni->gmni_nthreads) != 0) {
548                 count++;
549                 if ((count & (count - 1)) == 0)
550                         CWARN("Waiting for %d threads to stop\n",
551                               cfs_atomic_read(&gmni->gmni_nthreads));
552                 gmnal_yield(1);
553         }
554 }
555
556 int
557 gmnal_start_threads(gmnal_ni_t *gmni)
558 {
559         int     i;
560         int     pid;
561
562         LASSERT (!gmni->gmni_shutdown);
563         LASSERT (cfs_atomic_read(&gmni->gmni_nthreads) == 0);
564
565         gm_initialize_alarm(&gmni->gmni_alarm);
566
567         for (i = 0; i < cfs_num_online_cpus(); i++) {
568
569                 pid = cfs_kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
570                 if (pid < 0) {
571                         CERROR("rx thread failed to start: %d\n", pid);
572                         gmnal_stop_threads(gmni);
573                         return pid;
574                 }
575
576                 cfs_atomic_inc(&gmni->gmni_nthreads);
577         }
578
579         return 0;
580 }