Whamcloud - gitweb
22345fee01f42a85bc756ee98ab9f585c9fc4004
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 /*
29  *  LIB functions follow
30  *
31  */
32 int
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34               user_ptr src_addr, size_t len)
35 {
36         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37                nal->ni.nid, (long)len, src_addr, dst_addr);
38
39         memcpy( dst_addr, src_addr, len );
40         return 0;
41 }
42
43 int
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45                void *src_addr, size_t len)
46 {
47         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48                nal->ni.nid, (long)len, src_addr, dst_addr);
49
50         memcpy( dst_addr, src_addr, len );
51         return 0;
52 }
53
54 int
55 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
56                          ptl_event_t *ev)
57 {
58         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
59                nal->ni.nid, eq, ev);
60
61         if (eq->event_callback != NULL)
62                 eq->event_callback(ev);
63
64         return 0;
65 }
66
67 void *
68 ksocknal_malloc(nal_cb_t *nal, size_t len)
69 {
70         void *buf;
71
72         PORTAL_ALLOC(buf, len);
73
74         if (buf != NULL)
75                 memset(buf, 0, len);
76
77         return (buf);
78 }
79
80 void
81 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
82 {
83         PORTAL_FREE(buf, len);
84 }
85
86 void
87 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
88 {
89         va_list ap;
90         char msg[256];
91
92         va_start (ap, fmt);
93         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
94         va_end (ap);
95
96         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
97
98         CDEBUG (D_NET, "%s", msg);
99 }
100
101 void
102 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *data = nal->nal_data;
105
106         spin_lock(&data->ksnd_nal_cb_lock);
107 }
108
109 void
110 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data;
113         data = nal->nal_data;
114
115         spin_unlock(&data->ksnd_nal_cb_lock);
116 }
117
118 int
119 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
120 {
121         /* I would guess that if ksocknal_get_peer (nid) == NULL,
122            and we're not routing, then 'nid' is very distant :) */
123         if ( nal->ni.nid == nid ) {
124                 *dist = 0;
125         } else {
126                 *dist = 1;
127         }
128
129         return 0;
130 }
131
132 void
133 ksocknal_free_ltx (ksock_ltx_t *ltx)
134 {
135         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
136         PORTAL_FREE(ltx, ltx->ltx_desc_size);
137 }
138
139 #if SOCKNAL_ZC
140 struct page *
141 ksocknal_kvaddr_to_page (unsigned long vaddr)
142 {
143         struct page *page;
144
145         if (vaddr >= VMALLOC_START &&
146             vaddr < VMALLOC_END)
147                 page = vmalloc_to_page ((void *)vaddr);
148 #if CONFIG_HIGHMEM
149         else if (vaddr >= PKMAP_BASE &&
150                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
151                 page = vmalloc_to_page ((void *)vaddr);
152                 /* in 2.4 ^ just walks the page tables */
153 #endif
154         else
155                 page = virt_to_page (vaddr);
156
157         if (page == NULL ||
158             !VALID_PAGE (page))
159                 return (NULL);
160
161         return (page);
162 }
163 #endif
164
165 int
166 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
167 {
168         struct socket *sock = conn->ksnc_sock;
169         struct iovec  *iov = tx->tx_iov;
170         int            fragsize = iov->iov_len;
171         unsigned long  vaddr = (unsigned long)iov->iov_base;
172         int            more = (tx->tx_niov > 1) || 
173                               (tx->tx_nkiov > 0) ||
174                               (!list_empty (&conn->ksnc_tx_queue));
175 #if SOCKNAL_ZC
176         int            offset = vaddr & (PAGE_SIZE - 1);
177         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
178         struct page   *page;
179 #endif
180         int            rc;
181
182         /* NB we can't trust socket ops to either consume our iovs
183          * or leave them alone, so we only send 1 frag at a time. */
184         LASSERT (fragsize <= tx->tx_resid);
185         LASSERT (tx->tx_niov > 0);
186         
187 #if SOCKNAL_ZC
188         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
189             (sock->sk->route_caps & NETIF_F_SG) &&
190             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
191             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
192                 
193                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
194                        (void *)vaddr, page, page_address(page), offset, zcsize);
195
196                 if (fragsize > zcsize) {
197                         more = 1;
198                         fragsize = zcsize;
199                 }
200
201                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
202                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
203                                        &tx->tx_zccd);
204         } else
205 #endif
206         {
207                 /* NB don't pass tx's iov; sendmsg may or may not update it */
208                 struct iovec fragiov = { .iov_base = (void *)vaddr,
209                                          .iov_len  = fragsize};
210                 struct msghdr msg = {
211                         .msg_name       = NULL,
212                         .msg_namelen    = 0,
213                         .msg_iov        = &fragiov,
214                         .msg_iovlen     = 1,
215                         .msg_control    = NULL,
216                         .msg_controllen = 0,
217                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
218                 };
219                 mm_segment_t oldmm = get_fs();
220
221                 set_fs (KERNEL_DS);
222                 rc = sock_sendmsg(sock, &msg, fragsize);
223                 set_fs (oldmm);
224         } 
225
226         if (rc <= 0)
227                 return (rc);
228
229         tx->tx_resid -= rc;
230
231         if (rc < iov->iov_len) {
232                 /* didn't send whole iov entry... */
233                 iov->iov_base = (void *)(vaddr + rc);
234                 iov->iov_len -= rc;
235                 /* ...but did we send everything we tried to send? */
236                 return ((rc == fragsize) ? 1 : -EAGAIN);
237         }
238
239         tx->tx_iov++;
240         tx->tx_niov--;
241         return (1);
242 }
243
244 int
245 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
246 {
247         struct socket *sock = conn->ksnc_sock;
248         ptl_kiov_t    *kiov = tx->tx_kiov;
249         int            fragsize = kiov->kiov_len;
250         struct page   *page = kiov->kiov_page;
251         int            offset = kiov->kiov_offset;
252         int            more = (tx->tx_nkiov > 1) ||
253                               (!list_empty (&conn->ksnc_tx_queue));
254         int            rc;
255
256         /* NB we can't trust socket ops to either consume our iovs
257          * or leave them alone, so we only send 1 frag at a time. */
258         LASSERT (fragsize <= tx->tx_resid);
259         LASSERT (offset + fragsize <= PAGE_SIZE);
260         LASSERT (tx->tx_niov == 0);
261         LASSERT (tx->tx_nkiov > 0);
262
263 #if SOCKNAL_ZC
264         if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
265             (sock->sk->route_caps & NETIF_F_SG) &&
266             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
267
268                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
269                                page, offset, fragsize);
270
271                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
272                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
273                                        &tx->tx_zccd);
274         } else
275 #endif
276         {
277                 char *addr = ((char *)kmap (page)) + offset;
278                 struct iovec fragiov = {.iov_base = addr,
279                                         .iov_len  = fragsize};
280                 struct msghdr msg = {
281                         .msg_name       = NULL,
282                         .msg_namelen    = 0,
283                         .msg_iov        = &fragiov,
284                         .msg_iovlen     = 1,
285                         .msg_control    = NULL,
286                         .msg_controllen = 0,
287                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
288                 };
289                 mm_segment_t  oldmm = get_fs();
290                 
291                 set_fs (KERNEL_DS);
292                 rc = sock_sendmsg(sock, &msg, fragsize);
293                 set_fs (oldmm);
294
295                 kunmap (page);
296         }
297
298         if (rc <= 0)
299                 return (rc);
300
301         tx->tx_resid -= rc;
302  
303         if (rc < fragsize) {
304                 /* didn't send whole frag */
305                 kiov->kiov_offset = offset + rc;
306                 kiov->kiov_len    = fragsize - rc;
307                 return (-EAGAIN);
308         }
309
310         /* everything went */
311         LASSERT (rc == fragsize);
312         tx->tx_kiov++;
313         tx->tx_nkiov--;
314         return (1);
315 }
316
317 int
318 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
319 {
320         /* Return 0 on success, < 0 on error.
321          * caller checks tx_resid to determine progress/completion */
322         int      rc;
323         ENTRY;
324         
325         if (ksocknal_data.ksnd_stall_tx != 0) {
326                 set_current_state (TASK_UNINTERRUPTIBLE);
327                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
328         }
329
330         rc = ksocknal_getconnsock (conn);
331         if (rc != 0) {
332                 LASSERT (conn->ksnc_closing);
333                 return (rc);
334         }
335
336         for (;;) {
337                 LASSERT (tx->tx_resid != 0);
338
339                 if (tx->tx_niov != 0)
340                         rc = ksocknal_send_iov (conn, tx);
341                 else
342                         rc = ksocknal_send_kiov (conn, tx);
343
344                 if (rc <= 0) {                  /* error or socket full? */
345                         /* NB: rc == 0 and rc == -EAGAIN both mean try
346                          * again later (linux stack returns -EAGAIN for
347                          * this, but Adaptech TOE returns 0) */
348                         if (rc == -EAGAIN)
349                                 rc = 0;
350                         break;
351                 }
352
353                 /* Consider the connection alive since we managed to chuck
354                  * more data into it.  Really, we'd like to consider it
355                  * alive only when the peer ACKs something, but
356                  * write_space() only gets called back while SOCK_NOSPACE
357                  * is set.  Instead, we presume peer death has occurred if
358                  * the socket doesn't drain within a timout */
359                 conn->ksnc_tx_deadline = jiffies + 
360                                          ksocknal_data.ksnd_io_timeout * HZ;
361                 conn->ksnc_peer->ksnp_last_alive = jiffies;
362
363                 if (tx->tx_resid == 0) {        /* sent everything */
364                         rc = 0;
365                         break;
366                 }
367         }
368
369         ksocknal_putconnsock (conn);
370         RETURN (rc);
371 }
372
373 void
374 ksocknal_eager_ack (ksock_conn_t *conn)
375 {
376         int            opt = 1;
377         mm_segment_t   oldmm = get_fs();
378         struct socket *sock = conn->ksnc_sock;
379         
380         /* Remind the socket to ACK eagerly.  If I don't, the socket might
381          * think I'm about to send something it could piggy-back the ACK
382          * on, introducing delay in completing zero-copy sends in my
383          * peer. */
384
385         set_fs(KERNEL_DS);
386         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
387                                (char *)&opt, sizeof (opt));
388         set_fs(oldmm);
389 }
390
391 int
392 ksocknal_recv_iov (ksock_conn_t *conn)
393 {
394         struct iovec *iov = conn->ksnc_rx_iov;
395         int           fragsize  = iov->iov_len;
396         unsigned long vaddr = (unsigned long)iov->iov_base;
397         struct iovec  fragiov = { .iov_base = (void *)vaddr,
398                                   .iov_len  = fragsize};
399         struct msghdr msg = {
400                 .msg_name       = NULL,
401                 .msg_namelen    = 0,
402                 .msg_iov        = &fragiov,
403                 .msg_iovlen     = 1,
404                 .msg_control    = NULL,
405                 .msg_controllen = 0,
406                 .msg_flags      = 0
407         };
408         mm_segment_t oldmm = get_fs();
409         int          rc;
410
411         /* NB we can't trust socket ops to either consume our iovs
412          * or leave them alone, so we only receive 1 frag at a time. */
413         LASSERT (conn->ksnc_rx_niov > 0);
414         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
415
416         set_fs (KERNEL_DS);
417         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
418         /* NB this is just a boolean............................^ */
419         set_fs (oldmm);
420
421         if (rc <= 0)
422                 return (rc);
423
424         /* received something... */
425         conn->ksnc_peer->ksnp_last_alive = jiffies;
426         conn->ksnc_rx_deadline = jiffies + 
427                                  ksocknal_data.ksnd_io_timeout * HZ;
428         mb();                           /* order with setting rx_started */
429         conn->ksnc_rx_started = 1;
430
431         conn->ksnc_rx_nob_wanted -= rc;
432         conn->ksnc_rx_nob_left -= rc;
433                 
434         if (rc < fragsize) {
435                 iov->iov_base = (void *)(vaddr + rc);
436                 iov->iov_len = fragsize - rc;
437                 return (-EAGAIN);
438         }
439
440         conn->ksnc_rx_iov++;
441         conn->ksnc_rx_niov--;
442         return (1);
443 }
444
445 int
446 ksocknal_recv_kiov (ksock_conn_t *conn)
447 {
448         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
449         struct page  *page = kiov->kiov_page;
450         int           offset = kiov->kiov_offset;
451         int           fragsize = kiov->kiov_len;
452         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
453         struct iovec  fragiov = { .iov_base = (void *)vaddr,
454                                   .iov_len  = fragsize};
455         struct msghdr msg = {
456                 .msg_name       = NULL,
457                 .msg_namelen    = 0,
458                 .msg_iov        = &fragiov,
459                 .msg_iovlen     = 1,
460                 .msg_control    = NULL,
461                 .msg_controllen = 0,
462                 .msg_flags      = 0
463         };
464         mm_segment_t oldmm = get_fs();
465         int          rc;
466
467         /* NB we can't trust socket ops to either consume our iovs
468          * or leave them alone, so we only receive 1 frag at a time. */
469         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
470         LASSERT (conn->ksnc_rx_nkiov > 0);
471         LASSERT (offset + fragsize <= PAGE_SIZE);
472
473         set_fs (KERNEL_DS);
474         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
475         /* NB this is just a boolean............................^ */
476         set_fs (oldmm);
477
478         kunmap (page);
479         
480         if (rc <= 0)
481                 return (rc);
482         
483         /* received something... */
484         conn->ksnc_peer->ksnp_last_alive = jiffies;
485         conn->ksnc_rx_deadline = jiffies + 
486                                  ksocknal_data.ksnd_io_timeout * HZ;
487         mb();                           /* order with setting rx_started */
488         conn->ksnc_rx_started = 1;
489
490         conn->ksnc_rx_nob_wanted -= rc;
491         conn->ksnc_rx_nob_left -= rc;
492                 
493         if (rc < fragsize) {
494                 kiov->kiov_offset = offset + rc;
495                 kiov->kiov_len = fragsize - rc;
496                 return (-EAGAIN);
497         }
498
499         conn->ksnc_rx_kiov++;
500         conn->ksnc_rx_nkiov--;
501         return (1);
502 }
503
504 int
505 ksocknal_receive (ksock_conn_t *conn) 
506 {
507         /* Return 1 on success, 0 on EOF, < 0 on error.
508          * Caller checks ksnc_rx_nob_wanted to determine
509          * progress/completion. */
510         int     rc;
511         ENTRY;
512         
513         if (ksocknal_data.ksnd_stall_rx != 0) {
514                 set_current_state (TASK_UNINTERRUPTIBLE);
515                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
516         }
517
518         rc = ksocknal_getconnsock (conn);
519         if (rc != 0) {
520                 LASSERT (conn->ksnc_closing);
521                 return (rc);
522         }
523
524         for (;;) {
525                 if (conn->ksnc_rx_niov != 0)
526                         rc = ksocknal_recv_iov (conn);
527                 else
528                         rc = ksocknal_recv_kiov (conn);
529
530                 if (rc <= 0) {
531                         /* error/EOF or partial receive */
532                         if (rc == -EAGAIN) {
533                                 rc = 1;
534                         } else if (rc == 0 && conn->ksnc_rx_started) {
535                                 /* EOF in the middle of a message */
536                                 rc = -EPROTO;
537                         }
538                         break;
539                 }
540
541                 /* Completed a fragment */
542
543                 if (conn->ksnc_rx_nob_wanted == 0) {
544                         /* Completed a message segment (header or payload) */
545                         if ((ksocknal_data.ksnd_eager_ack & conn->ksnc_type) != 0 &&
546                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
547                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
548                                 /* Remind the socket to ack eagerly... */
549                                 ksocknal_eager_ack(conn);
550                         }
551                         rc = 1;
552                         break;
553                 }
554         }
555
556         ksocknal_putconnsock (conn);
557         RETURN (rc);
558 }
559
560 #if SOCKNAL_ZC
561 void
562 ksocknal_zc_callback (zccd_t *zcd)
563 {
564         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
565         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
566         unsigned long  flags;
567         ENTRY;
568
569         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
570
571         spin_lock_irqsave (&sched->kss_lock, flags);
572
573         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
574         wake_up (&sched->kss_waitq);
575
576         spin_unlock_irqrestore (&sched->kss_lock, flags);
577         EXIT;
578 }
579 #endif
580
581 void
582 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
583 {
584         ksock_ltx_t   *ltx;
585         ENTRY;
586
587         if (tx->tx_conn != NULL) {
588                 /* This tx got queued on a conn; do the accounting... */
589                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
590 #if SOCKNAL_ZC
591                 /* zero copy completion isn't always from
592                  * process_transmit() so it needs to keep a ref on
593                  * tx_conn... */
594                 if (asynch)
595                         ksocknal_put_conn (tx->tx_conn);
596 #else
597                 LASSERT (!asynch);
598 #endif
599         }
600
601         if (tx->tx_isfwd) {             /* was a forwarded packet? */
602                 kpr_fwd_done (&ksocknal_data.ksnd_router,
603                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
604                 EXIT;
605                 return;
606         }
607
608         /* local send */
609         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
610
611         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
612
613         ksocknal_free_ltx (ltx);
614         EXIT;
615 }
616
617 void
618 ksocknal_tx_launched (ksock_tx_t *tx) 
619 {
620 #if SOCKNAL_ZC
621         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
622                 ksock_conn_t  *conn = tx->tx_conn;
623                 
624                 /* zccd skbufs are still in-flight.  First take a ref on
625                  * conn, so it hangs about for ksocknal_tx_done... */
626                 atomic_inc (&conn->ksnc_refcount);
627
628                 /* ...then drop the initial ref on zccd, so the zero copy
629                  * callback can occur */
630                 zccd_put (&tx->tx_zccd);
631                 return;
632         }
633 #endif
634         /* Any zero-copy-ness (if any) has completed; I can complete the
635          * transmit now, avoiding an extra schedule */
636         ksocknal_tx_done (tx, 0);
637 }
638
639 int
640 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
641 {
642         int            rc;
643        
644         rc = ksocknal_transmit (conn, tx);
645
646         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
647         LASSERT (rc != -EAGAIN);
648
649         if (rc == 0) {
650                 /* no errors */
651                 if (tx->tx_resid != 0) {
652                         /* didn't send everything */
653                         return (-EAGAIN);
654                 }
655                 
656                 ksocknal_tx_launched (tx);
657                 return (0);
658         }
659
660         if (!conn->ksnc_closing)
661                 CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
662                         conn, rc, conn->ksnc_peer->ksnp_nid,
663                         conn->ksnc_ipaddr, conn->ksnc_port);
664
665         ksocknal_close_conn_and_siblings (conn, rc);
666         ksocknal_tx_launched (tx);
667
668         return (rc);
669
670
671 void
672 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
673 {
674         unsigned long     flags;
675
676         /* called holding write lock on ksnd_global_lock */
677
678         LASSERT (!route->ksnr_deleted);
679         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
680         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
681         LASSERT (!route->ksnr_connecting);
682         
683         if (ksocknal_data.ksnd_typed_conns)
684                 route->ksnr_connecting = 
685                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
686         else
687                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
688
689         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
690         
691         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
692         
693         list_add_tail (&route->ksnr_connect_list,
694                        &ksocknal_data.ksnd_autoconnectd_routes);
695         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
696         
697         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
698 }
699
700 ksock_peer_t *
701 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
702 {
703         ptl_nid_t     target_nid;
704         int           rc;
705         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
706         
707         if (peer != NULL)
708                 return (peer);
709         
710         if (tx->tx_isfwd) {
711                 CERROR ("Can't send packet to "LPX64
712                         ": routed target is not a peer\n", nid);
713                 return (NULL);
714         }
715         
716         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
717                          &target_nid);
718         if (rc != 0) {
719                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
720                 return (NULL);
721         }
722
723         peer = ksocknal_find_peer_locked (target_nid);
724         if (peer != NULL)
725                 return (peer);
726
727         CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
728         return (NULL);
729 }
730
731 ksock_conn_t *
732 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
733 {
734         struct list_head *tmp;
735         ksock_conn_t     *typed = NULL;
736         int               tnob  = 0;
737         ksock_conn_t     *fallback = NULL;
738         int               fnob     = 0;
739         
740         /* Find the conn with the shortest tx queue */
741         list_for_each (tmp, &peer->ksnp_conns) {
742                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
743                 int           nob = atomic_read(&c->ksnc_tx_nob);
744
745                 LASSERT (!c->ksnc_closing);
746
747                 if (fallback == NULL || nob < fnob) {
748                         fallback = c;
749                         fnob     = nob;
750                 }
751
752                 if (!ksocknal_data.ksnd_typed_conns)
753                         continue;
754
755                 switch (c->ksnc_type) {
756                 default:
757                         LBUG();
758                 case SOCKNAL_CONN_ANY:
759                         break;
760                 case SOCKNAL_CONN_BULK_IN:
761                         continue;
762                 case SOCKNAL_CONN_BULK_OUT:
763                         if (tx->tx_nob < ksocknal_data.ksnd_min_bulk)
764                                 continue;
765                         break;
766                 case SOCKNAL_CONN_CONTROL:
767                         if (tx->tx_nob >= ksocknal_data.ksnd_min_bulk)
768                                 continue;
769                         break;
770                 }
771
772                 if (typed == NULL || nob < tnob) {
773                         typed = c;
774                         tnob  = nob;
775                 }
776         }
777
778         /* prefer the typed selection */
779         return ((typed != NULL) ? typed : fallback);
780 }
781
782 void
783 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
784 {
785         unsigned long  flags;
786         ksock_sched_t *sched = conn->ksnc_scheduler;
787
788         /* called holding global lock (read or irq-write) and caller may
789          * not have dropped this lock between finding conn and calling me,
790          * so we don't need the {get,put}connsock dance to deref
791          * ksnc_sock... */
792         LASSERT(!conn->ksnc_closing);
793         LASSERT(tx->tx_resid == tx->tx_nob);
794         
795         CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
796                 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
797
798         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
799         tx->tx_conn = conn;
800
801 #if SOCKNAL_ZC
802         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
803         /* NB this sets 1 ref on zccd, so the callback can only occur after
804          * I've released this ref. */
805 #endif
806         spin_lock_irqsave (&sched->kss_lock, flags);
807
808         conn->ksnc_tx_deadline = jiffies + 
809                                  ksocknal_data.ksnd_io_timeout * HZ;
810         mb();                                   /* order with list_add_tail */
811
812         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
813                 
814         if (conn->ksnc_tx_ready &&      /* able to send */
815             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
816                 /* +1 ref for scheduler */
817                 atomic_inc (&conn->ksnc_refcount);
818                 list_add_tail (&conn->ksnc_tx_list, 
819                                &sched->kss_tx_conns);
820                 conn->ksnc_tx_scheduled = 1;
821                 wake_up (&sched->kss_waitq);
822         }
823
824         spin_unlock_irqrestore (&sched->kss_lock, flags);
825 }
826
827 ksock_route_t *
828 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
829 {
830         struct list_head  *tmp;
831         ksock_route_t     *route;
832         ksock_route_t     *candidate = NULL;
833         int                found = 0;
834         int                bits;
835         
836         list_for_each (tmp, &peer->ksnp_routes) {
837                 route = list_entry (tmp, ksock_route_t, ksnr_list);
838                 bits  = route->ksnr_connected;
839                 
840                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
841                     (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
842                     route->ksnr_connecting != 0) {
843                         /* All typed connections have been established, or
844                          * an untyped connection has been established, or
845                          * connections are currently being established */
846                         found = 1;
847                         continue;
848                 }
849
850                 /* too soon to retry this guy? */
851                 if (!time_after_eq (jiffies, route->ksnr_timeout))
852                         continue;
853                 
854                 /* always do eager routes */
855                 if (route->ksnr_eager)
856                         return (route);
857
858                 if (candidate == NULL) {
859                         /* If we don't find any other route that is fully
860                          * connected or connecting, the first connectable
861                          * route is returned.  If it fails to connect, it
862                          * will get placed at the end of the list */
863                         candidate = route;
864                 }
865         }
866  
867         return (found ? NULL : candidate);
868 }
869
870 ksock_route_t *
871 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
872 {
873         struct list_head  *tmp;
874         ksock_route_t     *route;
875
876         list_for_each (tmp, &peer->ksnp_routes) {
877                 route = list_entry (tmp, ksock_route_t, ksnr_list);
878                 
879                 if (route->ksnr_connecting != 0)
880                         return (route);
881         }
882         
883         return (NULL);
884 }
885
886 int
887 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
888 {
889         unsigned long     flags;
890         ksock_peer_t     *peer;
891         ksock_conn_t     *conn;
892         ksock_route_t    *route;
893         rwlock_t         *g_lock;
894         
895         /* Ensure the frags we've been given EXACTLY match the number of
896          * bytes we want to send.  Many TCP/IP stacks disregard any total
897          * size parameters passed to them and just look at the frags. 
898          *
899          * We always expect at least 1 mapped fragment containing the
900          * complete portals header. */
901         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
902                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
903         LASSERT (tx->tx_niov >= 1);
904         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
905
906         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
907                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
908                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
909
910         tx->tx_conn = NULL;                     /* only set when assigned a conn */
911         tx->tx_resid = tx->tx_nob;
912         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
913
914         g_lock = &ksocknal_data.ksnd_global_lock;
915         read_lock (g_lock);
916         
917         peer = ksocknal_find_target_peer_locked (tx, nid);
918         if (peer == NULL) {
919                 read_unlock (g_lock);
920                 return (-EHOSTUNREACH);
921         }
922
923         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
924                 conn = ksocknal_find_conn_locked (tx, peer);
925                 if (conn != NULL) {
926                         /* I've got no autoconnect routes that need to be
927                          * connecting and I do have an actual connection... */
928                         ksocknal_queue_tx_locked (tx, conn);
929                         read_unlock (g_lock);
930                         return (0);
931                 }
932         }
933         
934         /* Making one or more connections; I'll need a write lock... */
935
936         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
937         read_unlock (g_lock);
938         write_lock_irqsave (g_lock, flags);
939         
940         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
941                 write_unlock_irqrestore (g_lock, flags);
942                 ksocknal_put_peer (peer);
943                 return (-EHOSTUNREACH);
944         }
945         ksocknal_put_peer (peer);               /* drop ref I got above */
946
947         for (;;) {
948                 /* launch any/all autoconnections that need it */
949                 route = ksocknal_find_connectable_route_locked (peer);
950                 if (route == NULL)
951                         break;
952
953                 ksocknal_launch_autoconnect_locked (route);
954         }
955
956         conn = ksocknal_find_conn_locked (tx, peer);
957         if (conn != NULL) {
958                 /* Connection exists; queue message on it */
959                 ksocknal_queue_tx_locked (tx, conn);
960                 write_unlock_irqrestore (g_lock, flags);
961                 return (0);
962         }
963
964         route = ksocknal_find_connecting_route_locked (peer);
965         if (route != NULL) {
966                 /* At least 1 connection is being established; queue the
967                  * message... */
968                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
969                 write_unlock_irqrestore (g_lock, flags);
970                 return (0);
971         }
972         
973         write_unlock_irqrestore (g_lock, flags);
974         return (-EHOSTUNREACH);
975 }
976
977 int
978 ksocknal_sendmsg(nal_cb_t     *nal, 
979                  void         *private, 
980                  lib_msg_t    *cookie,
981                  ptl_hdr_t    *hdr, 
982                  int           type, 
983                  ptl_nid_t     nid, 
984                  ptl_pid_t     pid,
985                  unsigned int  payload_niov, 
986                  struct iovec *payload_iov, 
987                  ptl_kiov_t   *payload_kiov,
988                  size_t        payload_nob)
989 {
990         ksock_ltx_t  *ltx;
991         int           desc_size;
992         int           rc;
993
994         /* NB 'private' is different depending on what we're sending.
995          * Just ignore it... */
996
997         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
998                " pid %d\n", payload_nob, payload_niov, nid , pid);
999
1000         LASSERT (payload_nob == 0 || payload_niov > 0);
1001         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1002
1003         /* It must be OK to kmap() if required */
1004         LASSERT (payload_kiov == NULL || !in_interrupt ());
1005         /* payload is either all vaddrs or all pages */
1006         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1007         
1008         if (payload_iov != NULL)
1009                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1010         else
1011                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1012         
1013         if (in_interrupt() ||
1014             type == PTL_MSG_ACK ||
1015             type == PTL_MSG_REPLY) {
1016                 /* Can't block if in interrupt or responding to an incoming
1017                  * message */
1018                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1019         } else {
1020                 PORTAL_ALLOC(ltx, desc_size);
1021         }
1022         
1023         if (ltx == NULL) {
1024                 CERROR("Can't allocate tx desc type %d size %d %s\n",
1025                        type, desc_size, in_interrupt() ? "(intr)" : "");
1026                 return (PTL_NOSPACE);
1027         }
1028
1029         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1030
1031         ltx->ltx_desc_size = desc_size;
1032         
1033         /* We always have 1 mapped frag for the header */
1034         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1035         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1036         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1037         ltx->ltx_hdr = *hdr;
1038         
1039         ltx->ltx_private = private;
1040         ltx->ltx_cookie = cookie;
1041         
1042         ltx->ltx_tx.tx_isfwd = 0;
1043         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1044
1045         if (payload_iov != NULL) {
1046                 /* payload is all mapped */
1047                 ltx->ltx_tx.tx_kiov  = NULL;
1048                 ltx->ltx_tx.tx_nkiov = 0;
1049
1050                 ltx->ltx_tx.tx_niov = 1 + payload_niov;
1051
1052                 memcpy(ltx->ltx_iov + 1, payload_iov,
1053                        payload_niov * sizeof (*payload_iov));
1054
1055         } else {
1056                 /* payload is all pages */
1057                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1058                 ltx->ltx_tx.tx_nkiov = payload_niov;
1059
1060                 ltx->ltx_tx.tx_niov = 1;
1061
1062                 memcpy(ltx->ltx_kiov, payload_kiov, 
1063                        payload_niov * sizeof (*payload_kiov));
1064         }
1065
1066         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1067         if (rc == 0)
1068                 return (PTL_OK);
1069         
1070         ksocknal_free_ltx(ltx);
1071         return (PTL_FAIL);
1072 }
1073
1074 int
1075 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1076                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1077                unsigned int payload_niov, struct iovec *payload_iov,
1078                size_t payload_len)
1079 {
1080         return (ksocknal_sendmsg(nal, private, cookie,
1081                                  hdr, type, nid, pid,
1082                                  payload_niov, payload_iov, NULL,
1083                                  payload_len));
1084 }
1085
1086 int
1087 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1088                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1089                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1090                      size_t payload_len)
1091 {
1092         return (ksocknal_sendmsg(nal, private, cookie,
1093                                  hdr, type, nid, pid,
1094                                  payload_niov, NULL, payload_kiov,
1095                                  payload_len));
1096 }
1097
1098 void
1099 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1100 {
1101         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1102         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
1103         int           rc;
1104         
1105         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1106                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1107
1108         /* I'm the gateway; must be the last hop */
1109         if (nid == ksocknal_lib.ni.nid)
1110                 nid = fwd->kprfd_target_nid;
1111
1112         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
1113         tx->tx_nob   = fwd->kprfd_nob;
1114         tx->tx_niov  = fwd->kprfd_niov;
1115         tx->tx_iov   = fwd->kprfd_iov;
1116         tx->tx_nkiov = 0;
1117         tx->tx_kiov  = NULL;
1118
1119         rc = ksocknal_launch_packet (tx, nid);
1120         if (rc != 0)
1121                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1122 }
1123
1124 int
1125 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1126 {
1127         long    pid = kernel_thread (fn, arg, 0);
1128
1129         if (pid < 0)
1130                 return ((int)pid);
1131
1132         atomic_inc (&ksocknal_data.ksnd_nthreads);
1133         return (0);
1134 }
1135
1136 void
1137 ksocknal_thread_fini (void)
1138 {
1139         atomic_dec (&ksocknal_data.ksnd_nthreads);
1140 }
1141
1142 void
1143 ksocknal_fmb_callback (void *arg, int error)
1144 {
1145         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1146         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1147         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1148         ksock_conn_t      *conn = NULL;
1149         ksock_sched_t     *sched;
1150         unsigned long      flags;
1151
1152         if (error != 0)
1153                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1154                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1155                        error);
1156         else
1157                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1158                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1159
1160         /* drop peer ref taken on init */
1161         ksocknal_put_peer (fmb->fmb_peer);
1162         
1163         spin_lock_irqsave (&fmp->fmp_lock, flags);
1164
1165         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1166         fmp->fmp_nactive_fmbs--;
1167
1168         if (!list_empty (&fmp->fmp_blocked_conns)) {
1169                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1170                                    ksock_conn_t, ksnc_rx_list);
1171                 list_del (&conn->ksnc_rx_list);
1172         }
1173
1174         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1175
1176         if (conn == NULL)
1177                 return;
1178
1179         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1180         LASSERT (conn->ksnc_rx_scheduled);
1181         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1182
1183         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1184
1185         sched = conn->ksnc_scheduler;
1186
1187         spin_lock_irqsave (&sched->kss_lock, flags);
1188
1189         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1190         wake_up (&sched->kss_waitq);
1191
1192         spin_unlock_irqrestore (&sched->kss_lock, flags);
1193 }
1194
1195 ksock_fmb_t *
1196 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1197 {
1198         int               payload_nob = conn->ksnc_rx_nob_left;
1199         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1200         unsigned long     flags;
1201         ksock_fmb_pool_t *pool;
1202         ksock_fmb_t      *fmb;
1203
1204         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1205         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1206
1207         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1208                 pool = &ksocknal_data.ksnd_small_fmp;
1209         else
1210                 pool = &ksocknal_data.ksnd_large_fmp;
1211
1212         spin_lock_irqsave (&pool->fmp_lock, flags);
1213
1214         if (!list_empty (&pool->fmp_idle_fmbs)) {
1215                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1216                                  ksock_fmb_t, fmb_list);
1217                 list_del (&fmb->fmb_list);
1218                 pool->fmp_nactive_fmbs++;
1219                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1220
1221                 return (fmb);
1222         }
1223
1224         /* deschedule until fmb free */
1225
1226         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1227
1228         list_add_tail (&conn->ksnc_rx_list,
1229                        &pool->fmp_blocked_conns);
1230
1231         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1232         return (NULL);
1233 }
1234
1235 int
1236 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1237 {
1238         int payload_nob = conn->ksnc_rx_nob_left;
1239         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1240         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1241         int niov;                               /* at least the header */
1242         int nob;
1243
1244         LASSERT (conn->ksnc_rx_scheduled);
1245         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1246         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1247         LASSERT (payload_nob >= 0);
1248         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1249         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1250
1251         /* Got a forwarding buffer; copy the header we just read into the
1252          * forwarding buffer.  If there's payload, start reading reading it
1253          * into the buffer, otherwise the forwarding buffer can be kicked
1254          * off immediately.
1255          *
1256          * NB fmb->fmb_iov spans the WHOLE packet.
1257          *    conn->ksnc_rx_iov spans just the payload.
1258          */
1259         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1260
1261         /* copy header */
1262         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1263
1264         /* Take a ref on the conn's peer to prevent module unload before
1265          * forwarding completes.  NB we ref peer and not conn since because
1266          * all refs on conn after it has been closed must remove themselves
1267          * in finite time */
1268         fmb->fmb_peer = conn->ksnc_peer;
1269         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1270
1271         if (payload_nob == 0) {         /* got complete packet already */
1272                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1273                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1274                         dest_nid, packet_nob);
1275
1276                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1277
1278                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1279                               packet_nob, 1, fmb->fmb_iov,
1280                               ksocknal_fmb_callback, fmb);
1281
1282                 /* forward it now */
1283                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1284
1285                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1286                 return (1);
1287         }
1288
1289         niov = 1;
1290         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1291                 fmb->fmb_iov[0].iov_len = packet_nob;
1292         } else {
1293                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1294                 nob = packet_nob - PAGE_SIZE;
1295
1296                 do {
1297                         LASSERT (niov < fmb->fmb_npages);
1298                         fmb->fmb_iov[niov].iov_base =
1299                                 page_address (fmb->fmb_pages[niov]);
1300                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1301                         nob -= PAGE_SIZE;
1302                         niov++;
1303                 } while (nob > 0);
1304         }
1305
1306         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1307                       packet_nob, niov, fmb->fmb_iov,
1308                       ksocknal_fmb_callback, fmb);
1309
1310         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1311         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1312         
1313         /* payload is desc's iov-ed buffer, but skipping the hdr */
1314         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1315                  sizeof (struct iovec));
1316
1317         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1318         conn->ksnc_rx_iov[0].iov_base =
1319                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1320                          sizeof (ptl_hdr_t));
1321         conn->ksnc_rx_iov[0].iov_len =
1322                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1323
1324         if (niov > 1)
1325                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1326                        (niov - 1) * sizeof (struct iovec));
1327
1328         conn->ksnc_rx_niov = niov;
1329
1330         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1331                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1332         return (0);
1333 }
1334
1335 void
1336 ksocknal_fwd_parse (ksock_conn_t *conn)
1337 {
1338         ksock_peer_t *peer;
1339         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1340         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1341         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1342         char str[PTL_NALFMT_SIZE];
1343
1344         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1345                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1346
1347         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1348         LASSERT (conn->ksnc_rx_scheduled);
1349
1350         if (body_len < 0) {                 /* length corrupt (overflow) */
1351                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1352                        "packet size %d illegal\n",
1353                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1354                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1355                        body_len);
1356
1357                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1358                 return;
1359         }
1360
1361         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1362                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1363                        " (%s): not forwarding\n",
1364                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1365                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1366                 /* on to new packet (skip this one's body) */
1367                 ksocknal_new_packet (conn, body_len);
1368                 return;
1369         }
1370
1371         if (body_len > PTL_MTU) {      /* too big to forward */
1372                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1373                         "(%s): packet size %d too big\n",
1374                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1375                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1376                         body_len);
1377                 /* on to new packet (skip this one's body) */
1378                 ksocknal_new_packet (conn, body_len);
1379                 return;
1380         }
1381
1382         /* should have gone direct */
1383         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1384         if (peer != NULL) {
1385                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1386                         "(%s): target is a peer\n",
1387                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1388                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1389                 ksocknal_put_peer (peer);  /* drop ref from get above */
1390
1391                 /* on to next packet (skip this one's body) */
1392                 ksocknal_new_packet (conn, body_len);
1393                 return;
1394         }
1395
1396         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1397         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1398         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1399 }
1400
1401 int
1402 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1403 {
1404         static char ksocknal_slop_buffer[4096];
1405
1406         int   nob;
1407         int   niov;
1408         int   skipped;
1409
1410         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1411                 conn->ksnc_rx_started = 0;
1412                 mb ();                          /* racing with timeout thread */
1413                 
1414                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1415                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1416                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1417
1418                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1419                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1420                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1421                 conn->ksnc_rx_niov = 1;
1422
1423                 conn->ksnc_rx_kiov = NULL;
1424                 conn->ksnc_rx_nkiov = 0;
1425                 return (1);
1426         }
1427
1428         /* Set up to skip as much a possible now.  If there's more left
1429          * (ran out of iov entries) we'll get called again */
1430
1431         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1432         conn->ksnc_rx_nob_left = nob_to_skip;
1433         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1434         skipped = 0;
1435         niov = 0;
1436
1437         do {
1438                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1439
1440                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1441                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1442                 niov++;
1443                 skipped += nob;
1444                 nob_to_skip -=nob;
1445
1446         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1447                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1448
1449         conn->ksnc_rx_niov = niov;
1450         conn->ksnc_rx_kiov = NULL;
1451         conn->ksnc_rx_nkiov = 0;
1452         conn->ksnc_rx_nob_wanted = skipped;
1453         return (0);
1454 }
1455
1456 int
1457 ksocknal_process_receive (ksock_conn_t *conn)
1458 {
1459         ksock_fmb_t  *fmb;
1460         int           rc;
1461         
1462         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1463
1464         /* doesn't need a forwarding buffer */
1465         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1466                 goto try_read;
1467
1468  get_fmb:
1469         fmb = ksocknal_get_idle_fmb (conn);
1470         if (fmb == NULL) {
1471                 /* conn descheduled waiting for idle fmb */
1472                 return (0);
1473         }
1474
1475         if (ksocknal_init_fmb (conn, fmb)) {
1476                 /* packet forwarded */
1477                 return (0);
1478         }
1479
1480  try_read:
1481         /* NB: sched lock NOT held */
1482         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1483                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1484                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1485                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1486
1487         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1488
1489         rc = ksocknal_receive(conn);
1490
1491         if (rc <= 0) {
1492                 LASSERT (rc != -EAGAIN);
1493
1494                 if (rc == 0)
1495                         CWARN ("[%p] EOF from "LPX64" ip %08x:%d\n",
1496                                conn, conn->ksnc_peer->ksnp_nid,
1497                                conn->ksnc_ipaddr, conn->ksnc_port);
1498                 else if (!conn->ksnc_closing)
1499                         CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1500                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1501                                 conn->ksnc_ipaddr, conn->ksnc_port);
1502
1503                 ksocknal_close_conn_and_siblings (conn, rc);
1504                 return (rc == 0 ? -ESHUTDOWN : rc);
1505         }
1506
1507         if (conn->ksnc_rx_nob_wanted != 0) {
1508                 /* short read */
1509                 return (-EAGAIN);
1510         }
1511         
1512         switch (conn->ksnc_rx_state) {
1513         case SOCKNAL_RX_HEADER:
1514                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1515                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1516                         /* This packet isn't for me */
1517                         ksocknal_fwd_parse (conn);
1518                         switch (conn->ksnc_rx_state) {
1519                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1520                                 return (0);     /* => come back later */
1521                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1522                                 goto try_read;  /* => go read it */
1523                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1524                                 goto get_fmb;   /* => go get a fwd msg buffer */
1525                         default:
1526                                 LBUG ();
1527                         }
1528                         /* Not Reached */
1529                 }
1530
1531                 /* sets wanted_len, iovs etc */
1532                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1533
1534                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1535                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1536                         goto try_read;          /* go read the payload */
1537                 }
1538                 /* Fall through (completed packet for me) */
1539
1540         case SOCKNAL_RX_BODY:
1541                 /* payload all received */
1542                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1543                 /* Fall through */
1544
1545         case SOCKNAL_RX_SLOP:
1546                 /* starting new packet? */
1547                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1548                         return (0);     /* come back later */
1549                 goto try_read;          /* try to finish reading slop now */
1550
1551         case SOCKNAL_RX_BODY_FWD:
1552                 /* payload all received */
1553                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1554                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1555                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1556                         conn->ksnc_rx_nob_left);
1557
1558                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1559                  * conn->ksnc_cookie */
1560                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1561                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1562
1563                 /* no slop in forwarded packets */
1564                 LASSERT (conn->ksnc_rx_nob_left == 0);
1565
1566                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1567                 return (0);                     /* (later) */
1568
1569         default:
1570                 break;
1571         }
1572
1573         /* Not Reached */
1574         LBUG ();
1575         return (-EINVAL);                       /* keep gcc happy */
1576 }
1577
1578 int
1579 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1580                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1581 {
1582         ksock_conn_t *conn = (ksock_conn_t *)private;
1583
1584         LASSERT (mlen <= rlen);
1585         LASSERT (niov <= PTL_MD_MAX_IOV);
1586         
1587         conn->ksnc_cookie = msg;
1588         conn->ksnc_rx_nob_wanted = mlen;
1589         conn->ksnc_rx_nob_left   = rlen;
1590
1591         conn->ksnc_rx_nkiov = 0;
1592         conn->ksnc_rx_kiov = NULL;
1593         conn->ksnc_rx_niov = niov;
1594         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1595         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1596
1597         LASSERT (mlen == 
1598                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1599                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1600
1601         return (rlen);
1602 }
1603
1604 int
1605 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1606                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1607 {
1608         ksock_conn_t *conn = (ksock_conn_t *)private;
1609
1610         LASSERT (mlen <= rlen);
1611         LASSERT (niov <= PTL_MD_MAX_IOV);
1612         
1613         conn->ksnc_cookie = msg;
1614         conn->ksnc_rx_nob_wanted = mlen;
1615         conn->ksnc_rx_nob_left   = rlen;
1616
1617         conn->ksnc_rx_niov = 0;
1618         conn->ksnc_rx_iov  = NULL;
1619         conn->ksnc_rx_nkiov = niov;
1620         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1621         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1622
1623         LASSERT (mlen == 
1624                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1625                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1626
1627         return (rlen);
1628 }
1629
1630 int ksocknal_scheduler (void *arg)
1631 {
1632         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1633         ksock_conn_t      *conn;
1634         ksock_tx_t        *tx;
1635         unsigned long      flags;
1636         int                rc;
1637         int                nloops = 0;
1638         int                id = sched - ksocknal_data.ksnd_schedulers;
1639         char               name[16];
1640
1641         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1642         kportal_daemonize (name);
1643         kportal_blockallsigs ();
1644
1645         current->flags |= PF_MEMALLOC;
1646
1647 #if (CONFIG_SMP && CPU_AFFINITY)
1648         if ((cpu_online_map & (1 << id)) != 0) {
1649 #if 1
1650                 current->cpus_allowed = (1 << id);
1651 #else
1652                 set_cpus_allowed (current, 1<<id);
1653 #endif
1654         } else {
1655                 CERROR ("Can't set CPU affinity for %s\n", name);
1656         }
1657 #endif /* CONFIG_SMP && CPU_AFFINITY */
1658         
1659         spin_lock_irqsave (&sched->kss_lock, flags);
1660
1661         while (!ksocknal_data.ksnd_shuttingdown) {
1662                 int did_something = 0;
1663
1664                 /* Ensure I progress everything semi-fairly */
1665
1666                 if (!list_empty (&sched->kss_rx_conns)) {
1667                         conn = list_entry(sched->kss_rx_conns.next,
1668                                           ksock_conn_t, ksnc_rx_list);
1669                         list_del(&conn->ksnc_rx_list);
1670
1671                         LASSERT(conn->ksnc_rx_scheduled);
1672                         LASSERT(conn->ksnc_rx_ready);
1673
1674                         /* clear rx_ready in case receive isn't complete.
1675                          * Do it BEFORE we call process_recv, since
1676                          * data_ready can set it any time after we release
1677                          * kss_lock. */
1678                         conn->ksnc_rx_ready = 0;
1679                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1680                         
1681                         rc = ksocknal_process_receive(conn);
1682                         
1683                         spin_lock_irqsave(&sched->kss_lock, flags);
1684
1685                         /* I'm the only one that can clear this flag */
1686                         LASSERT(conn->ksnc_rx_scheduled);
1687
1688                         /* Did process_receive get everything it wanted? */
1689                         if (rc == 0)
1690                                 conn->ksnc_rx_ready = 1;
1691                         
1692                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1693                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1694                                 /* Conn blocked for a forwarding buffer.
1695                                  * It will get queued for my attention when
1696                                  * one becomes available (and it might just
1697                                  * already have been!).  Meanwhile my ref
1698                                  * on it stays put. */
1699                         } else if (conn->ksnc_rx_ready) {
1700                                 /* reschedule for rx */
1701                                 list_add_tail (&conn->ksnc_rx_list,
1702                                                &sched->kss_rx_conns);
1703                         } else {
1704                                 conn->ksnc_rx_scheduled = 0;
1705                                 /* drop my ref */
1706                                 ksocknal_put_conn(conn);
1707                         }
1708
1709                         did_something = 1;
1710                 }
1711
1712                 if (!list_empty (&sched->kss_tx_conns)) {
1713                         conn = list_entry(sched->kss_tx_conns.next,
1714                                           ksock_conn_t, ksnc_tx_list);
1715                         list_del (&conn->ksnc_tx_list);
1716                         
1717                         LASSERT(conn->ksnc_tx_scheduled);
1718                         LASSERT(conn->ksnc_tx_ready);
1719                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1720                         
1721                         tx = list_entry(conn->ksnc_tx_queue.next,
1722                                         ksock_tx_t, tx_list);
1723                         /* dequeue now so empty list => more to send */
1724                         list_del(&tx->tx_list);
1725                         
1726                         /* Clear tx_ready in case send isn't complete.  Do
1727                          * it BEFORE we call process_transmit, since
1728                          * write_space can set it any time after we release
1729                          * kss_lock. */
1730                         conn->ksnc_tx_ready = 0;
1731                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1732
1733                         rc = ksocknal_process_transmit(conn, tx);
1734
1735                         spin_lock_irqsave (&sched->kss_lock, flags);
1736
1737                         if (rc != -EAGAIN) {
1738                                 /* error or everything went: assume more can go */
1739                                 conn->ksnc_tx_ready = 1;
1740                         } else {
1741                                  /* back onto HEAD of tx_queue */
1742                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1743                         }
1744                         
1745                         if (conn->ksnc_tx_ready &&
1746                             !list_empty (&conn->ksnc_tx_queue)) {
1747                                 /* reschedule for tx */
1748                                 list_add_tail (&conn->ksnc_tx_list, 
1749                                                &sched->kss_tx_conns);
1750                         } else {
1751                                 conn->ksnc_tx_scheduled = 0;
1752                                 /* drop my ref */
1753                                 ksocknal_put_conn (conn);
1754                         }
1755                                 
1756                         did_something = 1;
1757                 }
1758 #if SOCKNAL_ZC
1759                 if (!list_empty (&sched->kss_zctxdone_list)) {
1760                         ksock_tx_t *tx =
1761                                 list_entry(sched->kss_zctxdone_list.next,
1762                                            ksock_tx_t, tx_list);
1763                         did_something = 1;
1764
1765                         list_del (&tx->tx_list);
1766                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1767
1768                         ksocknal_tx_done (tx, 1);
1769
1770                         spin_lock_irqsave (&sched->kss_lock, flags);
1771                 }
1772 #endif
1773                 if (!did_something ||           /* nothing to do */
1774                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1775                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1776
1777                         nloops = 0;
1778
1779                         if (!did_something) {   /* wait for something to do */
1780 #if SOCKNAL_ZC
1781                                 rc = wait_event_interruptible (sched->kss_waitq,
1782                                                                ksocknal_data.ksnd_shuttingdown ||
1783                                                                !list_empty(&sched->kss_rx_conns) ||
1784                                                                !list_empty(&sched->kss_tx_conns) ||
1785                                                                !list_empty(&sched->kss_zctxdone_list));
1786 #else
1787                                 rc = wait_event_interruptible (sched->kss_waitq,
1788                                                                ksocknal_data.ksnd_shuttingdown ||
1789                                                                !list_empty(&sched->kss_rx_conns) ||
1790                                                                !list_empty(&sched->kss_tx_conns));
1791 #endif
1792                                 LASSERT (rc == 0);
1793                         } else
1794                                our_cond_resched();
1795
1796                         spin_lock_irqsave (&sched->kss_lock, flags);
1797                 }
1798         }
1799
1800         spin_unlock_irqrestore (&sched->kss_lock, flags);
1801         ksocknal_thread_fini ();
1802         return (0);
1803 }
1804
1805 void
1806 ksocknal_data_ready (struct sock *sk, int n)
1807 {
1808         unsigned long  flags;
1809         ksock_conn_t  *conn;
1810         ksock_sched_t *sched;
1811         ENTRY;
1812
1813         /* interleave correctly with closing sockets... */
1814         read_lock (&ksocknal_data.ksnd_global_lock);
1815
1816         conn = sk->sk_user_data;
1817         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1818                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1819                 sk->sk_data_ready (sk, n);
1820         } else {
1821                 sched = conn->ksnc_scheduler;
1822
1823                 spin_lock_irqsave (&sched->kss_lock, flags);
1824
1825                 conn->ksnc_rx_ready = 1;
1826
1827                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1828                         list_add_tail(&conn->ksnc_rx_list,
1829                                       &sched->kss_rx_conns);
1830                         conn->ksnc_rx_scheduled = 1;
1831                         /* extra ref for scheduler */
1832                         atomic_inc (&conn->ksnc_refcount);
1833
1834                         wake_up (&sched->kss_waitq);
1835                 }
1836
1837                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1838         }
1839
1840         read_unlock (&ksocknal_data.ksnd_global_lock);
1841
1842         EXIT;
1843 }
1844
1845 void
1846 ksocknal_write_space (struct sock *sk)
1847 {
1848         unsigned long  flags;
1849         ksock_conn_t  *conn;
1850         ksock_sched_t *sched;
1851
1852         /* interleave correctly with closing sockets... */
1853         read_lock (&ksocknal_data.ksnd_global_lock);
1854
1855         conn = sk->sk_user_data;
1856
1857         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1858                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1859                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1860                                       " ready" : " blocked"),
1861                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1862                                       " scheduled" : " idle"),
1863                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1864                                       " empty" : " queued"));
1865
1866         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1867                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1868                 sk->sk_write_space (sk);
1869
1870                 read_unlock (&ksocknal_data.ksnd_global_lock);
1871                 return;
1872         }
1873
1874         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1875                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1876
1877                 sched = conn->ksnc_scheduler;
1878
1879                 spin_lock_irqsave (&sched->kss_lock, flags);
1880
1881                 conn->ksnc_tx_ready = 1;
1882
1883                 if (!conn->ksnc_tx_scheduled && // not being progressed
1884                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1885                         list_add_tail (&conn->ksnc_tx_list,
1886                                        &sched->kss_tx_conns);
1887                         conn->ksnc_tx_scheduled = 1;
1888                         /* extra ref for scheduler */
1889                         atomic_inc (&conn->ksnc_refcount);
1890
1891                         wake_up (&sched->kss_waitq);
1892                 }
1893
1894                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1895         }
1896
1897         read_unlock (&ksocknal_data.ksnd_global_lock);
1898 }
1899
1900 int
1901 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1902 {
1903         int           rc;
1904         mm_segment_t  oldmm = get_fs();
1905
1906         while (nob > 0) {
1907                 struct iovec  iov = {
1908                         .iov_base = buffer,
1909                         .iov_len  = nob
1910                 };
1911                 struct msghdr msg = {
1912                         .msg_name       = NULL,
1913                         .msg_namelen    = 0,
1914                         .msg_iov        = &iov,
1915                         .msg_iovlen     = 1,
1916                         .msg_control    = NULL,
1917                         .msg_controllen = 0,
1918                         .msg_flags      = 0
1919                 };
1920
1921                 set_fs (KERNEL_DS);
1922                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1923                 set_fs (oldmm);
1924                 
1925                 if (rc < 0)
1926                         return (rc);
1927
1928                 if (rc == 0) {
1929                         CERROR ("Unexpected zero rc\n");
1930                         return (-ECONNABORTED);
1931                 }
1932
1933                 buffer = ((char *)buffer) + rc;
1934                 nob -= rc;
1935         }
1936         
1937         return (0);
1938 }
1939
1940 int
1941 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1942 {
1943         int           rc;
1944         mm_segment_t  oldmm = get_fs();
1945         
1946         while (nob > 0) {
1947                 struct iovec  iov = {
1948                         .iov_base = buffer,
1949                         .iov_len  = nob
1950                 };
1951                 struct msghdr msg = {
1952                         .msg_name       = NULL,
1953                         .msg_namelen    = 0,
1954                         .msg_iov        = &iov,
1955                         .msg_iovlen     = 1,
1956                         .msg_control    = NULL,
1957                         .msg_controllen = 0,
1958                         .msg_flags      = 0
1959                 };
1960
1961                 set_fs (KERNEL_DS);
1962                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1963                 set_fs (oldmm);
1964                 
1965                 if (rc < 0)
1966                         return (rc);
1967
1968                 if (rc == 0)
1969                         return (-ECONNABORTED);
1970
1971                 buffer = ((char *)buffer) + rc;
1972                 nob -= rc;
1973         }
1974         
1975         return (0);
1976 }
1977
1978 int
1979 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnation)
1980 {
1981         int                 rc;
1982         ptl_hdr_t           hdr;
1983         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1984
1985         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1986
1987         memset (&hdr, 0, sizeof (hdr));
1988         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1989         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
1990         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
1991         
1992         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
1993         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
1994
1995         hdr.msg.hello.type = __cpu_to_le32 (*type);
1996         hdr.msg.hello.incarnation = 
1997                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
1998
1999         /* Assume sufficient socket buffering for this message */
2000         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
2001         if (rc != 0) {
2002                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
2003                 return (rc);
2004         }
2005
2006         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2007         if (rc != 0) {
2008                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
2009                 return (rc);
2010         }
2011         
2012         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2013                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
2014                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
2015                 return (-EPROTO);
2016         }
2017
2018         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2019             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2020                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2021                         " from "LPX64"\n",
2022                         __le16_to_cpu (hmv->version_major),
2023                         __le16_to_cpu (hmv->version_minor),
2024                         PORTALS_PROTO_VERSION_MAJOR,
2025                         PORTALS_PROTO_VERSION_MINOR,
2026                         *nid);
2027                 return (-EPROTO);
2028         }
2029
2030 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
2031 # error "This code only understands protocol version 0.x"
2032 #endif
2033         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2034          * so read the rest of it in now... */
2035
2036         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2037         if (rc != 0) {
2038                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
2039                         rc, *nid);
2040                 return (rc);
2041         }
2042
2043         /* ...and check we got what we expected */
2044         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2045             hdr.payload_length != __cpu_to_le32 (0)) {
2046                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2047                         " but got type %d with %d payload from "LPX64"\n",
2048                         __le32_to_cpu (hdr.type),
2049                         __le32_to_cpu (hdr.payload_length), *nid);
2050                 return (-EPROTO);
2051         }
2052
2053         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2054                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2055                 return (-EPROTO);
2056         }
2057
2058         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2059                 *nid = __le64_to_cpu(hdr.src_nid);
2060         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2061                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
2062                         __le64_to_cpu (hdr.src_nid), *nid);
2063                 return (-EPROTO);
2064         }
2065
2066         if (*type == SOCKNAL_CONN_NONE) {
2067                 /* I've accepted this connection; peer determines type */
2068                 *type = __le32_to_cpu(hdr.msg.hello.type);
2069                 switch (*type) {
2070                 case SOCKNAL_CONN_ANY:
2071                 case SOCKNAL_CONN_CONTROL:
2072                         break;
2073                 case SOCKNAL_CONN_BULK_IN:
2074                         *type = SOCKNAL_CONN_BULK_OUT;
2075                         break;
2076                 case SOCKNAL_CONN_BULK_OUT:
2077                         *type = SOCKNAL_CONN_BULK_IN;
2078                         break;
2079                 default:
2080                         CERROR ("Unexpected type %d from "LPX64"\n", *type, *nid);
2081                         return (-EPROTO);
2082                 }
2083         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2084                 CERROR ("Mismatched types: me %d "LPX64" %d\n",
2085                         *type, *nid, __le32_to_cpu(hdr.msg.hello.type));
2086                 return (-EPROTO);
2087         }
2088
2089         *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2090
2091         return (0);
2092 }
2093
2094 int
2095 ksocknal_setup_sock (struct socket *sock)
2096 {
2097         mm_segment_t    oldmm = get_fs ();
2098         int             rc;
2099         int             option;
2100         struct linger   linger;
2101
2102         sock->sk->allocation = GFP_MEMALLOC;
2103
2104         /* Ensure this socket aborts active sends immediately when we close
2105          * it. */
2106
2107         linger.l_onoff = 0;
2108         linger.l_linger = 0;
2109
2110         set_fs (KERNEL_DS);
2111         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2112                               (char *)&linger, sizeof (linger));
2113         set_fs (oldmm);
2114         if (rc != 0) {
2115                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2116                 return (rc);
2117         }
2118
2119         option = -1;
2120         set_fs (KERNEL_DS);
2121         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2122                                     (char *)&option, sizeof (option));
2123         set_fs (oldmm);
2124         if (rc != 0) {
2125                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2126                 return (rc);
2127         }
2128
2129 #if SOCKNAL_USE_KEEPALIVES
2130         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2131          * second until the timeout elapses. */
2132
2133         option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2134         set_fs (KERNEL_DS);
2135         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2136                                     (char *)&option, sizeof (option));
2137         set_fs (oldmm);
2138         if (rc != 0) {
2139                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2140                 return (rc);
2141         }
2142         
2143         option = 1;
2144         set_fs (KERNEL_DS);
2145         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2146                                     (char *)&option, sizeof (option));
2147         set_fs (oldmm);
2148         if (rc != 0) {
2149                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2150                 return (rc);
2151         }
2152         
2153         option = ksocknal_data.ksnd_io_timeout / 4;
2154         set_fs (KERNEL_DS);
2155         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2156                                     (char *)&option, sizeof (option));
2157         set_fs (oldmm);
2158         if (rc != 0) {
2159                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2160                 return (rc);
2161         }
2162
2163         option = 1;
2164         set_fs (KERNEL_DS);
2165         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2166                               (char *)&option, sizeof (option));
2167         set_fs (oldmm);
2168         if (rc != 0) {
2169                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2170                 return (rc);
2171         }
2172 #endif
2173         return (0);
2174 }
2175
2176 int
2177 ksocknal_connect_peer (ksock_route_t *route, int type)
2178 {
2179         struct sockaddr_in  peer_addr;
2180         mm_segment_t        oldmm = get_fs();
2181         struct timeval      tv;
2182         int                 fd;
2183         struct socket      *sock;
2184         int                 rc;
2185
2186         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2187         if (rc != 0) {
2188                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2189                 return (rc);
2190         }
2191
2192         /* Ugh; have to map_fd for compatibility with sockets passed in
2193          * from userspace.  And we actually need the sock->file refcounting
2194          * that this gives you :) */
2195
2196         fd = sock_map_fd (sock);
2197         if (fd < 0) {
2198                 sock_release (sock);
2199                 CERROR ("sock_map_fd error %d\n", fd);
2200                 return (fd);
2201         }
2202
2203         /* NB the fd now owns the ref on sock->file */
2204         LASSERT (sock->file != NULL);
2205         LASSERT (file_count(sock->file) == 1);
2206
2207         /* Set the socket timeouts, so our connection attempt completes in
2208          * finite time */
2209         tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2210         tv.tv_usec = 0;
2211
2212         set_fs (KERNEL_DS);
2213         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2214                               (char *)&tv, sizeof (tv));
2215         set_fs (oldmm);
2216         if (rc != 0) {
2217                 CERROR ("Can't set send timeout %d: %d\n", 
2218                         ksocknal_data.ksnd_io_timeout, rc);
2219                 goto out;
2220         }
2221         
2222         set_fs (KERNEL_DS);
2223         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2224                               (char *)&tv, sizeof (tv));
2225         set_fs (oldmm);
2226         if (rc != 0) {
2227                 CERROR ("Can't set receive timeout %d: %d\n",
2228                         ksocknal_data.ksnd_io_timeout, rc);
2229                 goto out;
2230         }
2231
2232         if (route->ksnr_nonagel) {
2233                 int  option = 1;
2234                 
2235                 set_fs (KERNEL_DS);
2236                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2237                                             (char *)&option, sizeof (option));
2238                 set_fs (oldmm);
2239                 if (rc != 0) {
2240                         CERROR ("Can't disable nagel: %d\n", rc);
2241                         goto out;
2242                 }
2243         }
2244         
2245         if (route->ksnr_buffer_size != 0) {
2246                 int option = route->ksnr_buffer_size;
2247                 
2248                 set_fs (KERNEL_DS);
2249                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2250                                       (char *)&option, sizeof (option));
2251                 set_fs (oldmm);
2252                 if (rc != 0) {
2253                         CERROR ("Can't set send buffer %d: %d\n",
2254                                 route->ksnr_buffer_size, rc);
2255                         goto out;
2256                 }
2257
2258                 set_fs (KERNEL_DS);
2259                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2260                                       (char *)&option, sizeof (option));
2261                 set_fs (oldmm);
2262                 if (rc != 0) {
2263                         CERROR ("Can't set receive buffer %d: %d\n",
2264                                 route->ksnr_buffer_size, rc);
2265                         goto out;
2266                 }
2267         }
2268         
2269         memset (&peer_addr, 0, sizeof (peer_addr));
2270         peer_addr.sin_family = AF_INET;
2271         peer_addr.sin_port = htons (route->ksnr_port);
2272         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2273         
2274         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2275                                  sizeof (peer_addr), sock->file->f_flags);
2276         if (rc != 0) {
2277                 CERROR ("Error %d connecting to "LPX64"\n", rc,
2278                         route->ksnr_peer->ksnp_nid);
2279                 goto out;
2280         }
2281         
2282         rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2283         if (rc == 0) {
2284                 /* Take an extra ref on sock->file to compensate for the
2285                  * upcoming close which will lose fd's ref on it. */
2286                 get_file (sock->file);
2287         }
2288
2289  out:
2290         sys_close (fd);
2291         return (rc);
2292 }
2293
2294 void
2295 ksocknal_autoconnect (ksock_route_t *route)
2296 {
2297         LIST_HEAD        (zombies);
2298         ksock_tx_t       *tx;
2299         ksock_peer_t     *peer;
2300         unsigned long     flags;
2301         int               rc;
2302         int               type;
2303         
2304         for (;;) {
2305                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2306                         if ((route->ksnr_connecting & (1 << type)) != 0)
2307                                 break;
2308                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2309
2310                 rc = ksocknal_connect_peer (route, type);
2311
2312                 if (rc != 0)
2313                         break;
2314                 
2315                 /* successfully autoconnected: create_conn did the
2316                  * route/conn binding and scheduled any blocked packets */
2317
2318                 if (route->ksnr_connecting == 0) {
2319                         /* No more connections required */
2320                         return;
2321                 }
2322         }
2323
2324         /* Connection attempt failed */
2325
2326         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2327
2328         peer = route->ksnr_peer;
2329         route->ksnr_connecting = 0;
2330
2331         /* This is a retry rather than a new connection */
2332         LASSERT (route->ksnr_retry_interval != 0);
2333         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2334         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2335                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2336
2337         if (!list_empty (&peer->ksnp_tx_queue) &&
2338             ksocknal_find_connecting_route_locked (peer) == NULL) {
2339                 LASSERT (list_empty (&peer->ksnp_conns));
2340
2341                 /* None of the connections that the blocked packets are
2342                  * waiting for have been successful.  Complete them now... */
2343                 do {
2344                         tx = list_entry (peer->ksnp_tx_queue.next,
2345                                          ksock_tx_t, tx_list);
2346                         list_del (&tx->tx_list);
2347                         list_add_tail (&tx->tx_list, &zombies);
2348                 } while (!list_empty (&peer->ksnp_tx_queue));
2349         }
2350
2351         /* make this route least-favourite for re-selection */
2352         if (!route->ksnr_deleted) {
2353                 list_del(&route->ksnr_list);
2354                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2355         }
2356         
2357         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2358
2359         while (!list_empty (&zombies)) {
2360                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2361                 
2362                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2363                         NTOH__u32 (tx->tx_hdr->type),
2364                         NTOH__u32 (tx->tx_hdr->payload_length),
2365                         NTOH__u64 (tx->tx_hdr->src_nid),
2366                         NTOH__u64 (tx->tx_hdr->dest_nid));
2367
2368                 list_del (&tx->tx_list);
2369                 /* complete now */
2370                 ksocknal_tx_done (tx, 0);
2371         }
2372 }
2373
2374 int
2375 ksocknal_autoconnectd (void *arg)
2376 {
2377         long               id = (long)arg;
2378         char               name[16];
2379         unsigned long      flags;
2380         ksock_route_t     *route;
2381         int                rc;
2382
2383         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2384         kportal_daemonize (name);
2385         kportal_blockallsigs ();
2386
2387         current->flags |= PF_MEMALLOC;
2388
2389         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2390
2391         while (!ksocknal_data.ksnd_shuttingdown) {
2392
2393                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2394                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2395                                             ksock_route_t, ksnr_connect_list);
2396                         
2397                         list_del (&route->ksnr_connect_list);
2398                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2399
2400                         ksocknal_autoconnect (route);
2401                         ksocknal_put_route (route);
2402
2403                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2404                         continue;
2405                 }
2406                 
2407                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2408
2409                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2410                                                ksocknal_data.ksnd_shuttingdown ||
2411                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2412
2413                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2414         }
2415
2416         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2417
2418         ksocknal_thread_fini ();
2419         return (0);
2420 }
2421
2422 ksock_conn_t *
2423 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2424 {
2425         /* We're called with a shared lock on ksnd_global_lock */
2426         ksock_conn_t      *conn;
2427         struct list_head  *ctmp;
2428         ksock_sched_t     *sched;
2429
2430         list_for_each (ctmp, &peer->ksnp_conns) {
2431                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2432                 sched = conn->ksnc_scheduler;
2433
2434                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2435                 LASSERT (!conn->ksnc_closing);
2436                 
2437                 if (conn->ksnc_rx_started &&
2438                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2439                         /* Timed out incomplete incoming message */
2440                         atomic_inc (&conn->ksnc_refcount);
2441                         CERROR ("Timed out RX from "LPX64" %p\n", 
2442                                 peer->ksnp_nid, conn);
2443                         return (conn);
2444                 }
2445                 
2446                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2447                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2448                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2449                         /* Timed out messages queued for sending, or
2450                          * messages buffered in the socket's send buffer */
2451                         atomic_inc (&conn->ksnc_refcount);
2452                         CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
2453                                 peer->ksnp_nid, 
2454                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2455                                 conn->ksnc_sock->sk->sk_wmem_queued, conn);
2456                         return (conn);
2457                 }
2458         }
2459
2460         return (NULL);
2461 }
2462
2463 void
2464 ksocknal_check_peer_timeouts (int idx)
2465 {
2466         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2467         struct list_head *ptmp;
2468         ksock_peer_t     *peer;
2469         ksock_conn_t     *conn;
2470
2471  again:
2472         /* NB. We expect to have a look at all the peers and not find any
2473          * connections to time out, so we just use a shared lock while we
2474          * take a look... */
2475         read_lock (&ksocknal_data.ksnd_global_lock);
2476
2477         list_for_each (ptmp, peers) {
2478                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2479                 conn = ksocknal_find_timed_out_conn (peer);
2480                 
2481                 if (conn != NULL) {
2482                         read_unlock (&ksocknal_data.ksnd_global_lock);
2483
2484                         CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2485                                 peer->ksnp_nid, conn->ksnc_ipaddr,
2486                                 conn->ksnc_port);
2487                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2488                         
2489                         /* NB we won't find this one again, but we can't
2490                          * just proceed with the next peer, since we dropped
2491                          * ksnd_global_lock and it might be dead already! */
2492                         ksocknal_put_conn (conn);
2493                         goto again;
2494                 }
2495         }
2496
2497         read_unlock (&ksocknal_data.ksnd_global_lock);
2498 }
2499
2500 int
2501 ksocknal_reaper (void *arg)
2502 {
2503         wait_queue_t       wait;
2504         unsigned long      flags;
2505         ksock_conn_t      *conn;
2506         int                timeout;
2507         int                i;
2508         int                peer_index = 0;
2509         unsigned long      deadline = jiffies;
2510         
2511         kportal_daemonize ("ksocknal_reaper");
2512         kportal_blockallsigs ();
2513
2514         init_waitqueue_entry (&wait, current);
2515
2516         current->flags |= PF_MEMALLOC;
2517
2518         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2519
2520         while (!ksocknal_data.ksnd_shuttingdown) {
2521
2522                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2523                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2524                                            ksock_conn_t, ksnc_list);
2525                         list_del (&conn->ksnc_list);
2526                         
2527                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2528
2529                         ksocknal_terminate_conn (conn);
2530                         ksocknal_put_conn (conn);
2531
2532                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2533                         continue;
2534                 }
2535
2536                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2537                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2538                                            ksock_conn_t, ksnc_list);
2539                         list_del (&conn->ksnc_list);
2540                         
2541                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2542
2543                         ksocknal_destroy_conn (conn);
2544
2545                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2546                         continue;
2547                 }
2548                 
2549                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2550
2551                 /* careful with the jiffy wrap... */
2552                 while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
2553                         const int n = 4;
2554                         const int p = 1;
2555                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2556                         
2557                         /* Time to check for timeouts on a few more peers: I do
2558                          * checks every 'p' seconds on a proportion of the peer
2559                          * table and I need to check every connection 'n' times
2560                          * within a timeout interval, to ensure I detect a
2561                          * timeout on any connection within (n+1)/n times the
2562                          * timeout interval. */
2563
2564                         if (ksocknal_data.ksnd_io_timeout > n * p)
2565                                 chunk = (chunk * n * p) / 
2566                                         ksocknal_data.ksnd_io_timeout;
2567                         if (chunk == 0)
2568                                 chunk = 1;
2569
2570                         for (i = 0; i < chunk; i++) {
2571                                 ksocknal_check_peer_timeouts (peer_index);
2572                                 peer_index = (peer_index + 1) % 
2573                                              ksocknal_data.ksnd_peer_hash_size;
2574                         }
2575
2576                         deadline += p * HZ;
2577                 }
2578
2579                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2580                 set_current_state (TASK_INTERRUPTIBLE);
2581
2582                 if (!ksocknal_data.ksnd_shuttingdown &&
2583                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2584                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2585                         schedule_timeout (timeout);
2586
2587                 set_current_state (TASK_RUNNING);
2588                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2589
2590                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2591         }
2592
2593         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2594
2595         ksocknal_thread_fini ();
2596         return (0);
2597 }
2598
2599 nal_cb_t ksocknal_lib = {
2600         nal_data:       &ksocknal_data,                /* NAL private data */
2601         cb_send:         ksocknal_send,
2602         cb_send_pages:   ksocknal_send_pages,
2603         cb_recv:         ksocknal_recv,
2604         cb_recv_pages:   ksocknal_recv_pages,
2605         cb_read:         ksocknal_read,
2606         cb_write:        ksocknal_write,
2607         cb_callback:     ksocknal_callback,
2608         cb_malloc:       ksocknal_malloc,
2609         cb_free:         ksocknal_free,
2610         cb_printf:       ksocknal_printf,
2611         cb_cli:          ksocknal_cli,
2612         cb_sti:          ksocknal_sti,
2613         cb_dist:         ksocknal_dist
2614 };