Whamcloud - gitweb
* Fixed bug 2119
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 /*
29  *  LIB functions follow
30  *
31  */
32 int
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34               user_ptr src_addr, size_t len)
35 {
36         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37                nal->ni.nid, (long)len, src_addr, dst_addr);
38
39         memcpy( dst_addr, src_addr, len );
40         return 0;
41 }
42
43 int
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45                void *src_addr, size_t len)
46 {
47         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48                nal->ni.nid, (long)len, src_addr, dst_addr);
49
50         memcpy( dst_addr, src_addr, len );
51         return 0;
52 }
53
54 int
55 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
56                          ptl_event_t *ev)
57 {
58         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
59                nal->ni.nid, eq, ev);
60
61         if (eq->event_callback != NULL)
62                 eq->event_callback(ev);
63
64         return 0;
65 }
66
67 void *
68 ksocknal_malloc(nal_cb_t *nal, size_t len)
69 {
70         void *buf;
71
72         PORTAL_ALLOC(buf, len);
73
74         if (buf != NULL)
75                 memset(buf, 0, len);
76
77         return (buf);
78 }
79
80 void
81 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
82 {
83         PORTAL_FREE(buf, len);
84 }
85
86 void
87 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
88 {
89         va_list ap;
90         char msg[256];
91
92         va_start (ap, fmt);
93         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
94         va_end (ap);
95
96         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
97
98         CDEBUG (D_NET, "%s", msg);
99 }
100
101 void
102 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *data = nal->nal_data;
105
106         spin_lock(&data->ksnd_nal_cb_lock);
107 }
108
109 void
110 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data;
113         data = nal->nal_data;
114
115         spin_unlock(&data->ksnd_nal_cb_lock);
116 }
117
118 int
119 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
120 {
121         /* I would guess that if ksocknal_get_peer (nid) == NULL,
122            and we're not routing, then 'nid' is very distant :) */
123         if ( nal->ni.nid == nid ) {
124                 *dist = 0;
125         } else {
126                 *dist = 1;
127         }
128
129         return 0;
130 }
131
132 ksock_ltx_t *
133 ksocknal_get_ltx (int may_block)
134 {
135         unsigned long flags;
136         ksock_ltx_t *ltx = NULL;
137
138         for (;;) {
139                 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
140
141                 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
142                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
143                                          ksock_ltx_t, ltx_tx.tx_list);
144                         list_del (&ltx->ltx_tx.tx_list);
145                         ksocknal_data.ksnd_active_ltxs++;
146                         break;
147                 }
148
149                 if (!may_block) {
150                         if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
151                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
152                                                  ksock_ltx_t, ltx_tx.tx_list);
153                                 list_del (&ltx->ltx_tx.tx_list);
154                                 ksocknal_data.ksnd_active_ltxs++;
155                         }
156                         break;
157                 }
158
159                 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
160                                        flags);
161
162                 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
163                             !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
164         }
165
166         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
167
168         return (ltx);
169 }
170
171 void
172 ksocknal_put_ltx (ksock_ltx_t *ltx)
173 {
174         unsigned long   flags;
175         
176         spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
177
178         ksocknal_data.ksnd_active_ltxs--;
179         list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
180
181         /* normal tx desc => wakeup anyone blocking for one */
182         if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list)
183                 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
184
185         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
186 }
187
188 #if SOCKNAL_ZC
189 struct page *
190 ksocknal_kvaddr_to_page (unsigned long vaddr)
191 {
192         struct page *page;
193
194         if (vaddr >= VMALLOC_START &&
195             vaddr < VMALLOC_END)
196                 page = vmalloc_to_page ((void *)vaddr);
197 #if CONFIG_HIGHMEM
198         else if (vaddr >= PKMAP_BASE &&
199                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
200                 page = vmalloc_to_page ((void *)vaddr);
201                 /* in 2.4 ^ just walks the page tables */
202 #endif
203         else
204                 page = virt_to_page (vaddr);
205
206         if (page == NULL ||
207             !VALID_PAGE (page))
208                 return (NULL);
209
210         return (page);
211 }
212 #endif
213
214 int
215 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
216 {
217         struct socket *sock = conn->ksnc_sock;
218         struct iovec  *iov = tx->tx_iov;
219         int            fragsize = iov->iov_len;
220         unsigned long  vaddr = (unsigned long)iov->iov_base;
221         int            more = (!list_empty (&conn->ksnc_tx_queue)) |
222                               (tx->tx_niov > 1) |
223                               (tx->tx_nkiov > 1);
224 #if SOCKNAL_ZC
225         int            offset = vaddr & (PAGE_SIZE - 1);
226         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
227         struct page   *page;
228 #endif
229         int            rc;
230
231         /* NB we can't trust socket ops to either consume our iovs
232          * or leave them alone, so we only send 1 frag at a time. */
233         LASSERT (fragsize <= tx->tx_resid);
234         LASSERT (tx->tx_niov > 0);
235         
236 #if SOCKNAL_ZC
237         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
238             (sock->sk->route_caps & NETIF_F_SG) &&
239             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
240             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
241                 
242                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
243                        (void *)vaddr, page, page_address(page), offset, zcsize);
244
245                 if (fragsize > zcsize) {
246                         more = 1;
247                         fragsize = zcsize;
248                 }
249
250                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
251                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
252                                        &tx->tx_zccd);
253         } else
254 #endif
255         {
256                 /* NB don't pass tx's iov; sendmsg may or may not update it */
257                 struct iovec fragiov = { .iov_base = (void *)vaddr,
258                                          .iov_len  = fragsize};
259                 struct msghdr msg = {
260                         .msg_name       = NULL,
261                         .msg_namelen    = 0,
262                         .msg_iov        = &fragiov,
263                         .msg_iovlen     = 1,
264                         .msg_control    = NULL,
265                         .msg_controllen = 0,
266                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
267                 };
268                 mm_segment_t oldmm = get_fs();
269                 
270                 set_fs (KERNEL_DS);
271                 rc = sock_sendmsg(sock, &msg, fragsize);
272                 set_fs (oldmm);
273         } 
274
275         if (rc <= 0)
276                 return (rc);
277
278         tx->tx_resid -= rc;
279
280         if (rc < iov->iov_len) {
281                 /* didn't send whole iov entry... */
282                 iov->iov_base = (void *)(vaddr + rc);
283                 iov->iov_len -= rc;
284                 /* ...but did we send everything we tried to send? */
285                 return ((rc == fragsize) ? 1 : -EAGAIN);
286         }
287
288         tx->tx_iov++;
289         tx->tx_niov--;
290         return (1);
291 }
292
293 int
294 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
295 {
296         struct socket *sock = conn->ksnc_sock;
297         ptl_kiov_t    *kiov = tx->tx_kiov;
298         int            fragsize = kiov->kiov_len;
299         struct page   *page = kiov->kiov_page;
300         int            offset = kiov->kiov_offset;
301         int            more = (!list_empty (&conn->ksnc_tx_queue)) |
302                               (tx->tx_nkiov > 1);
303         int            rc;
304
305         /* NB we can't trust socket ops to either consume our iovs
306          * or leave them alone, so we only send 1 frag at a time. */
307         LASSERT (fragsize <= tx->tx_resid);
308         LASSERT (offset + fragsize <= PAGE_SIZE);
309         LASSERT (tx->tx_niov == 0);
310         LASSERT (tx->tx_nkiov > 0);
311
312 #if SOCKNAL_ZC
313         if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
314             (sock->sk->route_caps & NETIF_F_SG) &&
315             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
316
317                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
318                                page, offset, fragsize);
319
320                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
321                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
322                                        &tx->tx_zccd);
323         } else
324 #endif
325         {
326                 char *addr = ((char *)kmap (page)) + offset;
327                 struct iovec fragiov = {.iov_base = addr,
328                                         .iov_len  = fragsize};
329                 struct msghdr msg = {
330                         .msg_name       = NULL,
331                         .msg_namelen    = 0,
332                         .msg_iov        = &fragiov,
333                         .msg_iovlen     = 1,
334                         .msg_control    = NULL,
335                         .msg_controllen = 0,
336                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
337                 };
338                 mm_segment_t  oldmm = get_fs();
339                 
340                 set_fs (KERNEL_DS);
341                 rc = sock_sendmsg(sock, &msg, fragsize);
342                 set_fs (oldmm);
343
344                 kunmap (page);
345         }
346
347         if (rc <= 0)
348                 return (rc);
349
350         tx->tx_resid -= rc;
351  
352         if (rc < fragsize) {
353                 /* didn't send whole frag */
354                 kiov->kiov_offset = offset + rc;
355                 kiov->kiov_len    = fragsize - rc;
356                 return (-EAGAIN);
357         }
358
359         /* everything went */
360         LASSERT (rc == fragsize);
361         tx->tx_kiov++;
362         tx->tx_nkiov--;
363         return (1);
364 }
365
366 int
367 ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
368 {
369         /* Return 0 on success, < 0 on error.
370          * caller checks tx_resid to determine progress/completion */
371         int      rc;
372         ENTRY;
373         
374         if (ksocknal_data.ksnd_stall_tx != 0) {
375                 set_current_state (TASK_UNINTERRUPTIBLE);
376                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
377         }
378
379         rc = ksocknal_getconnsock (conn);
380         if (rc != 0)
381                 return (rc);
382
383         for (;;) {
384                 LASSERT (tx->tx_resid != 0);
385
386                 if (conn->ksnc_closing) {
387                         rc = -ESHUTDOWN;
388                         break;
389                 }
390
391                 if (tx->tx_niov != 0)
392                         rc = ksocknal_send_iov (conn, tx);
393                 else
394                         rc = ksocknal_send_kiov (conn, tx);
395
396                 if (rc <= 0) {                  /* error or socket full? */
397                         /* NB: rc == 0 and rc == -EAGAIN both mean try
398                          * again later (linux stack returns -EAGAIN for
399                          * this, but Adaptech TOE returns 0) */
400                         if (rc == -EAGAIN)
401                                 rc = 0;
402                         break;
403                 }
404
405                 /* Consider the connection alive since we managed to chuck
406                  * more data into it.  Really, we'd like to consider it
407                  * alive only when the peer ACKs something, but
408                  * write_space() only gets called back while SOCK_NOSPACE
409                  * is set.  Instead, we presume peer death has occurred if
410                  * the socket doesn't drain within a timout */
411                 conn->ksnc_tx_deadline = jiffies + 
412                                          ksocknal_data.ksnd_io_timeout * HZ;
413                 conn->ksnc_peer->ksnp_last_alive = jiffies;
414
415                 if (tx->tx_resid == 0) {        /* sent everything */
416                         rc = 0;
417                         break;
418                 }
419         }
420
421         ksocknal_putconnsock (conn);
422         RETURN (rc);
423 }
424
425 void
426 ksocknal_eager_ack (ksock_conn_t *conn)
427 {
428         int            opt = 1;
429         mm_segment_t   oldmm = get_fs();
430         struct socket *sock = conn->ksnc_sock;
431         
432         /* Remind the socket to ACK eagerly.  If I don't, the socket might
433          * think I'm about to send something it could piggy-back the ACK
434          * on, introducing delay in completing zero-copy sends in my
435          * peer. */
436
437         set_fs(KERNEL_DS);
438         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
439                                (char *)&opt, sizeof (opt));
440         set_fs(oldmm);
441 }
442
443 int
444 ksocknal_recv_iov (ksock_conn_t *conn)
445 {
446         struct iovec *iov = conn->ksnc_rx_iov;
447         int           fragsize  = iov->iov_len;
448         unsigned long vaddr = (unsigned long)iov->iov_base;
449         struct iovec  fragiov = { .iov_base = (void *)vaddr,
450                                   .iov_len  = fragsize};
451         struct msghdr msg = {
452                 .msg_name       = NULL,
453                 .msg_namelen    = 0,
454                 .msg_iov        = &fragiov,
455                 .msg_iovlen     = 1,
456                 .msg_control    = NULL,
457                 .msg_controllen = 0,
458                 .msg_flags      = 0
459         };
460         mm_segment_t oldmm = get_fs();
461         int          rc;
462
463         /* NB we can't trust socket ops to either consume our iovs
464          * or leave them alone, so we only receive 1 frag at a time. */
465         LASSERT (conn->ksnc_rx_niov > 0);
466         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
467         
468         set_fs (KERNEL_DS);
469         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
470         /* NB this is just a boolean............................^ */
471         set_fs (oldmm);
472
473         if (rc <= 0)
474                 return (rc);
475
476         /* received something... */
477         conn->ksnc_peer->ksnp_last_alive = jiffies;
478         conn->ksnc_rx_deadline = jiffies + 
479                                  ksocknal_data.ksnd_io_timeout * HZ;
480         mb();                           /* order with setting rx_started */
481         conn->ksnc_rx_started = 1;
482
483         conn->ksnc_rx_nob_wanted -= rc;
484         conn->ksnc_rx_nob_left -= rc;
485                 
486         if (rc < fragsize) {
487                 iov->iov_base = (void *)(vaddr + rc);
488                 iov->iov_len = fragsize - rc;
489                 return (-EAGAIN);
490         }
491
492         conn->ksnc_rx_iov++;
493         conn->ksnc_rx_niov--;
494         return (1);
495 }
496
497 int
498 ksocknal_recv_kiov (ksock_conn_t *conn)
499 {
500         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
501         struct page  *page = kiov->kiov_page;
502         int           offset = kiov->kiov_offset;
503         int           fragsize = kiov->kiov_len;
504         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
505         struct iovec  fragiov = { .iov_base = (void *)vaddr,
506                                   .iov_len  = fragsize};
507         struct msghdr msg = {
508                 .msg_name       = NULL,
509                 .msg_namelen    = 0,
510                 .msg_iov        = &fragiov,
511                 .msg_iovlen     = 1,
512                 .msg_control    = NULL,
513                 .msg_controllen = 0,
514                 .msg_flags      = 0
515         };
516         mm_segment_t oldmm = get_fs();
517         int          rc;
518
519         /* NB we can't trust socket ops to either consume our iovs
520          * or leave them alone, so we only receive 1 frag at a time. */
521         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
522         LASSERT (conn->ksnc_rx_nkiov > 0);
523         LASSERT (offset + fragsize <= PAGE_SIZE);
524         
525         set_fs (KERNEL_DS);
526         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
527         /* NB this is just a boolean............................^ */
528         set_fs (oldmm);
529
530         kunmap (page);
531         
532         if (rc <= 0)
533                 return (rc);
534         
535         /* received something... */
536         conn->ksnc_peer->ksnp_last_alive = jiffies;
537         conn->ksnc_rx_deadline = jiffies + 
538                                  ksocknal_data.ksnd_io_timeout * HZ;
539         mb();                           /* order with setting rx_started */
540         conn->ksnc_rx_started = 1;
541
542         conn->ksnc_rx_nob_wanted -= rc;
543         conn->ksnc_rx_nob_left -= rc;
544                 
545         if (rc < fragsize) {
546                 kiov->kiov_offset = offset + rc;
547                 kiov->kiov_len = fragsize - rc;
548                 return (-EAGAIN);
549         }
550
551         conn->ksnc_rx_kiov++;
552         conn->ksnc_rx_nkiov--;
553         return (1);
554 }
555
556 int
557 ksocknal_recvmsg (ksock_conn_t *conn) 
558 {
559         /* Return 1 on success, 0 on EOF, < 0 on error.
560          * Caller checks ksnc_rx_nob_wanted to determine
561          * progress/completion. */
562         int     rc;
563         ENTRY;
564         
565         if (ksocknal_data.ksnd_stall_rx != 0) {
566                 set_current_state (TASK_UNINTERRUPTIBLE);
567                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
568         }
569
570         rc = ksocknal_getconnsock (conn);
571         if (rc != 0)
572                 return (rc);
573
574         for (;;) {
575                 if (conn->ksnc_closing) {
576                         rc = -ESHUTDOWN;
577                         break;
578                 }
579
580                 if (conn->ksnc_rx_niov != 0)
581                         rc = ksocknal_recv_iov (conn);
582                 else
583                         rc = ksocknal_recv_kiov (conn);
584
585                 if (rc <= 0) {
586                         /* error/EOF or partial receive */
587                         if (rc == -EAGAIN) {
588                                 rc = 1;
589                         } else if (rc == 0 && conn->ksnc_rx_started) {
590                                 /* EOF in the middle of a message */
591                                 rc = -EPROTO;
592                         }
593                         break;
594                 }
595
596                 /* Completed a fragment */
597
598                 if (conn->ksnc_rx_nob_wanted == 0) {
599                         /* Completed a message segment (header or payload) */
600                         if (ksocknal_data.ksnd_eager_ack &&
601                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
602                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
603                                 /* Remind the socket to ack eagerly... */
604                                 ksocknal_eager_ack(conn);
605                         }
606                         rc = 1;
607                         break;
608                 }
609         }
610
611         ksocknal_putconnsock (conn);
612         RETURN (rc);
613 }
614
615 #if SOCKNAL_ZC
616 void
617 ksocknal_zc_callback (zccd_t *zcd)
618 {
619         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
620         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
621         unsigned long  flags;
622         ENTRY;
623
624         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
625
626         spin_lock_irqsave (&sched->kss_lock, flags);
627
628         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
629         wake_up (&sched->kss_waitq);
630
631         spin_unlock_irqrestore (&sched->kss_lock, flags);
632         EXIT;
633 }
634 #endif
635
636 void
637 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
638 {
639         ksock_ltx_t   *ltx;
640         ENTRY;
641
642         if (tx->tx_conn != NULL) {
643                 /* This tx got queued on a conn; do the accounting... */
644                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
645 #if SOCKNAL_ZC
646                 /* zero copy completion isn't always from
647                  * process_transmit() so it needs to keep a ref on
648                  * tx_conn... */
649                 if (asynch)
650                         ksocknal_put_conn (tx->tx_conn);
651 #else
652                 LASSERT (!asynch);
653 #endif
654         }
655
656         if (tx->tx_isfwd) {             /* was a forwarded packet? */
657                 kpr_fwd_done (&ksocknal_data.ksnd_router,
658                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
659                 EXIT;
660                 return;
661         }
662
663         /* local send */
664         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
665
666         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
667
668         ksocknal_put_ltx (ltx);
669         EXIT;
670 }
671
672 void
673 ksocknal_tx_launched (ksock_tx_t *tx) 
674 {
675 #if SOCKNAL_ZC
676         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
677                 ksock_conn_t  *conn = tx->tx_conn;
678                 
679                 /* zccd skbufs are still in-flight.  First take a ref on
680                  * conn, so it hangs about for ksocknal_tx_done... */
681                 atomic_inc (&conn->ksnc_refcount);
682
683                 /* ...then drop the initial ref on zccd, so the zero copy
684                  * callback can occur */
685                 zccd_put (&tx->tx_zccd);
686                 return;
687         }
688 #endif
689         /* Any zero-copy-ness (if any) has completed; I can complete the
690          * transmit now, avoiding an extra schedule */
691         ksocknal_tx_done (tx, 0);
692 }
693
694 void
695 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
696 {
697         ksock_conn_t  *conn;
698         ksock_tx_t    *tx;
699         int            rc;
700         
701         LASSERT (!list_empty (&sched->kss_tx_conns));
702         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
703         list_del (&conn->ksnc_tx_list);
704
705         LASSERT (conn->ksnc_tx_scheduled);
706         LASSERT (conn->ksnc_tx_ready);
707         LASSERT (!list_empty (&conn->ksnc_tx_queue));
708         tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
709         /* assume transmit will complete now, so dequeue while I've got lock */
710         list_del (&tx->tx_list);
711
712         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
713
714         LASSERT (tx->tx_resid > 0);
715
716         conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
717         mb();                   /* => clear BEFORE trying to write */
718
719         rc = ksocknal_sendmsg (conn, tx);
720
721         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
722
723         if (rc != 0) {
724                 if (ksocknal_close_conn_unlocked (conn, rc)) {
725                         /* I'm the first to close */
726                         CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
727                                 conn, rc, conn->ksnc_peer->ksnp_nid,
728                                 conn->ksnc_ipaddr, conn->ksnc_port);
729                 }
730                 ksocknal_tx_launched (tx);
731                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
732
733         } else if (tx->tx_resid == 0) {  
734                 /* everything went; assume more can go, and avoid
735                  * write_space locking */
736                 conn->ksnc_tx_ready = 1;
737
738                 ksocknal_tx_launched (tx);
739                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
740         } else {
741                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
742
743                 /* back onto HEAD of tx_queue */
744                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
745         }
746
747         /* no space to write, or nothing to write? */
748         if (!conn->ksnc_tx_ready ||
749             list_empty (&conn->ksnc_tx_queue)) {
750                 /* mark not scheduled */
751                 conn->ksnc_tx_scheduled = 0;
752                 /* drop scheduler's ref */
753                 ksocknal_put_conn (conn);
754         } else {
755                 /* stay scheduled */
756                 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
757         }
758 }
759
760 void
761 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
762 {
763         unsigned long     flags;
764
765         /* called holding write lock on ksnd_global_lock */
766
767         LASSERT (route->ksnr_conn == NULL);
768         LASSERT (!route->ksnr_deleted && !route->ksnr_connecting);
769         
770         route->ksnr_connecting = 1;
771         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
772         
773         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
774         
775         list_add_tail (&route->ksnr_connect_list,
776                        &ksocknal_data.ksnd_autoconnectd_routes);
777         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
778         
779         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
780 }
781
782 ksock_peer_t *
783 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
784 {
785         ptl_nid_t     target_nid;
786         int           rc;
787         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
788         
789         if (peer != NULL)
790                 return (peer);
791         
792         if (tx->tx_isfwd) {
793                 CERROR ("Can't send packet to "LPX64
794                         ": routed target is not a peer\n", nid);
795                 return (NULL);
796         }
797         
798         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
799                          &target_nid);
800         if (rc != 0) {
801                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
802                 return (NULL);
803         }
804
805         peer = ksocknal_find_peer_locked (target_nid);
806         if (peer != NULL)
807                 return (peer);
808
809         CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
810         return (NULL);
811 }
812
813 ksock_conn_t *
814 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
815 {
816         struct list_head *tmp;
817         ksock_conn_t     *conn = NULL;
818
819         /* Find the conn with the shortest tx queue */
820         list_for_each (tmp, &peer->ksnp_conns) {
821                 ksock_conn_t *c = list_entry (tmp, ksock_conn_t, ksnc_list);
822
823                 LASSERT (!c->ksnc_closing);
824                 
825                 if (conn == NULL ||
826                     atomic_read (&conn->ksnc_tx_nob) >
827                     atomic_read (&c->ksnc_tx_nob))
828                         conn = c;
829         }
830
831         return (conn);
832 }
833
834 void
835 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
836 {
837         unsigned long  flags;
838         ksock_sched_t *sched = conn->ksnc_scheduler;
839
840         /* called holding global lock (read or irq-write) */
841
842         CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
843                 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
844
845         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
846         tx->tx_resid = tx->tx_nob;
847         tx->tx_conn = conn;
848
849 #if SOCKNAL_ZC
850         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
851         /* NB this sets 1 ref on zccd, so the callback can only occur after
852          * I've released this ref. */
853 #endif
854
855         spin_lock_irqsave (&sched->kss_lock, flags);
856
857         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
858                 
859         if (conn->ksnc_tx_ready &&      /* able to send */
860             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
861                 /* +1 ref for scheduler */
862                 atomic_inc (&conn->ksnc_refcount);
863                 list_add_tail (&conn->ksnc_tx_list, 
864                                &sched->kss_tx_conns);
865                 conn->ksnc_tx_scheduled = 1;
866                 wake_up (&sched->kss_waitq);
867         }
868
869         spin_unlock_irqrestore (&sched->kss_lock, flags);
870 }
871
872 ksock_route_t *
873 ksocknal_find_connectable_route_locked (ksock_peer_t *peer, int eager_only)
874 {
875         struct list_head  *tmp;
876         ksock_route_t     *route;
877         
878         list_for_each (tmp, &peer->ksnp_routes) {
879                 route = list_entry (tmp, ksock_route_t, ksnr_list);
880                 
881                 if (route->ksnr_conn == NULL && /* not connected */
882                     !route->ksnr_connecting &&  /* not connecting */
883                     (!eager_only || route->ksnr_eager) && /* wants to connect */
884                     time_after_eq (jiffies, route->ksnr_timeout)) /* OK to retry */
885                         return (route);
886         }
887         
888         return (NULL);
889 }
890
891 ksock_route_t *
892 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
893 {
894         struct list_head  *tmp;
895         ksock_route_t     *route;
896
897         list_for_each (tmp, &peer->ksnp_routes) {
898                 route = list_entry (tmp, ksock_route_t, ksnr_list);
899                 
900                 if (route->ksnr_connecting)
901                         return (route);
902         }
903         
904         return (NULL);
905 }
906
907 int
908 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
909 {
910         unsigned long     flags;
911         ksock_peer_t     *peer;
912         ksock_conn_t     *conn;
913         ksock_route_t    *route;
914         rwlock_t         *g_lock;
915
916         /* Ensure the frags we've been given EXACTLY match the number of
917          * bytes we want to send.  Many TCP/IP stacks disregard any total
918          * size parameters passed to them and just look at the frags. 
919          *
920          * We always expect at least 1 mapped fragment containing the
921          * complete portals header. */
922         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
923                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
924         LASSERT (tx->tx_niov >= 1);
925         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
926
927         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
928                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
929                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
930
931         tx->tx_conn = NULL;                     /* only set when assigned a conn */
932
933         g_lock = &ksocknal_data.ksnd_global_lock;
934         read_lock (g_lock);
935         
936         peer = ksocknal_find_target_peer_locked (tx, nid);
937         if (peer == NULL) {
938                 read_unlock (g_lock);
939                 return (PTL_FAIL);
940         }
941
942         if (ksocknal_find_connectable_route_locked(peer, 1) == NULL) {
943                 conn = ksocknal_find_conn_locked (tx, peer);
944                 if (conn != NULL) {
945                         /* I've got no unconnected autoconnect routes that
946                          * need to be connected, and I do have an actual
947                          * connection... */
948                         ksocknal_queue_tx_locked (tx, conn);
949                         read_unlock (g_lock);
950                         return (PTL_OK);
951                 }
952         }
953         
954         /* Making one or more connections; I'll need a write lock... */
955
956         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
957         read_unlock (g_lock);
958         write_lock_irqsave (g_lock, flags);
959         
960         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
961                 write_unlock_irqrestore (g_lock, flags);
962                 ksocknal_put_peer (peer);
963                 return (PTL_FAIL);
964         }
965         ksocknal_put_peer (peer);               /* drop ref I got above */
966
967
968         for (;;) {
969                 /* launch all eager autoconnections */
970                 route = ksocknal_find_connectable_route_locked (peer, 1);
971                 if (route == NULL)
972                         break;
973
974                 ksocknal_launch_autoconnect_locked (route);
975         }
976
977         conn = ksocknal_find_conn_locked (tx, peer);
978         if (conn != NULL) {
979                 /* Connection exists; queue message on it */
980                 ksocknal_queue_tx_locked (tx, conn);
981                 write_unlock_irqrestore (g_lock, flags);
982                 return (PTL_OK);
983         }
984
985         if (ksocknal_find_connecting_route_locked (peer) == NULL) {
986                 /* no autoconnect routes actually connecting now.  Scrape
987                  * the barrel for non-eager autoconnects */
988                 route = ksocknal_find_connectable_route_locked (peer, 0);
989                 if (route != NULL) {
990                         ksocknal_launch_autoconnect_locked (route);
991                 } else {
992                         write_unlock_irqrestore (g_lock, flags);
993                         return (PTL_FAIL);
994                 }
995         }
996
997         /* At least 1 connection is being established; queue the message... */
998         list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
999
1000         write_unlock_irqrestore (g_lock, flags);
1001         return (PTL_OK);
1002 }
1003
1004 ksock_ltx_t *
1005 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1006                     ptl_hdr_t *hdr, int type)
1007 {
1008         ksock_ltx_t  *ltx;
1009
1010         /* I may not block for a transmit descriptor if I might block the
1011          * receiver, or an interrupt handler. */
1012         ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
1013                                   type == PTL_MSG_REPLY ||
1014                                   in_interrupt ()));
1015         if (ltx == NULL) {
1016                 CERROR ("Can't allocate tx desc\n");
1017                 return (NULL);
1018         }
1019
1020         /* Init local send packet (storage for hdr, finalize() args) */
1021         ltx->ltx_hdr = *hdr;
1022         ltx->ltx_private = private;
1023         ltx->ltx_cookie = cookie;
1024         
1025         /* Init common ltx_tx */
1026         ltx->ltx_tx.tx_isfwd = 0;
1027         ltx->ltx_tx.tx_nob = sizeof (*hdr);
1028
1029         /* We always have 1 mapped frag for the header */
1030         ltx->ltx_tx.tx_niov = 1;
1031         ltx->ltx_tx.tx_iov = &ltx->ltx_iov_space.hdr;
1032         ltx->ltx_tx.tx_iov[0].iov_base = &ltx->ltx_hdr;
1033         ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
1034
1035         ltx->ltx_tx.tx_kiov  = NULL;
1036         ltx->ltx_tx.tx_nkiov = 0;
1037
1038         return (ltx);
1039 }
1040
1041 int
1042 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1043                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1044                unsigned int payload_niov, struct iovec *payload_iov,
1045                size_t payload_len)
1046 {
1047         ksock_ltx_t  *ltx;
1048         int           rc;
1049
1050         /* NB 'private' is different depending on what we're sending.
1051          * Just ignore it until we can rely on it
1052          */
1053
1054         CDEBUG(D_NET,
1055                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
1056                " pid %d\n", payload_len, payload_niov, nid, pid);
1057
1058         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1059         if (ltx == NULL)
1060                 return (PTL_FAIL);
1061
1062         /* append the payload_iovs to the one pointing at the header */
1063         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1064         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1065
1066         memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
1067                 payload_niov * sizeof (*payload_iov));
1068         ltx->ltx_tx.tx_niov = 1 + payload_niov;
1069         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1070
1071         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1072         if (rc != PTL_OK)
1073                 ksocknal_put_ltx (ltx);
1074         
1075         return (rc);
1076 }
1077
1078 int
1079 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1080                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1081                      unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
1082 {
1083         ksock_ltx_t *ltx;
1084         int          rc;
1085
1086         /* NB 'private' is different depending on what we're sending.
1087          * Just ignore it until we can rely on it */
1088
1089         CDEBUG(D_NET,
1090                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
1091                payload_len, payload_niov, nid, pid);
1092
1093         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1094         if (ltx == NULL)
1095                 return (PTL_FAIL);
1096
1097         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1098         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1099         
1100         ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
1101         memcpy (ltx->ltx_tx.tx_kiov, payload_iov, 
1102                 payload_niov * sizeof (*payload_iov));
1103         ltx->ltx_tx.tx_nkiov = payload_niov;
1104         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1105
1106         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1107         if (rc != PTL_OK) 
1108                 ksocknal_put_ltx (ltx);
1109                 
1110         return (rc);
1111 }
1112
1113 void
1114 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1115 {
1116         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1117         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
1118         int           rc;
1119         
1120         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1121                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1122
1123         /* I'm the gateway; must be the last hop */
1124         if (nid == ksocknal_lib.ni.nid)
1125                 nid = fwd->kprfd_target_nid;
1126
1127         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
1128         tx->tx_nob   = fwd->kprfd_nob;
1129         tx->tx_niov  = fwd->kprfd_niov;
1130         tx->tx_iov   = fwd->kprfd_iov;
1131         tx->tx_nkiov = 0;
1132         tx->tx_kiov  = NULL;
1133         tx->tx_hdr   = (ptl_hdr_t *)fwd->kprfd_iov[0].iov_base;
1134
1135         rc = ksocknal_launch_packet (tx, nid);
1136         if (rc != 0) {
1137                 /* FIXME, could pass a better completion error */
1138                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
1139         }
1140 }
1141
1142 int
1143 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1144 {
1145         long    pid = kernel_thread (fn, arg, 0);
1146
1147         if (pid < 0)
1148                 return ((int)pid);
1149
1150         atomic_inc (&ksocknal_data.ksnd_nthreads);
1151         return (0);
1152 }
1153
1154 void
1155 ksocknal_thread_fini (void)
1156 {
1157         atomic_dec (&ksocknal_data.ksnd_nthreads);
1158 }
1159
1160 void
1161 ksocknal_fmb_callback (void *arg, int error)
1162 {
1163         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1164         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1165         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1166         ksock_conn_t      *conn = NULL;
1167         ksock_sched_t     *sched;
1168         unsigned long      flags;
1169
1170         if (error != 0)
1171                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1172                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1173                        error);
1174         else
1175                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1176                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1177
1178         /* drop peer ref taken on init */
1179         ksocknal_put_peer (fmb->fmb_peer);
1180         
1181         spin_lock_irqsave (&fmp->fmp_lock, flags);
1182
1183         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1184
1185         if (!list_empty (&fmp->fmp_blocked_conns)) {
1186                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1187                                    ksock_conn_t, ksnc_rx_list);
1188                 list_del (&conn->ksnc_rx_list);
1189         }
1190
1191         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1192
1193         if (conn == NULL)
1194                 return;
1195
1196         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1197         LASSERT (conn->ksnc_rx_scheduled);
1198         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1199
1200         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1201
1202         sched = conn->ksnc_scheduler;
1203
1204         spin_lock_irqsave (&sched->kss_lock, flags);
1205
1206         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1207         wake_up (&sched->kss_waitq);
1208
1209         spin_unlock_irqrestore (&sched->kss_lock, flags);
1210 }
1211
1212 ksock_fmb_t *
1213 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1214 {
1215         int               payload_nob = conn->ksnc_rx_nob_left;
1216         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1217         unsigned long     flags;
1218         ksock_fmb_pool_t *pool;
1219         ksock_fmb_t      *fmb;
1220
1221         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1222         LASSERT (ksocknal_data.ksnd_fmbs != NULL);
1223
1224         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1225                 pool = &ksocknal_data.ksnd_small_fmp;
1226         else
1227                 pool = &ksocknal_data.ksnd_large_fmp;
1228
1229         spin_lock_irqsave (&pool->fmp_lock, flags);
1230
1231         if (!list_empty (&pool->fmp_idle_fmbs)) {
1232                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1233                                  ksock_fmb_t, fmb_list);
1234                 list_del (&fmb->fmb_list);
1235                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1236
1237                 return (fmb);
1238         }
1239
1240         /* deschedule until fmb free */
1241
1242         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1243
1244         list_add_tail (&conn->ksnc_rx_list,
1245                        &pool->fmp_blocked_conns);
1246
1247         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1248         return (NULL);
1249 }
1250
1251 int
1252 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1253 {
1254         int payload_nob = conn->ksnc_rx_nob_left;
1255         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1256         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1257         int niov;                               /* at least the header */
1258         int nob;
1259
1260         LASSERT (conn->ksnc_rx_scheduled);
1261         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1262         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1263         LASSERT (payload_nob >= 0);
1264         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1265         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1266
1267         /* Got a forwarding buffer; copy the header we just read into the
1268          * forwarding buffer.  If there's payload, start reading reading it
1269          * into the buffer, otherwise the forwarding buffer can be kicked
1270          * off immediately.
1271          *
1272          * NB fmb->fmb_iov spans the WHOLE packet.
1273          *    conn->ksnc_rx_iov spans just the payload.
1274          */
1275         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1276
1277         /* copy header */
1278         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1279
1280         /* Take a ref on the conn's peer to prevent module unload before
1281          * forwarding completes.  NB we ref peer and not conn since because
1282          * all refs on conn after it has been closed must remove themselves
1283          * in finite time */
1284         fmb->fmb_peer = conn->ksnc_peer;
1285         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1286
1287         if (payload_nob == 0) {         /* got complete packet already */
1288                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1289                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1290                         dest_nid, packet_nob);
1291
1292                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1293
1294                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1295                               packet_nob, 1, fmb->fmb_iov,
1296                               ksocknal_fmb_callback, fmb);
1297
1298                 /* forward it now */
1299                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1300
1301                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1302                 return (1);
1303         }
1304
1305         niov = 1;
1306         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1307                 fmb->fmb_iov[0].iov_len = packet_nob;
1308         } else {
1309                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1310                 nob = packet_nob - PAGE_SIZE;
1311
1312                 do {
1313                         LASSERT (niov < fmb->fmb_npages);
1314                         fmb->fmb_iov[niov].iov_base =
1315                                 page_address (fmb->fmb_pages[niov]);
1316                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1317                         nob -= PAGE_SIZE;
1318                         niov++;
1319                 } while (nob > 0);
1320         }
1321
1322         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1323                       packet_nob, niov, fmb->fmb_iov,
1324                       ksocknal_fmb_callback, fmb);
1325
1326         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1327         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1328         
1329         /* payload is desc's iov-ed buffer, but skipping the hdr */
1330         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1331                  sizeof (struct iovec));
1332
1333         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1334         conn->ksnc_rx_iov[0].iov_base =
1335                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1336                          sizeof (ptl_hdr_t));
1337         conn->ksnc_rx_iov[0].iov_len =
1338                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1339
1340         if (niov > 1)
1341                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1342                        (niov - 1) * sizeof (struct iovec));
1343
1344         conn->ksnc_rx_niov = niov;
1345
1346         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1347                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1348         return (0);
1349 }
1350
1351 void
1352 ksocknal_fwd_parse (ksock_conn_t *conn)
1353 {
1354         ksock_peer_t *peer;
1355         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1356         int           body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
1357
1358         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1359                 NTOH__u64 (conn->ksnc_hdr.src_nid),
1360                 dest_nid, conn->ksnc_rx_nob_left);
1361
1362         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1363         LASSERT (conn->ksnc_rx_scheduled);
1364
1365         if (body_len < 0) {                 /* length corrupt (overflow) */
1366                 CERROR("dropping packet from "LPX64" for "LPX64": packet "
1367                        "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
1368                        dest_nid, body_len);
1369
1370                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1371                 ksocknal_close_conn_unlocked (conn, -EINVAL); /* give up on conn */
1372                 return;
1373         }
1374
1375         if (ksocknal_data.ksnd_fmbs == NULL) {        /* not forwarding */
1376                 CERROR("dropping packet from "LPX64" for "LPX64": not "
1377                        "forwarding\n", conn->ksnc_hdr.src_nid,
1378                        conn->ksnc_hdr.dest_nid);
1379                 /* on to new packet (skip this one's body) */
1380                 ksocknal_new_packet (conn, body_len);
1381                 return;
1382         }
1383
1384         if (body_len > SOCKNAL_MAX_FWD_PAYLOAD) {      /* too big to forward */
1385                 CERROR ("dropping packet from "LPX64" for "LPX64
1386                         ": packet size %d too big\n", conn->ksnc_hdr.src_nid,
1387                         conn->ksnc_hdr.dest_nid, body_len);
1388                 /* on to new packet (skip this one's body) */
1389                 ksocknal_new_packet (conn, body_len);
1390                 return;
1391         }
1392
1393         /* should have gone direct */
1394         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1395         if (peer != NULL) {
1396                 CERROR ("dropping packet from "LPX64" for "LPX64
1397                         ": target is a peer\n", conn->ksnc_hdr.src_nid,
1398                         conn->ksnc_hdr.dest_nid);
1399                 ksocknal_put_peer (peer);  /* drop ref from get above */
1400
1401                 /* on to next packet (skip this one's body) */
1402                 ksocknal_new_packet (conn, body_len);
1403                 return;
1404         }
1405
1406         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1407         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1408         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1409 }
1410
1411 int
1412 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1413 {
1414         static char ksocknal_slop_buffer[4096];
1415
1416         int   nob;
1417         int   niov;
1418         int   skipped;
1419
1420         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1421                 conn->ksnc_rx_started = 0;
1422                 mb ();                          /* racing with timeout thread */
1423                 
1424                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1425                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1426                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1427
1428                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1429                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1430                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1431                 conn->ksnc_rx_niov = 1;
1432
1433                 conn->ksnc_rx_kiov = NULL;
1434                 conn->ksnc_rx_nkiov = 0;
1435                 return (1);
1436         }
1437
1438         /* Set up to skip as much a possible now.  If there's more left
1439          * (ran out of iov entries) we'll get called again */
1440
1441         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1442         conn->ksnc_rx_nob_left = nob_to_skip;
1443         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1444         skipped = 0;
1445         niov = 0;
1446
1447         do {
1448                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1449
1450                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1451                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1452                 niov++;
1453                 skipped += nob;
1454                 nob_to_skip -=nob;
1455
1456         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1457                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1458
1459         conn->ksnc_rx_niov = niov;
1460         conn->ksnc_rx_kiov = NULL;
1461         conn->ksnc_rx_nkiov = 0;
1462         conn->ksnc_rx_nob_wanted = skipped;
1463         return (0);
1464 }
1465
1466 void
1467 ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
1468 {
1469         ksock_conn_t *conn;
1470         ksock_fmb_t  *fmb;
1471         int           rc;
1472
1473         /* NB: sched->ksnc_lock lock held */
1474
1475         LASSERT (!list_empty (&sched->kss_rx_conns));
1476         conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
1477         list_del (&conn->ksnc_rx_list);
1478
1479         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
1480
1481         CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
1482         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1483         LASSERT (conn->ksnc_rx_scheduled);
1484         LASSERT (conn->ksnc_rx_ready);
1485
1486         /* doesn't need a forwarding buffer */
1487         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1488                 goto try_read;
1489
1490  get_fmb:
1491         fmb = ksocknal_get_idle_fmb (conn);
1492         if (fmb == NULL) {      /* conn descheduled waiting for idle fmb */
1493                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1494                 return;
1495         }
1496
1497         if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
1498                 goto out;               /* come back later for next packet */
1499
1500  try_read:
1501         /* NB: sched lock NOT held */
1502         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1503                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1504                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1505                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1506
1507         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1508
1509         conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
1510         mb();                   /* => clear BEFORE trying to read */
1511
1512         rc = ksocknal_recvmsg(conn);
1513
1514         if (rc <= 0) {
1515                 if (ksocknal_close_conn_unlocked (conn, rc)) {
1516                         /* I'm the first to close */
1517                         if (rc < 0)
1518                                 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1519                                         conn, rc, conn->ksnc_peer->ksnp_nid,
1520                                         conn->ksnc_ipaddr, conn->ksnc_port);
1521                         else
1522                                 CWARN ("[%p] EOF from "LPX64" ip %08x:%d\n",
1523                                        conn, conn->ksnc_peer->ksnp_nid,
1524                                        conn->ksnc_ipaddr, conn->ksnc_port);
1525                 }
1526                 goto out;
1527         }
1528
1529         
1530         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
1531                 goto out;                       /* try again later */
1532
1533         /* got all I wanted, assume there's more - prevent data_ready locking */
1534         conn->ksnc_rx_ready = 1;
1535
1536         switch (conn->ksnc_rx_state) {
1537         case SOCKNAL_RX_HEADER:
1538                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1539                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1540                         /* This packet isn't for me */
1541                         ksocknal_fwd_parse (conn);
1542                         switch (conn->ksnc_rx_state) {
1543                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1544                                 goto out;       /* => come back later */
1545                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1546                                 goto try_read;  /* => go read it */
1547                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1548                                 goto get_fmb;   /* => go get a fwd msg buffer */
1549                         default:
1550                                 LBUG ();
1551                         }
1552                         /* Not Reached */
1553                 }
1554
1555                 /* sets wanted_len, iovs etc */
1556                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1557
1558                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1559                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1560                         goto try_read;          /* go read the payload */
1561                 }
1562                 /* Fall through (completed packet for me) */
1563
1564         case SOCKNAL_RX_BODY:
1565                 /* payload all received */
1566                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1567                 /* Fall through */
1568
1569         case SOCKNAL_RX_SLOP:
1570                 /* starting new packet? */
1571                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1572                         goto out;       /* come back later */
1573                 goto try_read;          /* try to finish reading slop now */
1574
1575         case SOCKNAL_RX_BODY_FWD:
1576                 /* payload all received */
1577                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1578                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1579                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1580                         conn->ksnc_rx_nob_left);
1581
1582                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1583                  * conn->ksnc_cookie */
1584                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1585                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1586
1587                 /* no slop in forwarded packets */
1588                 LASSERT (conn->ksnc_rx_nob_left == 0);
1589
1590                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1591                 goto out;                       /* (later) */
1592
1593         default:
1594                 break;
1595         }
1596
1597         /* Not Reached */
1598         LBUG ();
1599
1600  out:
1601         spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1602
1603         /* no data there to read? */
1604         if (!conn->ksnc_rx_ready) {
1605                 /* let socket callback schedule again */
1606                 conn->ksnc_rx_scheduled = 0;
1607                 /* drop scheduler's ref */
1608                 ksocknal_put_conn (conn);   
1609         } else {
1610                 /* stay scheduled */
1611                 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1612         }
1613 }
1614
1615 int
1616 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1617                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1618 {
1619         ksock_conn_t *conn = (ksock_conn_t *)private;
1620
1621         LASSERT (mlen <= rlen);
1622         LASSERT (niov <= PTL_MD_MAX_IOV);
1623         
1624         conn->ksnc_cookie = msg;
1625         conn->ksnc_rx_nob_wanted = mlen;
1626         conn->ksnc_rx_nob_left   = rlen;
1627
1628         conn->ksnc_rx_nkiov = 0;
1629         conn->ksnc_rx_kiov = NULL;
1630         conn->ksnc_rx_niov = niov;
1631         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1632         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1633
1634         LASSERT (mlen == 
1635                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1636                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1637
1638         return (rlen);
1639 }
1640
1641 int
1642 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1643                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1644 {
1645         ksock_conn_t *conn = (ksock_conn_t *)private;
1646
1647         LASSERT (mlen <= rlen);
1648         LASSERT (niov <= PTL_MD_MAX_IOV);
1649         
1650         conn->ksnc_cookie = msg;
1651         conn->ksnc_rx_nob_wanted = mlen;
1652         conn->ksnc_rx_nob_left   = rlen;
1653
1654         conn->ksnc_rx_niov = 0;
1655         conn->ksnc_rx_iov  = NULL;
1656         conn->ksnc_rx_nkiov = niov;
1657         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1658         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1659
1660         LASSERT (mlen == 
1661                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1662                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1663
1664         return (rlen);
1665 }
1666
1667 int ksocknal_scheduler (void *arg)
1668 {
1669         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1670         unsigned long      flags;
1671         int                rc;
1672         int                nloops = 0;
1673         int                id = sched - ksocknal_data.ksnd_schedulers;
1674         char               name[16];
1675
1676         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1677         kportal_daemonize (name);
1678         kportal_blockallsigs ();
1679
1680 #if (CONFIG_SMP && CPU_AFFINITY)
1681         if ((cpu_online_map & (1 << id)) != 0) {
1682 #if 1
1683                 current->cpus_allowed = (1 << id);
1684 #else
1685                 set_cpus_allowed (current, 1<<id);
1686 #endif
1687         } else {
1688                 CERROR ("Can't set CPU affinity for %s\n", name);
1689         }
1690 #endif /* CONFIG_SMP && CPU_AFFINITY */
1691         
1692         spin_lock_irqsave (&sched->kss_lock, flags);
1693
1694         while (!ksocknal_data.ksnd_shuttingdown) {
1695                 int did_something = 0;
1696
1697                 /* Ensure I progress everything semi-fairly */
1698
1699                 if (!list_empty (&sched->kss_rx_conns)) {
1700                         did_something = 1;
1701                         /* drops & regains kss_lock */
1702                         ksocknal_process_receive (sched, &flags);
1703                 }
1704
1705                 if (!list_empty (&sched->kss_tx_conns)) {
1706                         did_something = 1;
1707                         /* drops and regains kss_lock */
1708                         ksocknal_process_transmit (sched, &flags);
1709                 }
1710 #if SOCKNAL_ZC
1711                 if (!list_empty (&sched->kss_zctxdone_list)) {
1712                         ksock_tx_t *tx =
1713                                 list_entry(sched->kss_zctxdone_list.next,
1714                                            ksock_tx_t, tx_list);
1715                         did_something = 1;
1716
1717                         list_del (&tx->tx_list);
1718                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1719
1720                         ksocknal_tx_done (tx, 1);
1721
1722                         spin_lock_irqsave (&sched->kss_lock, flags);
1723                 }
1724 #endif
1725                 if (!did_something ||           /* nothing to do */
1726                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1727                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1728
1729                         nloops = 0;
1730
1731                         if (!did_something) {   /* wait for something to do */
1732 #if SOCKNAL_ZC
1733                                 rc = wait_event_interruptible (sched->kss_waitq,
1734                                                                ksocknal_data.ksnd_shuttingdown ||
1735                                                                !list_empty(&sched->kss_rx_conns) ||
1736                                                                !list_empty(&sched->kss_tx_conns) ||
1737                                                                !list_empty(&sched->kss_zctxdone_list));
1738 #else
1739                                 rc = wait_event_interruptible (sched->kss_waitq,
1740                                                                ksocknal_data.ksnd_shuttingdown ||
1741                                                                !list_empty(&sched->kss_rx_conns) ||
1742                                                                !list_empty(&sched->kss_tx_conns));
1743 #endif
1744                                 LASSERT (rc == 0);
1745                         } else
1746                                our_cond_resched();
1747
1748                         spin_lock_irqsave (&sched->kss_lock, flags);
1749                 }
1750         }
1751
1752         spin_unlock_irqrestore (&sched->kss_lock, flags);
1753         ksocknal_thread_fini ();
1754         return (0);
1755 }
1756
1757 void
1758 ksocknal_data_ready (struct sock *sk, int n)
1759 {
1760         unsigned long  flags;
1761         ksock_conn_t  *conn;
1762         ksock_sched_t *sched;
1763         ENTRY;
1764
1765         /* interleave correctly with closing sockets... */
1766         read_lock (&ksocknal_data.ksnd_global_lock);
1767
1768         conn = sk->sk_user_data;
1769         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1770                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1771                 sk->sk_data_ready (sk, n);
1772                 goto out;
1773         }
1774
1775         if (!conn->ksnc_rx_ready) {        /* new news */
1776                 /* Set ASAP in case of concurrent calls to me */
1777                 conn->ksnc_rx_ready = 1;
1778
1779                 sched = conn->ksnc_scheduler;
1780
1781                 spin_lock_irqsave (&sched->kss_lock, flags);
1782
1783                 /* Set again (process_receive may have cleared while I blocked for the lock) */
1784                 conn->ksnc_rx_ready = 1;
1785
1786                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1787                         list_add_tail(&conn->ksnc_rx_list,
1788                                       &sched->kss_rx_conns);
1789                         conn->ksnc_rx_scheduled = 1;
1790                         /* extra ref for scheduler */
1791                         atomic_inc (&conn->ksnc_refcount);
1792
1793                         wake_up (&sched->kss_waitq);
1794                 }
1795
1796                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1797         }
1798
1799  out:
1800         read_unlock (&ksocknal_data.ksnd_global_lock);
1801
1802         EXIT;
1803 }
1804
1805 void
1806 ksocknal_write_space (struct sock *sk)
1807 {
1808         unsigned long  flags;
1809         ksock_conn_t  *conn;
1810         ksock_sched_t *sched;
1811
1812         /* interleave correctly with closing sockets... */
1813         read_lock (&ksocknal_data.ksnd_global_lock);
1814
1815         conn = sk->sk_user_data;
1816
1817         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1818                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1819                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1820                                       " ready" : " blocked"),
1821                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1822                                       " scheduled" : " idle"),
1823                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1824                                       " empty" : " queued"));
1825
1826         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1827                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1828                 sk->sk_write_space (sk);
1829
1830                 read_unlock (&ksocknal_data.ksnd_global_lock);
1831                 return;
1832         }
1833
1834         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1835                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1836
1837                 if (!conn->ksnc_tx_ready) {      /* new news */
1838                         /* Set ASAP in case of concurrent calls to me */
1839                         conn->ksnc_tx_ready = 1;
1840
1841                         sched = conn->ksnc_scheduler;
1842
1843                         spin_lock_irqsave (&sched->kss_lock, flags);
1844
1845                         /* Set again (process_transmit may have
1846                            cleared while I blocked for the lock) */
1847                         conn->ksnc_tx_ready = 1;
1848
1849                         if (!conn->ksnc_tx_scheduled && // not being progressed
1850                             !list_empty(&conn->ksnc_tx_queue)){//packets to send
1851                                 list_add_tail (&conn->ksnc_tx_list,
1852                                                &sched->kss_tx_conns);
1853                                 conn->ksnc_tx_scheduled = 1;
1854                                 /* extra ref for scheduler */
1855                                 atomic_inc (&conn->ksnc_refcount);
1856
1857                                 wake_up (&sched->kss_waitq);
1858                         }
1859
1860                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1861                 }
1862         }
1863
1864         read_unlock (&ksocknal_data.ksnd_global_lock);
1865 }
1866
1867 int
1868 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1869 {
1870         int           rc;
1871         mm_segment_t  oldmm = get_fs();
1872
1873         while (nob > 0) {
1874                 struct iovec  iov = {
1875                         .iov_base = buffer,
1876                         .iov_len  = nob
1877                 };
1878                 struct msghdr msg = {
1879                         .msg_name       = NULL,
1880                         .msg_namelen    = 0,
1881                         .msg_iov        = &iov,
1882                         .msg_iovlen     = 1,
1883                         .msg_control    = NULL,
1884                         .msg_controllen = 0,
1885                         .msg_flags      = 0
1886                 };
1887
1888                 set_fs (KERNEL_DS);
1889                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1890                 set_fs (oldmm);
1891                 
1892                 if (rc < 0)
1893                         return (rc);
1894
1895                 if (rc == 0) {
1896                         CERROR ("Unexpected zero rc\n");
1897                         return (-ECONNABORTED);
1898                 }
1899
1900                 buffer = ((char *)buffer) + rc;
1901                 nob -= rc;
1902         }
1903         
1904         return (0);
1905 }
1906
1907 int
1908 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1909 {
1910         int           rc;
1911         mm_segment_t  oldmm = get_fs();
1912         
1913         while (nob > 0) {
1914                 struct iovec  iov = {
1915                         .iov_base = buffer,
1916                         .iov_len  = nob
1917                 };
1918                 struct msghdr msg = {
1919                         .msg_name       = NULL,
1920                         .msg_namelen    = 0,
1921                         .msg_iov        = &iov,
1922                         .msg_iovlen     = 1,
1923                         .msg_control    = NULL,
1924                         .msg_controllen = 0,
1925                         .msg_flags      = 0
1926                 };
1927
1928                 set_fs (KERNEL_DS);
1929                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1930                 set_fs (oldmm);
1931                 
1932                 if (rc < 0)
1933                         return (rc);
1934
1935                 if (rc == 0)
1936                         return (-ECONNABORTED);
1937
1938                 buffer = ((char *)buffer) + rc;
1939                 nob -= rc;
1940         }
1941         
1942         return (0);
1943 }
1944
1945 int
1946 ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
1947 {
1948         int                 rc;
1949         ptl_hdr_t           hdr;
1950         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1951
1952         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1953
1954         memset (&hdr, 0, sizeof (hdr));
1955         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1956         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
1957         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
1958         
1959         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
1960         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
1961         
1962         /* Assume sufficient socket buffering for this message */
1963         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
1964         if (rc != 0) {
1965                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, nid);
1966                 return (rc);
1967         }
1968
1969         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
1970         if (rc != 0) {
1971                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, nid);
1972                 return (rc);
1973         }
1974         
1975         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
1976                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
1977                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, nid);
1978                 return (-EINVAL);
1979         }
1980
1981         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
1982             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
1983                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
1984                         " from "LPX64"\n",
1985                         __le16_to_cpu (hmv->version_major),
1986                         __le16_to_cpu (hmv->version_minor),
1987                         PORTALS_PROTO_VERSION_MAJOR,
1988                         PORTALS_PROTO_VERSION_MINOR,
1989                         nid);
1990                 return (-EINVAL);
1991         }
1992
1993         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
1994         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
1995          * so read the rest of it in now... */
1996
1997         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
1998         if (rc != 0) {
1999                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
2000                         rc, nid);
2001                 return (rc);
2002         }
2003
2004         /* ...and check we got what we expected */
2005         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2006             PTL_HDR_LENGTH (&hdr) != __cpu_to_le32 (0)) {
2007                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2008                         " but got type %d with %d payload from "LPX64"\n",
2009                         __le32_to_cpu (hdr.type),
2010                         __le32_to_cpu (PTL_HDR_LENGTH (&hdr)), nid);
2011                 return (-EINVAL);
2012         }
2013         
2014         if (__le64_to_cpu (hdr.src_nid) != nid) {
2015                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
2016                         __le64_to_cpu (hdr.src_nid), nid);
2017                 return (-EINVAL);
2018         }
2019
2020         return (0);
2021 }
2022
2023 int
2024 ksocknal_setup_sock (struct socket *sock) 
2025 {
2026         mm_segment_t    oldmm = get_fs ();
2027         int             rc;
2028         int             option;
2029         struct linger   linger;
2030
2031         /* Ensure this socket aborts active sends immediately when we close
2032          * it. */
2033         
2034         linger.l_onoff = 0;
2035         linger.l_linger = 0;
2036
2037         set_fs (KERNEL_DS);
2038         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER, 
2039                               (char *)&linger, sizeof (linger));
2040         set_fs (oldmm);
2041         if (rc != 0) {
2042                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2043                 return (rc);
2044         }
2045         
2046         option = -1;
2047         set_fs (KERNEL_DS);
2048         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2049                                     (char *)&option, sizeof (option));
2050         set_fs (oldmm);
2051         if (rc != 0) {
2052                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2053                 return (rc);
2054         }
2055
2056 #if SOCKNAL_USE_KEEPALIVES
2057         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2058          * second until the timeout elapses. */
2059
2060         option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2061         set_fs (KERNEL_DS);
2062         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2063                                     (char *)&option, sizeof (option));
2064         set_fs (oldmm);
2065         if (rc != 0) {
2066                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2067                 return (rc);
2068         }
2069         
2070         option = 1;
2071         set_fs (KERNEL_DS);
2072         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2073                                     (char *)&option, sizeof (option));
2074         set_fs (oldmm);
2075         if (rc != 0) {
2076                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2077                 return (rc);
2078         }
2079         
2080         option = ksocknal_data.ksnd_io_timeout / 4;
2081         set_fs (KERNEL_DS);
2082         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2083                                     (char *)&option, sizeof (option));
2084         set_fs (oldmm);
2085         if (rc != 0) {
2086                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2087                 return (rc);
2088         }
2089
2090         option = 1;
2091         set_fs (KERNEL_DS);
2092         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2093                               (char *)&option, sizeof (option));
2094         set_fs (oldmm);
2095         if (rc != 0) {
2096                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2097                 return (rc);
2098         }
2099 #endif
2100         return (0);
2101 }
2102
2103 int
2104 ksocknal_connect_peer (ksock_route_t *route)
2105 {
2106         struct sockaddr_in  peer_addr;
2107         mm_segment_t        oldmm = get_fs();
2108         struct timeval      tv;
2109         int                 fd;
2110         struct socket      *sock;
2111         int                 rc;
2112
2113         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2114         if (rc != 0) {
2115                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2116                 return (rc);
2117         }
2118
2119         /* Ugh; have to map_fd for compatibility with sockets passed in
2120          * from userspace.  And we actually need the sock->file refcounting
2121          * that this gives you :) */
2122
2123         fd = sock_map_fd (sock);
2124         if (fd < 0) {
2125                 sock_release (sock);
2126                 CERROR ("sock_map_fd error %d\n", fd);
2127                 return (fd);
2128         }
2129
2130         /* NB the fd now owns the ref on sock->file */
2131         LASSERT (sock->file != NULL);
2132         LASSERT (file_count(sock->file) == 1);
2133
2134         /* Set the socket timeouts, so our connection attempt completes in
2135          * finite time */
2136         tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2137         tv.tv_usec = 0;
2138
2139         set_fs (KERNEL_DS);
2140         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2141                               (char *)&tv, sizeof (tv));
2142         set_fs (oldmm);
2143         if (rc != 0) {
2144                 CERROR ("Can't set send timeout %d: %d\n", 
2145                         ksocknal_data.ksnd_io_timeout, rc);
2146                 goto out;
2147         }
2148         
2149         set_fs (KERNEL_DS);
2150         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2151                               (char *)&tv, sizeof (tv));
2152         set_fs (oldmm);
2153         if (rc != 0) {
2154                 CERROR ("Can't set receive timeout %d: %d\n",
2155                         ksocknal_data.ksnd_io_timeout, rc);
2156                 goto out;
2157         }
2158
2159         if (route->ksnr_nonagel) {
2160                 int  option = 1;
2161                 
2162                 set_fs (KERNEL_DS);
2163                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2164                                             (char *)&option, sizeof (option));
2165                 set_fs (oldmm);
2166                 if (rc != 0) {
2167                         CERROR ("Can't disable nagel: %d\n", rc);
2168                         goto out;
2169                 }
2170         }
2171         
2172         if (route->ksnr_buffer_size != 0) {
2173                 int option = route->ksnr_buffer_size;
2174                 
2175                 set_fs (KERNEL_DS);
2176                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2177                                       (char *)&option, sizeof (option));
2178                 set_fs (oldmm);
2179                 if (rc != 0) {
2180                         CERROR ("Can't set send buffer %d: %d\n",
2181                                 route->ksnr_buffer_size, rc);
2182                         goto out;
2183                 }
2184
2185                 set_fs (KERNEL_DS);
2186                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2187                                       (char *)&option, sizeof (option));
2188                 set_fs (oldmm);
2189                 if (rc != 0) {
2190                         CERROR ("Can't set receive buffer %d: %d\n",
2191                                 route->ksnr_buffer_size, rc);
2192                         goto out;
2193                 }
2194         }
2195         
2196         memset (&peer_addr, 0, sizeof (peer_addr));
2197         peer_addr.sin_family = AF_INET;
2198         peer_addr.sin_port = htons (route->ksnr_port);
2199         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2200         
2201         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2202                                  sizeof (peer_addr), sock->file->f_flags);
2203         if (rc != 0) {
2204                 CERROR ("Error %d connecting to "LPX64"\n", rc,
2205                         route->ksnr_peer->ksnp_nid);
2206                 goto out;
2207         }
2208         
2209         if (route->ksnr_xchange_nids) {
2210                 rc = ksocknal_exchange_nids (sock, route->ksnr_peer->ksnp_nid);
2211                 if (rc != 0)
2212                         goto out;
2213         }
2214
2215         rc = ksocknal_create_conn (route->ksnr_peer->ksnp_nid,
2216                                    route, sock, route->ksnr_irq_affinity);
2217         if (rc == 0) {
2218                 /* Take an extra ref on sock->file to compensate for the
2219                  * upcoming close which will lose fd's ref on it. */
2220                 get_file (sock->file);
2221         }
2222
2223  out:
2224         sys_close (fd);
2225         return (rc);
2226 }
2227
2228 void
2229 ksocknal_autoconnect (ksock_route_t *route)
2230 {
2231         LIST_HEAD        (zombies);
2232         ksock_tx_t       *tx;
2233         ksock_peer_t     *peer;
2234         unsigned long     flags;
2235         int               rc;
2236         
2237         rc = ksocknal_connect_peer (route);
2238         if (rc == 0) {
2239                 /* successfully autoconnected: create_conn did the
2240                  * route/conn binding and scheduled any blocked packets, 
2241                  * so there's nothing left to do now. */
2242                 return;
2243         }
2244
2245         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2246
2247         peer = route->ksnr_peer;
2248         route->ksnr_connecting = 0;
2249
2250         LASSERT (route->ksnr_retry_interval != 0);
2251         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2252         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2253                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2254
2255         if (!list_empty (&peer->ksnp_tx_queue) &&
2256             ksocknal_find_connecting_route_locked (peer) == NULL) {
2257                 LASSERT (list_empty (&peer->ksnp_conns));
2258
2259                 /* None of the connections that the blocked packets are
2260                  * waiting for have been successful.  Complete them now... */
2261                 do {
2262                         tx = list_entry (peer->ksnp_tx_queue.next,
2263                                          ksock_tx_t, tx_list);
2264                         list_del (&tx->tx_list);
2265                         list_add_tail (&tx->tx_list, &zombies);
2266                 } while (!list_empty (&peer->ksnp_tx_queue));
2267         }
2268
2269         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2270
2271         while (!list_empty (&zombies)) {
2272                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2273                 
2274                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2275                         NTOH__u32 (tx->tx_hdr->type),
2276                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
2277                         NTOH__u64 (tx->tx_hdr->src_nid),
2278                         NTOH__u64 (tx->tx_hdr->dest_nid));
2279
2280                 list_del (&tx->tx_list);
2281                 /* complete now */
2282                 ksocknal_tx_done (tx, 0);
2283         }
2284 }
2285
2286 int
2287 ksocknal_autoconnectd (void *arg)
2288 {
2289         long               id = (long)arg;
2290         char               name[16];
2291         unsigned long      flags;
2292         ksock_route_t     *route;
2293         int                rc;
2294
2295         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2296         kportal_daemonize (name);
2297         kportal_blockallsigs ();
2298
2299         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2300
2301         while (!ksocknal_data.ksnd_shuttingdown) {
2302
2303                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2304                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2305                                             ksock_route_t, ksnr_connect_list);
2306                         
2307                         list_del (&route->ksnr_connect_list);
2308                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2309
2310                         ksocknal_autoconnect (route);
2311                         ksocknal_put_route (route);
2312
2313                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2314                         continue;
2315                 }
2316                 
2317                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2318
2319                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2320                                                ksocknal_data.ksnd_shuttingdown ||
2321                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2322
2323                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2324         }
2325
2326         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2327
2328         ksocknal_thread_fini ();
2329         return (0);
2330 }
2331
2332 ksock_conn_t *
2333 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2334 {
2335         /* We're called with a shared lock on ksnd_global_lock */
2336         ksock_conn_t      *conn;
2337         struct list_head  *ctmp;
2338         ksock_sched_t     *sched;
2339
2340         list_for_each (ctmp, &peer->ksnp_conns) {
2341                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2342                 sched = conn->ksnc_scheduler;
2343
2344                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2345                 LASSERT (!conn->ksnc_closing);
2346                 
2347                 if (conn->ksnc_rx_started &&
2348                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2349                         /* Timed out incomplete incoming message */
2350                         atomic_inc (&conn->ksnc_refcount);
2351                         CERROR ("Timed out RX from "LPX64" %p\n", 
2352                                 peer->ksnp_nid, conn);
2353                         return (conn);
2354                 }
2355                 
2356                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2357                      conn->ksnc_sock->sk->wmem_queued != 0) &&
2358                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2359                         /* Timed out messages queued for sending, or
2360                          * messages buffered in the socket's send buffer */
2361                         atomic_inc (&conn->ksnc_refcount);
2362                         CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
2363                                 peer->ksnp_nid, 
2364                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2365                                 conn->ksnc_sock->sk->wmem_queued, conn);
2366                         return (conn);
2367                 }
2368         }
2369
2370         return (NULL);
2371 }
2372
2373 void
2374 ksocknal_check_peer_timeouts (int idx)
2375 {
2376         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2377         struct list_head *ptmp;
2378         ksock_peer_t     *peer;
2379         ksock_conn_t     *conn;
2380
2381  again:
2382         /* NB. We expect to have a look at all the peers and not find any
2383          * connections to time out, so we just use a shared lock while we
2384          * take a look... */
2385         read_lock (&ksocknal_data.ksnd_global_lock);
2386
2387         list_for_each (ptmp, peers) {
2388                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2389                 conn = ksocknal_find_timed_out_conn (peer);
2390                 
2391                 if (conn != NULL) {
2392                         read_unlock (&ksocknal_data.ksnd_global_lock);
2393
2394                         if (ksocknal_close_conn_unlocked (conn, -ETIMEDOUT)) {
2395                                 /* I actually closed... */
2396                                 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2397                                         peer->ksnp_nid, conn->ksnc_ipaddr,
2398                                         conn->ksnc_port);
2399                         }
2400                 
2401                         /* NB we won't find this one again, but we can't
2402                          * just proceed with the next peer, since we dropped
2403                          * ksnd_global_lock and it might be dead already! */
2404                         ksocknal_put_conn (conn);
2405                         goto again;
2406                 }
2407         }
2408
2409         read_unlock (&ksocknal_data.ksnd_global_lock);
2410 }
2411
2412 int
2413 ksocknal_reaper (void *arg)
2414 {
2415         wait_queue_t       wait;
2416         unsigned long      flags;
2417         ksock_conn_t      *conn;
2418         int                timeout;
2419         int                i;
2420         int                peer_index = 0;
2421         unsigned long      deadline = jiffies;
2422         
2423         kportal_daemonize ("ksocknal_reaper");
2424         kportal_blockallsigs ();
2425
2426         init_waitqueue_entry (&wait, current);
2427
2428         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2429
2430         while (!ksocknal_data.ksnd_shuttingdown) {
2431
2432                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2433                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2434                                            ksock_conn_t, ksnc_list);
2435                         list_del (&conn->ksnc_list);
2436                         
2437                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2438
2439                         ksocknal_terminate_conn (conn);
2440                         ksocknal_put_conn (conn);
2441
2442                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2443                         continue;
2444                 }
2445
2446                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2447                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2448                                            ksock_conn_t, ksnc_list);
2449                         list_del (&conn->ksnc_list);
2450                         
2451                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2452
2453                         ksocknal_destroy_conn (conn);
2454
2455                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2456                         continue;
2457                 }
2458                 
2459                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2460
2461                 /* careful with the jiffy wrap... */
2462                 while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
2463                         const int n = 4;
2464                         const int p = 1;
2465                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2466                         
2467                         /* Time to check for timeouts on a few more peers: I do
2468                          * checks every 'p' seconds on a proportion of the peer
2469                          * table and I need to check every connection 'n' times
2470                          * within a timeout interval, to ensure I detect a
2471                          * timeout on any connection within (n+1)/n times the
2472                          * timeout interval. */
2473
2474                         if (ksocknal_data.ksnd_io_timeout > n * p)
2475                                 chunk = (chunk * n * p) / 
2476                                         ksocknal_data.ksnd_io_timeout;
2477                         if (chunk == 0)
2478                                 chunk = 1;
2479
2480                         for (i = 0; i < chunk; i++) {
2481                                 ksocknal_check_peer_timeouts (peer_index);
2482                                 peer_index = (peer_index + 1) % 
2483                                              ksocknal_data.ksnd_peer_hash_size;
2484                         }
2485
2486                         deadline += p * HZ;
2487                 }
2488
2489                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2490                 set_current_state (TASK_INTERRUPTIBLE);
2491
2492                 if (!ksocknal_data.ksnd_shuttingdown &&
2493                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2494                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2495                         schedule_timeout (timeout);
2496
2497                 set_current_state (TASK_RUNNING);
2498                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2499
2500                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2501         }
2502
2503         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2504
2505         ksocknal_thread_fini ();
2506         return (0);
2507 }
2508
2509 nal_cb_t ksocknal_lib = {
2510         nal_data:       &ksocknal_data,                /* NAL private data */
2511         cb_send:         ksocknal_send,
2512         cb_send_pages:   ksocknal_send_pages,
2513         cb_recv:         ksocknal_recv,
2514         cb_recv_pages:   ksocknal_recv_pages,
2515         cb_read:         ksocknal_read,
2516         cb_write:        ksocknal_write,
2517         cb_callback:     ksocknal_callback,
2518         cb_malloc:       ksocknal_malloc,
2519         cb_free:         ksocknal_free,
2520         cb_printf:       ksocknal_printf,
2521         cb_cli:          ksocknal_cli,
2522         cb_sti:          ksocknal_sti,
2523         cb_dist:         ksocknal_dist
2524 };