Whamcloud - gitweb
4a669064762a4dc2b9a8e4a50172a4af7f9bc747
[fs/lustre-release.git] / lnet / klnds / gmlnd / gmlnd_comm.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (c) 2003 Los Alamos National Laboratory (LANL)
5  * Copyright (C) 2005 Cluster File Systems, Inc. All rights reserved.
6  *
7  *   This file is part of Lustre, http://www.lustre.org/
8  *
9  *   Lustre is free software; you can redistribute it and/or
10  *   modify it under the terms of version 2 of the GNU General Public
11  *   License as published by the Free Software Foundation.
12  *
13  *   Lustre is distributed in the hope that it will be useful,
14  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
15  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
16  *   GNU General Public License for more details.
17  *
18  *   You should have received a copy of the GNU General Public License
19  *   along with Lustre; if not, write to the Free Software
20  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
21  */
22
23 /*
24  *      This file contains all gmnal send and receive functions
25  */
26
27 #include "gmlnd.h"
28
29 void
30 gmnal_notify_peer_down(gmnal_tx_t *tx)
31 {
32         struct timeval     now;
33         time_t             then;
34
35         do_gettimeofday (&now);
36         then = now.tv_sec - (jiffies - tx->tx_launchtime)/HZ;
37
38         lnet_notify(tx->tx_gmni->gmni_ni, tx->tx_nid, 0, then);
39 }
40
41 void
42 gmnal_pack_msg(gmnal_ni_t *gmni, gmnal_msg_t *msg,
43                lnet_nid_t dstnid, int type)
44 {
45         /* CAVEAT EMPTOR! this only sets the common message fields. */
46         msg->gmm_magic    = GMNAL_MSG_MAGIC;
47         msg->gmm_version  = GMNAL_MSG_VERSION;
48         msg->gmm_type     = type;
49         msg->gmm_srcnid   = lnet_ptlcompat_srcnid(gmni->gmni_ni->ni_nid,
50                                                   dstnid);
51         msg->gmm_dstnid   = dstnid;
52 }
53
54 int
55 gmnal_unpack_msg(gmnal_ni_t *gmni, gmnal_rx_t *rx)
56 {
57         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
58         const int    hdr_size = offsetof(gmnal_msg_t, gmm_u);
59         int          buffnob = rx->rx_islarge ? gmni->gmni_large_msgsize :
60                                                 gmni->gmni_small_msgsize;
61         int          flip;
62
63         /* rc = 0:SUCCESS -ve:failure +ve:version mismatch */
64
65         /* GM may not overflow our buffer */
66         LASSERT (rx->rx_recv_nob <= buffnob);
67
68         /* 6 bytes are enough to have received magic + version */
69         if (rx->rx_recv_nob < 6) {
70                 CERROR("Short message from gmid %u: %d\n", 
71                        rx->rx_recv_gmid, rx->rx_recv_nob);
72                 return -EPROTO;
73         }
74
75         if (msg->gmm_magic == GMNAL_MSG_MAGIC) {
76                 flip = 0;
77         } else if (msg->gmm_magic == __swab32(GMNAL_MSG_MAGIC)) {
78                 flip = 1;
79         } else if (msg->gmm_magic == LNET_PROTO_MAGIC ||
80                    msg->gmm_magic == __swab32(LNET_PROTO_MAGIC)) {
81                 return EPROTO;
82         } else {
83                 CERROR("Bad magic from gmid %u: %08x\n", 
84                        rx->rx_recv_gmid, msg->gmm_magic);
85                 return -EPROTO;
86         }
87
88         if (msg->gmm_version != 
89             (flip ? __swab16(GMNAL_MSG_VERSION) : GMNAL_MSG_VERSION)) {
90                 return EPROTO;
91         }
92
93         if (rx->rx_recv_nob < hdr_size) {
94                 CERROR("Short message from %u: %d\n",
95                        rx->rx_recv_gmid, rx->rx_recv_nob);
96                 return -EPROTO;
97         }
98
99         if (flip) {
100                 /* leave magic unflipped as a clue to peer endianness */
101                 __swab16s(&msg->gmm_version);
102                 __swab16s(&msg->gmm_type);
103                 __swab64s(&msg->gmm_srcnid);
104                 __swab64s(&msg->gmm_dstnid);
105         }
106         
107         if (msg->gmm_srcnid == LNET_NID_ANY) {
108                 CERROR("Bad src nid from %u: %s\n", 
109                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_srcnid));
110                 return -EPROTO;
111         }
112
113         if (!lnet_ptlcompat_matchnid(gmni->gmni_ni->ni_nid, 
114                                      msg->gmm_dstnid)) {
115                 CERROR("Bad dst nid from %u: %s\n",
116                        rx->rx_recv_gmid, libcfs_nid2str(msg->gmm_dstnid));
117                 return -EPROTO;
118         }
119         
120         switch (msg->gmm_type) {
121         default:
122                 CERROR("Unknown message type from %u: %x\n", 
123                        rx->rx_recv_gmid, msg->gmm_type);
124                 return -EPROTO;
125                 
126         case GMNAL_MSG_IMMEDIATE:
127                 if (rx->rx_recv_nob < offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0])) {
128                         CERROR("Short IMMEDIATE from %u: %d("LPSZ")\n", 
129                                rx->rx_recv_gmid, rx->rx_recv_nob, 
130                                offsetof(gmnal_msg_t, gmm_u.immediate.gmim_payload[0]));
131                         return -EPROTO;
132                 }
133                 break;
134         }
135         return 0;
136 }
137
138 gmnal_tx_t *
139 gmnal_get_tx(gmnal_ni_t *gmni)
140 {
141         gmnal_tx_t       *tx = NULL;
142
143         spin_lock(&gmni->gmni_tx_lock);
144
145         if (gmni->gmni_shutdown ||
146             list_empty(&gmni->gmni_idle_txs)) {
147                 spin_unlock(&gmni->gmni_tx_lock);
148                 return NULL;
149         }
150         
151         tx = list_entry(gmni->gmni_idle_txs.next, gmnal_tx_t, tx_list);
152         list_del(&tx->tx_list);
153
154         spin_unlock(&gmni->gmni_tx_lock);
155
156         LASSERT (tx->tx_lntmsg == NULL);
157         LASSERT (tx->tx_ltxb == NULL);
158         LASSERT (!tx->tx_credit);
159         
160         return tx;
161 }
162
163 void
164 gmnal_tx_done(gmnal_tx_t *tx, int rc)
165 {
166         gmnal_ni_t *gmni = tx->tx_gmni;
167         int         wake_sched = 0;
168         lnet_msg_t *lnetmsg = tx->tx_lntmsg;
169         
170         tx->tx_lntmsg = NULL;
171
172         spin_lock(&gmni->gmni_tx_lock);
173         
174         if (tx->tx_ltxb != NULL) {
175                 wake_sched = 1;
176                 list_add_tail(&tx->tx_ltxb->txb_list, &gmni->gmni_idle_ltxbs);
177                 tx->tx_ltxb = NULL;
178         }
179         
180         if (tx->tx_credit) {
181                 wake_sched = 1;
182                 gmni->gmni_tx_credits++;
183                 tx->tx_credit = 0;
184         }
185         
186         list_add_tail(&tx->tx_list, &gmni->gmni_idle_txs);
187
188         if (wake_sched)
189                 gmnal_check_txqueues_locked(gmni);
190
191         spin_unlock(&gmni->gmni_tx_lock);
192
193         /* Delay finalize until tx is free */
194         if (lnetmsg != NULL)
195                 lnet_finalize(gmni->gmni_ni, lnetmsg, rc);
196 }
197
198 void 
199 gmnal_drop_sends_callback(struct gm_port *gm_port, void *context, 
200                           gm_status_t status)
201 {
202         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
203
204         LASSERT(!in_interrupt());
205          
206         CDEBUG(D_NET, "status for tx [%p] is [%d][%s], nid %s\n", 
207                tx, status, gmnal_gmstatus2str(status),
208                libcfs_nid2str(tx->tx_nid));
209
210         gmnal_tx_done(tx, -EIO);
211 }
212
213 void 
214 gmnal_tx_callback(gm_port_t *gm_port, void *context, gm_status_t status)
215 {
216         gmnal_tx_t      *tx = (gmnal_tx_t*)context;
217         gmnal_ni_t      *gmni = tx->tx_gmni;
218
219         LASSERT(!in_interrupt());
220
221         switch(status) {
222         case GM_SUCCESS:
223                 gmnal_tx_done(tx, 0);
224                 return;
225
226         case GM_SEND_DROPPED:
227                 CDEBUG(D_NETERROR, "Dropped tx %p to %s\n", 
228                        tx, libcfs_nid2str(tx->tx_nid));
229                 /* Another tx failed and called gm_drop_sends() which made this
230                  * one complete immediately */
231                 gmnal_tx_done(tx, -EIO);
232                 return;
233                         
234         default:
235                 /* Some error; NB don't complete tx yet; we need its credit for
236                  * gm_drop_sends() */
237                 CDEBUG(D_NETERROR, "tx %p error %d(%s), nid %s\n",
238                        tx, status, gmnal_gmstatus2str(status), 
239                        libcfs_nid2str(tx->tx_nid));
240
241                 gmnal_notify_peer_down(tx);
242
243                 spin_lock(&gmni->gmni_gm_lock);
244                 gm_drop_sends(gmni->gmni_port, 
245                               tx->tx_ltxb != NULL ?
246                               GMNAL_LARGE_PRIORITY : GMNAL_SMALL_PRIORITY,
247                               tx->tx_gmlid, *gmnal_tunables.gm_port, 
248                               gmnal_drop_sends_callback, tx);
249                 spin_unlock(&gmni->gmni_gm_lock);
250                 return;
251         }
252
253         /* not reached */
254         LBUG();
255 }
256
257 void
258 gmnal_check_txqueues_locked (gmnal_ni_t *gmni)
259 {
260         gmnal_tx_t    *tx;
261         gmnal_txbuf_t *ltxb;
262         int            gmsize;
263         int            pri;
264         void          *netaddr;
265         
266         tx = list_empty(&gmni->gmni_buf_txq) ? NULL :
267              list_entry(gmni->gmni_buf_txq.next, gmnal_tx_t, tx_list);
268
269         if (tx != NULL &&
270             (tx->tx_large_nob == 0 || 
271              !list_empty(&gmni->gmni_idle_ltxbs))) {
272
273                 /* consume tx */
274                 list_del(&tx->tx_list);
275                 
276                 LASSERT (tx->tx_ltxb == NULL);
277
278                 if (tx->tx_large_nob != 0) {
279                         ltxb = list_entry(gmni->gmni_idle_ltxbs.next,
280                                           gmnal_txbuf_t, txb_list);
281
282                         /* consume large buffer */
283                         list_del(&ltxb->txb_list);
284
285                         spin_unlock(&gmni->gmni_tx_lock);
286
287                         /* Unlocking here allows sends to get re-ordered,
288                          * but we want to allow other CPUs to progress... */
289
290                         tx->tx_ltxb = ltxb;
291
292                         /* marshall message in tx_ltxb...
293                          * 1. Copy what was marshalled so far (in tx_buf) */
294                         memcpy(GMNAL_NETBUF_MSG(&ltxb->txb_buf),
295                                GMNAL_NETBUF_MSG(&tx->tx_buf), tx->tx_msgnob);
296
297                         /* 2. Copy the payload */
298                         if (tx->tx_large_iskiov)
299                                 lnet_copy_kiov2kiov(
300                                         gmni->gmni_large_pages,
301                                         ltxb->txb_buf.nb_kiov,
302                                         tx->tx_msgnob,
303                                         tx->tx_large_niov,
304                                         tx->tx_large_frags.kiov,
305                                         tx->tx_large_offset,
306                                         tx->tx_large_nob);
307                         else
308                                 lnet_copy_iov2kiov(
309                                         gmni->gmni_large_pages,
310                                         ltxb->txb_buf.nb_kiov,
311                                         tx->tx_msgnob,
312                                         tx->tx_large_niov,
313                                         tx->tx_large_frags.iov,
314                                         tx->tx_large_offset,
315                                         tx->tx_large_nob);
316
317                         tx->tx_msgnob += tx->tx_large_nob;
318
319                         spin_lock(&gmni->gmni_tx_lock);
320                 }
321
322                 list_add_tail(&tx->tx_list, &gmni->gmni_cred_txq);
323         }
324
325         if (!list_empty(&gmni->gmni_cred_txq) &&
326             gmni->gmni_tx_credits != 0) {
327
328                 tx = list_entry(gmni->gmni_cred_txq.next, gmnal_tx_t, tx_list);
329
330                 /* consume tx and 1 credit */
331                 list_del(&tx->tx_list);
332                 gmni->gmni_tx_credits--;
333
334                 spin_unlock(&gmni->gmni_tx_lock);
335
336                 /* Unlocking here allows sends to get re-ordered, but we want
337                  * to allow other CPUs to progress... */
338
339                 LASSERT(!tx->tx_credit);
340                 tx->tx_credit = 1;
341
342                 tx->tx_launchtime = jiffies;
343
344                 if (tx->tx_msgnob <= gmni->gmni_small_msgsize) {
345                         LASSERT (tx->tx_ltxb == NULL);
346                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_buf);
347                         gmsize = gmni->gmni_small_gmsize;
348                         pri = GMNAL_SMALL_PRIORITY;
349                 } else {
350                         LASSERT (tx->tx_ltxb != NULL);
351                         netaddr = GMNAL_NETBUF_LOCAL_NETADDR(&tx->tx_ltxb->txb_buf);
352                         gmsize = gmni->gmni_large_gmsize;
353                         pri = GMNAL_LARGE_PRIORITY;
354                 }
355
356                 spin_lock(&gmni->gmni_gm_lock);
357
358                 gm_send_to_peer_with_callback(gmni->gmni_port, 
359                                               netaddr, gmsize, 
360                                               tx->tx_msgnob,
361                                               pri, 
362                                               tx->tx_gmlid,
363                                               gmnal_tx_callback, 
364                                               (void*)tx);
365
366                 spin_unlock(&gmni->gmni_gm_lock);
367                 spin_lock(&gmni->gmni_tx_lock);
368         }
369 }
370
371 void
372 gmnal_post_rx(gmnal_ni_t *gmni, gmnal_rx_t *rx)
373 {
374         int   gmsize = rx->rx_islarge ? gmni->gmni_large_gmsize :
375                                         gmni->gmni_small_gmsize;
376         int   pri    = rx->rx_islarge ? GMNAL_LARGE_PRIORITY :
377                                         GMNAL_SMALL_PRIORITY;
378         void *buffer = GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf);
379
380         CDEBUG(D_NET, "posting rx %p buf %p\n", rx, buffer);
381
382         spin_lock(&gmni->gmni_gm_lock);
383         gm_provide_receive_buffer_with_tag(gmni->gmni_port, 
384                                            buffer, gmsize, pri, 0);
385         spin_unlock(&gmni->gmni_gm_lock);
386 }
387
388 void
389 gmnal_version_reply (gmnal_ni_t *gmni, gmnal_rx_t *rx)
390 {
391         /* Future protocol version compatibility support!
392          * The next gmlnd-specific protocol rev will first send a message to
393          * check version; I reply with a stub message containing my current
394          * magic+version... */
395         gmnal_msg_t *msg;
396         gmnal_tx_t  *tx = gmnal_get_tx(gmni);
397
398         if (tx == NULL) {
399                 CERROR("Can't allocate tx to send version info to %u\n",
400                        rx->rx_recv_gmid);
401                 return;
402         }
403
404         LASSERT (tx->tx_lntmsg == NULL);        /* no finalize */
405
406         tx->tx_nid = LNET_NID_ANY;
407         tx->tx_gmlid = rx->rx_recv_gmid;
408
409         msg = GMNAL_NETBUF_MSG(&tx->tx_buf);
410         msg->gmm_magic   = GMNAL_MSG_MAGIC;
411         msg->gmm_version = GMNAL_MSG_VERSION;
412
413         /* just send magic + version */
414         tx->tx_msgnob = offsetof(gmnal_msg_t, gmm_type);
415         tx->tx_large_nob = 0;
416
417         spin_lock(&gmni->gmni_tx_lock);
418
419         list_add_tail(&tx->tx_list, &gmni->gmni_buf_txq);
420         gmnal_check_txqueues_locked(gmni);
421
422         spin_unlock(&gmni->gmni_tx_lock);
423 }
424
425 int
426 gmnal_rx_thread(void *arg)
427 {
428         gmnal_ni_t      *gmni = arg;
429         gm_recv_event_t *rxevent = NULL;
430         gm_recv_t       *recv = NULL;
431         gmnal_rx_t      *rx;
432         int              rc;
433
434         cfs_daemonize("gmnal_rxd");
435
436         down(&gmni->gmni_rx_mutex);
437
438         while (!gmni->gmni_shutdown) {
439
440                 spin_lock(&gmni->gmni_gm_lock);
441                 rxevent = gm_blocking_receive_no_spin(gmni->gmni_port);
442                 spin_unlock(&gmni->gmni_gm_lock);
443
444                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
445                 default:
446                         gm_unknown(gmni->gmni_port, rxevent);
447                         continue;
448
449                 case GM_FAST_RECV_EVENT:
450                 case GM_FAST_PEER_RECV_EVENT:
451                 case GM_PEER_RECV_EVENT:
452                 case GM_FAST_HIGH_RECV_EVENT:
453                 case GM_FAST_HIGH_PEER_RECV_EVENT:
454                 case GM_HIGH_PEER_RECV_EVENT:
455                 case GM_RECV_EVENT:
456                 case GM_HIGH_RECV_EVENT:
457                         break;
458                 }
459                 
460                 recv = &rxevent->recv;
461                 rx = gm_hash_find(gmni->gmni_rx_hash, 
462                                   gm_ntohp(recv->buffer));
463                 LASSERT (rx != NULL);
464
465                 rx->rx_recv_nob  = gm_ntoh_u32(recv->length);
466                 rx->rx_recv_gmid = gm_ntoh_u16(recv->sender_node_id);
467                 rx->rx_recv_port = gm_ntoh_u8(recv->sender_port_id);
468                 rx->rx_recv_type = gm_ntoh_u8(recv->type);
469
470                 switch (GM_RECV_EVENT_TYPE(rxevent)) {
471                 case GM_FAST_RECV_EVENT:
472                 case GM_FAST_PEER_RECV_EVENT:
473                 case GM_FAST_HIGH_RECV_EVENT:
474                 case GM_FAST_HIGH_PEER_RECV_EVENT:
475                         LASSERT (rx->rx_recv_nob <= PAGE_SIZE);
476
477                         memcpy(GMNAL_NETBUF_MSG(&rx->rx_buf),
478                                gm_ntohp(recv->message), rx->rx_recv_nob);
479                         break;
480                 }
481
482                 up(&gmni->gmni_rx_mutex);
483
484                 CDEBUG (D_NET, "rx %p: buf %p(%p) nob %d\n", rx, 
485                         GMNAL_NETBUF_LOCAL_NETADDR(&rx->rx_buf),
486                         gm_ntohp(recv->buffer), rx->rx_recv_nob);
487
488                 /* We're connectionless: simply drop packets with
489                  * errors */
490                 rc = gmnal_unpack_msg(gmni, rx);
491
492                 if (rc == 0) {
493                         gmnal_msg_t *msg = GMNAL_NETBUF_MSG(&rx->rx_buf);
494                         
495                         LASSERT (msg->gmm_type == GMNAL_MSG_IMMEDIATE);
496                         rc =  lnet_parse(gmni->gmni_ni, 
497                                          &msg->gmm_u.immediate.gmim_hdr,
498                                          msg->gmm_srcnid,
499                                          rx, 0);
500                 } else if (rc > 0) {
501                         gmnal_version_reply(gmni, rx);
502                         rc = -EPROTO;           /* repost rx */
503                 }
504
505                 if (rc < 0)                     /* parse failure */
506                         gmnal_post_rx(gmni, rx);
507
508                 down(&gmni->gmni_rx_mutex);
509         }
510
511         up(&gmni->gmni_rx_mutex);
512
513         CDEBUG(D_NET, "exiting\n");
514         atomic_dec(&gmni->gmni_nthreads);
515         return 0;
516 }
517
518 void
519 gmnal_stop_threads(gmnal_ni_t *gmni)
520 {
521         int count = 2;
522
523         gmni->gmni_shutdown = 1;
524         mb();
525         
526         /* wake rxthread owning gmni_rx_mutex with an alarm. */
527         spin_lock(&gmni->gmni_gm_lock);
528         gm_set_alarm(gmni->gmni_port, &gmni->gmni_alarm, 0, NULL, NULL);
529         spin_unlock(&gmni->gmni_gm_lock);
530
531         while (atomic_read(&gmni->gmni_nthreads) != 0) {
532                 count++;
533                 if ((count & (count - 1)) == 0)
534                         CWARN("Waiting for %d threads to stop\n",
535                               atomic_read(&gmni->gmni_nthreads));
536                 gmnal_yield(1);
537         }
538 }
539
540 int
541 gmnal_start_threads(gmnal_ni_t *gmni)
542 {
543         int     i;
544         int     pid;
545
546         LASSERT (!gmni->gmni_shutdown);
547         LASSERT (atomic_read(&gmni->gmni_nthreads) == 0);
548
549         gm_initialize_alarm(&gmni->gmni_alarm);
550
551         for (i = 0; i < num_online_cpus(); i++) {
552
553                 pid = kernel_thread(gmnal_rx_thread, (void*)gmni, 0);
554                 if (pid < 0) {
555                         CERROR("rx thread failed to start: %d\n", pid);
556                         gmnal_stop_threads(gmni);
557                         return pid;
558                 }
559
560                 atomic_inc(&gmni->gmni_nthreads);
561         }
562
563         return 0;
564 }