Whamcloud - gitweb
- merge 2 weeks of b1_4 fixes onto HEAD
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
29 #endif
30
31 /*
32  *  LIB functions follow
33  *
34  */
35 int
36 ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
37 {
38         /* I would guess that if ksocknal_get_peer (nid) == NULL,
39            and we're not routing, then 'nid' is very distant :) */
40         if (nal->libnal_ni.ni_pid.nid == nid) {
41                 *dist = 0;
42         } else {
43                 *dist = 1;
44         }
45
46         return 0;
47 }
48
49 void
50 ksocknal_free_ltx (ksock_ltx_t *ltx)
51 {
52         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
53         PORTAL_FREE(ltx, ltx->ltx_desc_size);
54 }
55
56 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
57 struct page *
58 ksocknal_kvaddr_to_page (unsigned long vaddr)
59 {
60         struct page *page;
61
62         if (vaddr >= VMALLOC_START &&
63             vaddr < VMALLOC_END)
64                 page = vmalloc_to_page ((void *)vaddr);
65 #if CONFIG_HIGHMEM
66         else if (vaddr >= PKMAP_BASE &&
67                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
68                 page = vmalloc_to_page ((void *)vaddr);
69                 /* in 2.4 ^ just walks the page tables */
70 #endif
71         else
72                 page = virt_to_page (vaddr);
73
74         if (page == NULL ||
75             !VALID_PAGE (page))
76                 return (NULL);
77
78         return (page);
79 }
80 #endif
81
82 int
83 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
84 {
85         struct socket *sock = conn->ksnc_sock;
86         struct iovec  *iov = tx->tx_iov;
87         int            fragsize = iov->iov_len;
88         unsigned long  vaddr = (unsigned long)iov->iov_base;
89         int            more = (tx->tx_niov > 1) || 
90                               (tx->tx_nkiov > 0) ||
91                               (!list_empty (&conn->ksnc_tx_queue));
92 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
93         int            offset = vaddr & (PAGE_SIZE - 1);
94         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
95         struct page   *page;
96 #endif
97         int            rc;
98
99         /* NB we can't trust socket ops to either consume our iovs
100          * or leave them alone, so we only send 1 frag at a time. */
101         LASSERT (fragsize <= tx->tx_resid);
102         LASSERT (tx->tx_niov > 0);
103         
104 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
105         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
106             (sock->sk->route_caps & NETIF_F_SG) &&
107             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
108             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
109                 
110                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
111                        (void *)vaddr, page, page_address(page), offset, zcsize);
112
113                 if (fragsize > zcsize) {
114                         more = 1;
115                         fragsize = zcsize;
116                 }
117
118                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
119                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
120                                        &tx->tx_zccd);
121         } else
122 #endif
123         {
124                 /* NB don't pass tx's iov; sendmsg may or may not update it */
125                 struct iovec fragiov = { .iov_base = (void *)vaddr,
126                                          .iov_len  = fragsize};
127                 struct msghdr msg = {
128                         .msg_name       = NULL,
129                         .msg_namelen    = 0,
130                         .msg_iov        = &fragiov,
131                         .msg_iovlen     = 1,
132                         .msg_control    = NULL,
133                         .msg_controllen = 0,
134                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
135                 };
136                 mm_segment_t oldmm = get_fs();
137
138                 set_fs (KERNEL_DS);
139                 rc = sock_sendmsg(sock, &msg, fragsize);
140                 set_fs (oldmm);
141         } 
142
143         if (rc > 0) {
144                 tx->tx_resid -= rc;
145
146                 if (rc < iov->iov_len) {
147                         /* didn't send whole iov entry... */
148                         iov->iov_base = (void *)(vaddr + rc);
149                         iov->iov_len -= rc;
150                 } else {
151                         tx->tx_iov++;
152                         tx->tx_niov--;
153                 }
154         }
155         
156         return (rc);
157 }
158
159 int
160 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
161 {
162         struct socket *sock = conn->ksnc_sock;
163         ptl_kiov_t    *kiov = tx->tx_kiov;
164         int            fragsize = kiov->kiov_len;
165         struct page   *page = kiov->kiov_page;
166         int            offset = kiov->kiov_offset;
167         int            more = (tx->tx_nkiov > 1) ||
168                               (!list_empty (&conn->ksnc_tx_queue));
169         int            rc;
170
171         /* NB we can't trust socket ops to either consume our iovs
172          * or leave them alone, so we only send 1 frag at a time. */
173         LASSERT (fragsize <= tx->tx_resid);
174         LASSERT (offset + fragsize <= PAGE_SIZE);
175         LASSERT (tx->tx_niov == 0);
176         LASSERT (tx->tx_nkiov > 0);
177
178 #if SOCKNAL_ZC
179         if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
180             (sock->sk->route_caps & NETIF_F_SG) &&
181             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
182
183                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
184                                page, offset, fragsize);
185
186                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
187                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
188                                        &tx->tx_zccd);
189         } else
190 #endif
191         {
192                 char *addr = ((char *)kmap (page)) + offset;
193                 struct iovec fragiov = {.iov_base = addr,
194                                         .iov_len  = fragsize};
195                 struct msghdr msg = {
196                         .msg_name       = NULL,
197                         .msg_namelen    = 0,
198                         .msg_iov        = &fragiov,
199                         .msg_iovlen     = 1,
200                         .msg_control    = NULL,
201                         .msg_controllen = 0,
202                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
203                 };
204                 mm_segment_t  oldmm = get_fs();
205                 
206                 set_fs (KERNEL_DS);
207                 rc = sock_sendmsg(sock, &msg, fragsize);
208                 set_fs (oldmm);
209
210                 kunmap (page);
211         }
212
213         if (rc > 0) {
214                 tx->tx_resid -= rc;
215  
216                 if (rc < fragsize) {
217                         kiov->kiov_offset = offset + rc;
218                         kiov->kiov_len    = fragsize - rc;
219                 } else {
220                         tx->tx_kiov++;
221                         tx->tx_nkiov--;
222                 }
223         }
224
225         return (rc);
226 }
227
228 int
229 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
230 {
231         int      rc;
232         
233         if (ksocknal_data.ksnd_stall_tx != 0) {
234                 set_current_state (TASK_UNINTERRUPTIBLE);
235                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
236         }
237
238         LASSERT (tx->tx_resid != 0);
239
240         rc = ksocknal_getconnsock (conn);
241         if (rc != 0) {
242                 LASSERT (conn->ksnc_closing);
243                 return (-ESHUTDOWN);
244         }
245
246         do {
247                 if (ksocknal_data.ksnd_enomem_tx > 0) {
248                         /* testing... */
249                         ksocknal_data.ksnd_enomem_tx--;
250                         rc = -EAGAIN;
251                 } else if (tx->tx_niov != 0) {
252                         rc = ksocknal_send_iov (conn, tx);
253                 } else {
254                         rc = ksocknal_send_kiov (conn, tx);
255                 }
256
257                 if (rc <= 0) {
258                         /* Didn't write anything.
259                          *
260                          * NB: rc == 0 and rc == -EAGAIN both mean try
261                          * again later (linux stack returns -EAGAIN for
262                          * this, but Adaptech TOE returns 0).
263                          *
264                          * Also, sends never fail with -ENOMEM, just
265                          * -EAGAIN, but with the added bonus that we can't
266                          * expect write_space() to call us back to tell us
267                          * when to try sending again.  We use the
268                          * SOCK_NOSPACE flag to diagnose...  */
269
270                         LASSERT(rc != -ENOMEM);
271
272                         if (rc == 0 || rc == -EAGAIN) {
273                                 if (test_bit(SOCK_NOSPACE, 
274                                              &conn->ksnc_sock->flags)) {
275                                         rc = -EAGAIN;
276                                 } else {
277                                         static int counter;
278                          
279                                         counter++;
280                                         if ((counter & (-counter)) == counter)
281                                                 CWARN("%d ENOMEM tx %p\n", 
282                                                       counter, conn);
283                                         rc = -ENOMEM;
284                                 }
285                         }
286                         break;
287                 }
288
289                 rc = 0;
290
291                 /* Consider the connection alive since we managed to chuck
292                  * more data into it.  Really, we'd like to consider it
293                  * alive only when the peer ACKs something, but
294                  * write_space() only gets called back while SOCK_NOSPACE
295                  * is set.  Instead, we presume peer death has occurred if
296                  * the socket doesn't drain within a timout */
297                 conn->ksnc_tx_deadline = jiffies + 
298                                          ksocknal_tunables.ksnd_io_timeout * HZ;
299                 conn->ksnc_peer->ksnp_last_alive = jiffies;
300
301         } while (tx->tx_resid != 0);
302
303         ksocknal_putconnsock (conn);
304         return (rc);
305 }
306
307 void
308 ksocknal_eager_ack (ksock_conn_t *conn)
309 {
310         int            opt = 1;
311         mm_segment_t   oldmm = get_fs();
312         struct socket *sock = conn->ksnc_sock;
313         
314         /* Remind the socket to ACK eagerly.  If I don't, the socket might
315          * think I'm about to send something it could piggy-back the ACK
316          * on, introducing delay in completing zero-copy sends in my
317          * peer. */
318
319         set_fs(KERNEL_DS);
320         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
321                                (char *)&opt, sizeof (opt));
322         set_fs(oldmm);
323 }
324
325 int
326 ksocknal_recv_iov (ksock_conn_t *conn)
327 {
328         struct iovec *iov = conn->ksnc_rx_iov;
329         int           fragsize  = iov->iov_len;
330         unsigned long vaddr = (unsigned long)iov->iov_base;
331         struct iovec  fragiov = { .iov_base = (void *)vaddr,
332                                   .iov_len  = fragsize};
333         struct msghdr msg = {
334                 .msg_name       = NULL,
335                 .msg_namelen    = 0,
336                 .msg_iov        = &fragiov,
337                 .msg_iovlen     = 1,
338                 .msg_control    = NULL,
339                 .msg_controllen = 0,
340                 .msg_flags      = 0
341         };
342         mm_segment_t oldmm = get_fs();
343         int          rc;
344
345         /* NB we can't trust socket ops to either consume our iovs
346          * or leave them alone, so we only receive 1 frag at a time. */
347         LASSERT (conn->ksnc_rx_niov > 0);
348         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
349
350         set_fs (KERNEL_DS);
351         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
352         /* NB this is just a boolean............................^ */
353         set_fs (oldmm);
354
355         if (rc <= 0)
356                 return (rc);
357
358         /* received something... */
359         conn->ksnc_peer->ksnp_last_alive = jiffies;
360         conn->ksnc_rx_deadline = jiffies + 
361                                  ksocknal_tunables.ksnd_io_timeout * HZ;
362         mb();                           /* order with setting rx_started */
363         conn->ksnc_rx_started = 1;
364
365         conn->ksnc_rx_nob_wanted -= rc;
366         conn->ksnc_rx_nob_left -= rc;
367                 
368         if (rc < fragsize) {
369                 iov->iov_base = (void *)(vaddr + rc);
370                 iov->iov_len = fragsize - rc;
371                 return (-EAGAIN);
372         }
373
374         conn->ksnc_rx_iov++;
375         conn->ksnc_rx_niov--;
376         return (1);
377 }
378
379 int
380 ksocknal_recv_kiov (ksock_conn_t *conn)
381 {
382         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
383         struct page  *page = kiov->kiov_page;
384         int           offset = kiov->kiov_offset;
385         int           fragsize = kiov->kiov_len;
386         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
387         struct iovec  fragiov = { .iov_base = (void *)vaddr,
388                                   .iov_len  = fragsize};
389         struct msghdr msg = {
390                 .msg_name       = NULL,
391                 .msg_namelen    = 0,
392                 .msg_iov        = &fragiov,
393                 .msg_iovlen     = 1,
394                 .msg_control    = NULL,
395                 .msg_controllen = 0,
396                 .msg_flags      = 0
397         };
398         mm_segment_t oldmm = get_fs();
399         int          rc;
400
401         /* NB we can't trust socket ops to either consume our iovs
402          * or leave them alone, so we only receive 1 frag at a time. */
403         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
404         LASSERT (conn->ksnc_rx_nkiov > 0);
405         LASSERT (offset + fragsize <= PAGE_SIZE);
406
407         set_fs (KERNEL_DS);
408         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
409         /* NB this is just a boolean............................^ */
410         set_fs (oldmm);
411
412         kunmap (page);
413         
414         if (rc <= 0)
415                 return (rc);
416         
417         /* received something... */
418         conn->ksnc_peer->ksnp_last_alive = jiffies;
419         conn->ksnc_rx_deadline = jiffies + 
420                                  ksocknal_tunables.ksnd_io_timeout * HZ;
421         mb();                           /* order with setting rx_started */
422         conn->ksnc_rx_started = 1;
423
424         conn->ksnc_rx_nob_wanted -= rc;
425         conn->ksnc_rx_nob_left -= rc;
426                 
427         if (rc < fragsize) {
428                 kiov->kiov_offset = offset + rc;
429                 kiov->kiov_len = fragsize - rc;
430                 return (-EAGAIN);
431         }
432
433         conn->ksnc_rx_kiov++;
434         conn->ksnc_rx_nkiov--;
435         return (1);
436 }
437
438 int
439 ksocknal_receive (ksock_conn_t *conn) 
440 {
441         /* Return 1 on success, 0 on EOF, < 0 on error.
442          * Caller checks ksnc_rx_nob_wanted to determine
443          * progress/completion. */
444         int     rc;
445         ENTRY;
446         
447         if (ksocknal_data.ksnd_stall_rx != 0) {
448                 set_current_state (TASK_UNINTERRUPTIBLE);
449                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
450         }
451
452         rc = ksocknal_getconnsock (conn);
453         if (rc != 0) {
454                 LASSERT (conn->ksnc_closing);
455                 return (-ESHUTDOWN);
456         }
457
458         for (;;) {
459                 if (conn->ksnc_rx_niov != 0)
460                         rc = ksocknal_recv_iov (conn);
461                 else
462                         rc = ksocknal_recv_kiov (conn);
463
464                 if (rc <= 0) {
465                         /* error/EOF or partial receive */
466                         if (rc == -EAGAIN) {
467                                 rc = 1;
468                         } else if (rc == 0 && conn->ksnc_rx_started) {
469                                 /* EOF in the middle of a message */
470                                 rc = -EPROTO;
471                         }
472                         break;
473                 }
474
475                 /* Completed a fragment */
476
477                 if (conn->ksnc_rx_nob_wanted == 0) {
478                         /* Completed a message segment (header or payload) */
479                         if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
480                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
481                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
482                                 /* Remind the socket to ack eagerly... */
483                                 ksocknal_eager_ack(conn);
484                         }
485                         rc = 1;
486                         break;
487                 }
488         }
489
490         ksocknal_putconnsock (conn);
491         RETURN (rc);
492 }
493
494 #if SOCKNAL_ZC
495 void
496 ksocknal_zc_callback (zccd_t *zcd)
497 {
498         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
499         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
500         unsigned long  flags;
501         ENTRY;
502
503         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
504
505         spin_lock_irqsave (&sched->kss_lock, flags);
506
507         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
508         wake_up (&sched->kss_waitq);
509
510         spin_unlock_irqrestore (&sched->kss_lock, flags);
511         EXIT;
512 }
513 #endif
514
515 void
516 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
517 {
518         ksock_ltx_t   *ltx;
519         ENTRY;
520
521         if (tx->tx_conn != NULL) {
522                 /* This tx got queued on a conn; do the accounting... */
523                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
524 #if SOCKNAL_ZC
525                 /* zero copy completion isn't always from
526                  * process_transmit() so it needs to keep a ref on
527                  * tx_conn... */
528                 if (asynch)
529                         ksocknal_put_conn (tx->tx_conn);
530 #else
531                 LASSERT (!asynch);
532 #endif
533         }
534
535         if (tx->tx_isfwd) {             /* was a forwarded packet? */
536                 kpr_fwd_done (&ksocknal_data.ksnd_router,
537                               KSOCK_TX_2_KPR_FWD_DESC (tx), 
538                               (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
539                 EXIT;
540                 return;
541         }
542
543         /* local send */
544         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
545
546         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
547                       (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
548
549         ksocknal_free_ltx (ltx);
550         EXIT;
551 }
552
553 void
554 ksocknal_tx_launched (ksock_tx_t *tx) 
555 {
556 #if SOCKNAL_ZC
557         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
558                 ksock_conn_t  *conn = tx->tx_conn;
559                 
560                 /* zccd skbufs are still in-flight.  First take a ref on
561                  * conn, so it hangs about for ksocknal_tx_done... */
562                 atomic_inc (&conn->ksnc_refcount);
563
564                 /* ...then drop the initial ref on zccd, so the zero copy
565                  * callback can occur */
566                 zccd_put (&tx->tx_zccd);
567                 return;
568         }
569 #endif
570         /* Any zero-copy-ness (if any) has completed; I can complete the
571          * transmit now, avoiding an extra schedule */
572         ksocknal_tx_done (tx, 0);
573 }
574
575 int
576 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
577 {
578         unsigned long  flags;
579         int            rc;
580        
581         rc = ksocknal_transmit (conn, tx);
582
583         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
584
585         if (tx->tx_resid == 0) {
586                 /* Sent everything OK */
587                 LASSERT (rc == 0);
588
589                 ksocknal_tx_launched (tx);
590                 return (0);
591         }
592
593         if (rc == -EAGAIN)
594                 return (rc);
595
596         if (rc == -ENOMEM) {
597                 /* Queue on ksnd_enomem_conns for retry after a timeout */
598                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
599
600                 /* enomem list takes over scheduler's ref... */
601                 LASSERT (conn->ksnc_tx_scheduled);
602                 list_add_tail(&conn->ksnc_tx_list,
603                               &ksocknal_data.ksnd_enomem_conns);
604                 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
605                                    ksocknal_data.ksnd_reaper_waketime))
606                         wake_up (&ksocknal_data.ksnd_reaper_waitq);
607                 
608                 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
609                 return (rc);
610         }
611
612         /* Actual error */
613         LASSERT (rc < 0);
614
615         if (!conn->ksnc_closing)
616                 CERROR("[%p] Error %d on write to "LPX64
617                        " ip %d.%d.%d.%d:%d\n", conn, rc,
618                        conn->ksnc_peer->ksnp_nid,
619                        HIPQUAD(conn->ksnc_ipaddr),
620                        conn->ksnc_port);
621
622         ksocknal_close_conn_and_siblings (conn, rc);
623         ksocknal_tx_launched (tx);
624
625         return (rc);
626 }
627
628 void
629 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
630 {
631         unsigned long     flags;
632
633         /* called holding write lock on ksnd_global_lock */
634
635         LASSERT (!route->ksnr_deleted);
636         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
637         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
638         LASSERT (!route->ksnr_connecting);
639         
640         if (ksocknal_tunables.ksnd_typed_conns)
641                 route->ksnr_connecting = 
642                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
643         else
644                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
645
646         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
647         
648         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
649         
650         list_add_tail (&route->ksnr_connect_list,
651                        &ksocknal_data.ksnd_autoconnectd_routes);
652         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
653         
654         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
655 }
656
657 ksock_peer_t *
658 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
659 {
660         char          ipbuf[PTL_NALFMT_SIZE];
661         ptl_nid_t     target_nid;
662         int           rc;
663         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
664
665         if (peer != NULL)
666                 return (peer);
667
668         if (tx->tx_isfwd) {
669                 CERROR ("Can't send packet to "LPX64
670                        " %s: routed target is not a peer\n",
671                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
672                 return (NULL);
673         }
674
675         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
676                          &target_nid);
677         if (rc != 0) {
678                 CERROR ("Can't route to "LPX64" %s: router error %d\n",
679                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
680                 return (NULL);
681         }
682
683         peer = ksocknal_find_peer_locked (target_nid);
684         if (peer != NULL)
685                 return (peer);
686
687         CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
688                 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
689         return (NULL);
690 }
691
692 ksock_conn_t *
693 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
694 {
695         struct list_head *tmp;
696         ksock_conn_t     *typed = NULL;
697         int               tnob  = 0;
698         ksock_conn_t     *fallback = NULL;
699         int               fnob     = 0;
700
701         /* Find the conn with the shortest tx queue */
702         list_for_each (tmp, &peer->ksnp_conns) {
703                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
704                 int           nob = atomic_read(&c->ksnc_tx_nob) +
705                                         c->ksnc_sock->sk->sk_wmem_queued;
706
707                 LASSERT (!c->ksnc_closing);
708
709                 if (fallback == NULL || nob < fnob) {
710                         fallback = c;
711                         fnob     = nob;
712                 }
713
714                 if (!ksocknal_tunables.ksnd_typed_conns)
715                         continue;
716
717                 switch (c->ksnc_type) {
718                 default:
719                         LBUG();
720                 case SOCKNAL_CONN_ANY:
721                         break;
722                 case SOCKNAL_CONN_BULK_IN:
723                         continue;
724                 case SOCKNAL_CONN_BULK_OUT:
725                         if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
726                                 continue;
727                         break;
728                 case SOCKNAL_CONN_CONTROL:
729                         if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
730                                 continue;
731                         break;
732                 }
733
734                 if (typed == NULL || nob < tnob) {
735                         typed = c;
736                         tnob  = nob;
737                 }
738         }
739
740         /* prefer the typed selection */
741         return ((typed != NULL) ? typed : fallback);
742 }
743
744 void
745 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
746 {
747         unsigned long  flags;
748         ksock_sched_t *sched = conn->ksnc_scheduler;
749
750         /* called holding global lock (read or irq-write) and caller may
751          * not have dropped this lock between finding conn and calling me,
752          * so we don't need the {get,put}connsock dance to deref
753          * ksnc_sock... */
754         LASSERT(!conn->ksnc_closing);
755         LASSERT(tx->tx_resid == tx->tx_nob);
756         
757         CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
758                 conn->ksnc_peer->ksnp_nid,
759                 HIPQUAD(conn->ksnc_ipaddr),
760                 conn->ksnc_port);
761
762         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
763         tx->tx_conn = conn;
764
765 #if SOCKNAL_ZC
766         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
767         /* NB this sets 1 ref on zccd, so the callback can only occur after
768          * I've released this ref. */
769 #endif
770         spin_lock_irqsave (&sched->kss_lock, flags);
771
772         conn->ksnc_tx_deadline = jiffies + 
773                                  ksocknal_tunables.ksnd_io_timeout * HZ;
774         mb();                                   /* order with list_add_tail */
775
776         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
777                 
778         if (conn->ksnc_tx_ready &&      /* able to send */
779             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
780                 /* +1 ref for scheduler */
781                 atomic_inc (&conn->ksnc_refcount);
782                 list_add_tail (&conn->ksnc_tx_list, 
783                                &sched->kss_tx_conns);
784                 conn->ksnc_tx_scheduled = 1;
785                 wake_up (&sched->kss_waitq);
786         }
787
788         spin_unlock_irqrestore (&sched->kss_lock, flags);
789 }
790
791 ksock_route_t *
792 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
793 {
794         struct list_head  *tmp;
795         ksock_route_t     *route;
796         ksock_route_t     *first_lazy = NULL;
797         int                found_connecting_or_connected = 0;
798         int                bits;
799         
800         list_for_each (tmp, &peer->ksnp_routes) {
801                 route = list_entry (tmp, ksock_route_t, ksnr_list);
802                 bits  = route->ksnr_connected;
803                 
804                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
805                     (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
806                     route->ksnr_connecting != 0) {
807                         /* All typed connections have been established, or
808                          * an untyped connection has been established, or
809                          * connections are currently being established */
810                         found_connecting_or_connected = 1;
811                         continue;
812                 }
813
814                 /* too soon to retry this guy? */
815                 if (!time_after_eq (jiffies, route->ksnr_timeout))
816                         continue;
817                 
818                 /* eager routes always want to be connected */
819                 if (route->ksnr_eager)
820                         return (route);
821
822                 if (first_lazy == NULL)
823                         first_lazy = route;
824         }
825         
826         /* No eager routes need to be connected.  If some connection has
827          * already been established, or is being established there's nothing to
828          * do.  Otherwise we return the first lazy route we found.  If it fails
829          * to connect, it will go to the end of the list. */
830
831         if (!list_empty (&peer->ksnp_conns) ||
832             found_connecting_or_connected)
833                 return (NULL);
834         
835         return (first_lazy);
836 }
837
838 ksock_route_t *
839 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
840 {
841         struct list_head  *tmp;
842         ksock_route_t     *route;
843
844         list_for_each (tmp, &peer->ksnp_routes) {
845                 route = list_entry (tmp, ksock_route_t, ksnr_list);
846                 
847                 if (route->ksnr_connecting != 0)
848                         return (route);
849         }
850         
851         return (NULL);
852 }
853
854 int
855 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
856 {
857         unsigned long     flags;
858         ksock_peer_t     *peer;
859         ksock_conn_t     *conn;
860         ksock_route_t    *route;
861         rwlock_t         *g_lock;
862         
863         /* Ensure the frags we've been given EXACTLY match the number of
864          * bytes we want to send.  Many TCP/IP stacks disregard any total
865          * size parameters passed to them and just look at the frags. 
866          *
867          * We always expect at least 1 mapped fragment containing the
868          * complete portals header. */
869         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
870                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
871         LASSERT (tx->tx_niov >= 1);
872         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
873
874         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
875                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
876                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
877
878         tx->tx_conn = NULL;                     /* only set when assigned a conn */
879         tx->tx_resid = tx->tx_nob;
880         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
881
882         g_lock = &ksocknal_data.ksnd_global_lock;
883         read_lock (g_lock);
884         
885         peer = ksocknal_find_target_peer_locked (tx, nid);
886         if (peer == NULL) {
887                 read_unlock (g_lock);
888                 return (-EHOSTUNREACH);
889         }
890
891         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
892                 conn = ksocknal_find_conn_locked (tx, peer);
893                 if (conn != NULL) {
894                         /* I've got no autoconnect routes that need to be
895                          * connecting and I do have an actual connection... */
896                         ksocknal_queue_tx_locked (tx, conn);
897                         read_unlock (g_lock);
898                         return (0);
899                 }
900         }
901         
902         /* Making one or more connections; I'll need a write lock... */
903
904         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
905         read_unlock (g_lock);
906         write_lock_irqsave (g_lock, flags);
907         
908         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
909                 write_unlock_irqrestore (g_lock, flags);
910                 ksocknal_put_peer (peer);
911                 return (-EHOSTUNREACH);
912         }
913         ksocknal_put_peer (peer);               /* drop ref I got above */
914
915         for (;;) {
916                 /* launch any/all autoconnections that need it */
917                 route = ksocknal_find_connectable_route_locked (peer);
918                 if (route == NULL)
919                         break;
920
921                 ksocknal_launch_autoconnect_locked (route);
922         }
923
924         conn = ksocknal_find_conn_locked (tx, peer);
925         if (conn != NULL) {
926                 /* Connection exists; queue message on it */
927                 ksocknal_queue_tx_locked (tx, conn);
928                 write_unlock_irqrestore (g_lock, flags);
929                 return (0);
930         }
931
932         route = ksocknal_find_connecting_route_locked (peer);
933         if (route != NULL) {
934                 /* At least 1 connection is being established; queue the
935                  * message... */
936                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
937                 write_unlock_irqrestore (g_lock, flags);
938                 return (0);
939         }
940         
941         write_unlock_irqrestore (g_lock, flags);
942         return (-EHOSTUNREACH);
943 }
944
945 ptl_err_t
946 ksocknal_sendmsg(lib_nal_t     *nal, 
947                  void         *private, 
948                  lib_msg_t    *cookie,
949                  ptl_hdr_t    *hdr, 
950                  int           type, 
951                  ptl_nid_t     nid, 
952                  ptl_pid_t     pid,
953                  unsigned int  payload_niov, 
954                  struct iovec *payload_iov, 
955                  ptl_kiov_t   *payload_kiov,
956                  size_t        payload_offset,
957                  size_t        payload_nob)
958 {
959         ksock_ltx_t  *ltx;
960         int           desc_size;
961         int           rc;
962
963         /* NB 'private' is different depending on what we're sending.
964          * Just ignore it... */
965
966         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
967                " pid %d\n", payload_nob, payload_niov, nid , pid);
968
969         LASSERT (payload_nob == 0 || payload_niov > 0);
970         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
971
972         /* It must be OK to kmap() if required */
973         LASSERT (payload_kiov == NULL || !in_interrupt ());
974         /* payload is either all vaddrs or all pages */
975         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
976         
977         if (payload_iov != NULL)
978                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
979         else
980                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
981         
982         if (in_interrupt() ||
983             type == PTL_MSG_ACK ||
984             type == PTL_MSG_REPLY) {
985                 /* Can't block if in interrupt or responding to an incoming
986                  * message */
987                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
988         } else {
989                 PORTAL_ALLOC(ltx, desc_size);
990         }
991         
992         if (ltx == NULL) {
993                 CERROR("Can't allocate tx desc type %d size %d %s\n",
994                        type, desc_size, in_interrupt() ? "(intr)" : "");
995                 return (PTL_NO_SPACE);
996         }
997
998         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
999
1000         ltx->ltx_desc_size = desc_size;
1001         
1002         /* We always have 1 mapped frag for the header */
1003         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1004         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1005         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1006         ltx->ltx_hdr = *hdr;
1007         
1008         ltx->ltx_private = private;
1009         ltx->ltx_cookie = cookie;
1010         
1011         ltx->ltx_tx.tx_isfwd = 0;
1012         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1013
1014         if (payload_iov != NULL) {
1015                 /* payload is all mapped */
1016                 ltx->ltx_tx.tx_kiov  = NULL;
1017                 ltx->ltx_tx.tx_nkiov = 0;
1018
1019                 ltx->ltx_tx.tx_niov = 
1020                         1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
1021                                             payload_niov, payload_iov,
1022                                             payload_offset, payload_nob);
1023         } else {
1024                 /* payload is all pages */
1025                 ltx->ltx_tx.tx_niov = 1;
1026
1027                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1028                 ltx->ltx_tx.tx_nkiov =
1029                         lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1030                                          payload_niov, payload_kiov,
1031                                          payload_offset, payload_nob);
1032         }
1033
1034         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1035         if (rc == 0)
1036                 return (PTL_OK);
1037         
1038         ksocknal_free_ltx(ltx);
1039         return (PTL_FAIL);
1040 }
1041
1042 ptl_err_t
1043 ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1044                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1045                unsigned int payload_niov, struct iovec *payload_iov,
1046                size_t payload_offset, size_t payload_len)
1047 {
1048         return (ksocknal_sendmsg(nal, private, cookie,
1049                                  hdr, type, nid, pid,
1050                                  payload_niov, payload_iov, NULL,
1051                                  payload_offset, payload_len));
1052 }
1053
1054 ptl_err_t
1055 ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1056                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1057                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1058                      size_t payload_offset, size_t payload_len)
1059 {
1060         return (ksocknal_sendmsg(nal, private, cookie,
1061                                  hdr, type, nid, pid,
1062                                  payload_niov, NULL, payload_kiov,
1063                                  payload_offset, payload_len));
1064 }
1065
1066 void
1067 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1068 {
1069         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1070         ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1071         int           rc;
1072         
1073         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1074                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1075
1076         /* I'm the gateway; must be the last hop */
1077         if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
1078                 nid = fwd->kprfd_target_nid;
1079
1080         /* setup iov for hdr */
1081         ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1082         ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1083
1084         ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
1085         ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1086         ftx->ftx_tx.tx_niov  = 1;
1087         ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
1088         ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1089         ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
1090
1091         rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1092         if (rc != 0)
1093                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1094 }
1095
1096 int
1097 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1098 {
1099         long    pid = kernel_thread (fn, arg, 0);
1100
1101         if (pid < 0)
1102                 return ((int)pid);
1103
1104         atomic_inc (&ksocknal_data.ksnd_nthreads);
1105         return (0);
1106 }
1107
1108 void
1109 ksocknal_thread_fini (void)
1110 {
1111         atomic_dec (&ksocknal_data.ksnd_nthreads);
1112 }
1113
1114 void
1115 ksocknal_fmb_callback (void *arg, int error)
1116 {
1117         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1118         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1119         ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
1120         ksock_conn_t      *conn = NULL;
1121         ksock_sched_t     *sched;
1122         unsigned long      flags;
1123         char               ipbuf[PTL_NALFMT_SIZE];
1124         char               ipbuf2[PTL_NALFMT_SIZE];
1125
1126         if (error != 0)
1127                 CERROR("Failed to route packet from "
1128                        LPX64" %s to "LPX64" %s: %d\n",
1129                        NTOH__u64(hdr->src_nid),
1130                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
1131                        NTOH__u64(hdr->dest_nid),
1132                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2),
1133                        error);
1134         else
1135                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1136                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1137
1138         /* drop peer ref taken on init */
1139         ksocknal_put_peer (fmb->fmb_peer);
1140
1141         spin_lock_irqsave (&fmp->fmp_lock, flags);
1142
1143         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1144         fmp->fmp_nactive_fmbs--;
1145
1146         if (!list_empty (&fmp->fmp_blocked_conns)) {
1147                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1148                                    ksock_conn_t, ksnc_rx_list);
1149                 list_del (&conn->ksnc_rx_list);
1150         }
1151
1152         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1153
1154         if (conn == NULL)
1155                 return;
1156
1157         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1158         LASSERT (conn->ksnc_rx_scheduled);
1159         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1160
1161         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1162
1163         sched = conn->ksnc_scheduler;
1164
1165         spin_lock_irqsave (&sched->kss_lock, flags);
1166
1167         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1168         wake_up (&sched->kss_waitq);
1169
1170         spin_unlock_irqrestore (&sched->kss_lock, flags);
1171 }
1172
1173 ksock_fmb_t *
1174 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1175 {
1176         int               payload_nob = conn->ksnc_rx_nob_left;
1177         unsigned long     flags;
1178         ksock_fmb_pool_t *pool;
1179         ksock_fmb_t      *fmb;
1180
1181         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1182         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1183
1184         if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1185                 pool = &ksocknal_data.ksnd_small_fmp;
1186         else
1187                 pool = &ksocknal_data.ksnd_large_fmp;
1188
1189         spin_lock_irqsave (&pool->fmp_lock, flags);
1190
1191         if (!list_empty (&pool->fmp_idle_fmbs)) {
1192                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1193                                  ksock_fmb_t, fmb_list);
1194                 list_del (&fmb->fmb_list);
1195                 pool->fmp_nactive_fmbs++;
1196                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1197
1198                 return (fmb);
1199         }
1200
1201         /* deschedule until fmb free */
1202
1203         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1204
1205         list_add_tail (&conn->ksnc_rx_list,
1206                        &pool->fmp_blocked_conns);
1207
1208         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1209         return (NULL);
1210 }
1211
1212 int
1213 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1214 {
1215         int       payload_nob = conn->ksnc_rx_nob_left;
1216         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1217         int       niov = 0;
1218         int       nob = payload_nob;
1219
1220         LASSERT (conn->ksnc_rx_scheduled);
1221         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1222         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1223         LASSERT (payload_nob >= 0);
1224         LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1225         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1226         LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1227
1228         /* Take a ref on the conn's peer to prevent module unload before
1229          * forwarding completes. */
1230         fmb->fmb_peer = conn->ksnc_peer;
1231         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1232
1233         /* Copy the header we just read into the forwarding buffer.  If
1234          * there's payload, start reading reading it into the buffer,
1235          * otherwise the forwarding buffer can be kicked off
1236          * immediately. */
1237         fmb->fmb_hdr = conn->ksnc_hdr;
1238
1239         while (nob > 0) {
1240                 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1241                 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1242                 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1243                 nob -= PAGE_SIZE;
1244                 niov++;
1245         }
1246
1247         kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1248                      payload_nob, niov, fmb->fmb_kiov,
1249                      ksocknal_fmb_callback, fmb);
1250
1251         if (payload_nob == 0) {         /* got complete packet already */
1252                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1253                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
1254
1255                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1256
1257                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1258                 return (1);
1259         }
1260
1261         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1262         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1263         
1264         /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1265          * buffer */
1266         LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1267
1268         conn->ksnc_rx_niov = 0;
1269         conn->ksnc_rx_nkiov = niov;
1270         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1271         memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1272         
1273         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1274                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1275         return (0);
1276 }
1277
1278 void
1279 ksocknal_fwd_parse (ksock_conn_t *conn)
1280 {
1281         ksock_peer_t *peer;
1282         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1283         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1284         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1285         char str[PTL_NALFMT_SIZE];
1286         char str2[PTL_NALFMT_SIZE];
1287
1288         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1289                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1290
1291         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1292         LASSERT (conn->ksnc_rx_scheduled);
1293
1294         if (body_len < 0) {                 /* length corrupt (overflow) */
1295                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1296                        "packet size %d illegal\n",
1297                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1298                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1299                        body_len);
1300
1301                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1302                 return;
1303         }
1304
1305         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1306                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1307                        " (%s): not forwarding\n",
1308                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1309                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1310                 /* on to new packet (skip this one's body) */
1311                 ksocknal_new_packet (conn, body_len);
1312                 return;
1313         }
1314
1315         if (body_len > PTL_MTU) {      /* too big to forward */
1316                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1317                         "(%s): packet size %d too big\n",
1318                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1319                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1320                         body_len);
1321                 /* on to new packet (skip this one's body) */
1322                 ksocknal_new_packet (conn, body_len);
1323                 return;
1324         }
1325
1326         /* should have gone direct */
1327         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1328         if (peer != NULL) {
1329                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1330                         "(%s): target is a peer\n",
1331                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1332                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1333                 ksocknal_put_peer (peer);  /* drop ref from get above */
1334
1335                 /* on to next packet (skip this one's body) */
1336                 ksocknal_new_packet (conn, body_len);
1337                 return;
1338         }
1339
1340         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1341         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1342         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1343 }
1344
1345 int
1346 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1347 {
1348         static char ksocknal_slop_buffer[4096];
1349
1350         int   nob;
1351         int   niov;
1352         int   skipped;
1353
1354         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1355                 conn->ksnc_rx_started = 0;
1356                 mb ();                          /* racing with timeout thread */
1357                 
1358                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1359                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1360                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1361
1362                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1363                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1364                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1365                 conn->ksnc_rx_niov = 1;
1366
1367                 conn->ksnc_rx_kiov = NULL;
1368                 conn->ksnc_rx_nkiov = 0;
1369                 return (1);
1370         }
1371
1372         /* Set up to skip as much a possible now.  If there's more left
1373          * (ran out of iov entries) we'll get called again */
1374
1375         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1376         conn->ksnc_rx_nob_left = nob_to_skip;
1377         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1378         skipped = 0;
1379         niov = 0;
1380
1381         do {
1382                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1383
1384                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1385                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1386                 niov++;
1387                 skipped += nob;
1388                 nob_to_skip -=nob;
1389
1390         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1391                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1392
1393         conn->ksnc_rx_niov = niov;
1394         conn->ksnc_rx_kiov = NULL;
1395         conn->ksnc_rx_nkiov = 0;
1396         conn->ksnc_rx_nob_wanted = skipped;
1397         return (0);
1398 }
1399
1400 int
1401 ksocknal_process_receive (ksock_conn_t *conn)
1402 {
1403         ksock_fmb_t  *fmb;
1404         int           rc;
1405         
1406         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1407
1408         /* doesn't need a forwarding buffer */
1409         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1410                 goto try_read;
1411
1412  get_fmb:
1413         fmb = ksocknal_get_idle_fmb (conn);
1414         if (fmb == NULL) {
1415                 /* conn descheduled waiting for idle fmb */
1416                 return (0);
1417         }
1418
1419         if (ksocknal_init_fmb (conn, fmb)) {
1420                 /* packet forwarded */
1421                 return (0);
1422         }
1423
1424  try_read:
1425         /* NB: sched lock NOT held */
1426         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1427                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1428                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1429                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1430
1431         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1432
1433         rc = ksocknal_receive(conn);
1434
1435         if (rc <= 0) {
1436                 LASSERT (rc != -EAGAIN);
1437
1438                 if (rc == 0)
1439                         CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1440                                conn, conn->ksnc_peer->ksnp_nid,
1441                                HIPQUAD(conn->ksnc_ipaddr),
1442                                conn->ksnc_port);
1443                 else if (!conn->ksnc_closing)
1444                         CERROR ("[%p] Error %d on read from "LPX64
1445                                 " ip %d.%d.%d.%d:%d\n",
1446                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1447                                 HIPQUAD(conn->ksnc_ipaddr),
1448                                 conn->ksnc_port);
1449
1450                 ksocknal_close_conn_and_siblings (conn, rc);
1451                 return (rc == 0 ? -ESHUTDOWN : rc);
1452         }
1453
1454         if (conn->ksnc_rx_nob_wanted != 0) {
1455                 /* short read */
1456                 return (-EAGAIN);
1457         }
1458         
1459         switch (conn->ksnc_rx_state) {
1460         case SOCKNAL_RX_HEADER:
1461                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1462                     NTOH__u64(conn->ksnc_hdr.dest_nid) != 
1463                     ksocknal_lib.libnal_ni.ni_pid.nid) {
1464                         /* This packet isn't for me */
1465                         ksocknal_fwd_parse (conn);
1466                         switch (conn->ksnc_rx_state) {
1467                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1468                                 return (0);     /* => come back later */
1469                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1470                                 goto try_read;  /* => go read it */
1471                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1472                                 goto get_fmb;   /* => go get a fwd msg buffer */
1473                         default:
1474                                 LBUG ();
1475                         }
1476                         /* Not Reached */
1477                 }
1478
1479                 /* sets wanted_len, iovs etc */
1480                 rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1481
1482                 if (rc != PTL_OK) {
1483                         /* I just received garbage: give up on this conn */
1484                         ksocknal_close_conn_and_siblings (conn, rc);
1485                         return (-EPROTO);
1486                 }
1487
1488                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1489                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1490                         goto try_read;          /* go read the payload */
1491                 }
1492                 /* Fall through (completed packet for me) */
1493
1494         case SOCKNAL_RX_BODY:
1495                 /* payload all received */
1496                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1497                 /* Fall through */
1498
1499         case SOCKNAL_RX_SLOP:
1500                 /* starting new packet? */
1501                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1502                         return (0);     /* come back later */
1503                 goto try_read;          /* try to finish reading slop now */
1504
1505         case SOCKNAL_RX_BODY_FWD:
1506                 /* payload all received */
1507                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1508                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1509                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1510                         conn->ksnc_rx_nob_left);
1511
1512                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1513                  * conn->ksnc_cookie */
1514                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1515                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1516
1517                 /* no slop in forwarded packets */
1518                 LASSERT (conn->ksnc_rx_nob_left == 0);
1519
1520                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1521                 return (0);                     /* (later) */
1522
1523         default:
1524                 break;
1525         }
1526
1527         /* Not Reached */
1528         LBUG ();
1529         return (-EINVAL);                       /* keep gcc happy */
1530 }
1531
1532 ptl_err_t
1533 ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1534                unsigned int niov, struct iovec *iov, 
1535                size_t offset, size_t mlen, size_t rlen)
1536 {
1537         ksock_conn_t *conn = (ksock_conn_t *)private;
1538
1539         LASSERT (mlen <= rlen);
1540         LASSERT (niov <= PTL_MD_MAX_IOV);
1541         
1542         conn->ksnc_cookie = msg;
1543         conn->ksnc_rx_nob_wanted = mlen;
1544         conn->ksnc_rx_nob_left   = rlen;
1545
1546         conn->ksnc_rx_nkiov = 0;
1547         conn->ksnc_rx_kiov = NULL;
1548         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1549         conn->ksnc_rx_niov =
1550                 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1551                                 niov, iov, offset, mlen);
1552
1553         LASSERT (mlen == 
1554                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1555                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1556
1557         return (PTL_OK);
1558 }
1559
1560 ptl_err_t
1561 ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1562                      unsigned int niov, ptl_kiov_t *kiov, 
1563                      size_t offset, size_t mlen, size_t rlen)
1564 {
1565         ksock_conn_t *conn = (ksock_conn_t *)private;
1566
1567         LASSERT (mlen <= rlen);
1568         LASSERT (niov <= PTL_MD_MAX_IOV);
1569         
1570         conn->ksnc_cookie = msg;
1571         conn->ksnc_rx_nob_wanted = mlen;
1572         conn->ksnc_rx_nob_left   = rlen;
1573
1574         conn->ksnc_rx_niov = 0;
1575         conn->ksnc_rx_iov  = NULL;
1576         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1577         conn->ksnc_rx_nkiov = 
1578                 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1579                                  niov, kiov, offset, mlen);
1580
1581         LASSERT (mlen == 
1582                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1583                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1584
1585         return (PTL_OK);
1586 }
1587
1588 int ksocknal_scheduler (void *arg)
1589 {
1590         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1591         ksock_conn_t      *conn;
1592         ksock_tx_t        *tx;
1593         unsigned long      flags;
1594         int                rc;
1595         int                nloops = 0;
1596         int                id = sched - ksocknal_data.ksnd_schedulers;
1597         char               name[16];
1598
1599         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1600         kportal_daemonize (name);
1601         kportal_blockallsigs ();
1602
1603 #if (CONFIG_SMP && CPU_AFFINITY)
1604         if ((cpu_online_map & (1 << id)) != 0) {
1605 #if 1
1606                 current->cpus_allowed = (1 << id);
1607 #else
1608                 set_cpus_allowed (current, 1<<id);
1609 #endif
1610         } else {
1611                 CERROR ("Can't set CPU affinity for %s\n", name);
1612         }
1613 #endif /* CONFIG_SMP && CPU_AFFINITY */
1614         
1615         spin_lock_irqsave (&sched->kss_lock, flags);
1616
1617         while (!ksocknal_data.ksnd_shuttingdown) {
1618                 int did_something = 0;
1619
1620                 /* Ensure I progress everything semi-fairly */
1621
1622                 if (!list_empty (&sched->kss_rx_conns)) {
1623                         conn = list_entry(sched->kss_rx_conns.next,
1624                                           ksock_conn_t, ksnc_rx_list);
1625                         list_del(&conn->ksnc_rx_list);
1626
1627                         LASSERT(conn->ksnc_rx_scheduled);
1628                         LASSERT(conn->ksnc_rx_ready);
1629
1630                         /* clear rx_ready in case receive isn't complete.
1631                          * Do it BEFORE we call process_recv, since
1632                          * data_ready can set it any time after we release
1633                          * kss_lock. */
1634                         conn->ksnc_rx_ready = 0;
1635                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1636                         
1637                         rc = ksocknal_process_receive(conn);
1638                         
1639                         spin_lock_irqsave(&sched->kss_lock, flags);
1640
1641                         /* I'm the only one that can clear this flag */
1642                         LASSERT(conn->ksnc_rx_scheduled);
1643
1644                         /* Did process_receive get everything it wanted? */
1645                         if (rc == 0)
1646                                 conn->ksnc_rx_ready = 1;
1647                         
1648                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1649                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1650                                 /* Conn blocked for a forwarding buffer.
1651                                  * It will get queued for my attention when
1652                                  * one becomes available (and it might just
1653                                  * already have been!).  Meanwhile my ref
1654                                  * on it stays put. */
1655                         } else if (conn->ksnc_rx_ready) {
1656                                 /* reschedule for rx */
1657                                 list_add_tail (&conn->ksnc_rx_list,
1658                                                &sched->kss_rx_conns);
1659                         } else {
1660                                 conn->ksnc_rx_scheduled = 0;
1661                                 /* drop my ref */
1662                                 ksocknal_put_conn(conn);
1663                         }
1664
1665                         did_something = 1;
1666                 }
1667
1668                 if (!list_empty (&sched->kss_tx_conns)) {
1669                         conn = list_entry(sched->kss_tx_conns.next,
1670                                           ksock_conn_t, ksnc_tx_list);
1671                         list_del (&conn->ksnc_tx_list);
1672                         
1673                         LASSERT(conn->ksnc_tx_scheduled);
1674                         LASSERT(conn->ksnc_tx_ready);
1675                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1676                         
1677                         tx = list_entry(conn->ksnc_tx_queue.next,
1678                                         ksock_tx_t, tx_list);
1679                         /* dequeue now so empty list => more to send */
1680                         list_del(&tx->tx_list);
1681                         
1682                         /* Clear tx_ready in case send isn't complete.  Do
1683                          * it BEFORE we call process_transmit, since
1684                          * write_space can set it any time after we release
1685                          * kss_lock. */
1686                         conn->ksnc_tx_ready = 0;
1687                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1688
1689                         rc = ksocknal_process_transmit(conn, tx);
1690
1691                         spin_lock_irqsave (&sched->kss_lock, flags);
1692
1693                         if (rc == -ENOMEM || rc == -EAGAIN) {
1694                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1695                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1696                         } else {
1697                                 /* Complete send; assume space for more */
1698                                 conn->ksnc_tx_ready = 1;
1699                         }
1700
1701                         if (rc == -ENOMEM) {
1702                                 /* Do nothing; after a short timeout, this
1703                                  * conn will be reposted on kss_tx_conns. */
1704                         } else if (conn->ksnc_tx_ready &&
1705                                    !list_empty (&conn->ksnc_tx_queue)) {
1706                                 /* reschedule for tx */
1707                                 list_add_tail (&conn->ksnc_tx_list, 
1708                                                &sched->kss_tx_conns);
1709                         } else {
1710                                 conn->ksnc_tx_scheduled = 0;
1711                                 /* drop my ref */
1712                                 ksocknal_put_conn (conn);
1713                         }
1714                                 
1715                         did_something = 1;
1716                 }
1717 #if SOCKNAL_ZC
1718                 if (!list_empty (&sched->kss_zctxdone_list)) {
1719                         ksock_tx_t *tx =
1720                                 list_entry(sched->kss_zctxdone_list.next,
1721                                            ksock_tx_t, tx_list);
1722                         did_something = 1;
1723
1724                         list_del (&tx->tx_list);
1725                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1726
1727                         ksocknal_tx_done (tx, 1);
1728
1729                         spin_lock_irqsave (&sched->kss_lock, flags);
1730                 }
1731 #endif
1732                 if (!did_something ||           /* nothing to do */
1733                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1734                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1735
1736                         nloops = 0;
1737
1738                         if (!did_something) {   /* wait for something to do */
1739 #if SOCKNAL_ZC
1740                                 rc = wait_event_interruptible (sched->kss_waitq,
1741                                                                ksocknal_data.ksnd_shuttingdown ||
1742                                                                !list_empty(&sched->kss_rx_conns) ||
1743                                                                !list_empty(&sched->kss_tx_conns) ||
1744                                                                !list_empty(&sched->kss_zctxdone_list));
1745 #else
1746                                 rc = wait_event_interruptible (sched->kss_waitq,
1747                                                                ksocknal_data.ksnd_shuttingdown ||
1748                                                                !list_empty(&sched->kss_rx_conns) ||
1749                                                                !list_empty(&sched->kss_tx_conns));
1750 #endif
1751                                 LASSERT (rc == 0);
1752                         } else
1753                                our_cond_resched();
1754
1755                         spin_lock_irqsave (&sched->kss_lock, flags);
1756                 }
1757         }
1758
1759         spin_unlock_irqrestore (&sched->kss_lock, flags);
1760         ksocknal_thread_fini ();
1761         return (0);
1762 }
1763
1764 void
1765 ksocknal_data_ready (struct sock *sk, int n)
1766 {
1767         unsigned long  flags;
1768         ksock_conn_t  *conn;
1769         ksock_sched_t *sched;
1770         ENTRY;
1771
1772         /* interleave correctly with closing sockets... */
1773         read_lock (&ksocknal_data.ksnd_global_lock);
1774
1775         conn = sk->sk_user_data;
1776         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1777                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1778                 sk->sk_data_ready (sk, n);
1779         } else {
1780                 sched = conn->ksnc_scheduler;
1781
1782                 spin_lock_irqsave (&sched->kss_lock, flags);
1783
1784                 conn->ksnc_rx_ready = 1;
1785
1786                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1787                         list_add_tail(&conn->ksnc_rx_list,
1788                                       &sched->kss_rx_conns);
1789                         conn->ksnc_rx_scheduled = 1;
1790                         /* extra ref for scheduler */
1791                         atomic_inc (&conn->ksnc_refcount);
1792
1793                         wake_up (&sched->kss_waitq);
1794                 }
1795
1796                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1797         }
1798
1799         read_unlock (&ksocknal_data.ksnd_global_lock);
1800
1801         EXIT;
1802 }
1803
1804 void
1805 ksocknal_write_space (struct sock *sk)
1806 {
1807         unsigned long  flags;
1808         ksock_conn_t  *conn;
1809         ksock_sched_t *sched;
1810
1811         /* interleave correctly with closing sockets... */
1812         read_lock (&ksocknal_data.ksnd_global_lock);
1813
1814         conn = sk->sk_user_data;
1815
1816         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1817                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1818                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1819                                       " ready" : " blocked"),
1820                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1821                                       " scheduled" : " idle"),
1822                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1823                                       " empty" : " queued"));
1824
1825         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1826                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1827                 sk->sk_write_space (sk);
1828
1829                 read_unlock (&ksocknal_data.ksnd_global_lock);
1830                 return;
1831         }
1832
1833         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1834                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1835
1836                 sched = conn->ksnc_scheduler;
1837
1838                 spin_lock_irqsave (&sched->kss_lock, flags);
1839
1840                 conn->ksnc_tx_ready = 1;
1841
1842                 if (!conn->ksnc_tx_scheduled && // not being progressed
1843                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1844                         list_add_tail (&conn->ksnc_tx_list,
1845                                        &sched->kss_tx_conns);
1846                         conn->ksnc_tx_scheduled = 1;
1847                         /* extra ref for scheduler */
1848                         atomic_inc (&conn->ksnc_refcount);
1849
1850                         wake_up (&sched->kss_waitq);
1851                 }
1852
1853                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1854         }
1855
1856         read_unlock (&ksocknal_data.ksnd_global_lock);
1857 }
1858
1859 int
1860 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1861 {
1862         int           rc;
1863         mm_segment_t  oldmm = get_fs();
1864
1865         while (nob > 0) {
1866                 struct iovec  iov = {
1867                         .iov_base = buffer,
1868                         .iov_len  = nob
1869                 };
1870                 struct msghdr msg = {
1871                         .msg_name       = NULL,
1872                         .msg_namelen    = 0,
1873                         .msg_iov        = &iov,
1874                         .msg_iovlen     = 1,
1875                         .msg_control    = NULL,
1876                         .msg_controllen = 0,
1877                         .msg_flags      = 0
1878                 };
1879
1880                 set_fs (KERNEL_DS);
1881                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1882                 set_fs (oldmm);
1883                 
1884                 if (rc < 0)
1885                         return (rc);
1886
1887                 if (rc == 0) {
1888                         CERROR ("Unexpected zero rc\n");
1889                         return (-ECONNABORTED);
1890                 }
1891
1892                 buffer = ((char *)buffer) + rc;
1893                 nob -= rc;
1894         }
1895         
1896         return (0);
1897 }
1898
1899 int
1900 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1901 {
1902         int           rc;
1903         mm_segment_t  oldmm = get_fs();
1904         
1905         while (nob > 0) {
1906                 struct iovec  iov = {
1907                         .iov_base = buffer,
1908                         .iov_len  = nob
1909                 };
1910                 struct msghdr msg = {
1911                         .msg_name       = NULL,
1912                         .msg_namelen    = 0,
1913                         .msg_iov        = &iov,
1914                         .msg_iovlen     = 1,
1915                         .msg_control    = NULL,
1916                         .msg_controllen = 0,
1917                         .msg_flags      = 0
1918                 };
1919
1920                 set_fs (KERNEL_DS);
1921                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1922                 set_fs (oldmm);
1923                 
1924                 if (rc < 0)
1925                         return (rc);
1926
1927                 if (rc == 0)
1928                         return (-ECONNABORTED);
1929
1930                 buffer = ((char *)buffer) + rc;
1931                 nob -= rc;
1932         }
1933         
1934         return (0);
1935 }
1936
1937 int
1938 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
1939                 __u64 *incarnation)
1940 {
1941         int                 rc;
1942         ptl_hdr_t           hdr;
1943         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1944         char                ipbuf[PTL_NALFMT_SIZE];
1945         char                ipbuf2[PTL_NALFMT_SIZE];
1946
1947         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1948
1949         memset (&hdr, 0, sizeof (hdr));
1950         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1951         hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
1952         hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
1953
1954         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
1955         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
1956
1957         hdr.msg.hello.type = __cpu_to_le32 (*type);
1958         hdr.msg.hello.incarnation =
1959                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
1960
1961         /* Assume sufficient socket buffering for this message */
1962         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
1963         if (rc != 0) {
1964                 CERROR ("Error %d sending HELLO to "LPX64" %s\n",
1965                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
1966                 return (rc);
1967         }
1968
1969         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
1970         if (rc != 0) {
1971                 CERROR ("Error %d reading HELLO from "LPX64" %s\n",
1972                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
1973                 return (rc);
1974         }
1975
1976         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
1977                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
1978                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
1979                         portals_nid2str(SOCKNAL, *nid, ipbuf));
1980                 return (-EPROTO);
1981         }
1982
1983         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
1984             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
1985                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
1986                         " from "LPX64" %s\n",
1987                         __le16_to_cpu (hmv->version_major),
1988                         __le16_to_cpu (hmv->version_minor),
1989                         PORTALS_PROTO_VERSION_MAJOR,
1990                         PORTALS_PROTO_VERSION_MINOR,
1991                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
1992                 return (-EPROTO);
1993         }
1994
1995 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
1996 # error "This code only understands protocol version 0.x"
1997 #endif
1998         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
1999          * so read the rest of it in now... */
2000
2001         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2002         if (rc != 0) {
2003                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
2004                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2005                 return (rc);
2006         }
2007
2008         /* ...and check we got what we expected */
2009         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2010             hdr.payload_length != __cpu_to_le32 (0)) {
2011                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2012                         " but got type %d with %d payload from "LPX64" %s\n",
2013                         __le32_to_cpu (hdr.type),
2014                         __le32_to_cpu (hdr.payload_length), *nid,
2015                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2016                 return (-EPROTO);
2017         }
2018
2019         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2020                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2021                 return (-EPROTO);
2022         }
2023
2024         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2025                 *nid = __le64_to_cpu(hdr.src_nid);
2026         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2027                 CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
2028                         __le64_to_cpu (hdr.src_nid),
2029                         portals_nid2str(SOCKNAL,
2030                                         __le64_to_cpu(hdr.src_nid),
2031                                         ipbuf),
2032                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2));
2033                 return (-EPROTO);
2034         }
2035
2036         if (*type == SOCKNAL_CONN_NONE) {
2037                 /* I've accepted this connection; peer determines type */
2038                 *type = __le32_to_cpu(hdr.msg.hello.type);
2039                 switch (*type) {
2040                 case SOCKNAL_CONN_ANY:
2041                 case SOCKNAL_CONN_CONTROL:
2042                         break;
2043                 case SOCKNAL_CONN_BULK_IN:
2044                         *type = SOCKNAL_CONN_BULK_OUT;
2045                         break;
2046                 case SOCKNAL_CONN_BULK_OUT:
2047                         *type = SOCKNAL_CONN_BULK_IN;
2048                         break;
2049                 default:
2050                         CERROR ("Unexpected type %d from "LPX64" %s\n",
2051                                 *type, *nid,
2052                                 portals_nid2str(SOCKNAL, *nid, ipbuf));
2053                         return (-EPROTO);
2054                 }
2055         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2056                 CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
2057                         *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
2058                         __le32_to_cpu(hdr.msg.hello.type));
2059                 return (-EPROTO);
2060         }
2061
2062         *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2063
2064         return (0);
2065 }
2066
2067 int
2068 ksocknal_setup_sock (struct socket *sock)
2069 {
2070         mm_segment_t    oldmm = get_fs ();
2071         int             rc;
2072         int             option;
2073         struct linger   linger;
2074
2075 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2076         sock->sk->sk_allocation = GFP_NOFS;
2077 #else
2078         sock->sk->allocation = GFP_NOFS;
2079 #endif
2080
2081         /* Ensure this socket aborts active sends immediately when we close
2082          * it. */
2083
2084         linger.l_onoff = 0;
2085         linger.l_linger = 0;
2086
2087         set_fs (KERNEL_DS);
2088         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2089                               (char *)&linger, sizeof (linger));
2090         set_fs (oldmm);
2091         if (rc != 0) {
2092                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2093                 return (rc);
2094         }
2095
2096         option = -1;
2097         set_fs (KERNEL_DS);
2098         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2099                                     (char *)&option, sizeof (option));
2100         set_fs (oldmm);
2101         if (rc != 0) {
2102                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2103                 return (rc);
2104         }
2105
2106 #if SOCKNAL_USE_KEEPALIVES
2107         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2108          * second until the timeout elapses. */
2109
2110         option = (ksocknal_tunables.ksnd_io_timeout * 3) / 4;
2111         set_fs (KERNEL_DS);
2112         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2113                                     (char *)&option, sizeof (option));
2114         set_fs (oldmm);
2115         if (rc != 0) {
2116                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2117                 return (rc);
2118         }
2119         
2120         option = 1;
2121         set_fs (KERNEL_DS);
2122         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2123                                     (char *)&option, sizeof (option));
2124         set_fs (oldmm);
2125         if (rc != 0) {
2126                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2127                 return (rc);
2128         }
2129         
2130         option = ksocknal_tunables.ksnd_io_timeout / 4;
2131         set_fs (KERNEL_DS);
2132         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2133                                     (char *)&option, sizeof (option));
2134         set_fs (oldmm);
2135         if (rc != 0) {
2136                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2137                 return (rc);
2138         }
2139
2140         option = 1;
2141         set_fs (KERNEL_DS);
2142         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2143                               (char *)&option, sizeof (option));
2144         set_fs (oldmm);
2145         if (rc != 0) {
2146                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2147                 return (rc);
2148         }
2149 #endif
2150         return (0);
2151 }
2152
2153 int
2154 ksocknal_connect_peer (ksock_route_t *route, int type)
2155 {
2156         struct sockaddr_in  peer_addr;
2157         mm_segment_t        oldmm = get_fs();
2158         struct timeval      tv;
2159         int                 fd;
2160         struct socket      *sock;
2161         int                 rc;
2162         char                ipbuf[PTL_NALFMT_SIZE];
2163
2164         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2165         if (rc != 0) {
2166                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2167                 return (rc);
2168         }
2169
2170         /* Ugh; have to map_fd for compatibility with sockets passed in
2171          * from userspace.  And we actually need the sock->file refcounting
2172          * that this gives you :) */
2173
2174         fd = sock_map_fd (sock);
2175         if (fd < 0) {
2176                 sock_release (sock);
2177                 CERROR ("sock_map_fd error %d\n", fd);
2178                 return (fd);
2179         }
2180
2181         /* NB the fd now owns the ref on sock->file */
2182         LASSERT (sock->file != NULL);
2183         LASSERT (file_count(sock->file) == 1);
2184
2185         /* Set the socket timeouts, so our connection attempt completes in
2186          * finite time */
2187         tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2188         tv.tv_usec = 0;
2189
2190         set_fs (KERNEL_DS);
2191         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2192                               (char *)&tv, sizeof (tv));
2193         set_fs (oldmm);
2194         if (rc != 0) {
2195                 CERROR ("Can't set send timeout %d: %d\n", 
2196                         ksocknal_tunables.ksnd_io_timeout, rc);
2197                 goto out;
2198         }
2199         
2200         set_fs (KERNEL_DS);
2201         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2202                               (char *)&tv, sizeof (tv));
2203         set_fs (oldmm);
2204         if (rc != 0) {
2205                 CERROR ("Can't set receive timeout %d: %d\n",
2206                         ksocknal_tunables.ksnd_io_timeout, rc);
2207                 goto out;
2208         }
2209
2210         {
2211                 int  option = 1;
2212                 
2213                 set_fs (KERNEL_DS);
2214                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2215                                             (char *)&option, sizeof (option));
2216                 set_fs (oldmm);
2217                 if (rc != 0) {
2218                         CERROR ("Can't disable nagle: %d\n", rc);
2219                         goto out;
2220                 }
2221         }
2222         
2223         if (route->ksnr_buffer_size != 0) {
2224                 int option = route->ksnr_buffer_size;
2225                 
2226                 set_fs (KERNEL_DS);
2227                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2228                                       (char *)&option, sizeof (option));
2229                 set_fs (oldmm);
2230                 if (rc != 0) {
2231                         CERROR ("Can't set send buffer %d: %d\n",
2232                                 route->ksnr_buffer_size, rc);
2233                         goto out;
2234                 }
2235
2236                 set_fs (KERNEL_DS);
2237                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2238                                       (char *)&option, sizeof (option));
2239                 set_fs (oldmm);
2240                 if (rc != 0) {
2241                         CERROR ("Can't set receive buffer %d: %d\n",
2242                                 route->ksnr_buffer_size, rc);
2243                         goto out;
2244                 }
2245         }
2246         
2247         memset (&peer_addr, 0, sizeof (peer_addr));
2248         peer_addr.sin_family = AF_INET;
2249         peer_addr.sin_port = htons (route->ksnr_port);
2250         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2251         
2252         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2253                                  sizeof (peer_addr), sock->file->f_flags);
2254         if (rc != 0) {
2255                 CERROR ("Error %d connecting to "LPX64" %s\n", rc,
2256                         route->ksnr_peer->ksnp_nid,
2257                         portals_nid2str(SOCKNAL,
2258                                         route->ksnr_peer->ksnp_nid,
2259                                         ipbuf));
2260                 goto out;
2261         }
2262         
2263         rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2264         if (rc == 0) {
2265                 /* Take an extra ref on sock->file to compensate for the
2266                  * upcoming close which will lose fd's ref on it. */
2267                 get_file (sock->file);
2268         }
2269
2270  out:
2271         sys_close (fd);
2272         return (rc);
2273 }
2274
2275 void
2276 ksocknal_autoconnect (ksock_route_t *route)
2277 {
2278         LIST_HEAD        (zombies);
2279         ksock_tx_t       *tx;
2280         ksock_peer_t     *peer;
2281         unsigned long     flags;
2282         int               rc;
2283         int               type;
2284         
2285         for (;;) {
2286                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2287                         if ((route->ksnr_connecting & (1 << type)) != 0)
2288                                 break;
2289                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2290
2291                 rc = ksocknal_connect_peer (route, type);
2292
2293                 if (rc != 0)
2294                         break;
2295                 
2296                 /* successfully autoconnected: create_conn did the
2297                  * route/conn binding and scheduled any blocked packets */
2298
2299                 if (route->ksnr_connecting == 0) {
2300                         /* No more connections required */
2301                         return;
2302                 }
2303         }
2304
2305         /* Connection attempt failed */
2306
2307         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2308
2309         peer = route->ksnr_peer;
2310         route->ksnr_connecting = 0;
2311
2312         /* This is a retry rather than a new connection */
2313         LASSERT (route->ksnr_retry_interval != 0);
2314         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2315         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2316                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2317
2318         if (!list_empty (&peer->ksnp_tx_queue) &&
2319             ksocknal_find_connecting_route_locked (peer) == NULL) {
2320                 LASSERT (list_empty (&peer->ksnp_conns));
2321
2322                 /* None of the connections that the blocked packets are
2323                  * waiting for have been successful.  Complete them now... */
2324                 do {
2325                         tx = list_entry (peer->ksnp_tx_queue.next,
2326                                          ksock_tx_t, tx_list);
2327                         list_del (&tx->tx_list);
2328                         list_add_tail (&tx->tx_list, &zombies);
2329                 } while (!list_empty (&peer->ksnp_tx_queue));
2330         }
2331
2332         /* make this route least-favourite for re-selection */
2333         if (!route->ksnr_deleted) {
2334                 list_del(&route->ksnr_list);
2335                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2336         }
2337         
2338         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2339
2340         while (!list_empty (&zombies)) {
2341                 char ipbuf[PTL_NALFMT_SIZE];
2342                 char ipbuf2[PTL_NALFMT_SIZE];
2343                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2344
2345                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2346                         NTOH__u32 (tx->tx_hdr->type),
2347                         NTOH__u32 (tx->tx_hdr->payload_length),
2348                         NTOH__u64 (tx->tx_hdr->src_nid),
2349                         portals_nid2str(SOCKNAL,
2350                                         NTOH__u64(tx->tx_hdr->src_nid),
2351                                         ipbuf),
2352                         NTOH__u64 (tx->tx_hdr->dest_nid),
2353                         portals_nid2str(SOCKNAL,
2354                                         NTOH__u64(tx->tx_hdr->src_nid),
2355                                         ipbuf2));
2356
2357                 list_del (&tx->tx_list);
2358                 /* complete now */
2359                 ksocknal_tx_done (tx, 0);
2360         }
2361 }
2362
2363 int
2364 ksocknal_autoconnectd (void *arg)
2365 {
2366         long               id = (long)arg;
2367         char               name[16];
2368         unsigned long      flags;
2369         ksock_route_t     *route;
2370         int                rc;
2371
2372         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2373         kportal_daemonize (name);
2374         kportal_blockallsigs ();
2375
2376         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2377
2378         while (!ksocknal_data.ksnd_shuttingdown) {
2379
2380                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2381                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2382                                             ksock_route_t, ksnr_connect_list);
2383                         
2384                         list_del (&route->ksnr_connect_list);
2385                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2386
2387                         ksocknal_autoconnect (route);
2388                         ksocknal_put_route (route);
2389
2390                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2391                         continue;
2392                 }
2393                 
2394                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2395
2396                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2397                                                ksocknal_data.ksnd_shuttingdown ||
2398                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2399
2400                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2401         }
2402
2403         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2404
2405         ksocknal_thread_fini ();
2406         return (0);
2407 }
2408
2409 ksock_conn_t *
2410 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2411 {
2412         /* We're called with a shared lock on ksnd_global_lock */
2413         ksock_conn_t      *conn;
2414         struct list_head  *ctmp;
2415         ksock_sched_t     *sched;
2416
2417         list_for_each (ctmp, &peer->ksnp_conns) {
2418                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2419                 sched = conn->ksnc_scheduler;
2420
2421                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2422                 LASSERT (!conn->ksnc_closing);
2423                 
2424                 if (conn->ksnc_rx_started &&
2425                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2426                         /* Timed out incomplete incoming message */
2427                         atomic_inc (&conn->ksnc_refcount);
2428                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2429                                 peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
2430                         return (conn);
2431                 }
2432                 
2433                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2434                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2435                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2436                         /* Timed out messages queued for sending, or
2437                          * messages buffered in the socket's send buffer */
2438                         atomic_inc (&conn->ksnc_refcount);
2439                         CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", 
2440                                 peer->ksnp_nid, 
2441                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2442                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2443                                 HIPQUAD(conn->ksnc_ipaddr));
2444                         return (conn);
2445                 }
2446         }
2447
2448         return (NULL);
2449 }
2450
2451 void
2452 ksocknal_check_peer_timeouts (int idx)
2453 {
2454         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2455         struct list_head *ptmp;
2456         ksock_peer_t     *peer;
2457         ksock_conn_t     *conn;
2458
2459  again:
2460         /* NB. We expect to have a look at all the peers and not find any
2461          * connections to time out, so we just use a shared lock while we
2462          * take a look... */
2463         read_lock (&ksocknal_data.ksnd_global_lock);
2464
2465         list_for_each (ptmp, peers) {
2466                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2467                 conn = ksocknal_find_timed_out_conn (peer);
2468                 
2469                 if (conn != NULL) {
2470                         read_unlock (&ksocknal_data.ksnd_global_lock);
2471
2472                         CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2473                                 peer->ksnp_nid,
2474                                 HIPQUAD(conn->ksnc_ipaddr),
2475                                 conn->ksnc_port);
2476                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2477                         
2478                         /* NB we won't find this one again, but we can't
2479                          * just proceed with the next peer, since we dropped
2480                          * ksnd_global_lock and it might be dead already! */
2481                         ksocknal_put_conn (conn);
2482                         goto again;
2483                 }
2484         }
2485
2486         read_unlock (&ksocknal_data.ksnd_global_lock);
2487 }
2488
2489 int
2490 ksocknal_reaper (void *arg)
2491 {
2492         wait_queue_t       wait;
2493         unsigned long      flags;
2494         ksock_conn_t      *conn;
2495         ksock_sched_t     *sched;
2496         struct list_head   enomem_conns;
2497         int                nenomem_conns;
2498         int                timeout;
2499         int                i;
2500         int                peer_index = 0;
2501         unsigned long      deadline = jiffies;
2502         
2503         kportal_daemonize ("ksocknal_reaper");
2504         kportal_blockallsigs ();
2505
2506         INIT_LIST_HEAD(&enomem_conns);
2507         init_waitqueue_entry (&wait, current);
2508
2509         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2510
2511         while (!ksocknal_data.ksnd_shuttingdown) {
2512
2513                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2514                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2515                                            ksock_conn_t, ksnc_list);
2516                         list_del (&conn->ksnc_list);
2517                         
2518                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2519
2520                         ksocknal_terminate_conn (conn);
2521                         ksocknal_put_conn (conn);
2522
2523                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2524                         continue;
2525                 }
2526
2527                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2528                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2529                                            ksock_conn_t, ksnc_list);
2530                         list_del (&conn->ksnc_list);
2531                         
2532                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2533
2534                         ksocknal_destroy_conn (conn);
2535
2536                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2537                         continue;
2538                 }
2539
2540                 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2541                         list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2542                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2543                 }
2544
2545                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2546
2547                 /* reschedule all the connections that stalled with ENOMEM... */
2548                 nenomem_conns = 0;
2549                 while (!list_empty (&enomem_conns)) {
2550                         conn = list_entry (enomem_conns.next,
2551                                            ksock_conn_t, ksnc_tx_list);
2552                         list_del (&conn->ksnc_tx_list);
2553
2554                         sched = conn->ksnc_scheduler;
2555
2556                         spin_lock_irqsave (&sched->kss_lock, flags);
2557
2558                         LASSERT (conn->ksnc_tx_scheduled);
2559                         conn->ksnc_tx_ready = 1;
2560                         list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2561                         wake_up (&sched->kss_waitq);
2562
2563                         spin_unlock_irqrestore (&sched->kss_lock, flags);
2564                         nenomem_conns++;
2565                 }
2566                 
2567                 /* careful with the jiffy wrap... */
2568                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2569                         const int n = 4;
2570                         const int p = 1;
2571                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2572                         
2573                         /* Time to check for timeouts on a few more peers: I do
2574                          * checks every 'p' seconds on a proportion of the peer
2575                          * table and I need to check every connection 'n' times
2576                          * within a timeout interval, to ensure I detect a
2577                          * timeout on any connection within (n+1)/n times the
2578                          * timeout interval. */
2579
2580                         if (ksocknal_tunables.ksnd_io_timeout > n * p)
2581                                 chunk = (chunk * n * p) / 
2582                                         ksocknal_tunables.ksnd_io_timeout;
2583                         if (chunk == 0)
2584                                 chunk = 1;
2585
2586                         for (i = 0; i < chunk; i++) {
2587                                 ksocknal_check_peer_timeouts (peer_index);
2588                                 peer_index = (peer_index + 1) % 
2589                                              ksocknal_data.ksnd_peer_hash_size;
2590                         }
2591
2592                         deadline += p * HZ;
2593                 }
2594
2595                 if (nenomem_conns != 0) {
2596                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2597                          * This also prevents me getting woken immediately
2598                          * if any go back on my enomem list. */
2599                         timeout = SOCKNAL_ENOMEM_RETRY;
2600                 }
2601                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2602
2603                 set_current_state (TASK_INTERRUPTIBLE);
2604                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2605
2606                 if (!ksocknal_data.ksnd_shuttingdown &&
2607                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2608                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2609                         schedule_timeout (timeout);
2610
2611                 set_current_state (TASK_RUNNING);
2612                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2613
2614                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2615         }
2616
2617         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2618
2619         ksocknal_thread_fini ();
2620         return (0);
2621 }
2622
2623 lib_nal_t ksocknal_lib = {
2624         libnal_data:       &ksocknal_data,      /* NAL private data */
2625         libnal_send:        ksocknal_send,
2626         libnal_send_pages:  ksocknal_send_pages,
2627         libnal_recv:        ksocknal_recv,
2628         libnal_recv_pages:  ksocknal_recv_pages,
2629         libnal_dist:        ksocknal_dist
2630 };