Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 /*
29  *  LIB functions follow
30  *
31  */
32 int
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34               user_ptr src_addr, size_t len)
35 {
36         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37                nal->ni.nid, (long)len, src_addr, dst_addr);
38
39         memcpy( dst_addr, src_addr, len );
40         return 0;
41 }
42
43 int
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45                void *src_addr, size_t len)
46 {
47         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48                nal->ni.nid, (long)len, src_addr, dst_addr);
49
50         memcpy( dst_addr, src_addr, len );
51         return 0;
52 }
53
54 int
55 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
56                          ptl_event_t *ev)
57 {
58         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
59                nal->ni.nid, eq, ev);
60
61         if (eq->event_callback != NULL)
62                 eq->event_callback(ev);
63
64         return 0;
65 }
66
67 void *
68 ksocknal_malloc(nal_cb_t *nal, size_t len)
69 {
70         void *buf;
71
72         PORTAL_ALLOC(buf, len);
73
74         if (buf != NULL)
75                 memset(buf, 0, len);
76
77         return (buf);
78 }
79
80 void
81 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
82 {
83         PORTAL_FREE(buf, len);
84 }
85
86 void
87 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
88 {
89         va_list ap;
90         char msg[256];
91
92         va_start (ap, fmt);
93         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
94         va_end (ap);
95
96         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
97
98         CDEBUG (D_NET, "%s", msg);
99 }
100
101 void
102 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *data = nal->nal_data;
105
106         spin_lock(&data->ksnd_nal_cb_lock);
107 }
108
109 void
110 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data;
113         data = nal->nal_data;
114
115         spin_unlock(&data->ksnd_nal_cb_lock);
116 }
117
118 int
119 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
120 {
121         /* I would guess that if ksocknal_get_peer (nid) == NULL,
122            and we're not routing, then 'nid' is very distant :) */
123         if ( nal->ni.nid == nid ) {
124                 *dist = 0;
125         } else {
126                 *dist = 1;
127         }
128
129         return 0;
130 }
131
132 ksock_ltx_t *
133 ksocknal_get_ltx (int may_block)
134 {
135         unsigned long flags;
136         ksock_ltx_t *ltx = NULL;
137
138         for (;;) {
139                 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
140
141                 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
142                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
143                                          ksock_ltx_t, ltx_tx.tx_list);
144                         list_del (&ltx->ltx_tx.tx_list);
145                         ksocknal_data.ksnd_active_ltxs++;
146                         break;
147                 }
148
149                 if (!may_block) {
150                         if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
151                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
152                                                  ksock_ltx_t, ltx_tx.tx_list);
153                                 list_del (&ltx->ltx_tx.tx_list);
154                                 ksocknal_data.ksnd_active_ltxs++;
155                         }
156                         break;
157                 }
158
159                 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
160                                        flags);
161
162                 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
163                             !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
164         }
165
166         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
167
168         return (ltx);
169 }
170
171 void
172 ksocknal_put_ltx (ksock_ltx_t *ltx)
173 {
174         unsigned long   flags;
175         
176         spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
177
178         ksocknal_data.ksnd_active_ltxs--;
179         list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
180
181         /* normal tx desc => wakeup anyone blocking for one */
182         if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list)
183                 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
184
185         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
186 }
187
188 #if SOCKNAL_ZC
189 struct page *
190 ksocknal_kvaddr_to_page (unsigned long vaddr)
191 {
192         struct page *page;
193
194         if (vaddr >= VMALLOC_START &&
195             vaddr < VMALLOC_END)
196                 page = vmalloc_to_page ((void *)vaddr);
197 #if CONFIG_HIGHMEM
198         else if (vaddr >= PKMAP_BASE &&
199                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
200                 page = vmalloc_to_page ((void *)vaddr);
201                 /* in 2.4 ^ just walks the page tables */
202 #endif
203         else
204                 page = virt_to_page (vaddr);
205
206         if (page == NULL ||
207             !VALID_PAGE (page))
208                 return (NULL);
209
210         return (page);
211 }
212 #endif
213
214 int
215 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
216 {
217         struct socket *sock = conn->ksnc_sock;
218         struct iovec  *iov = tx->tx_iov;
219         int            fragsize = iov->iov_len;
220         unsigned long  vaddr = (unsigned long)iov->iov_base;
221         int            more = (tx->tx_niov > 1) || 
222                               (tx->tx_nkiov > 0) ||
223                               (!list_empty (&conn->ksnc_tx_queue));
224 #if SOCKNAL_ZC
225         int            offset = vaddr & (PAGE_SIZE - 1);
226         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
227         struct page   *page;
228 #endif
229         int            rc;
230
231         /* NB we can't trust socket ops to either consume our iovs
232          * or leave them alone, so we only send 1 frag at a time. */
233         LASSERT (fragsize <= tx->tx_resid);
234         LASSERT (tx->tx_niov > 0);
235         
236 #if SOCKNAL_ZC
237         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
238             (sock->sk->route_caps & NETIF_F_SG) &&
239             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
240             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
241                 
242                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
243                        (void *)vaddr, page, page_address(page), offset, zcsize);
244
245                 if (fragsize > zcsize) {
246                         more = 1;
247                         fragsize = zcsize;
248                 }
249
250                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
251                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
252                                        &tx->tx_zccd);
253         } else
254 #endif
255         {
256                 /* NB don't pass tx's iov; sendmsg may or may not update it */
257                 struct iovec fragiov = { .iov_base = (void *)vaddr,
258                                          .iov_len  = fragsize};
259                 struct msghdr msg = {
260                         .msg_name       = NULL,
261                         .msg_namelen    = 0,
262                         .msg_iov        = &fragiov,
263                         .msg_iovlen     = 1,
264                         .msg_control    = NULL,
265                         .msg_controllen = 0,
266                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
267                 };
268                 mm_segment_t oldmm = get_fs();
269
270                 set_fs (KERNEL_DS);
271                 rc = sock_sendmsg(sock, &msg, fragsize);
272                 set_fs (oldmm);
273         } 
274
275         if (rc <= 0)
276                 return (rc);
277
278         tx->tx_resid -= rc;
279
280         if (rc < iov->iov_len) {
281                 /* didn't send whole iov entry... */
282                 iov->iov_base = (void *)(vaddr + rc);
283                 iov->iov_len -= rc;
284                 /* ...but did we send everything we tried to send? */
285                 return ((rc == fragsize) ? 1 : -EAGAIN);
286         }
287
288         tx->tx_iov++;
289         tx->tx_niov--;
290         return (1);
291 }
292
293 int
294 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
295 {
296         struct socket *sock = conn->ksnc_sock;
297         ptl_kiov_t    *kiov = tx->tx_kiov;
298         int            fragsize = kiov->kiov_len;
299         struct page   *page = kiov->kiov_page;
300         int            offset = kiov->kiov_offset;
301         int            more = (tx->tx_nkiov > 1) ||
302                               (!list_empty (&conn->ksnc_tx_queue));
303         int            rc;
304
305         /* NB we can't trust socket ops to either consume our iovs
306          * or leave them alone, so we only send 1 frag at a time. */
307         LASSERT (fragsize <= tx->tx_resid);
308         LASSERT (offset + fragsize <= PAGE_SIZE);
309         LASSERT (tx->tx_niov == 0);
310         LASSERT (tx->tx_nkiov > 0);
311
312 #if SOCKNAL_ZC
313         if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
314             (sock->sk->route_caps & NETIF_F_SG) &&
315             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
316
317                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
318                                page, offset, fragsize);
319
320                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
321                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
322                                        &tx->tx_zccd);
323         } else
324 #endif
325         {
326                 char *addr = ((char *)kmap (page)) + offset;
327                 struct iovec fragiov = {.iov_base = addr,
328                                         .iov_len  = fragsize};
329                 struct msghdr msg = {
330                         .msg_name       = NULL,
331                         .msg_namelen    = 0,
332                         .msg_iov        = &fragiov,
333                         .msg_iovlen     = 1,
334                         .msg_control    = NULL,
335                         .msg_controllen = 0,
336                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
337                 };
338                 mm_segment_t  oldmm = get_fs();
339                 
340                 set_fs (KERNEL_DS);
341                 rc = sock_sendmsg(sock, &msg, fragsize);
342                 set_fs (oldmm);
343
344                 kunmap (page);
345         }
346
347         if (rc <= 0)
348                 return (rc);
349
350         tx->tx_resid -= rc;
351  
352         if (rc < fragsize) {
353                 /* didn't send whole frag */
354                 kiov->kiov_offset = offset + rc;
355                 kiov->kiov_len    = fragsize - rc;
356                 return (-EAGAIN);
357         }
358
359         /* everything went */
360         LASSERT (rc == fragsize);
361         tx->tx_kiov++;
362         tx->tx_nkiov--;
363         return (1);
364 }
365
366 int
367 ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
368 {
369         /* Return 0 on success, < 0 on error.
370          * caller checks tx_resid to determine progress/completion */
371         int      rc;
372         ENTRY;
373         
374         if (ksocknal_data.ksnd_stall_tx != 0) {
375                 set_current_state (TASK_UNINTERRUPTIBLE);
376                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
377         }
378
379         rc = ksocknal_getconnsock (conn);
380         if (rc != 0)
381                 return (rc);
382
383         for (;;) {
384                 LASSERT (tx->tx_resid != 0);
385
386                 if (conn->ksnc_closing) {
387                         rc = -ESHUTDOWN;
388                         break;
389                 }
390
391                 if (tx->tx_niov != 0)
392                         rc = ksocknal_send_iov (conn, tx);
393                 else
394                         rc = ksocknal_send_kiov (conn, tx);
395
396                 if (rc <= 0) {                  /* error or socket full? */
397                         /* NB: rc == 0 and rc == -EAGAIN both mean try
398                          * again later (linux stack returns -EAGAIN for
399                          * this, but Adaptech TOE returns 0) */
400                         if (rc == -EAGAIN)
401                                 rc = 0;
402                         break;
403                 }
404
405                 /* Consider the connection alive since we managed to chuck
406                  * more data into it.  Really, we'd like to consider it
407                  * alive only when the peer ACKs something, but
408                  * write_space() only gets called back while SOCK_NOSPACE
409                  * is set.  Instead, we presume peer death has occurred if
410                  * the socket doesn't drain within a timout */
411                 conn->ksnc_tx_deadline = jiffies + 
412                                          ksocknal_data.ksnd_io_timeout * HZ;
413                 conn->ksnc_peer->ksnp_last_alive = jiffies;
414
415                 if (tx->tx_resid == 0) {        /* sent everything */
416                         rc = 0;
417                         break;
418                 }
419         }
420
421         ksocknal_putconnsock (conn);
422         RETURN (rc);
423 }
424
425 void
426 ksocknal_eager_ack (ksock_conn_t *conn)
427 {
428         int            opt = 1;
429         mm_segment_t   oldmm = get_fs();
430         struct socket *sock = conn->ksnc_sock;
431         
432         /* Remind the socket to ACK eagerly.  If I don't, the socket might
433          * think I'm about to send something it could piggy-back the ACK
434          * on, introducing delay in completing zero-copy sends in my
435          * peer. */
436
437         set_fs(KERNEL_DS);
438         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
439                                (char *)&opt, sizeof (opt));
440         set_fs(oldmm);
441 }
442
443 int
444 ksocknal_recv_iov (ksock_conn_t *conn)
445 {
446         struct iovec *iov = conn->ksnc_rx_iov;
447         int           fragsize  = iov->iov_len;
448         unsigned long vaddr = (unsigned long)iov->iov_base;
449         struct iovec  fragiov = { .iov_base = (void *)vaddr,
450                                   .iov_len  = fragsize};
451         struct msghdr msg = {
452                 .msg_name       = NULL,
453                 .msg_namelen    = 0,
454                 .msg_iov        = &fragiov,
455                 .msg_iovlen     = 1,
456                 .msg_control    = NULL,
457                 .msg_controllen = 0,
458                 .msg_flags      = 0
459         };
460         mm_segment_t oldmm = get_fs();
461         int          rc;
462
463         /* NB we can't trust socket ops to either consume our iovs
464          * or leave them alone, so we only receive 1 frag at a time. */
465         LASSERT (conn->ksnc_rx_niov > 0);
466         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
467
468         set_fs (KERNEL_DS);
469         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
470         /* NB this is just a boolean............................^ */
471         set_fs (oldmm);
472
473         if (rc <= 0)
474                 return (rc);
475
476         /* received something... */
477         conn->ksnc_peer->ksnp_last_alive = jiffies;
478         conn->ksnc_rx_deadline = jiffies + 
479                                  ksocknal_data.ksnd_io_timeout * HZ;
480         mb();                           /* order with setting rx_started */
481         conn->ksnc_rx_started = 1;
482
483         conn->ksnc_rx_nob_wanted -= rc;
484         conn->ksnc_rx_nob_left -= rc;
485                 
486         if (rc < fragsize) {
487                 iov->iov_base = (void *)(vaddr + rc);
488                 iov->iov_len = fragsize - rc;
489                 return (-EAGAIN);
490         }
491
492         conn->ksnc_rx_iov++;
493         conn->ksnc_rx_niov--;
494         return (1);
495 }
496
497 int
498 ksocknal_recv_kiov (ksock_conn_t *conn)
499 {
500         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
501         struct page  *page = kiov->kiov_page;
502         int           offset = kiov->kiov_offset;
503         int           fragsize = kiov->kiov_len;
504         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
505         struct iovec  fragiov = { .iov_base = (void *)vaddr,
506                                   .iov_len  = fragsize};
507         struct msghdr msg = {
508                 .msg_name       = NULL,
509                 .msg_namelen    = 0,
510                 .msg_iov        = &fragiov,
511                 .msg_iovlen     = 1,
512                 .msg_control    = NULL,
513                 .msg_controllen = 0,
514                 .msg_flags      = 0
515         };
516         mm_segment_t oldmm = get_fs();
517         int          rc;
518
519         /* NB we can't trust socket ops to either consume our iovs
520          * or leave them alone, so we only receive 1 frag at a time. */
521         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
522         LASSERT (conn->ksnc_rx_nkiov > 0);
523         LASSERT (offset + fragsize <= PAGE_SIZE);
524
525         set_fs (KERNEL_DS);
526         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
527         /* NB this is just a boolean............................^ */
528         set_fs (oldmm);
529
530         kunmap (page);
531         
532         if (rc <= 0)
533                 return (rc);
534         
535         /* received something... */
536         conn->ksnc_peer->ksnp_last_alive = jiffies;
537         conn->ksnc_rx_deadline = jiffies + 
538                                  ksocknal_data.ksnd_io_timeout * HZ;
539         mb();                           /* order with setting rx_started */
540         conn->ksnc_rx_started = 1;
541
542         conn->ksnc_rx_nob_wanted -= rc;
543         conn->ksnc_rx_nob_left -= rc;
544                 
545         if (rc < fragsize) {
546                 kiov->kiov_offset = offset + rc;
547                 kiov->kiov_len = fragsize - rc;
548                 return (-EAGAIN);
549         }
550
551         conn->ksnc_rx_kiov++;
552         conn->ksnc_rx_nkiov--;
553         return (1);
554 }
555
556 int
557 ksocknal_recvmsg (ksock_conn_t *conn) 
558 {
559         /* Return 1 on success, 0 on EOF, < 0 on error.
560          * Caller checks ksnc_rx_nob_wanted to determine
561          * progress/completion. */
562         int     rc;
563         ENTRY;
564         
565         if (ksocknal_data.ksnd_stall_rx != 0) {
566                 set_current_state (TASK_UNINTERRUPTIBLE);
567                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
568         }
569
570         rc = ksocknal_getconnsock (conn);
571         if (rc != 0)
572                 return (rc);
573
574         for (;;) {
575                 if (conn->ksnc_closing) {
576                         rc = -ESHUTDOWN;
577                         break;
578                 }
579
580                 if (conn->ksnc_rx_niov != 0)
581                         rc = ksocknal_recv_iov (conn);
582                 else
583                         rc = ksocknal_recv_kiov (conn);
584
585                 if (rc <= 0) {
586                         /* error/EOF or partial receive */
587                         if (rc == -EAGAIN) {
588                                 rc = 1;
589                         } else if (rc == 0 && conn->ksnc_rx_started) {
590                                 /* EOF in the middle of a message */
591                                 rc = -EPROTO;
592                         }
593                         break;
594                 }
595
596                 /* Completed a fragment */
597
598                 if (conn->ksnc_rx_nob_wanted == 0) {
599                         /* Completed a message segment (header or payload) */
600                         if ((ksocknal_data.ksnd_eager_ack & conn->ksnc_type) != 0 &&
601                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
602                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
603                                 /* Remind the socket to ack eagerly... */
604                                 ksocknal_eager_ack(conn);
605                         }
606                         rc = 1;
607                         break;
608                 }
609         }
610
611         ksocknal_putconnsock (conn);
612         RETURN (rc);
613 }
614
615 #if SOCKNAL_ZC
616 void
617 ksocknal_zc_callback (zccd_t *zcd)
618 {
619         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
620         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
621         unsigned long  flags;
622         ENTRY;
623
624         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
625
626         spin_lock_irqsave (&sched->kss_lock, flags);
627
628         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
629         wake_up (&sched->kss_waitq);
630
631         spin_unlock_irqrestore (&sched->kss_lock, flags);
632         EXIT;
633 }
634 #endif
635
636 void
637 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
638 {
639         ksock_ltx_t   *ltx;
640         ENTRY;
641
642         if (tx->tx_conn != NULL) {
643                 /* This tx got queued on a conn; do the accounting... */
644                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
645 #if SOCKNAL_ZC
646                 /* zero copy completion isn't always from
647                  * process_transmit() so it needs to keep a ref on
648                  * tx_conn... */
649                 if (asynch)
650                         ksocknal_put_conn (tx->tx_conn);
651 #else
652                 LASSERT (!asynch);
653 #endif
654         }
655
656         if (tx->tx_isfwd) {             /* was a forwarded packet? */
657                 kpr_fwd_done (&ksocknal_data.ksnd_router,
658                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
659                 EXIT;
660                 return;
661         }
662
663         /* local send */
664         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
665
666         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
667
668         ksocknal_put_ltx (ltx);
669         EXIT;
670 }
671
672 void
673 ksocknal_tx_launched (ksock_tx_t *tx) 
674 {
675 #if SOCKNAL_ZC
676         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
677                 ksock_conn_t  *conn = tx->tx_conn;
678                 
679                 /* zccd skbufs are still in-flight.  First take a ref on
680                  * conn, so it hangs about for ksocknal_tx_done... */
681                 atomic_inc (&conn->ksnc_refcount);
682
683                 /* ...then drop the initial ref on zccd, so the zero copy
684                  * callback can occur */
685                 zccd_put (&tx->tx_zccd);
686                 return;
687         }
688 #endif
689         /* Any zero-copy-ness (if any) has completed; I can complete the
690          * transmit now, avoiding an extra schedule */
691         ksocknal_tx_done (tx, 0);
692 }
693
694 int
695 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
696 {
697         int            rc;
698        
699         rc = ksocknal_sendmsg (conn, tx);
700
701         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
702         LASSERT (rc != -EAGAIN);
703
704         if (rc == 0) {
705                 /* no errors */
706                 if (tx->tx_resid != 0) {
707                         /* didn't send everything */
708                         return (-EAGAIN);
709                 }
710                 
711                 ksocknal_tx_launched (tx);
712                 return (0);
713         }
714
715         if (!conn->ksnc_closing)
716                 CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
717                         conn, rc, conn->ksnc_peer->ksnp_nid,
718                         conn->ksnc_ipaddr, conn->ksnc_port);
719
720         ksocknal_close_conn_and_siblings (conn, rc);
721         ksocknal_tx_launched (tx);
722
723         return (rc);
724
725
726 void
727 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
728 {
729         unsigned long     flags;
730
731         /* called holding write lock on ksnd_global_lock */
732
733         LASSERT (!route->ksnr_deleted);
734         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
735         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
736         LASSERT (!route->ksnr_connecting);
737         
738         if (ksocknal_data.ksnd_typed_conns)
739                 route->ksnr_connecting = 
740                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
741         else
742                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
743
744         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
745         
746         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
747         
748         list_add_tail (&route->ksnr_connect_list,
749                        &ksocknal_data.ksnd_autoconnectd_routes);
750         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
751         
752         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
753 }
754
755 ksock_peer_t *
756 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
757 {
758         ptl_nid_t     target_nid;
759         int           rc;
760         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
761         
762         if (peer != NULL)
763                 return (peer);
764         
765         if (tx->tx_isfwd) {
766                 CERROR ("Can't send packet to "LPX64
767                         ": routed target is not a peer\n", nid);
768                 return (NULL);
769         }
770         
771         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
772                          &target_nid);
773         if (rc != 0) {
774                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
775                 return (NULL);
776         }
777
778         peer = ksocknal_find_peer_locked (target_nid);
779         if (peer != NULL)
780                 return (peer);
781
782         CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
783         return (NULL);
784 }
785
786 ksock_conn_t *
787 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
788 {
789         struct list_head *tmp;
790         ksock_conn_t     *typed = NULL;
791         int               tnob  = 0;
792         ksock_conn_t     *fallback = NULL;
793         int               fnob     = 0;
794         
795         /* Find the conn with the shortest tx queue */
796         list_for_each (tmp, &peer->ksnp_conns) {
797                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
798                 int           nob = atomic_read(&c->ksnc_tx_nob);
799
800                 LASSERT (!c->ksnc_closing);
801
802                 if (fallback == NULL || nob < fnob) {
803                         fallback = c;
804                         fnob     = nob;
805                 }
806
807                 if (!ksocknal_data.ksnd_typed_conns)
808                         continue;
809
810                 switch (c->ksnc_type) {
811                 default:
812                         LBUG();
813                 case SOCKNAL_CONN_ANY:
814                         break;
815                 case SOCKNAL_CONN_BULK_IN:
816                         continue;
817                 case SOCKNAL_CONN_BULK_OUT:
818                         if (tx->tx_nob < ksocknal_data.ksnd_min_bulk)
819                                 continue;
820                         break;
821                 case SOCKNAL_CONN_CONTROL:
822                         if (tx->tx_nob >= ksocknal_data.ksnd_min_bulk)
823                                 continue;
824                         break;
825                 }
826
827                 if (typed == NULL || nob < tnob) {
828                         typed = c;
829                         tnob  = nob;
830                 }
831         }
832
833         /* prefer the typed selection */
834         return ((typed != NULL) ? typed : fallback);
835 }
836
837 void
838 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
839 {
840         unsigned long  flags;
841         ksock_sched_t *sched = conn->ksnc_scheduler;
842
843         /* called holding global lock (read or irq-write) */
844
845         CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
846                 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
847
848         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
849         tx->tx_resid = tx->tx_nob;
850         tx->tx_conn = conn;
851
852 #if SOCKNAL_ZC
853         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
854         /* NB this sets 1 ref on zccd, so the callback can only occur after
855          * I've released this ref. */
856 #endif
857
858         spin_lock_irqsave (&sched->kss_lock, flags);
859
860         conn->ksnc_tx_deadline = jiffies + 
861                                  ksocknal_data.ksnd_io_timeout * HZ;
862         mb();                                   /* order with list_add_tail */
863
864         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
865                 
866         if (conn->ksnc_tx_ready &&      /* able to send */
867             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
868                 /* +1 ref for scheduler */
869                 atomic_inc (&conn->ksnc_refcount);
870                 list_add_tail (&conn->ksnc_tx_list, 
871                                &sched->kss_tx_conns);
872                 conn->ksnc_tx_scheduled = 1;
873                 wake_up (&sched->kss_waitq);
874         }
875
876         spin_unlock_irqrestore (&sched->kss_lock, flags);
877 }
878
879 ksock_route_t *
880 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
881 {
882         struct list_head  *tmp;
883         ksock_route_t     *route;
884         ksock_route_t     *candidate = NULL;
885         int                found = 0;
886         int                bits;
887         
888         list_for_each (tmp, &peer->ksnp_routes) {
889                 route = list_entry (tmp, ksock_route_t, ksnr_list);
890                 bits  = route->ksnr_connected;
891                 
892                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
893                     (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
894                     route->ksnr_connecting != 0) {
895                         /* All typed connections have been established, or
896                          * an untyped connection has been established, or
897                          * connections are currently being established */
898                         found = 1;
899                         continue;
900                 }
901
902                 /* too soon to retry this guy? */
903                 if (!time_after_eq (jiffies, route->ksnr_timeout))
904                         continue;
905                 
906                 /* always do eager routes */
907                 if (route->ksnr_eager)
908                         return (route);
909
910                 if (candidate == NULL) {
911                         /* If we don't find any other route that is fully
912                          * connected or connecting, the first connectable
913                          * route is returned.  If it fails to connect, it
914                          * will get placed at the end of the list */
915                         candidate = route;
916                 }
917         }
918  
919         return (found ? NULL : candidate);
920 }
921
922 ksock_route_t *
923 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
924 {
925         struct list_head  *tmp;
926         ksock_route_t     *route;
927
928         list_for_each (tmp, &peer->ksnp_routes) {
929                 route = list_entry (tmp, ksock_route_t, ksnr_list);
930                 
931                 if (route->ksnr_connecting != 0)
932                         return (route);
933         }
934         
935         return (NULL);
936 }
937
938 int
939 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
940 {
941         unsigned long     flags;
942         ksock_peer_t     *peer;
943         ksock_conn_t     *conn;
944         ksock_route_t    *route;
945         rwlock_t         *g_lock;
946         
947         /* Ensure the frags we've been given EXACTLY match the number of
948          * bytes we want to send.  Many TCP/IP stacks disregard any total
949          * size parameters passed to them and just look at the frags. 
950          *
951          * We always expect at least 1 mapped fragment containing the
952          * complete portals header. */
953         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
954                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
955         LASSERT (tx->tx_niov >= 1);
956         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
957
958         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
959                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
960                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
961
962         tx->tx_conn = NULL;                     /* only set when assigned a conn */
963
964         g_lock = &ksocknal_data.ksnd_global_lock;
965         read_lock (g_lock);
966         
967         peer = ksocknal_find_target_peer_locked (tx, nid);
968         if (peer == NULL) {
969                 read_unlock (g_lock);
970                 return (-EHOSTUNREACH);
971         }
972
973         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
974                 conn = ksocknal_find_conn_locked (tx, peer);
975                 if (conn != NULL) {
976                         /* I've got no autoconnect routes that need to be
977                          * connecting and I do have an actual connection... */
978                         ksocknal_queue_tx_locked (tx, conn);
979                         read_unlock (g_lock);
980                         return (0);
981                 }
982         }
983         
984         /* Making one or more connections; I'll need a write lock... */
985
986         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
987         read_unlock (g_lock);
988         write_lock_irqsave (g_lock, flags);
989         
990         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
991                 write_unlock_irqrestore (g_lock, flags);
992                 ksocknal_put_peer (peer);
993                 return (-EHOSTUNREACH);
994         }
995         ksocknal_put_peer (peer);               /* drop ref I got above */
996
997         for (;;) {
998                 /* launch any/all autoconnections that need it */
999                 route = ksocknal_find_connectable_route_locked (peer);
1000                 if (route == NULL)
1001                         break;
1002
1003                 ksocknal_launch_autoconnect_locked (route);
1004         }
1005
1006         conn = ksocknal_find_conn_locked (tx, peer);
1007         if (conn != NULL) {
1008                 /* Connection exists; queue message on it */
1009                 ksocknal_queue_tx_locked (tx, conn);
1010                 write_unlock_irqrestore (g_lock, flags);
1011                 return (0);
1012         }
1013
1014         route = ksocknal_find_connecting_route_locked (peer);
1015         if (route != NULL) {
1016                 /* At least 1 connection is being established; queue the
1017                  * message... */
1018                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1019                 write_unlock_irqrestore (g_lock, flags);
1020                 return (0);
1021         }
1022         
1023         write_unlock_irqrestore (g_lock, flags);
1024         return (-EHOSTUNREACH);
1025 }
1026
1027 ksock_ltx_t *
1028 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1029                     ptl_hdr_t *hdr, int type)
1030 {
1031         ksock_ltx_t  *ltx;
1032
1033         /* I may not block for a transmit descriptor if I might block the
1034          * receiver, or an interrupt handler. */
1035         ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
1036                                   type == PTL_MSG_REPLY ||
1037                                   in_interrupt ()));
1038         if (ltx == NULL) {
1039                 CERROR ("Can't allocate tx desc\n");
1040                 return (NULL);
1041         }
1042
1043         /* Init local send packet (storage for hdr, finalize() args) */
1044         ltx->ltx_hdr = *hdr;
1045         ltx->ltx_private = private;
1046         ltx->ltx_cookie = cookie;
1047         
1048         /* Init common ltx_tx */
1049         ltx->ltx_tx.tx_isfwd = 0;
1050         ltx->ltx_tx.tx_nob = sizeof (*hdr);
1051
1052         /* We always have 1 mapped frag for the header */
1053         ltx->ltx_tx.tx_niov = 1;
1054         ltx->ltx_tx.tx_iov = &ltx->ltx_iov_space.hdr;
1055         ltx->ltx_tx.tx_iov[0].iov_base = &ltx->ltx_hdr;
1056         ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
1057
1058         ltx->ltx_tx.tx_kiov  = NULL;
1059         ltx->ltx_tx.tx_nkiov = 0;
1060
1061         return (ltx);
1062 }
1063
1064 int
1065 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1066                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1067                unsigned int payload_niov, struct iovec *payload_iov,
1068                size_t payload_len)
1069 {
1070         ksock_ltx_t  *ltx;
1071         int           rc;
1072
1073         /* NB 'private' is different depending on what we're sending.
1074          * Just ignore it until we can rely on it
1075          */
1076
1077         CDEBUG(D_NET,
1078                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
1079                " pid %d\n", payload_len, payload_niov, nid, pid);
1080
1081         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1082         if (ltx == NULL)
1083                 return (PTL_FAIL);
1084
1085         /* append the payload_iovs to the one pointing at the header */
1086         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1087         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1088
1089         memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
1090                 payload_niov * sizeof (*payload_iov));
1091         ltx->ltx_tx.tx_niov = 1 + payload_niov;
1092         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1093
1094         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1095         if (rc == 0)
1096                 return (PTL_OK);
1097         
1098         ksocknal_put_ltx (ltx);
1099         return (PTL_FAIL);
1100 }
1101
1102 int
1103 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1104                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1105                      unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
1106 {
1107         ksock_ltx_t *ltx;
1108         int          rc;
1109
1110         /* NB 'private' is different depending on what we're sending.
1111          * Just ignore it until we can rely on it */
1112
1113         CDEBUG(D_NET,
1114                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
1115                payload_len, payload_niov, nid, pid);
1116
1117         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1118         if (ltx == NULL)
1119                 return (PTL_FAIL);
1120
1121         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1122         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1123         
1124         ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
1125         memcpy (ltx->ltx_tx.tx_kiov, payload_iov, 
1126                 payload_niov * sizeof (*payload_iov));
1127         ltx->ltx_tx.tx_nkiov = payload_niov;
1128         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1129
1130         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1131         if (rc == 0)
1132                 return (PTL_OK);
1133
1134         ksocknal_put_ltx (ltx);
1135         return (PTL_FAIL);
1136 }
1137
1138 void
1139 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1140 {
1141         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1142         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
1143         int           rc;
1144         
1145         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1146                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1147
1148         /* I'm the gateway; must be the last hop */
1149         if (nid == ksocknal_lib.ni.nid)
1150                 nid = fwd->kprfd_target_nid;
1151
1152         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
1153         tx->tx_nob   = fwd->kprfd_nob;
1154         tx->tx_niov  = fwd->kprfd_niov;
1155         tx->tx_iov   = fwd->kprfd_iov;
1156         tx->tx_nkiov = 0;
1157         tx->tx_kiov  = NULL;
1158         tx->tx_hdr   = (ptl_hdr_t *)fwd->kprfd_iov[0].iov_base;
1159
1160         rc = ksocknal_launch_packet (tx, nid);
1161         if (rc != 0)
1162                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1163 }
1164
1165 int
1166 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1167 {
1168         long    pid = kernel_thread (fn, arg, 0);
1169
1170         if (pid < 0)
1171                 return ((int)pid);
1172
1173         atomic_inc (&ksocknal_data.ksnd_nthreads);
1174         return (0);
1175 }
1176
1177 void
1178 ksocknal_thread_fini (void)
1179 {
1180         atomic_dec (&ksocknal_data.ksnd_nthreads);
1181 }
1182
1183 void
1184 ksocknal_fmb_callback (void *arg, int error)
1185 {
1186         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1187         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1188         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1189         ksock_conn_t      *conn = NULL;
1190         ksock_sched_t     *sched;
1191         unsigned long      flags;
1192
1193         if (error != 0)
1194                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1195                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1196                        error);
1197         else
1198                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1199                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1200
1201         /* drop peer ref taken on init */
1202         ksocknal_put_peer (fmb->fmb_peer);
1203         
1204         spin_lock_irqsave (&fmp->fmp_lock, flags);
1205
1206         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1207
1208         if (!list_empty (&fmp->fmp_blocked_conns)) {
1209                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1210                                    ksock_conn_t, ksnc_rx_list);
1211                 list_del (&conn->ksnc_rx_list);
1212         }
1213
1214         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1215
1216         if (conn == NULL)
1217                 return;
1218
1219         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1220         LASSERT (conn->ksnc_rx_scheduled);
1221         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1222
1223         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1224
1225         sched = conn->ksnc_scheduler;
1226
1227         spin_lock_irqsave (&sched->kss_lock, flags);
1228
1229         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1230         wake_up (&sched->kss_waitq);
1231
1232         spin_unlock_irqrestore (&sched->kss_lock, flags);
1233 }
1234
1235 ksock_fmb_t *
1236 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1237 {
1238         int               payload_nob = conn->ksnc_rx_nob_left;
1239         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1240         unsigned long     flags;
1241         ksock_fmb_pool_t *pool;
1242         ksock_fmb_t      *fmb;
1243
1244         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1245         LASSERT (ksocknal_data.ksnd_fmbs != NULL);
1246
1247         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1248                 pool = &ksocknal_data.ksnd_small_fmp;
1249         else
1250                 pool = &ksocknal_data.ksnd_large_fmp;
1251
1252         spin_lock_irqsave (&pool->fmp_lock, flags);
1253
1254         if (!list_empty (&pool->fmp_idle_fmbs)) {
1255                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1256                                  ksock_fmb_t, fmb_list);
1257                 list_del (&fmb->fmb_list);
1258                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1259
1260                 return (fmb);
1261         }
1262
1263         /* deschedule until fmb free */
1264
1265         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1266
1267         list_add_tail (&conn->ksnc_rx_list,
1268                        &pool->fmp_blocked_conns);
1269
1270         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1271         return (NULL);
1272 }
1273
1274 int
1275 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1276 {
1277         int payload_nob = conn->ksnc_rx_nob_left;
1278         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1279         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1280         int niov;                               /* at least the header */
1281         int nob;
1282
1283         LASSERT (conn->ksnc_rx_scheduled);
1284         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1285         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1286         LASSERT (payload_nob >= 0);
1287         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1288         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1289
1290         /* Got a forwarding buffer; copy the header we just read into the
1291          * forwarding buffer.  If there's payload, start reading reading it
1292          * into the buffer, otherwise the forwarding buffer can be kicked
1293          * off immediately.
1294          *
1295          * NB fmb->fmb_iov spans the WHOLE packet.
1296          *    conn->ksnc_rx_iov spans just the payload.
1297          */
1298         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1299
1300         /* copy header */
1301         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1302
1303         /* Take a ref on the conn's peer to prevent module unload before
1304          * forwarding completes.  NB we ref peer and not conn since because
1305          * all refs on conn after it has been closed must remove themselves
1306          * in finite time */
1307         fmb->fmb_peer = conn->ksnc_peer;
1308         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1309
1310         if (payload_nob == 0) {         /* got complete packet already */
1311                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1312                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1313                         dest_nid, packet_nob);
1314
1315                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1316
1317                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1318                               packet_nob, 1, fmb->fmb_iov,
1319                               ksocknal_fmb_callback, fmb);
1320
1321                 /* forward it now */
1322                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1323
1324                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1325                 return (1);
1326         }
1327
1328         niov = 1;
1329         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1330                 fmb->fmb_iov[0].iov_len = packet_nob;
1331         } else {
1332                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1333                 nob = packet_nob - PAGE_SIZE;
1334
1335                 do {
1336                         LASSERT (niov < fmb->fmb_npages);
1337                         fmb->fmb_iov[niov].iov_base =
1338                                 page_address (fmb->fmb_pages[niov]);
1339                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1340                         nob -= PAGE_SIZE;
1341                         niov++;
1342                 } while (nob > 0);
1343         }
1344
1345         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1346                       packet_nob, niov, fmb->fmb_iov,
1347                       ksocknal_fmb_callback, fmb);
1348
1349         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1350         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1351         
1352         /* payload is desc's iov-ed buffer, but skipping the hdr */
1353         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1354                  sizeof (struct iovec));
1355
1356         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1357         conn->ksnc_rx_iov[0].iov_base =
1358                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1359                          sizeof (ptl_hdr_t));
1360         conn->ksnc_rx_iov[0].iov_len =
1361                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1362
1363         if (niov > 1)
1364                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1365                        (niov - 1) * sizeof (struct iovec));
1366
1367         conn->ksnc_rx_niov = niov;
1368
1369         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1370                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1371         return (0);
1372 }
1373
1374 void
1375 ksocknal_fwd_parse (ksock_conn_t *conn)
1376 {
1377         ksock_peer_t *peer;
1378         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1379         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1380         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1381         char str[PTL_NALFMT_SIZE];
1382
1383         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1384                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1385
1386         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1387         LASSERT (conn->ksnc_rx_scheduled);
1388
1389         if (body_len < 0) {                 /* length corrupt (overflow) */
1390                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1391                        "packet size %d illegal\n",
1392                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1393                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1394                        body_len);
1395
1396                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1397                 return;
1398         }
1399
1400         if (ksocknal_data.ksnd_fmbs == NULL) {        /* not forwarding */
1401                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1402                        " (%s): not forwarding\n",
1403                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1404                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1405                 /* on to new packet (skip this one's body) */
1406                 ksocknal_new_packet (conn, body_len);
1407                 return;
1408         }
1409
1410         if (body_len > PTL_MTU) {      /* too big to forward */
1411                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1412                         "(%s): packet size %d too big\n",
1413                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1414                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1415                         body_len);
1416                 /* on to new packet (skip this one's body) */
1417                 ksocknal_new_packet (conn, body_len);
1418                 return;
1419         }
1420
1421         /* should have gone direct */
1422         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1423         if (peer != NULL) {
1424                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1425                         "(%s): target is a peer\n",
1426                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1427                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1428                 ksocknal_put_peer (peer);  /* drop ref from get above */
1429
1430                 /* on to next packet (skip this one's body) */
1431                 ksocknal_new_packet (conn, body_len);
1432                 return;
1433         }
1434
1435         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1436         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1437         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1438 }
1439
1440 int
1441 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1442 {
1443         static char ksocknal_slop_buffer[4096];
1444
1445         int   nob;
1446         int   niov;
1447         int   skipped;
1448
1449         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1450                 conn->ksnc_rx_started = 0;
1451                 mb ();                          /* racing with timeout thread */
1452                 
1453                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1454                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1455                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1456
1457                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1458                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1459                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1460                 conn->ksnc_rx_niov = 1;
1461
1462                 conn->ksnc_rx_kiov = NULL;
1463                 conn->ksnc_rx_nkiov = 0;
1464                 return (1);
1465         }
1466
1467         /* Set up to skip as much a possible now.  If there's more left
1468          * (ran out of iov entries) we'll get called again */
1469
1470         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1471         conn->ksnc_rx_nob_left = nob_to_skip;
1472         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1473         skipped = 0;
1474         niov = 0;
1475
1476         do {
1477                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1478
1479                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1480                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1481                 niov++;
1482                 skipped += nob;
1483                 nob_to_skip -=nob;
1484
1485         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1486                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1487
1488         conn->ksnc_rx_niov = niov;
1489         conn->ksnc_rx_kiov = NULL;
1490         conn->ksnc_rx_nkiov = 0;
1491         conn->ksnc_rx_nob_wanted = skipped;
1492         return (0);
1493 }
1494
1495 int
1496 ksocknal_process_receive (ksock_conn_t *conn)
1497 {
1498         ksock_fmb_t  *fmb;
1499         int           rc;
1500         
1501         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1502
1503         /* doesn't need a forwarding buffer */
1504         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1505                 goto try_read;
1506
1507  get_fmb:
1508         fmb = ksocknal_get_idle_fmb (conn);
1509         if (fmb == NULL) {
1510                 /* conn descheduled waiting for idle fmb */
1511                 return (0);
1512         }
1513
1514         if (ksocknal_init_fmb (conn, fmb)) {
1515                 /* packet forwarded */
1516                 return (0);
1517         }
1518
1519  try_read:
1520         /* NB: sched lock NOT held */
1521         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1522                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1523                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1524                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1525
1526         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1527
1528         rc = ksocknal_recvmsg(conn);
1529
1530         if (rc <= 0) {
1531                 if (rc == 0)
1532                         CWARN ("[%p] EOF from "LPX64" ip %08x:%d\n",
1533                                conn, conn->ksnc_peer->ksnp_nid,
1534                                conn->ksnc_ipaddr, conn->ksnc_port);
1535                 else if (!conn->ksnc_closing)
1536                         CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1537                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1538                                 conn->ksnc_ipaddr, conn->ksnc_port);
1539
1540                 ksocknal_close_conn_and_siblings (conn, rc);
1541                 return (rc == 0 ? -ESHUTDOWN : rc);
1542         }
1543
1544         if (conn->ksnc_rx_nob_wanted != 0) {
1545                 /* short read */
1546                 return (-EAGAIN);
1547         }
1548         
1549         switch (conn->ksnc_rx_state) {
1550         case SOCKNAL_RX_HEADER:
1551                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1552                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1553                         /* This packet isn't for me */
1554                         ksocknal_fwd_parse (conn);
1555                         switch (conn->ksnc_rx_state) {
1556                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1557                                 return (0);     /* => come back later */
1558                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1559                                 goto try_read;  /* => go read it */
1560                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1561                                 goto get_fmb;   /* => go get a fwd msg buffer */
1562                         default:
1563                                 LBUG ();
1564                         }
1565                         /* Not Reached */
1566                 }
1567
1568                 /* sets wanted_len, iovs etc */
1569                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1570
1571                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1572                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1573                         goto try_read;          /* go read the payload */
1574                 }
1575                 /* Fall through (completed packet for me) */
1576
1577         case SOCKNAL_RX_BODY:
1578                 /* payload all received */
1579                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1580                 /* Fall through */
1581
1582         case SOCKNAL_RX_SLOP:
1583                 /* starting new packet? */
1584                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1585                         return (0);     /* come back later */
1586                 goto try_read;          /* try to finish reading slop now */
1587
1588         case SOCKNAL_RX_BODY_FWD:
1589                 /* payload all received */
1590                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1591                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1592                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1593                         conn->ksnc_rx_nob_left);
1594
1595                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1596                  * conn->ksnc_cookie */
1597                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1598                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1599
1600                 /* no slop in forwarded packets */
1601                 LASSERT (conn->ksnc_rx_nob_left == 0);
1602
1603                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1604                 return (0);                     /* (later) */
1605
1606         default:
1607                 break;
1608         }
1609
1610         /* Not Reached */
1611         LBUG ();
1612         return (-EINVAL);                       /* keep gcc happy */
1613 }
1614
1615 int
1616 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1617                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1618 {
1619         ksock_conn_t *conn = (ksock_conn_t *)private;
1620
1621         LASSERT (mlen <= rlen);
1622         LASSERT (niov <= PTL_MD_MAX_IOV);
1623         
1624         conn->ksnc_cookie = msg;
1625         conn->ksnc_rx_nob_wanted = mlen;
1626         conn->ksnc_rx_nob_left   = rlen;
1627
1628         conn->ksnc_rx_nkiov = 0;
1629         conn->ksnc_rx_kiov = NULL;
1630         conn->ksnc_rx_niov = niov;
1631         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1632         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1633
1634         LASSERT (mlen == 
1635                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1636                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1637
1638         return (rlen);
1639 }
1640
1641 int
1642 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1643                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1644 {
1645         ksock_conn_t *conn = (ksock_conn_t *)private;
1646
1647         LASSERT (mlen <= rlen);
1648         LASSERT (niov <= PTL_MD_MAX_IOV);
1649         
1650         conn->ksnc_cookie = msg;
1651         conn->ksnc_rx_nob_wanted = mlen;
1652         conn->ksnc_rx_nob_left   = rlen;
1653
1654         conn->ksnc_rx_niov = 0;
1655         conn->ksnc_rx_iov  = NULL;
1656         conn->ksnc_rx_nkiov = niov;
1657         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1658         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1659
1660         LASSERT (mlen == 
1661                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1662                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1663
1664         return (rlen);
1665 }
1666
1667 int ksocknal_scheduler (void *arg)
1668 {
1669         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1670         ksock_conn_t      *conn;
1671         ksock_tx_t        *tx;
1672         unsigned long      flags;
1673         int                rc;
1674         int                nloops = 0;
1675         int                id = sched - ksocknal_data.ksnd_schedulers;
1676         char               name[16];
1677
1678         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1679         kportal_daemonize (name);
1680         kportal_blockallsigs ();
1681
1682         current->flags |= PF_MEMALLOC;
1683
1684 #if (CONFIG_SMP && CPU_AFFINITY)
1685         if ((cpu_online_map & (1 << id)) != 0) {
1686 #if 1
1687                 current->cpus_allowed = (1 << id);
1688 #else
1689                 set_cpus_allowed (current, 1<<id);
1690 #endif
1691         } else {
1692                 CERROR ("Can't set CPU affinity for %s\n", name);
1693         }
1694 #endif /* CONFIG_SMP && CPU_AFFINITY */
1695         
1696         spin_lock_irqsave (&sched->kss_lock, flags);
1697
1698         while (!ksocknal_data.ksnd_shuttingdown) {
1699                 int did_something = 0;
1700
1701                 /* Ensure I progress everything semi-fairly */
1702
1703                 if (!list_empty (&sched->kss_rx_conns)) {
1704                         conn = list_entry(sched->kss_rx_conns.next,
1705                                           ksock_conn_t, ksnc_rx_list);
1706                         list_del(&conn->ksnc_rx_list);
1707
1708                         LASSERT(conn->ksnc_rx_scheduled);
1709                         LASSERT(conn->ksnc_rx_ready);
1710
1711                         /* clear rx_ready in case receive isn't complete.
1712                          * Do it BEFORE we call process_recv, since
1713                          * data_ready can set it any time after we release
1714                          * kss_lock. */
1715                         conn->ksnc_rx_ready = 0;
1716                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1717                         
1718                         rc = ksocknal_process_receive(conn);
1719                         
1720                         spin_lock_irqsave(&sched->kss_lock, flags);
1721
1722                         /* I'm the only one that can clear this flag */
1723                         LASSERT(conn->ksnc_rx_scheduled);
1724
1725                         /* Did process_receive get everything it wanted? */
1726                         if (rc == 0)
1727                                 conn->ksnc_rx_ready = 1;
1728                         
1729                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1730                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1731                                 /* Conn blocked for a forwarding buffer.
1732                                  * It will get queued for my attention when
1733                                  * one becomes available (and it might just
1734                                  * already have been!).  Meanwhile my ref
1735                                  * on it stays put. */
1736                         } else if (conn->ksnc_rx_ready) {
1737                                 /* reschedule for rx */
1738                                 list_add_tail (&conn->ksnc_rx_list,
1739                                                &sched->kss_rx_conns);
1740                         } else {
1741                                 conn->ksnc_rx_scheduled = 0;
1742                                 /* drop my ref */
1743                                 ksocknal_put_conn(conn);
1744                         }
1745
1746                         did_something = 1;
1747                 }
1748
1749                 if (!list_empty (&sched->kss_tx_conns)) {
1750                         conn = list_entry(sched->kss_tx_conns.next,
1751                                           ksock_conn_t, ksnc_tx_list);
1752                         list_del (&conn->ksnc_tx_list);
1753                         
1754                         LASSERT(conn->ksnc_tx_scheduled);
1755                         LASSERT(conn->ksnc_tx_ready);
1756                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1757                         
1758                         tx = list_entry(conn->ksnc_tx_queue.next,
1759                                         ksock_tx_t, tx_list);
1760                         /* dequeue now so empty list => more to send */
1761                         list_del(&tx->tx_list);
1762                         
1763                         /* Clear tx_ready in case send isn't complete.  Do
1764                          * it BEFORE we call process_transmit, since
1765                          * write_space can set it any time after we release
1766                          * kss_lock. */
1767                         conn->ksnc_tx_ready = 0;
1768                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1769                         
1770                         rc = ksocknal_process_transmit(conn, tx);
1771                         
1772                         spin_lock_irqsave (&sched->kss_lock, flags);
1773
1774                         if (rc != -EAGAIN) {
1775                                 /* error or everything went: assume more can go */
1776                                 conn->ksnc_tx_ready = 1;
1777                         } else {
1778                                  /* back onto HEAD of tx_queue */
1779                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1780                         }
1781                         
1782                         if (conn->ksnc_tx_ready &&
1783                             !list_empty (&conn->ksnc_tx_queue)) {
1784                                 /* reschedule for tx */
1785                                 list_add_tail (&conn->ksnc_tx_list, 
1786                                                &sched->kss_tx_conns);
1787                         } else {
1788                                 conn->ksnc_tx_scheduled = 0;
1789                                 /* drop my ref */
1790                                 ksocknal_put_conn (conn);
1791                         }
1792                                 
1793                         did_something = 1;
1794                 }
1795 #if SOCKNAL_ZC
1796                 if (!list_empty (&sched->kss_zctxdone_list)) {
1797                         ksock_tx_t *tx =
1798                                 list_entry(sched->kss_zctxdone_list.next,
1799                                            ksock_tx_t, tx_list);
1800                         did_something = 1;
1801
1802                         list_del (&tx->tx_list);
1803                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1804
1805                         ksocknal_tx_done (tx, 1);
1806
1807                         spin_lock_irqsave (&sched->kss_lock, flags);
1808                 }
1809 #endif
1810                 if (!did_something ||           /* nothing to do */
1811                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1812                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1813
1814                         nloops = 0;
1815
1816                         if (!did_something) {   /* wait for something to do */
1817 #if SOCKNAL_ZC
1818                                 rc = wait_event_interruptible (sched->kss_waitq,
1819                                                                ksocknal_data.ksnd_shuttingdown ||
1820                                                                !list_empty(&sched->kss_rx_conns) ||
1821                                                                !list_empty(&sched->kss_tx_conns) ||
1822                                                                !list_empty(&sched->kss_zctxdone_list));
1823 #else
1824                                 rc = wait_event_interruptible (sched->kss_waitq,
1825                                                                ksocknal_data.ksnd_shuttingdown ||
1826                                                                !list_empty(&sched->kss_rx_conns) ||
1827                                                                !list_empty(&sched->kss_tx_conns));
1828 #endif
1829                                 LASSERT (rc == 0);
1830                         } else
1831                                our_cond_resched();
1832
1833                         spin_lock_irqsave (&sched->kss_lock, flags);
1834                 }
1835         }
1836
1837         spin_unlock_irqrestore (&sched->kss_lock, flags);
1838         ksocknal_thread_fini ();
1839         return (0);
1840 }
1841
1842 void
1843 ksocknal_data_ready (struct sock *sk, int n)
1844 {
1845         unsigned long  flags;
1846         ksock_conn_t  *conn;
1847         ksock_sched_t *sched;
1848         ENTRY;
1849
1850         /* interleave correctly with closing sockets... */
1851         read_lock (&ksocknal_data.ksnd_global_lock);
1852
1853         conn = sk->sk_user_data;
1854         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1855                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1856                 sk->sk_data_ready (sk, n);
1857         } else {
1858                 sched = conn->ksnc_scheduler;
1859
1860                 spin_lock_irqsave (&sched->kss_lock, flags);
1861
1862                 conn->ksnc_rx_ready = 1;
1863
1864                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1865                         list_add_tail(&conn->ksnc_rx_list,
1866                                       &sched->kss_rx_conns);
1867                         conn->ksnc_rx_scheduled = 1;
1868                         /* extra ref for scheduler */
1869                         atomic_inc (&conn->ksnc_refcount);
1870
1871                         wake_up (&sched->kss_waitq);
1872                 }
1873
1874                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1875         }
1876
1877         read_unlock (&ksocknal_data.ksnd_global_lock);
1878
1879         EXIT;
1880 }
1881
1882 void
1883 ksocknal_write_space (struct sock *sk)
1884 {
1885         unsigned long  flags;
1886         ksock_conn_t  *conn;
1887         ksock_sched_t *sched;
1888
1889         /* interleave correctly with closing sockets... */
1890         read_lock (&ksocknal_data.ksnd_global_lock);
1891
1892         conn = sk->sk_user_data;
1893
1894         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1895                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1896                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1897                                       " ready" : " blocked"),
1898                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1899                                       " scheduled" : " idle"),
1900                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1901                                       " empty" : " queued"));
1902
1903         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1904                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1905                 sk->sk_write_space (sk);
1906
1907                 read_unlock (&ksocknal_data.ksnd_global_lock);
1908                 return;
1909         }
1910
1911         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1912                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1913
1914                 sched = conn->ksnc_scheduler;
1915
1916                 spin_lock_irqsave (&sched->kss_lock, flags);
1917
1918                 conn->ksnc_tx_ready = 1;
1919
1920                 if (!conn->ksnc_tx_scheduled && // not being progressed
1921                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1922                         list_add_tail (&conn->ksnc_tx_list,
1923                                        &sched->kss_tx_conns);
1924                         conn->ksnc_tx_scheduled = 1;
1925                         /* extra ref for scheduler */
1926                         atomic_inc (&conn->ksnc_refcount);
1927
1928                         wake_up (&sched->kss_waitq);
1929                 }
1930
1931                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1932         }
1933
1934         read_unlock (&ksocknal_data.ksnd_global_lock);
1935 }
1936
1937 int
1938 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1939 {
1940         int           rc;
1941         mm_segment_t  oldmm = get_fs();
1942
1943         while (nob > 0) {
1944                 struct iovec  iov = {
1945                         .iov_base = buffer,
1946                         .iov_len  = nob
1947                 };
1948                 struct msghdr msg = {
1949                         .msg_name       = NULL,
1950                         .msg_namelen    = 0,
1951                         .msg_iov        = &iov,
1952                         .msg_iovlen     = 1,
1953                         .msg_control    = NULL,
1954                         .msg_controllen = 0,
1955                         .msg_flags      = 0
1956                 };
1957
1958                 set_fs (KERNEL_DS);
1959                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1960                 set_fs (oldmm);
1961                 
1962                 if (rc < 0)
1963                         return (rc);
1964
1965                 if (rc == 0) {
1966                         CERROR ("Unexpected zero rc\n");
1967                         return (-ECONNABORTED);
1968                 }
1969
1970                 buffer = ((char *)buffer) + rc;
1971                 nob -= rc;
1972         }
1973         
1974         return (0);
1975 }
1976
1977 int
1978 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1979 {
1980         int           rc;
1981         mm_segment_t  oldmm = get_fs();
1982         
1983         while (nob > 0) {
1984                 struct iovec  iov = {
1985                         .iov_base = buffer,
1986                         .iov_len  = nob
1987                 };
1988                 struct msghdr msg = {
1989                         .msg_name       = NULL,
1990                         .msg_namelen    = 0,
1991                         .msg_iov        = &iov,
1992                         .msg_iovlen     = 1,
1993                         .msg_control    = NULL,
1994                         .msg_controllen = 0,
1995                         .msg_flags      = 0
1996                 };
1997
1998                 set_fs (KERNEL_DS);
1999                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2000                 set_fs (oldmm);
2001                 
2002                 if (rc < 0)
2003                         return (rc);
2004
2005                 if (rc == 0)
2006                         return (-ECONNABORTED);
2007
2008                 buffer = ((char *)buffer) + rc;
2009                 nob -= rc;
2010         }
2011         
2012         return (0);
2013 }
2014
2015 int
2016 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnation)
2017 {
2018         int                 rc;
2019         ptl_hdr_t           hdr;
2020         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2021
2022         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2023
2024         memset (&hdr, 0, sizeof (hdr));
2025         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
2026         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
2027         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
2028         
2029         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
2030         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
2031
2032         hdr.msg.hello.type = __cpu_to_le32 (*type);
2033         hdr.msg.hello.incarnation = 
2034                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2035
2036         /* Assume sufficient socket buffering for this message */
2037         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
2038         if (rc != 0) {
2039                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, *nid);
2040                 return (rc);
2041         }
2042
2043         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2044         if (rc != 0) {
2045                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, *nid);
2046                 return (rc);
2047         }
2048         
2049         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2050                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
2051                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid);
2052                 return (-EPROTO);
2053         }
2054
2055         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2056             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2057                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2058                         " from "LPX64"\n",
2059                         __le16_to_cpu (hmv->version_major),
2060                         __le16_to_cpu (hmv->version_minor),
2061                         PORTALS_PROTO_VERSION_MAJOR,
2062                         PORTALS_PROTO_VERSION_MINOR,
2063                         *nid);
2064                 return (-EPROTO);
2065         }
2066
2067 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
2068 # error "This code only understands protocol version 0.x"
2069 #endif
2070         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2071          * so read the rest of it in now... */
2072
2073         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2074         if (rc != 0) {
2075                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
2076                         rc, *nid);
2077                 return (rc);
2078         }
2079
2080         /* ...and check we got what we expected */
2081         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2082             hdr.payload_length != __cpu_to_le32 (0)) {
2083                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2084                         " but got type %d with %d payload from "LPX64"\n",
2085                         __le32_to_cpu (hdr.type),
2086                         __le32_to_cpu (hdr.payload_length), *nid);
2087                 return (-EPROTO);
2088         }
2089
2090         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2091                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2092                 return (-EPROTO);
2093         }
2094
2095         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2096                 *nid = __le64_to_cpu(hdr.src_nid);
2097         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2098                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
2099                         __le64_to_cpu (hdr.src_nid), *nid);
2100                 return (-EPROTO);
2101         }
2102
2103         if (*type == SOCKNAL_CONN_NONE) {
2104                 /* I've accepted this connection; peer determines type */
2105                 *type = __le32_to_cpu(hdr.msg.hello.type);
2106                 switch (*type) {
2107                 case SOCKNAL_CONN_ANY:
2108                 case SOCKNAL_CONN_CONTROL:
2109                         break;
2110                 case SOCKNAL_CONN_BULK_IN:
2111                         *type = SOCKNAL_CONN_BULK_OUT;
2112                         break;
2113                 case SOCKNAL_CONN_BULK_OUT:
2114                         *type = SOCKNAL_CONN_BULK_IN;
2115                         break;
2116                 default:
2117                         CERROR ("Unexpected type %d from "LPX64"\n", *type, *nid);
2118                         return (-EPROTO);
2119                 }
2120         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2121                 CERROR ("Mismatched types: me %d "LPX64" %d\n",
2122                         *type, *nid, __le32_to_cpu(hdr.msg.hello.type));
2123                 return (-EPROTO);
2124         }
2125
2126         *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2127
2128         return (0);
2129 }
2130
2131 int
2132 ksocknal_setup_sock (struct socket *sock)
2133 {
2134         mm_segment_t    oldmm = get_fs ();
2135         int             rc;
2136         int             option;
2137         struct linger   linger;
2138
2139         sock->sk->allocation = GFP_NOFS;
2140
2141         /* Ensure this socket aborts active sends immediately when we close
2142          * it. */
2143
2144         linger.l_onoff = 0;
2145         linger.l_linger = 0;
2146
2147         set_fs (KERNEL_DS);
2148         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2149                               (char *)&linger, sizeof (linger));
2150         set_fs (oldmm);
2151         if (rc != 0) {
2152                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2153                 return (rc);
2154         }
2155
2156         option = -1;
2157         set_fs (KERNEL_DS);
2158         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2159                                     (char *)&option, sizeof (option));
2160         set_fs (oldmm);
2161         if (rc != 0) {
2162                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2163                 return (rc);
2164         }
2165
2166 #if SOCKNAL_USE_KEEPALIVES
2167         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2168          * second until the timeout elapses. */
2169
2170         option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2171         set_fs (KERNEL_DS);
2172         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2173                                     (char *)&option, sizeof (option));
2174         set_fs (oldmm);
2175         if (rc != 0) {
2176                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2177                 return (rc);
2178         }
2179         
2180         option = 1;
2181         set_fs (KERNEL_DS);
2182         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2183                                     (char *)&option, sizeof (option));
2184         set_fs (oldmm);
2185         if (rc != 0) {
2186                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2187                 return (rc);
2188         }
2189         
2190         option = ksocknal_data.ksnd_io_timeout / 4;
2191         set_fs (KERNEL_DS);
2192         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2193                                     (char *)&option, sizeof (option));
2194         set_fs (oldmm);
2195         if (rc != 0) {
2196                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2197                 return (rc);
2198         }
2199
2200         option = 1;
2201         set_fs (KERNEL_DS);
2202         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2203                               (char *)&option, sizeof (option));
2204         set_fs (oldmm);
2205         if (rc != 0) {
2206                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2207                 return (rc);
2208         }
2209 #endif
2210         return (0);
2211 }
2212
2213 int
2214 ksocknal_connect_peer (ksock_route_t *route, int type)
2215 {
2216         struct sockaddr_in  peer_addr;
2217         mm_segment_t        oldmm = get_fs();
2218         struct timeval      tv;
2219         int                 fd;
2220         struct socket      *sock;
2221         int                 rc;
2222
2223         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2224         if (rc != 0) {
2225                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2226                 return (rc);
2227         }
2228
2229         /* Ugh; have to map_fd for compatibility with sockets passed in
2230          * from userspace.  And we actually need the sock->file refcounting
2231          * that this gives you :) */
2232
2233         fd = sock_map_fd (sock);
2234         if (fd < 0) {
2235                 sock_release (sock);
2236                 CERROR ("sock_map_fd error %d\n", fd);
2237                 return (fd);
2238         }
2239
2240         /* NB the fd now owns the ref on sock->file */
2241         LASSERT (sock->file != NULL);
2242         LASSERT (file_count(sock->file) == 1);
2243
2244         /* Set the socket timeouts, so our connection attempt completes in
2245          * finite time */
2246         tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2247         tv.tv_usec = 0;
2248
2249         set_fs (KERNEL_DS);
2250         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2251                               (char *)&tv, sizeof (tv));
2252         set_fs (oldmm);
2253         if (rc != 0) {
2254                 CERROR ("Can't set send timeout %d: %d\n", 
2255                         ksocknal_data.ksnd_io_timeout, rc);
2256                 goto out;
2257         }
2258         
2259         set_fs (KERNEL_DS);
2260         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2261                               (char *)&tv, sizeof (tv));
2262         set_fs (oldmm);
2263         if (rc != 0) {
2264                 CERROR ("Can't set receive timeout %d: %d\n",
2265                         ksocknal_data.ksnd_io_timeout, rc);
2266                 goto out;
2267         }
2268
2269         if (route->ksnr_nonagel) {
2270                 int  option = 1;
2271                 
2272                 set_fs (KERNEL_DS);
2273                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2274                                             (char *)&option, sizeof (option));
2275                 set_fs (oldmm);
2276                 if (rc != 0) {
2277                         CERROR ("Can't disable nagel: %d\n", rc);
2278                         goto out;
2279                 }
2280         }
2281         
2282         if (route->ksnr_buffer_size != 0) {
2283                 int option = route->ksnr_buffer_size;
2284                 
2285                 set_fs (KERNEL_DS);
2286                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2287                                       (char *)&option, sizeof (option));
2288                 set_fs (oldmm);
2289                 if (rc != 0) {
2290                         CERROR ("Can't set send buffer %d: %d\n",
2291                                 route->ksnr_buffer_size, rc);
2292                         goto out;
2293                 }
2294
2295                 set_fs (KERNEL_DS);
2296                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2297                                       (char *)&option, sizeof (option));
2298                 set_fs (oldmm);
2299                 if (rc != 0) {
2300                         CERROR ("Can't set receive buffer %d: %d\n",
2301                                 route->ksnr_buffer_size, rc);
2302                         goto out;
2303                 }
2304         }
2305         
2306         memset (&peer_addr, 0, sizeof (peer_addr));
2307         peer_addr.sin_family = AF_INET;
2308         peer_addr.sin_port = htons (route->ksnr_port);
2309         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2310         
2311         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2312                                  sizeof (peer_addr), sock->file->f_flags);
2313         if (rc != 0) {
2314                 CERROR ("Error %d connecting to "LPX64"\n", rc,
2315                         route->ksnr_peer->ksnp_nid);
2316                 goto out;
2317         }
2318         
2319         rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2320         if (rc == 0) {
2321                 /* Take an extra ref on sock->file to compensate for the
2322                  * upcoming close which will lose fd's ref on it. */
2323                 get_file (sock->file);
2324         }
2325
2326  out:
2327         sys_close (fd);
2328         return (rc);
2329 }
2330
2331 void
2332 ksocknal_autoconnect (ksock_route_t *route)
2333 {
2334         LIST_HEAD        (zombies);
2335         ksock_tx_t       *tx;
2336         ksock_peer_t     *peer;
2337         unsigned long     flags;
2338         int               rc;
2339         int               type;
2340         
2341         for (;;) {
2342                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2343                         if ((route->ksnr_connecting & (1 << type)) != 0)
2344                                 break;
2345                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2346
2347                 rc = ksocknal_connect_peer (route, type);
2348
2349                 if (rc != 0)
2350                         break;
2351                 
2352                 /* successfully autoconnected: create_conn did the
2353                  * route/conn binding and scheduled any blocked packets */
2354
2355                 if (route->ksnr_connecting == 0) {
2356                         /* No more connections required */
2357                         return;
2358                 }
2359         }
2360
2361         /* Connection attempt failed */
2362
2363         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2364
2365         peer = route->ksnr_peer;
2366         route->ksnr_connecting = 0;
2367
2368         /* This is a retry rather than a new connection */
2369         LASSERT (route->ksnr_retry_interval != 0);
2370         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2371         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2372                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2373
2374         if (!list_empty (&peer->ksnp_tx_queue) &&
2375             ksocknal_find_connecting_route_locked (peer) == NULL) {
2376                 LASSERT (list_empty (&peer->ksnp_conns));
2377
2378                 /* None of the connections that the blocked packets are
2379                  * waiting for have been successful.  Complete them now... */
2380                 do {
2381                         tx = list_entry (peer->ksnp_tx_queue.next,
2382                                          ksock_tx_t, tx_list);
2383                         list_del (&tx->tx_list);
2384                         list_add_tail (&tx->tx_list, &zombies);
2385                 } while (!list_empty (&peer->ksnp_tx_queue));
2386         }
2387
2388         /* make this route least-favourite for re-selection */
2389         if (!route->ksnr_deleted) {
2390                 list_del(&route->ksnr_list);
2391                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2392         }
2393         
2394         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2395
2396         while (!list_empty (&zombies)) {
2397                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2398                 
2399                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2400                         NTOH__u32 (tx->tx_hdr->type),
2401                         NTOH__u32 (tx->tx_hdr->payload_length),
2402                         NTOH__u64 (tx->tx_hdr->src_nid),
2403                         NTOH__u64 (tx->tx_hdr->dest_nid));
2404
2405                 list_del (&tx->tx_list);
2406                 /* complete now */
2407                 ksocknal_tx_done (tx, 0);
2408         }
2409 }
2410
2411 int
2412 ksocknal_autoconnectd (void *arg)
2413 {
2414         long               id = (long)arg;
2415         char               name[16];
2416         unsigned long      flags;
2417         ksock_route_t     *route;
2418         int                rc;
2419
2420         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2421         kportal_daemonize (name);
2422         kportal_blockallsigs ();
2423
2424         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2425
2426         while (!ksocknal_data.ksnd_shuttingdown) {
2427
2428                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2429                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2430                                             ksock_route_t, ksnr_connect_list);
2431                         
2432                         list_del (&route->ksnr_connect_list);
2433                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2434
2435                         ksocknal_autoconnect (route);
2436                         ksocknal_put_route (route);
2437
2438                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2439                         continue;
2440                 }
2441                 
2442                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2443
2444                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2445                                                ksocknal_data.ksnd_shuttingdown ||
2446                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2447
2448                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2449         }
2450
2451         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2452
2453         ksocknal_thread_fini ();
2454         return (0);
2455 }
2456
2457 ksock_conn_t *
2458 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2459 {
2460         /* We're called with a shared lock on ksnd_global_lock */
2461         ksock_conn_t      *conn;
2462         struct list_head  *ctmp;
2463         ksock_sched_t     *sched;
2464
2465         list_for_each (ctmp, &peer->ksnp_conns) {
2466                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2467                 sched = conn->ksnc_scheduler;
2468
2469                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2470                 LASSERT (!conn->ksnc_closing);
2471                 
2472                 if (conn->ksnc_rx_started &&
2473                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2474                         /* Timed out incomplete incoming message */
2475                         atomic_inc (&conn->ksnc_refcount);
2476                         CERROR ("Timed out RX from "LPX64" %p\n", 
2477                                 peer->ksnp_nid, conn);
2478                         return (conn);
2479                 }
2480                 
2481                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2482                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2483                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2484                         /* Timed out messages queued for sending, or
2485                          * messages buffered in the socket's send buffer */
2486                         atomic_inc (&conn->ksnc_refcount);
2487                         CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
2488                                 peer->ksnp_nid, 
2489                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2490                                 conn->ksnc_sock->sk->sk_wmem_queued, conn);
2491                         return (conn);
2492                 }
2493         }
2494
2495         return (NULL);
2496 }
2497
2498 void
2499 ksocknal_check_peer_timeouts (int idx)
2500 {
2501         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2502         struct list_head *ptmp;
2503         ksock_peer_t     *peer;
2504         ksock_conn_t     *conn;
2505
2506  again:
2507         /* NB. We expect to have a look at all the peers and not find any
2508          * connections to time out, so we just use a shared lock while we
2509          * take a look... */
2510         read_lock (&ksocknal_data.ksnd_global_lock);
2511
2512         list_for_each (ptmp, peers) {
2513                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2514                 conn = ksocknal_find_timed_out_conn (peer);
2515                 
2516                 if (conn != NULL) {
2517                         read_unlock (&ksocknal_data.ksnd_global_lock);
2518
2519                         CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2520                                 peer->ksnp_nid, conn->ksnc_ipaddr,
2521                                 conn->ksnc_port);
2522                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2523                         
2524                         /* NB we won't find this one again, but we can't
2525                          * just proceed with the next peer, since we dropped
2526                          * ksnd_global_lock and it might be dead already! */
2527                         ksocknal_put_conn (conn);
2528                         goto again;
2529                 }
2530         }
2531
2532         read_unlock (&ksocknal_data.ksnd_global_lock);
2533 }
2534
2535 int
2536 ksocknal_reaper (void *arg)
2537 {
2538         wait_queue_t       wait;
2539         unsigned long      flags;
2540         ksock_conn_t      *conn;
2541         int                timeout;
2542         int                i;
2543         int                peer_index = 0;
2544         unsigned long      deadline = jiffies;
2545         
2546         kportal_daemonize ("ksocknal_reaper");
2547         kportal_blockallsigs ();
2548
2549         init_waitqueue_entry (&wait, current);
2550
2551         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2552
2553         while (!ksocknal_data.ksnd_shuttingdown) {
2554
2555                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2556                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2557                                            ksock_conn_t, ksnc_list);
2558                         list_del (&conn->ksnc_list);
2559                         
2560                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2561
2562                         ksocknal_terminate_conn (conn);
2563                         ksocknal_put_conn (conn);
2564
2565                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2566                         continue;
2567                 }
2568
2569                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2570                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2571                                            ksock_conn_t, ksnc_list);
2572                         list_del (&conn->ksnc_list);
2573                         
2574                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2575
2576                         ksocknal_destroy_conn (conn);
2577
2578                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2579                         continue;
2580                 }
2581                 
2582                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2583
2584                 /* careful with the jiffy wrap... */
2585                 while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
2586                         const int n = 4;
2587                         const int p = 1;
2588                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2589                         
2590                         /* Time to check for timeouts on a few more peers: I do
2591                          * checks every 'p' seconds on a proportion of the peer
2592                          * table and I need to check every connection 'n' times
2593                          * within a timeout interval, to ensure I detect a
2594                          * timeout on any connection within (n+1)/n times the
2595                          * timeout interval. */
2596
2597                         if (ksocknal_data.ksnd_io_timeout > n * p)
2598                                 chunk = (chunk * n * p) / 
2599                                         ksocknal_data.ksnd_io_timeout;
2600                         if (chunk == 0)
2601                                 chunk = 1;
2602
2603                         for (i = 0; i < chunk; i++) {
2604                                 ksocknal_check_peer_timeouts (peer_index);
2605                                 peer_index = (peer_index + 1) % 
2606                                              ksocknal_data.ksnd_peer_hash_size;
2607                         }
2608
2609                         deadline += p * HZ;
2610                 }
2611
2612                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2613                 set_current_state (TASK_INTERRUPTIBLE);
2614
2615                 if (!ksocknal_data.ksnd_shuttingdown &&
2616                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2617                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2618                         schedule_timeout (timeout);
2619
2620                 set_current_state (TASK_RUNNING);
2621                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2622
2623                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2624         }
2625
2626         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2627
2628         ksocknal_thread_fini ();
2629         return (0);
2630 }
2631
2632 nal_cb_t ksocknal_lib = {
2633         nal_data:       &ksocknal_data,                /* NAL private data */
2634         cb_send:         ksocknal_send,
2635         cb_send_pages:   ksocknal_send_pages,
2636         cb_recv:         ksocknal_recv,
2637         cb_recv_pages:   ksocknal_recv_pages,
2638         cb_read:         ksocknal_read,
2639         cb_write:        ksocknal_write,
2640         cb_callback:     ksocknal_callback,
2641         cb_malloc:       ksocknal_malloc,
2642         cb_free:         ksocknal_free,
2643         cb_printf:       ksocknal_printf,
2644         cb_cli:          ksocknal_cli,
2645         cb_sti:          ksocknal_sti,
2646         cb_dist:         ksocknal_dist
2647 };