Whamcloud - gitweb
* lctl set_route <nid> <up/down> enables or disables particular portals
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 /*
29  *  LIB functions follow
30  *
31  */
32 int
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34               user_ptr src_addr, size_t len)
35 {
36         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37                nal->ni.nid, (long)len, src_addr, dst_addr);
38
39         memcpy( dst_addr, src_addr, len );
40         return 0;
41 }
42
43 int
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45                void *src_addr, size_t len)
46 {
47         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48                nal->ni.nid, (long)len, src_addr, dst_addr);
49
50         memcpy( dst_addr, src_addr, len );
51         return 0;
52 }
53
54 int
55 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
56                          ptl_event_t *ev)
57 {
58         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
59                nal->ni.nid, eq, ev);
60
61         if (eq->event_callback != NULL)
62                 eq->event_callback(ev);
63
64         return 0;
65 }
66
67 void *
68 ksocknal_malloc(nal_cb_t *nal, size_t len)
69 {
70         void *buf;
71
72         PORTAL_ALLOC(buf, len);
73
74         if (buf != NULL)
75                 memset(buf, 0, len);
76
77         return (buf);
78 }
79
80 void
81 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
82 {
83         PORTAL_FREE(buf, len);
84 }
85
86 void
87 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
88 {
89         va_list ap;
90         char msg[256];
91
92         va_start (ap, fmt);
93         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
94         va_end (ap);
95
96         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
97
98         CDEBUG (D_NET, "%s", msg);
99 }
100
101 void
102 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *data = nal->nal_data;
105
106         spin_lock(&data->ksnd_nal_cb_lock);
107 }
108
109 void
110 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data;
113         data = nal->nal_data;
114
115         spin_unlock(&data->ksnd_nal_cb_lock);
116 }
117
118 int
119 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
120 {
121         /* I would guess that if ksocknal_get_peer (nid) == NULL,
122            and we're not routing, then 'nid' is very distant :) */
123         if ( nal->ni.nid == nid ) {
124                 *dist = 0;
125         } else {
126                 *dist = 1;
127         }
128
129         return 0;
130 }
131
132 ksock_ltx_t *
133 ksocknal_get_ltx (int may_block)
134 {
135         unsigned long flags;
136         ksock_ltx_t *ltx = NULL;
137
138         for (;;) {
139                 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
140
141                 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
142                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
143                                          ksock_ltx_t, ltx_tx.tx_list);
144                         list_del (&ltx->ltx_tx.tx_list);
145                         ksocknal_data.ksnd_active_ltxs++;
146                         break;
147                 }
148
149                 if (!may_block) {
150                         if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
151                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
152                                                  ksock_ltx_t, ltx_tx.tx_list);
153                                 list_del (&ltx->ltx_tx.tx_list);
154                                 ksocknal_data.ksnd_active_ltxs++;
155                         }
156                         break;
157                 }
158
159                 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
160                                        flags);
161
162                 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
163                             !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
164         }
165
166         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
167
168         return (ltx);
169 }
170
171 void
172 ksocknal_put_ltx (ksock_ltx_t *ltx)
173 {
174         unsigned long   flags;
175         
176         spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
177
178         ksocknal_data.ksnd_active_ltxs--;
179         list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
180
181         /* normal tx desc => wakeup anyone blocking for one */
182         if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
183             waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
184                 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
185
186         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
187 }
188
189 #if SOCKNAL_ZC
190 struct page *
191 ksocknal_kvaddr_to_page (unsigned long vaddr)
192 {
193         struct page *page;
194
195         if (vaddr >= VMALLOC_START &&
196             vaddr < VMALLOC_END)
197                 page = vmalloc_to_page ((void *)vaddr);
198 #if CONFIG_HIGHMEM
199         else if (vaddr >= PKMAP_BASE &&
200                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
201                 page = vmalloc_to_page ((void *)vaddr);
202                 /* in 2.4 ^ just walks the page tables */
203 #endif
204         else
205                 page = virt_to_page (vaddr);
206
207         if (page == NULL ||
208             !VALID_PAGE (page))
209                 return (NULL);
210
211         return (page);
212 }
213 #endif
214
215 int
216 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
217 {
218         struct socket *sock = conn->ksnc_sock;
219         struct iovec  *iov = tx->tx_iov;
220         int            fragsize = iov->iov_len;
221         unsigned long  vaddr = (unsigned long)iov->iov_base;
222         int            more = (!list_empty (&conn->ksnc_tx_queue)) |
223                               (tx->tx_niov > 1) |
224                               (tx->tx_nkiov > 1);
225 #if SOCKNAL_ZC
226         int            offset = vaddr & (PAGE_SIZE - 1);
227         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
228         struct page   *page;
229 #endif
230         int            rc;
231
232         /* NB we can't trust socket ops to either consume our iovs
233          * or leave them alone, so we only send 1 frag at a time. */
234         LASSERT (fragsize <= tx->tx_resid);
235         LASSERT (tx->tx_niov > 0);
236         
237 #if SOCKNAL_ZC
238         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
239             (sock->sk->route_caps & NETIF_F_SG) &&
240             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
241             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
242                 
243                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
244                        (void *)vaddr, page, page_address(page), offset, zcsize);
245
246                 if (fragsize > zcsize) {
247                         more = 1;
248                         fragsize = zcsize;
249                 }
250
251                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
252                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
253                                        &tx->tx_zccd);
254         } else
255 #endif
256         {
257                 /* NB don't pass tx's iov; sendmsg may or may not update it */
258                 struct iovec fragiov = { .iov_base = (void *)vaddr,
259                                          .iov_len  = fragsize};
260                 struct msghdr msg = {
261                         .msg_name       = NULL,
262                         .msg_namelen    = 0,
263                         .msg_iov        = &fragiov,
264                         .msg_iovlen     = 1,
265                         .msg_control    = NULL,
266                         .msg_controllen = 0,
267                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
268                 };
269                 mm_segment_t oldmm = get_fs();
270                 
271                 set_fs (KERNEL_DS);
272                 rc = sock_sendmsg(sock, &msg, fragsize);
273                 set_fs (oldmm);
274         } 
275
276         if (rc <= 0)
277                 return (rc);
278
279         tx->tx_resid -= rc;
280
281         if (rc < iov->iov_len) {
282                 /* didn't send whole iov entry... */
283                 iov->iov_base = (void *)(vaddr + rc);
284                 iov->iov_len -= rc;
285                 /* ...but did we send everything we tried to send? */
286                 return ((rc == fragsize) ? 1 : -EAGAIN);
287         }
288
289         tx->tx_iov++;
290         tx->tx_niov--;
291         return (1);
292 }
293
294 int
295 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
296 {
297         struct socket *sock = conn->ksnc_sock;
298         ptl_kiov_t    *kiov = tx->tx_kiov;
299         int            fragsize = kiov->kiov_len;
300         struct page   *page = kiov->kiov_page;
301         int            offset = kiov->kiov_offset;
302         int            more = (!list_empty (&conn->ksnc_tx_queue)) |
303                               (tx->tx_nkiov > 1);
304         int            rc;
305
306         /* NB we can't trust socket ops to either consume our iovs
307          * or leave them alone, so we only send 1 frag at a time. */
308         LASSERT (fragsize <= tx->tx_resid);
309         LASSERT (offset + fragsize <= PAGE_SIZE);
310         LASSERT (tx->tx_niov == 0);
311         LASSERT (tx->tx_nkiov > 0);
312
313 #if SOCKNAL_ZC
314         if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
315             (sock->sk->route_caps & NETIF_F_SG) &&
316             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
317
318                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
319                                page, offset, fragsize);
320
321                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
322                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
323                                        &tx->tx_zccd);
324         } else
325 #endif
326         {
327                 char *addr = ((char *)kmap (page)) + offset;
328                 struct iovec fragiov = {.iov_base = addr,
329                                         .iov_len  = fragsize};
330                 struct msghdr msg = {
331                         .msg_name       = NULL,
332                         .msg_namelen    = 0,
333                         .msg_iov        = &fragiov,
334                         .msg_iovlen     = 1,
335                         .msg_control    = NULL,
336                         .msg_controllen = 0,
337                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
338                 };
339                 mm_segment_t  oldmm = get_fs();
340                 
341                 set_fs (KERNEL_DS);
342                 rc = sock_sendmsg(sock, &msg, fragsize);
343                 set_fs (oldmm);
344
345                 kunmap (page);
346         }
347
348         if (rc <= 0)
349                 return (rc);
350
351         tx->tx_resid -= rc;
352  
353         if (rc < fragsize) {
354                 /* didn't send whole frag */
355                 kiov->kiov_offset = offset + rc;
356                 kiov->kiov_len    = fragsize - rc;
357                 return (-EAGAIN);
358         }
359
360         /* everything went */
361         LASSERT (rc == fragsize);
362         tx->tx_kiov++;
363         tx->tx_nkiov--;
364         return (1);
365 }
366
367 int
368 ksocknal_sendmsg (ksock_conn_t *conn, ksock_tx_t *tx)
369 {
370         /* Return 0 on success, < 0 on error.
371          * caller checks tx_resid to determine progress/completion */
372         int      rc;
373         ENTRY;
374         
375         if (ksocknal_data.ksnd_stall_tx != 0) {
376                 set_current_state (TASK_UNINTERRUPTIBLE);
377                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
378         }
379
380         rc = ksocknal_getconnsock (conn);
381         if (rc != 0)
382                 return (rc);
383
384         for (;;) {
385                 LASSERT (tx->tx_resid != 0);
386
387                 if (conn->ksnc_closing) {
388                         rc = -ESHUTDOWN;
389                         break;
390                 }
391
392                 if (tx->tx_niov != 0)
393                         rc = ksocknal_send_iov (conn, tx);
394                 else
395                         rc = ksocknal_send_kiov (conn, tx);
396
397                 if (rc <= 0) {                  /* error or socket full? */
398                         /* NB: rc == 0 and rc == -EAGAIN both mean try
399                          * again later (linux stack returns -EAGAIN for
400                          * this, but Adaptech TOE returns 0) */
401                         if (rc == -EAGAIN)
402                                 rc = 0;
403                         break;
404                 }
405
406                 /* Consider the connection alive since we managed to chuck
407                  * more data into it.  Really, we'd like to consider it
408                  * alive only when the peer ACKs something, but
409                  * write_space() only gets called back while SOCK_NOSPACE
410                  * is set.  Instead, we presume peer death has occurred if
411                  * the socket doesn't drain within a timout */
412                 conn->ksnc_tx_deadline = jiffies + 
413                                          ksocknal_data.ksnd_io_timeout * HZ;
414                 conn->ksnc_peer->ksnp_last_alive = jiffies;
415
416                 if (tx->tx_resid == 0) {        /* sent everything */
417                         rc = 0;
418                         break;
419                 }
420         }
421
422         ksocknal_putconnsock (conn);
423         RETURN (rc);
424 }
425
426 void
427 ksocknal_eager_ack (ksock_conn_t *conn)
428 {
429         int            opt = 1;
430         mm_segment_t   oldmm = get_fs();
431         struct socket *sock = conn->ksnc_sock;
432         
433         /* Remind the socket to ACK eagerly.  If I don't, the socket might
434          * think I'm about to send something it could piggy-back the ACK
435          * on, introducing delay in completing zero-copy sends in my
436          * peer. */
437
438         set_fs(KERNEL_DS);
439         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
440                                (char *)&opt, sizeof (opt));
441         set_fs(oldmm);
442 }
443
444 int
445 ksocknal_recv_iov (ksock_conn_t *conn)
446 {
447         struct iovec *iov = conn->ksnc_rx_iov;
448         int           fragsize  = iov->iov_len;
449         unsigned long vaddr = (unsigned long)iov->iov_base;
450         struct iovec  fragiov = { .iov_base = (void *)vaddr,
451                                   .iov_len  = fragsize};
452         struct msghdr msg = {
453                 .msg_name       = NULL,
454                 .msg_namelen    = 0,
455                 .msg_iov        = &fragiov,
456                 .msg_iovlen     = 1,
457                 .msg_control    = NULL,
458                 .msg_controllen = 0,
459                 .msg_flags      = 0
460         };
461         mm_segment_t oldmm = get_fs();
462         int          rc;
463
464         /* NB we can't trust socket ops to either consume our iovs
465          * or leave them alone, so we only receive 1 frag at a time. */
466         LASSERT (conn->ksnc_rx_niov > 0);
467         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
468         
469         set_fs (KERNEL_DS);
470         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
471         /* NB this is just a boolean............................^ */
472         set_fs (oldmm);
473
474         if (rc <= 0)
475                 return (rc);
476
477         /* received something... */
478         conn->ksnc_peer->ksnp_last_alive = jiffies;
479         conn->ksnc_rx_deadline = jiffies + 
480                                  ksocknal_data.ksnd_io_timeout * HZ;
481         mb();                           /* order with setting rx_started */
482         conn->ksnc_rx_started = 1;
483
484         conn->ksnc_rx_nob_wanted -= rc;
485         conn->ksnc_rx_nob_left -= rc;
486                 
487         if (rc < fragsize) {
488                 iov->iov_base = (void *)(vaddr + rc);
489                 iov->iov_len = fragsize - rc;
490                 return (-EAGAIN);
491         }
492
493         conn->ksnc_rx_iov++;
494         conn->ksnc_rx_niov--;
495         return (1);
496 }
497
498 int
499 ksocknal_recv_kiov (ksock_conn_t *conn)
500 {
501         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
502         struct page  *page = kiov->kiov_page;
503         int           offset = kiov->kiov_offset;
504         int           fragsize = kiov->kiov_len;
505         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
506         struct iovec  fragiov = { .iov_base = (void *)vaddr,
507                                   .iov_len  = fragsize};
508         struct msghdr msg = {
509                 .msg_name       = NULL,
510                 .msg_namelen    = 0,
511                 .msg_iov        = &fragiov,
512                 .msg_iovlen     = 1,
513                 .msg_control    = NULL,
514                 .msg_controllen = 0,
515                 .msg_flags      = 0
516         };
517         mm_segment_t oldmm = get_fs();
518         int          rc;
519
520         /* NB we can't trust socket ops to either consume our iovs
521          * or leave them alone, so we only receive 1 frag at a time. */
522         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
523         LASSERT (conn->ksnc_rx_nkiov > 0);
524         LASSERT (offset + fragsize <= PAGE_SIZE);
525         
526         set_fs (KERNEL_DS);
527         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
528         /* NB this is just a boolean............................^ */
529         set_fs (oldmm);
530
531         kunmap (page);
532         
533         if (rc <= 0)
534                 return (rc);
535         
536         /* received something... */
537         conn->ksnc_peer->ksnp_last_alive = jiffies;
538         conn->ksnc_rx_deadline = jiffies + 
539                                  ksocknal_data.ksnd_io_timeout * HZ;
540         mb();                           /* order with setting rx_started */
541         conn->ksnc_rx_started = 1;
542
543         conn->ksnc_rx_nob_wanted -= rc;
544         conn->ksnc_rx_nob_left -= rc;
545                 
546         if (rc < fragsize) {
547                 kiov->kiov_offset = offset + rc;
548                 kiov->kiov_len = fragsize - rc;
549                 return (-EAGAIN);
550         }
551
552         conn->ksnc_rx_kiov++;
553         conn->ksnc_rx_nkiov--;
554         return (1);
555 }
556
557 int
558 ksocknal_recvmsg (ksock_conn_t *conn) 
559 {
560         /* Return 1 on success, 0 on EOF, < 0 on error.
561          * Caller checks ksnc_rx_nob_wanted to determine
562          * progress/completion. */
563         int     rc;
564         ENTRY;
565         
566         if (ksocknal_data.ksnd_stall_rx != 0) {
567                 set_current_state (TASK_UNINTERRUPTIBLE);
568                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
569         }
570
571         rc = ksocknal_getconnsock (conn);
572         if (rc != 0)
573                 return (rc);
574
575         for (;;) {
576                 if (conn->ksnc_closing) {
577                         rc = -ESHUTDOWN;
578                         break;
579                 }
580
581                 if (conn->ksnc_rx_niov != 0)
582                         rc = ksocknal_recv_iov (conn);
583                 else
584                         rc = ksocknal_recv_kiov (conn);
585
586                 if (rc <= 0) {
587                         /* error/EOF or partial receive */
588                         if (rc == -EAGAIN) {
589                                 rc = 1;
590                         } else if (rc == 0 && conn->ksnc_rx_started) {
591                                 /* EOF in the middle of a message */
592                                 rc = -EPROTO;
593                         }
594                         break;
595                 }
596
597                 /* Completed a fragment */
598
599                 if (conn->ksnc_rx_nob_wanted == 0) {
600                         /* Completed a message segment (header or payload) */
601                         if (ksocknal_data.ksnd_eager_ack &&
602                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
603                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
604                                 /* Remind the socket to ack eagerly... */
605                                 ksocknal_eager_ack(conn);
606                         }
607                         rc = 1;
608                         break;
609                 }
610         }
611
612         ksocknal_putconnsock (conn);
613         RETURN (rc);
614 }
615
616 #if SOCKNAL_ZC
617 void
618 ksocknal_zc_callback (zccd_t *zcd)
619 {
620         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
621         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
622         unsigned long  flags;
623         ENTRY;
624
625         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
626
627         spin_lock_irqsave (&sched->kss_lock, flags);
628
629         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
630         if (waitqueue_active (&sched->kss_waitq))
631                 wake_up (&sched->kss_waitq);
632
633         spin_unlock_irqrestore (&sched->kss_lock, flags);
634         EXIT;
635 }
636 #endif
637
638 void
639 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
640 {
641         ksock_ltx_t   *ltx;
642         ENTRY;
643
644         if (tx->tx_conn != NULL) {
645                 /* This tx got queued on a conn; do the accounting... */
646                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
647 #if SOCKNAL_ZC
648                 /* zero copy completion isn't always from
649                  * process_transmit() so it needs to keep a ref on
650                  * tx_conn... */
651                 if (asynch)
652                         ksocknal_put_conn (tx->tx_conn);
653 #else
654                 LASSERT (!asynch);
655 #endif
656         }
657
658         if (tx->tx_isfwd) {             /* was a forwarded packet? */
659                 kpr_fwd_done (&ksocknal_data.ksnd_router,
660                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
661                 EXIT;
662                 return;
663         }
664
665         /* local send */
666         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
667
668         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
669
670         ksocknal_put_ltx (ltx);
671         EXIT;
672 }
673
674 void
675 ksocknal_tx_launched (ksock_tx_t *tx) 
676 {
677 #if SOCKNAL_ZC
678         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
679                 ksock_conn_t  *conn = tx->tx_conn;
680                 
681                 /* zccd skbufs are still in-flight.  First take a ref on
682                  * conn, so it hangs about for ksocknal_tx_done... */
683                 atomic_inc (&conn->ksnc_refcount);
684
685                 /* ...then drop the initial ref on zccd, so the zero copy
686                  * callback can occur */
687                 zccd_put (&tx->tx_zccd);
688                 return;
689         }
690 #endif
691         /* Any zero-copy-ness (if any) has completed; I can complete the
692          * transmit now, avoiding an extra schedule */
693         ksocknal_tx_done (tx, 0);
694 }
695
696 void
697 ksocknal_process_transmit (ksock_sched_t *sched, unsigned long *irq_flags)
698 {
699         ksock_conn_t  *conn;
700         ksock_tx_t    *tx;
701         int            rc;
702         
703         LASSERT (!list_empty (&sched->kss_tx_conns));
704         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
705         list_del (&conn->ksnc_tx_list);
706
707         LASSERT (conn->ksnc_tx_scheduled);
708         LASSERT (conn->ksnc_tx_ready);
709         LASSERT (!list_empty (&conn->ksnc_tx_queue));
710         tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
711         /* assume transmit will complete now, so dequeue while I've got lock */
712         list_del (&tx->tx_list);
713
714         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
715
716         LASSERT (tx->tx_resid > 0);
717
718         conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
719         mb();                   /* => clear BEFORE trying to write */
720
721         rc = ksocknal_sendmsg (conn, tx);
722
723         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
724
725         if (rc != 0) {
726                 if (ksocknal_close_conn_unlocked (conn, rc)) {
727                         /* I'm the first to close */
728                         CERROR ("[%p] Error %d on write to "LPX64" ip %08x:%d\n",
729                                 conn, rc, conn->ksnc_peer->ksnp_nid,
730                                 conn->ksnc_ipaddr, conn->ksnc_port);
731                 }
732                 ksocknal_tx_launched (tx);
733                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
734
735         } else if (tx->tx_resid == 0) {  
736                 /* everything went; assume more can go, and avoid
737                  * write_space locking */
738                 conn->ksnc_tx_ready = 1;
739
740                 ksocknal_tx_launched (tx);
741                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
742         } else {
743                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
744
745                 /* back onto HEAD of tx_queue */
746                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
747         }
748
749         /* no space to write, or nothing to write? */
750         if (!conn->ksnc_tx_ready ||
751             list_empty (&conn->ksnc_tx_queue)) {
752                 /* mark not scheduled */
753                 conn->ksnc_tx_scheduled = 0;
754                 /* drop scheduler's ref */
755                 ksocknal_put_conn (conn);
756         } else {
757                 /* stay scheduled */
758                 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
759         }
760 }
761
762 void
763 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
764 {
765         unsigned long     flags;
766
767         /* called holding write lock on ksnd_global_lock */
768
769         LASSERT (route->ksnr_conn == NULL);
770         LASSERT (!route->ksnr_deleted && !route->ksnr_connecting);
771         
772         route->ksnr_connecting = 1;
773         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
774         
775         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
776         
777         list_add_tail (&route->ksnr_connect_list,
778                        &ksocknal_data.ksnd_autoconnectd_routes);
779
780         if (waitqueue_active (&ksocknal_data.ksnd_autoconnectd_waitq))
781                 wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
782         
783         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
784 }
785
786 ksock_peer_t *
787 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
788 {
789         ptl_nid_t     target_nid;
790         int           rc;
791         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
792         
793         if (peer != NULL)
794                 return (peer);
795         
796         if (tx->tx_isfwd) {
797                 CERROR ("Can't send packet to "LPX64
798                         ": routed target is not a peer\n", nid);
799                 return (NULL);
800         }
801         
802         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
803                          &target_nid);
804         if (rc != 0) {
805                 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
806                 return (NULL);
807         }
808
809         peer = ksocknal_find_peer_locked (target_nid);
810         if (peer != NULL)
811                 return (peer);
812
813         CERROR ("Can't send packet to "LPX64": no peer entry\n", target_nid);
814         return (NULL);
815 }
816
817 ksock_conn_t *
818 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
819 {
820         struct list_head *tmp;
821         ksock_conn_t     *conn = NULL;
822
823         /* Find the conn with the shortest tx queue */
824         list_for_each (tmp, &peer->ksnp_conns) {
825                 ksock_conn_t *c = list_entry (tmp, ksock_conn_t, ksnc_list);
826
827                 LASSERT (!c->ksnc_closing);
828                 
829                 if (conn == NULL ||
830                     atomic_read (&conn->ksnc_tx_nob) >
831                     atomic_read (&c->ksnc_tx_nob))
832                         conn = c;
833         }
834
835         return (conn);
836 }
837
838 void
839 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
840 {
841         unsigned long  flags;
842         ksock_sched_t *sched = conn->ksnc_scheduler;
843
844         /* called holding global lock (read or irq-write) */
845
846         CDEBUG (D_NET, "Sending to "LPX64" on port %d\n", 
847                 conn->ksnc_peer->ksnp_nid, conn->ksnc_port);
848
849         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
850         tx->tx_resid = tx->tx_nob;
851         tx->tx_conn = conn;
852
853 #if SOCKNAL_ZC
854         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
855         /* NB this sets 1 ref on zccd, so the callback can only occur after
856          * I've released this ref. */
857 #endif
858
859         spin_lock_irqsave (&sched->kss_lock, flags);
860
861         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
862                 
863         if (conn->ksnc_tx_ready &&      /* able to send */
864             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
865                 /* +1 ref for scheduler */
866                 atomic_inc (&conn->ksnc_refcount);
867                 list_add_tail (&conn->ksnc_tx_list, 
868                                &sched->kss_tx_conns);
869                 conn->ksnc_tx_scheduled = 1;
870                 if (waitqueue_active (&sched->kss_waitq))
871                         wake_up (&sched->kss_waitq);
872         }
873
874         spin_unlock_irqrestore (&sched->kss_lock, flags);
875 }
876
877 ksock_route_t *
878 ksocknal_find_connectable_route_locked (ksock_peer_t *peer, int eager_only)
879 {
880         struct list_head  *tmp;
881         ksock_route_t     *route;
882         
883         list_for_each (tmp, &peer->ksnp_routes) {
884                 route = list_entry (tmp, ksock_route_t, ksnr_list);
885                 
886                 if (route->ksnr_conn == NULL && /* not connected */
887                     !route->ksnr_connecting &&  /* not connecting */
888                     (!eager_only || route->ksnr_eager) && /* wants to connect */
889                     time_after_eq (jiffies, route->ksnr_timeout)) /* OK to retry */
890                         return (route);
891         }
892         
893         return (NULL);
894 }
895
896 ksock_route_t *
897 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
898 {
899         struct list_head  *tmp;
900         ksock_route_t     *route;
901
902         list_for_each (tmp, &peer->ksnp_routes) {
903                 route = list_entry (tmp, ksock_route_t, ksnr_list);
904                 
905                 if (route->ksnr_connecting)
906                         return (route);
907         }
908         
909         return (NULL);
910 }
911
912 int
913 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
914 {
915         unsigned long     flags;
916         ksock_peer_t     *peer;
917         ksock_conn_t     *conn;
918         ksock_route_t    *route;
919         rwlock_t         *g_lock;
920
921         /* Ensure the frags we've been given EXACTLY match the number of
922          * bytes we want to send.  Many TCP/IP stacks disregard any total
923          * size parameters passed to them and just look at the frags. 
924          *
925          * We always expect at least 1 mapped fragment containing the
926          * complete portals header. */
927         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
928                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
929         LASSERT (tx->tx_niov >= 1);
930         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
931
932         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
933                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
934                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
935
936         tx->tx_conn = NULL;                     /* only set when assigned a conn */
937
938         g_lock = &ksocknal_data.ksnd_global_lock;
939         read_lock (g_lock);
940         
941         peer = ksocknal_find_target_peer_locked (tx, nid);
942         if (peer == NULL) {
943                 read_unlock (g_lock);
944                 return (PTL_FAIL);
945         }
946
947         if (ksocknal_find_connectable_route_locked(peer, 1) == NULL) {
948                 conn = ksocknal_find_conn_locked (tx, peer);
949                 if (conn != NULL) {
950                         /* I've got no unconnected autoconnect routes that
951                          * need to be connected, and I do have an actual
952                          * connection... */
953                         ksocknal_queue_tx_locked (tx, conn);
954                         read_unlock (g_lock);
955                         return (PTL_OK);
956                 }
957         }
958         
959         /* Making one or more connections; I'll need a write lock... */
960
961         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
962         read_unlock (g_lock);
963         write_lock_irqsave (g_lock, flags);
964         
965         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
966                 write_unlock_irqrestore (g_lock, flags);
967                 ksocknal_put_peer (peer);
968                 return (PTL_FAIL);
969         }
970         ksocknal_put_peer (peer);               /* drop ref I got above */
971
972
973         for (;;) {
974                 /* launch all eager autoconnections */
975                 route = ksocknal_find_connectable_route_locked (peer, 1);
976                 if (route == NULL)
977                         break;
978
979                 ksocknal_launch_autoconnect_locked (route);
980         }
981
982         conn = ksocknal_find_conn_locked (tx, peer);
983         if (conn != NULL) {
984                 /* Connection exists; queue message on it */
985                 ksocknal_queue_tx_locked (tx, conn);
986                 write_unlock_irqrestore (g_lock, flags);
987                 return (PTL_OK);
988         }
989
990         if (ksocknal_find_connecting_route_locked (peer) == NULL) {
991                 /* no autoconnect routes actually connecting now.  Scrape
992                  * the barrel for non-eager autoconnects */
993                 route = ksocknal_find_connectable_route_locked (peer, 0);
994                 if (route != NULL) {
995                         ksocknal_launch_autoconnect_locked (route);
996                 } else {
997                         write_unlock_irqrestore (g_lock, flags);
998                         return (PTL_FAIL);
999                 }
1000         }
1001
1002         /* At least 1 connection is being established; queue the message... */
1003         list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1004
1005         write_unlock_irqrestore (g_lock, flags);
1006         return (PTL_OK);
1007 }
1008
1009 ksock_ltx_t *
1010 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1011                     ptl_hdr_t *hdr, int type)
1012 {
1013         ksock_ltx_t  *ltx;
1014
1015         /* I may not block for a transmit descriptor if I might block the
1016          * receiver, or an interrupt handler. */
1017         ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
1018                                   type == PTL_MSG_REPLY ||
1019                                   in_interrupt ()));
1020         if (ltx == NULL) {
1021                 CERROR ("Can't allocate tx desc\n");
1022                 return (NULL);
1023         }
1024
1025         /* Init local send packet (storage for hdr, finalize() args) */
1026         ltx->ltx_hdr = *hdr;
1027         ltx->ltx_private = private;
1028         ltx->ltx_cookie = cookie;
1029         
1030         /* Init common ltx_tx */
1031         ltx->ltx_tx.tx_isfwd = 0;
1032         ltx->ltx_tx.tx_nob = sizeof (*hdr);
1033
1034         /* We always have 1 mapped frag for the header */
1035         ltx->ltx_tx.tx_niov = 1;
1036         ltx->ltx_tx.tx_iov = &ltx->ltx_iov_space.hdr;
1037         ltx->ltx_tx.tx_iov[0].iov_base = &ltx->ltx_hdr;
1038         ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
1039
1040         ltx->ltx_tx.tx_kiov  = NULL;
1041         ltx->ltx_tx.tx_nkiov = 0;
1042
1043         return (ltx);
1044 }
1045
1046 int
1047 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1048                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1049                unsigned int payload_niov, struct iovec *payload_iov,
1050                size_t payload_len)
1051 {
1052         ksock_ltx_t  *ltx;
1053         int           rc;
1054
1055         /* NB 'private' is different depending on what we're sending.
1056          * Just ignore it until we can rely on it
1057          */
1058
1059         CDEBUG(D_NET,
1060                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
1061                " pid %d\n", payload_len, payload_niov, nid, pid);
1062
1063         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1064         if (ltx == NULL)
1065                 return (PTL_FAIL);
1066
1067         /* append the payload_iovs to the one pointing at the header */
1068         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1069         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1070
1071         memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
1072                 payload_niov * sizeof (*payload_iov));
1073         ltx->ltx_tx.tx_niov = 1 + payload_niov;
1074         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1075
1076         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1077         if (rc != PTL_OK)
1078                 ksocknal_put_ltx (ltx);
1079         
1080         return (rc);
1081 }
1082
1083 int
1084 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1085                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1086                      unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
1087 {
1088         ksock_ltx_t *ltx;
1089         int          rc;
1090
1091         /* NB 'private' is different depending on what we're sending.
1092          * Just ignore it until we can rely on it */
1093
1094         CDEBUG(D_NET,
1095                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
1096                payload_len, payload_niov, nid, pid);
1097
1098         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
1099         if (ltx == NULL)
1100                 return (PTL_FAIL);
1101
1102         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
1103         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1104         
1105         ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
1106         memcpy (ltx->ltx_tx.tx_kiov, payload_iov, 
1107                 payload_niov * sizeof (*payload_iov));
1108         ltx->ltx_tx.tx_nkiov = payload_niov;
1109         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
1110
1111         rc = ksocknal_launch_packet (&ltx->ltx_tx, nid);
1112         if (rc != PTL_OK) 
1113                 ksocknal_put_ltx (ltx);
1114                 
1115         return (rc);
1116 }
1117
1118 void
1119 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1120 {
1121         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1122         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
1123         int           rc;
1124         
1125         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1126                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1127
1128         /* I'm the gateway; must be the last hop */
1129         if (nid == ksocknal_lib.ni.nid)
1130                 nid = fwd->kprfd_target_nid;
1131
1132         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
1133         tx->tx_nob   = fwd->kprfd_nob;
1134         tx->tx_niov  = fwd->kprfd_niov;
1135         tx->tx_iov   = fwd->kprfd_iov;
1136         tx->tx_nkiov = 0;
1137         tx->tx_kiov  = NULL;
1138         tx->tx_hdr   = (ptl_hdr_t *)fwd->kprfd_iov[0].iov_base;
1139
1140         rc = ksocknal_launch_packet (tx, nid);
1141         if (rc != 0) {
1142                 /* FIXME, could pass a better completion error */
1143                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
1144         }
1145 }
1146
1147 int
1148 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1149 {
1150         long    pid = kernel_thread (fn, arg, 0);
1151
1152         if (pid < 0)
1153                 return ((int)pid);
1154
1155         atomic_inc (&ksocknal_data.ksnd_nthreads);
1156         return (0);
1157 }
1158
1159 void
1160 ksocknal_thread_fini (void)
1161 {
1162         atomic_dec (&ksocknal_data.ksnd_nthreads);
1163 }
1164
1165 void
1166 ksocknal_fmb_callback (void *arg, int error)
1167 {
1168         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1169         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1170         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1171         ksock_conn_t      *conn = NULL;
1172         ksock_sched_t     *sched;
1173         unsigned long      flags;
1174
1175         if (error != 0)
1176                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1177                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
1178                        error);
1179         else
1180                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1181                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1182
1183         spin_lock_irqsave (&fmp->fmp_lock, flags);
1184
1185         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1186
1187         if (!list_empty (&fmp->fmp_blocked_conns)) {
1188                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1189                                    ksock_conn_t, ksnc_rx_list);
1190                 list_del (&conn->ksnc_rx_list);
1191         }
1192
1193         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1194
1195         /* drop peer ref taken on init */
1196         ksocknal_put_peer (fmb->fmb_peer);
1197         
1198         if (conn == NULL)
1199                 return;
1200
1201         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1202         LASSERT (conn->ksnc_rx_scheduled);
1203         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1204
1205         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1206
1207         sched = conn->ksnc_scheduler;
1208
1209         spin_lock_irqsave (&sched->kss_lock, flags);
1210
1211         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1212
1213         if (waitqueue_active (&sched->kss_waitq))
1214                 wake_up (&sched->kss_waitq);
1215
1216         spin_unlock_irqrestore (&sched->kss_lock, flags);
1217 }
1218
1219 ksock_fmb_t *
1220 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1221 {
1222         int               payload_nob = conn->ksnc_rx_nob_left;
1223         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1224         unsigned long     flags;
1225         ksock_fmb_pool_t *pool;
1226         ksock_fmb_t      *fmb;
1227
1228         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1229         LASSERT (ksocknal_data.ksnd_fmbs != NULL);
1230
1231         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1232                 pool = &ksocknal_data.ksnd_small_fmp;
1233         else
1234                 pool = &ksocknal_data.ksnd_large_fmp;
1235
1236         spin_lock_irqsave (&pool->fmp_lock, flags);
1237
1238         if (!list_empty (&pool->fmp_idle_fmbs)) {
1239                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1240                                  ksock_fmb_t, fmb_list);
1241                 list_del (&fmb->fmb_list);
1242                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1243
1244                 return (fmb);
1245         }
1246
1247         /* deschedule until fmb free */
1248
1249         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1250
1251         list_add_tail (&conn->ksnc_rx_list,
1252                        &pool->fmp_blocked_conns);
1253
1254         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1255         return (NULL);
1256 }
1257
1258 int
1259 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1260 {
1261         int payload_nob = conn->ksnc_rx_nob_left;
1262         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1263         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1264         int niov;                               /* at least the header */
1265         int nob;
1266
1267         LASSERT (conn->ksnc_rx_scheduled);
1268         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1269         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1270         LASSERT (payload_nob >= 0);
1271         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1272         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1273
1274         /* Got a forwarding buffer; copy the header we just read into the
1275          * forwarding buffer.  If there's payload, start reading reading it
1276          * into the buffer, otherwise the forwarding buffer can be kicked
1277          * off immediately.
1278          *
1279          * NB fmb->fmb_iov spans the WHOLE packet.
1280          *    conn->ksnc_rx_iov spans just the payload.
1281          */
1282         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1283
1284         /* copy header */
1285         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1286
1287         /* Take a ref on the conn's peer to prevent module unload before
1288          * forwarding completes.  NB we ref peer and not conn since because
1289          * all refs on conn after it has been closed must remove themselves
1290          * in finite time */
1291         fmb->fmb_peer = conn->ksnc_peer;
1292         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1293
1294         if (payload_nob == 0) {         /* got complete packet already */
1295                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1296                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1297                         dest_nid, packet_nob);
1298
1299                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1300
1301                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1302                               packet_nob, 1, fmb->fmb_iov,
1303                               ksocknal_fmb_callback, fmb);
1304
1305                 /* forward it now */
1306                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1307
1308                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1309                 return (1);
1310         }
1311
1312         niov = 1;
1313         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1314                 fmb->fmb_iov[0].iov_len = packet_nob;
1315         } else {
1316                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1317                 nob = packet_nob - PAGE_SIZE;
1318
1319                 do {
1320                         LASSERT (niov < fmb->fmb_npages);
1321                         fmb->fmb_iov[niov].iov_base =
1322                                 page_address (fmb->fmb_pages[niov]);
1323                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1324                         nob -= PAGE_SIZE;
1325                         niov++;
1326                 } while (nob > 0);
1327         }
1328
1329         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1330                       packet_nob, niov, fmb->fmb_iov,
1331                       ksocknal_fmb_callback, fmb);
1332
1333         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1334         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1335         
1336         /* payload is desc's iov-ed buffer, but skipping the hdr */
1337         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1338                  sizeof (struct iovec));
1339
1340         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1341         conn->ksnc_rx_iov[0].iov_base =
1342                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1343                          sizeof (ptl_hdr_t));
1344         conn->ksnc_rx_iov[0].iov_len =
1345                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1346
1347         if (niov > 1)
1348                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1349                        (niov - 1) * sizeof (struct iovec));
1350
1351         conn->ksnc_rx_niov = niov;
1352
1353         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1354                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1355         return (0);
1356 }
1357
1358 void
1359 ksocknal_fwd_parse (ksock_conn_t *conn)
1360 {
1361         ksock_peer_t *peer;
1362         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1363         int           body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
1364
1365         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1366                 NTOH__u64 (conn->ksnc_hdr.src_nid),
1367                 dest_nid, conn->ksnc_rx_nob_left);
1368
1369         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1370         LASSERT (conn->ksnc_rx_scheduled);
1371
1372         if (body_len < 0) {                 /* length corrupt (overflow) */
1373                 CERROR("dropping packet from "LPX64" for "LPX64": packet "
1374                        "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
1375                        dest_nid, body_len);
1376
1377                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1378                 ksocknal_close_conn_unlocked (conn, -EINVAL); /* give up on conn */
1379                 return;
1380         }
1381
1382         if (ksocknal_data.ksnd_fmbs == NULL) {        /* not forwarding */
1383                 CERROR("dropping packet from "LPX64" for "LPX64": not "
1384                        "forwarding\n", conn->ksnc_hdr.src_nid,
1385                        conn->ksnc_hdr.dest_nid);
1386                 /* on to new packet (skip this one's body) */
1387                 ksocknal_new_packet (conn, body_len);
1388                 return;
1389         }
1390
1391         if (body_len > SOCKNAL_MAX_FWD_PAYLOAD) {      /* too big to forward */
1392                 CERROR ("dropping packet from "LPX64" for "LPX64
1393                         ": packet size %d too big\n", conn->ksnc_hdr.src_nid,
1394                         conn->ksnc_hdr.dest_nid, body_len);
1395                 /* on to new packet (skip this one's body) */
1396                 ksocknal_new_packet (conn, body_len);
1397                 return;
1398         }
1399
1400         /* should have gone direct */
1401         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1402         if (peer != NULL) {
1403                 CERROR ("dropping packet from "LPX64" for "LPX64
1404                         ": target is a peer\n", conn->ksnc_hdr.src_nid,
1405                         conn->ksnc_hdr.dest_nid);
1406                 ksocknal_put_peer (peer);  /* drop ref from get above */
1407
1408                 /* on to next packet (skip this one's body) */
1409                 ksocknal_new_packet (conn, body_len);
1410                 return;
1411         }
1412
1413         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1414         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1415         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1416 }
1417
1418 int
1419 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1420 {
1421         static char ksocknal_slop_buffer[4096];
1422
1423         int   nob;
1424         int   niov;
1425         int   skipped;
1426
1427         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1428                 conn->ksnc_rx_started = 0;
1429                 mb ();                          /* racing with timeout thread */
1430                 
1431                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1432                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1433                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1434
1435                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1436                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1437                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1438                 conn->ksnc_rx_niov = 1;
1439
1440                 conn->ksnc_rx_kiov = NULL;
1441                 conn->ksnc_rx_nkiov = 0;
1442                 return (1);
1443         }
1444
1445         /* Set up to skip as much a possible now.  If there's more left
1446          * (ran out of iov entries) we'll get called again */
1447
1448         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1449         conn->ksnc_rx_nob_left = nob_to_skip;
1450         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1451         skipped = 0;
1452         niov = 0;
1453
1454         do {
1455                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1456
1457                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1458                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1459                 niov++;
1460                 skipped += nob;
1461                 nob_to_skip -=nob;
1462
1463         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1464                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1465
1466         conn->ksnc_rx_niov = niov;
1467         conn->ksnc_rx_kiov = NULL;
1468         conn->ksnc_rx_nkiov = 0;
1469         conn->ksnc_rx_nob_wanted = skipped;
1470         return (0);
1471 }
1472
1473 void
1474 ksocknal_process_receive (ksock_sched_t *sched, unsigned long *irq_flags)
1475 {
1476         ksock_conn_t *conn;
1477         ksock_fmb_t  *fmb;
1478         int           rc;
1479
1480         /* NB: sched->ksnc_lock lock held */
1481
1482         LASSERT (!list_empty (&sched->kss_rx_conns));
1483         conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
1484         list_del (&conn->ksnc_rx_list);
1485
1486         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
1487
1488         CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
1489         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1490         LASSERT (conn->ksnc_rx_scheduled);
1491         LASSERT (conn->ksnc_rx_ready);
1492
1493         /* doesn't need a forwarding buffer */
1494         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1495                 goto try_read;
1496
1497  get_fmb:
1498         fmb = ksocknal_get_idle_fmb (conn);
1499         if (fmb == NULL) {      /* conn descheduled waiting for idle fmb */
1500                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1501                 return;
1502         }
1503
1504         if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
1505                 goto out;               /* come back later for next packet */
1506
1507  try_read:
1508         /* NB: sched lock NOT held */
1509         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1510                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1511                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1512                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1513
1514         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1515
1516         conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
1517         mb();                   /* => clear BEFORE trying to read */
1518
1519         rc = ksocknal_recvmsg(conn);
1520
1521         if (rc <= 0) {
1522                 if (ksocknal_close_conn_unlocked (conn, rc)) {
1523                         /* I'm the first to close */
1524                         if (rc < 0)
1525                                 CERROR ("[%p] Error %d on read from "LPX64" ip %08x:%d\n",
1526                                         conn, rc, conn->ksnc_peer->ksnp_nid,
1527                                         conn->ksnc_ipaddr, conn->ksnc_port);
1528                         else
1529                                 CERROR ("[%p] EOF from "LPX64" ip %08x:%d\n",
1530                                         conn, conn->ksnc_peer->ksnp_nid,
1531                                         conn->ksnc_ipaddr, conn->ksnc_port);
1532                 }
1533                 goto out;
1534         }
1535
1536         
1537         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
1538                 goto out;                       /* try again later */
1539
1540         /* got all I wanted, assume there's more - prevent data_ready locking */
1541         conn->ksnc_rx_ready = 1;
1542
1543         switch (conn->ksnc_rx_state) {
1544         case SOCKNAL_RX_HEADER:
1545                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1546                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1547                         /* This packet isn't for me */
1548                         ksocknal_fwd_parse (conn);
1549                         switch (conn->ksnc_rx_state) {
1550                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1551                                 goto out;       /* => come back later */
1552                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1553                                 goto try_read;  /* => go read it */
1554                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1555                                 goto get_fmb;   /* => go get a fwd msg buffer */
1556                         default:
1557                                 LBUG ();
1558                         }
1559                         /* Not Reached */
1560                 }
1561
1562                 /* sets wanted_len, iovs etc */
1563                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1564
1565                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1566                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1567                         goto try_read;          /* go read the payload */
1568                 }
1569                 /* Fall through (completed packet for me) */
1570
1571         case SOCKNAL_RX_BODY:
1572                 /* payload all received */
1573                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1574                 /* Fall through */
1575
1576         case SOCKNAL_RX_SLOP:
1577                 /* starting new packet? */
1578                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1579                         goto out;       /* come back later */
1580                 goto try_read;          /* try to finish reading slop now */
1581
1582         case SOCKNAL_RX_BODY_FWD:
1583                 /* payload all received */
1584                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1585                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1586                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1587                         conn->ksnc_rx_nob_left);
1588
1589                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1590                  * conn->ksnc_cookie */
1591                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1592                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1593
1594                 /* no slop in forwarded packets */
1595                 LASSERT (conn->ksnc_rx_nob_left == 0);
1596
1597                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1598                 goto out;                       /* (later) */
1599
1600         default:
1601                 break;
1602         }
1603
1604         /* Not Reached */
1605         LBUG ();
1606
1607  out:
1608         spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1609
1610         /* no data there to read? */
1611         if (!conn->ksnc_rx_ready) {
1612                 /* let socket callback schedule again */
1613                 conn->ksnc_rx_scheduled = 0;
1614                 /* drop scheduler's ref */
1615                 ksocknal_put_conn (conn);   
1616         } else {
1617                 /* stay scheduled */
1618                 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1619         }
1620 }
1621
1622 int
1623 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1624                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1625 {
1626         ksock_conn_t *conn = (ksock_conn_t *)private;
1627
1628         LASSERT (mlen <= rlen);
1629         LASSERT (niov <= PTL_MD_MAX_IOV);
1630         
1631         conn->ksnc_cookie = msg;
1632         conn->ksnc_rx_nob_wanted = mlen;
1633         conn->ksnc_rx_nob_left   = rlen;
1634
1635         conn->ksnc_rx_nkiov = 0;
1636         conn->ksnc_rx_kiov = NULL;
1637         conn->ksnc_rx_niov = niov;
1638         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1639         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1640
1641         LASSERT (mlen == 
1642                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1643                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1644
1645         return (rlen);
1646 }
1647
1648 int
1649 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1650                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1651 {
1652         ksock_conn_t *conn = (ksock_conn_t *)private;
1653
1654         LASSERT (mlen <= rlen);
1655         LASSERT (niov <= PTL_MD_MAX_IOV);
1656         
1657         conn->ksnc_cookie = msg;
1658         conn->ksnc_rx_nob_wanted = mlen;
1659         conn->ksnc_rx_nob_left   = rlen;
1660
1661         conn->ksnc_rx_niov = 0;
1662         conn->ksnc_rx_iov  = NULL;
1663         conn->ksnc_rx_nkiov = niov;
1664         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1665         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1666
1667         LASSERT (mlen == 
1668                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1669                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1670
1671         return (rlen);
1672 }
1673
1674 int ksocknal_scheduler (void *arg)
1675 {
1676         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1677         unsigned long      flags;
1678         int                rc;
1679         int                nloops = 0;
1680         int                id = sched - ksocknal_data.ksnd_schedulers;
1681         char               name[16];
1682
1683         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1684         kportal_daemonize (name);
1685         kportal_blockallsigs ();
1686
1687 #if (CONFIG_SMP && CPU_AFFINITY)
1688         if ((cpu_online_map & (1 << id)) != 0) {
1689 #if 0
1690                 current->cpus_allowed = (1 << id);
1691 #else
1692                 set_cpus_allowed (current, 1<<id);
1693 #endif
1694         } else {
1695                 CERROR ("Can't set CPU affinity for %s\n", name);
1696         }
1697 #endif /* CONFIG_SMP && CPU_AFFINITY */
1698         
1699         spin_lock_irqsave (&sched->kss_lock, flags);
1700
1701         while (!ksocknal_data.ksnd_shuttingdown) {
1702                 int did_something = 0;
1703
1704                 /* Ensure I progress everything semi-fairly */
1705
1706                 if (!list_empty (&sched->kss_rx_conns)) {
1707                         did_something = 1;
1708                         /* drops & regains kss_lock */
1709                         ksocknal_process_receive (sched, &flags);
1710                 }
1711
1712                 if (!list_empty (&sched->kss_tx_conns)) {
1713                         did_something = 1;
1714                         /* drops and regains kss_lock */
1715                         ksocknal_process_transmit (sched, &flags);
1716                 }
1717 #if SOCKNAL_ZC
1718                 if (!list_empty (&sched->kss_zctxdone_list)) {
1719                         ksock_tx_t *tx =
1720                                 list_entry(sched->kss_zctxdone_list.next,
1721                                            ksock_tx_t, tx_list);
1722                         did_something = 1;
1723
1724                         list_del (&tx->tx_list);
1725                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1726
1727                         ksocknal_tx_done (tx, 1);
1728
1729                         spin_lock_irqsave (&sched->kss_lock, flags);
1730                 }
1731 #endif
1732                 if (!did_something ||           /* nothing to do */
1733                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1734                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1735
1736                         nloops = 0;
1737
1738                         if (!did_something) {   /* wait for something to do */
1739 #if SOCKNAL_ZC
1740                                 rc = wait_event_interruptible (sched->kss_waitq,
1741                                                                ksocknal_data.ksnd_shuttingdown ||
1742                                                                !list_empty(&sched->kss_rx_conns) ||
1743                                                                !list_empty(&sched->kss_tx_conns) ||
1744                                                                !list_empty(&sched->kss_zctxdone_list));
1745 #else
1746                                 rc = wait_event_interruptible (sched->kss_waitq,
1747                                                                ksocknal_data.ksnd_shuttingdown ||
1748                                                                !list_empty(&sched->kss_rx_conns) ||
1749                                                                !list_empty(&sched->kss_tx_conns));
1750 #endif
1751                                 LASSERT (rc == 0);
1752                         } else
1753                                our_cond_resched();
1754
1755                         spin_lock_irqsave (&sched->kss_lock, flags);
1756                 }
1757         }
1758
1759         spin_unlock_irqrestore (&sched->kss_lock, flags);
1760         ksocknal_thread_fini ();
1761         return (0);
1762 }
1763
1764 void
1765 ksocknal_data_ready (struct sock *sk, int n)
1766 {
1767         unsigned long  flags;
1768         ksock_conn_t  *conn;
1769         ksock_sched_t *sched;
1770         ENTRY;
1771
1772         /* interleave correctly with closing sockets... */
1773         read_lock (&ksocknal_data.ksnd_global_lock);
1774
1775         conn = sk->sk_user_data;
1776         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1777                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1778                 sk->sk_data_ready (sk, n);
1779                 goto out;
1780         }
1781
1782         if (!conn->ksnc_rx_ready) {        /* new news */
1783                 /* Set ASAP in case of concurrent calls to me */
1784                 conn->ksnc_rx_ready = 1;
1785
1786                 sched = conn->ksnc_scheduler;
1787
1788                 spin_lock_irqsave (&sched->kss_lock, flags);
1789
1790                 /* Set again (process_receive may have cleared while I blocked for the lock) */
1791                 conn->ksnc_rx_ready = 1;
1792
1793                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1794                         list_add_tail(&conn->ksnc_rx_list,
1795                                       &sched->kss_rx_conns);
1796                         conn->ksnc_rx_scheduled = 1;
1797                         /* extra ref for scheduler */
1798                         atomic_inc (&conn->ksnc_refcount);
1799
1800                         if (waitqueue_active (&sched->kss_waitq))
1801                                 wake_up (&sched->kss_waitq);
1802                 }
1803
1804                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1805         }
1806
1807  out:
1808         read_unlock (&ksocknal_data.ksnd_global_lock);
1809
1810         EXIT;
1811 }
1812
1813 void
1814 ksocknal_write_space (struct sock *sk)
1815 {
1816         unsigned long  flags;
1817         ksock_conn_t  *conn;
1818         ksock_sched_t *sched;
1819
1820         /* interleave correctly with closing sockets... */
1821         read_lock (&ksocknal_data.ksnd_global_lock);
1822
1823         conn = sk->sk_user_data;
1824
1825         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1826                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1827                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1828                                       " ready" : " blocked"),
1829                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1830                                       " scheduled" : " idle"),
1831                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1832                                       " empty" : " queued"));
1833
1834         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1835                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1836                 sk->sk_write_space (sk);
1837
1838                 read_unlock (&ksocknal_data.ksnd_global_lock);
1839                 return;
1840         }
1841
1842         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1843                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1844
1845                 if (!conn->ksnc_tx_ready) {      /* new news */
1846                         /* Set ASAP in case of concurrent calls to me */
1847                         conn->ksnc_tx_ready = 1;
1848
1849                         sched = conn->ksnc_scheduler;
1850
1851                         spin_lock_irqsave (&sched->kss_lock, flags);
1852
1853                         /* Set again (process_transmit may have
1854                            cleared while I blocked for the lock) */
1855                         conn->ksnc_tx_ready = 1;
1856
1857                         if (!conn->ksnc_tx_scheduled && // not being progressed
1858                             !list_empty(&conn->ksnc_tx_queue)){//packets to send
1859                                 list_add_tail (&conn->ksnc_tx_list,
1860                                                &sched->kss_tx_conns);
1861                                 conn->ksnc_tx_scheduled = 1;
1862                                 /* extra ref for scheduler */
1863                                 atomic_inc (&conn->ksnc_refcount);
1864
1865                                 if (waitqueue_active (&sched->kss_waitq))
1866                                         wake_up (&sched->kss_waitq);
1867                         }
1868
1869                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1870                 }
1871         }
1872
1873         read_unlock (&ksocknal_data.ksnd_global_lock);
1874 }
1875
1876 int
1877 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1878 {
1879         int           rc;
1880         mm_segment_t  oldmm = get_fs();
1881
1882         while (nob > 0) {
1883                 struct iovec  iov = {
1884                         .iov_base = buffer,
1885                         .iov_len  = nob
1886                 };
1887                 struct msghdr msg = {
1888                         .msg_name       = NULL,
1889                         .msg_namelen    = 0,
1890                         .msg_iov        = &iov,
1891                         .msg_iovlen     = 1,
1892                         .msg_control    = NULL,
1893                         .msg_controllen = 0,
1894                         .msg_flags      = 0
1895                 };
1896
1897                 set_fs (KERNEL_DS);
1898                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1899                 set_fs (oldmm);
1900                 
1901                 if (rc < 0)
1902                         return (rc);
1903
1904                 if (rc == 0) {
1905                         CERROR ("Unexpected zero rc\n");
1906                         return (-ECONNABORTED);
1907                 }
1908
1909                 buffer = ((char *)buffer) + rc;
1910                 nob -= rc;
1911         }
1912         
1913         return (0);
1914 }
1915
1916 int
1917 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1918 {
1919         int           rc;
1920         mm_segment_t  oldmm = get_fs();
1921         
1922         while (nob > 0) {
1923                 struct iovec  iov = {
1924                         .iov_base = buffer,
1925                         .iov_len  = nob
1926                 };
1927                 struct msghdr msg = {
1928                         .msg_name       = NULL,
1929                         .msg_namelen    = 0,
1930                         .msg_iov        = &iov,
1931                         .msg_iovlen     = 1,
1932                         .msg_control    = NULL,
1933                         .msg_controllen = 0,
1934                         .msg_flags      = 0
1935                 };
1936
1937                 set_fs (KERNEL_DS);
1938                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1939                 set_fs (oldmm);
1940                 
1941                 if (rc < 0)
1942                         return (rc);
1943
1944                 if (rc == 0)
1945                         return (-ECONNABORTED);
1946
1947                 buffer = ((char *)buffer) + rc;
1948                 nob -= rc;
1949         }
1950         
1951         return (0);
1952 }
1953
1954 int
1955 ksocknal_exchange_nids (struct socket *sock, ptl_nid_t nid)
1956 {
1957         int                 rc;
1958         ptl_hdr_t           hdr;
1959         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
1960
1961         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
1962
1963         memset (&hdr, 0, sizeof (hdr));
1964         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
1965         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
1966         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
1967         
1968         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
1969         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
1970         
1971         /* Assume sufficient socket buffering for this message */
1972         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
1973         if (rc != 0) {
1974                 CERROR ("Error %d sending HELLO to "LPX64"\n", rc, nid);
1975                 return (rc);
1976         }
1977
1978         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
1979         if (rc != 0) {
1980                 CERROR ("Error %d reading HELLO from "LPX64"\n", rc, nid);
1981                 return (rc);
1982         }
1983         
1984         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
1985                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64"\n",
1986                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, nid);
1987                 return (-EINVAL);
1988         }
1989
1990         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
1991             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
1992                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
1993                         " from "LPX64"\n",
1994                         __le16_to_cpu (hmv->version_major),
1995                         __le16_to_cpu (hmv->version_minor),
1996                         PORTALS_PROTO_VERSION_MAJOR,
1997                         PORTALS_PROTO_VERSION_MINOR,
1998                         nid);
1999                 return (-EINVAL);
2000         }
2001
2002         LASSERT (PORTALS_PROTO_VERSION_MAJOR == 0);
2003         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2004          * so read the rest of it in now... */
2005
2006         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2007         if (rc != 0) {
2008                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64"\n",
2009                         rc, nid);
2010                 return (rc);
2011         }
2012
2013         /* ...and check we got what we expected */
2014         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2015             PTL_HDR_LENGTH (&hdr) != __cpu_to_le32 (0)) {
2016                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2017                         " but got type %d with %d payload from "LPX64"\n",
2018                         __le32_to_cpu (hdr.type),
2019                         __le32_to_cpu (PTL_HDR_LENGTH (&hdr)), nid);
2020                 return (-EINVAL);
2021         }
2022         
2023         if (__le64_to_cpu (hdr.src_nid) != nid) {
2024                 CERROR ("Connected to nid "LPX64", but expecting "LPX64"\n",
2025                         __le64_to_cpu (hdr.src_nid), nid);
2026                 return (-EINVAL);
2027         }
2028
2029         return (0);
2030 }
2031
2032 int
2033 ksocknal_setup_sock (struct socket *sock) 
2034 {
2035         mm_segment_t    oldmm = get_fs ();
2036         int             rc;
2037         int             option;
2038         struct linger   linger;
2039
2040         /* Ensure this socket aborts active sends immediately when we close
2041          * it. */
2042         
2043         linger.l_onoff = 0;
2044         linger.l_linger = 0;
2045
2046         set_fs (KERNEL_DS);
2047         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER, 
2048                               (char *)&linger, sizeof (linger));
2049         set_fs (oldmm);
2050         if (rc != 0) {
2051                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2052                 return (rc);
2053         }
2054         
2055         option = -1;
2056         set_fs (KERNEL_DS);
2057         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2058                                     (char *)&option, sizeof (option));
2059         set_fs (oldmm);
2060         if (rc != 0) {
2061                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2062                 return (rc);
2063         }
2064
2065 #if SOCKNAL_USE_KEEPALIVES
2066         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2067          * second until the timeout elapses. */
2068
2069         option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2070         set_fs (KERNEL_DS);
2071         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2072                                     (char *)&option, sizeof (option));
2073         set_fs (oldmm);
2074         if (rc != 0) {
2075                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2076                 return (rc);
2077         }
2078         
2079         option = 1;
2080         set_fs (KERNEL_DS);
2081         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2082                                     (char *)&option, sizeof (option));
2083         set_fs (oldmm);
2084         if (rc != 0) {
2085                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2086                 return (rc);
2087         }
2088         
2089         option = ksocknal_data.ksnd_io_timeout / 4;
2090         set_fs (KERNEL_DS);
2091         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2092                                     (char *)&option, sizeof (option));
2093         set_fs (oldmm);
2094         if (rc != 0) {
2095                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2096                 return (rc);
2097         }
2098
2099         option = 1;
2100         set_fs (KERNEL_DS);
2101         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2102                               (char *)&option, sizeof (option));
2103         set_fs (oldmm);
2104         if (rc != 0) {
2105                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2106                 return (rc);
2107         }
2108 #endif
2109         return (0);
2110 }
2111
2112 int
2113 ksocknal_connect_peer (ksock_route_t *route)
2114 {
2115         struct sockaddr_in  peer_addr;
2116         mm_segment_t        oldmm = get_fs();
2117         struct timeval      tv;
2118         int                 fd;
2119         struct socket      *sock;
2120         int                 rc;
2121
2122         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2123         if (rc != 0) {
2124                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2125                 return (rc);
2126         }
2127
2128         /* Ugh; have to map_fd for compatibility with sockets passed in
2129          * from userspace.  And we actually need the sock->file refcounting
2130          * that this gives you :) */
2131
2132         fd = sock_map_fd (sock);
2133         if (fd < 0) {
2134                 sock_release (sock);
2135                 CERROR ("sock_map_fd error %d\n", fd);
2136                 return (fd);
2137         }
2138
2139         /* NB the fd now owns the ref on sock->file */
2140         LASSERT (sock->file != NULL);
2141         LASSERT (file_count(sock->file) == 1);
2142
2143         /* Set the socket timeouts, so our connection attempt completes in
2144          * finite time */
2145         tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2146         tv.tv_usec = 0;
2147
2148         set_fs (KERNEL_DS);
2149         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2150                               (char *)&tv, sizeof (tv));
2151         set_fs (oldmm);
2152         if (rc != 0) {
2153                 CERROR ("Can't set send timeout %d: %d\n", 
2154                         ksocknal_data.ksnd_io_timeout, rc);
2155                 goto out;
2156         }
2157         
2158         set_fs (KERNEL_DS);
2159         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2160                               (char *)&tv, sizeof (tv));
2161         set_fs (oldmm);
2162         if (rc != 0) {
2163                 CERROR ("Can't set receive timeout %d: %d\n",
2164                         ksocknal_data.ksnd_io_timeout, rc);
2165                 goto out;
2166         }
2167
2168         if (route->ksnr_nonagel) {
2169                 int  option = 1;
2170                 
2171                 set_fs (KERNEL_DS);
2172                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2173                                             (char *)&option, sizeof (option));
2174                 set_fs (oldmm);
2175                 if (rc != 0) {
2176                         CERROR ("Can't disable nagel: %d\n", rc);
2177                         goto out;
2178                 }
2179         }
2180         
2181         if (route->ksnr_buffer_size != 0) {
2182                 int option = route->ksnr_buffer_size;
2183                 
2184                 set_fs (KERNEL_DS);
2185                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2186                                       (char *)&option, sizeof (option));
2187                 set_fs (oldmm);
2188                 if (rc != 0) {
2189                         CERROR ("Can't set send buffer %d: %d\n",
2190                                 route->ksnr_buffer_size, rc);
2191                         goto out;
2192                 }
2193
2194                 set_fs (KERNEL_DS);
2195                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2196                                       (char *)&option, sizeof (option));
2197                 set_fs (oldmm);
2198                 if (rc != 0) {
2199                         CERROR ("Can't set receive buffer %d: %d\n",
2200                                 route->ksnr_buffer_size, rc);
2201                         goto out;
2202                 }
2203         }
2204         
2205         memset (&peer_addr, 0, sizeof (peer_addr));
2206         peer_addr.sin_family = AF_INET;
2207         peer_addr.sin_port = htons (route->ksnr_port);
2208         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2209         
2210         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2211                                  sizeof (peer_addr), sock->file->f_flags);
2212         if (rc != 0) {
2213                 CERROR ("Error %d connecting to "LPX64"\n", rc,
2214                         route->ksnr_peer->ksnp_nid);
2215                 goto out;
2216         }
2217         
2218         if (route->ksnr_xchange_nids) {
2219                 rc = ksocknal_exchange_nids (sock, route->ksnr_peer->ksnp_nid);
2220                 if (rc != 0)
2221                         goto out;
2222         }
2223
2224         rc = ksocknal_create_conn (route->ksnr_peer->ksnp_nid,
2225                                    route, sock, route->ksnr_irq_affinity);
2226         if (rc == 0) {
2227                 /* Take an extra ref on sock->file to compensate for the
2228                  * upcoming close which will lose fd's ref on it. */
2229                 get_file (sock->file);
2230         }
2231
2232  out:
2233         sys_close (fd);
2234         return (rc);
2235 }
2236
2237 void
2238 ksocknal_autoconnect (ksock_route_t *route)
2239 {
2240         LIST_HEAD        (zombies);
2241         ksock_tx_t       *tx;
2242         ksock_peer_t     *peer;
2243         unsigned long     flags;
2244         int               rc;
2245         
2246         rc = ksocknal_connect_peer (route);
2247         if (rc == 0) {
2248                 /* successfully autoconnected: create_conn did the
2249                  * route/conn binding and scheduled any blocked packets, 
2250                  * so there's nothing left to do now. */
2251                 return;
2252         }
2253
2254         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2255
2256         peer = route->ksnr_peer;
2257         route->ksnr_connecting = 0;
2258
2259         LASSERT (route->ksnr_retry_interval != 0);
2260         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2261         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2262                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2263
2264         if (!list_empty (&peer->ksnp_tx_queue) &&
2265             ksocknal_find_connecting_route_locked (peer) == NULL) {
2266                 LASSERT (list_empty (&peer->ksnp_conns));
2267
2268                 /* None of the connections that the blocked packets are
2269                  * waiting for have been successful.  Complete them now... */
2270                 do {
2271                         tx = list_entry (peer->ksnp_tx_queue.next,
2272                                          ksock_tx_t, tx_list);
2273                         list_del (&tx->tx_list);
2274                         list_add_tail (&tx->tx_list, &zombies);
2275                 } while (!list_empty (&peer->ksnp_tx_queue));
2276         }
2277
2278         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2279
2280         while (!list_empty (&zombies)) {
2281                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2282                 
2283                 CERROR ("Deleting packet type %d len %d ("LPX64"->"LPX64")\n",
2284                         NTOH__u32 (tx->tx_hdr->type),
2285                         NTOH__u32 (PTL_HDR_LENGTH(tx->tx_hdr)),
2286                         NTOH__u64 (tx->tx_hdr->src_nid),
2287                         NTOH__u64 (tx->tx_hdr->dest_nid));
2288
2289                 list_del (&tx->tx_list);
2290                 /* complete now */
2291                 ksocknal_tx_done (tx, 0);
2292         }
2293 }
2294
2295 int
2296 ksocknal_autoconnectd (void *arg)
2297 {
2298         long               id = (long)arg;
2299         char               name[16];
2300         unsigned long      flags;
2301         ksock_route_t     *route;
2302         int                rc;
2303
2304         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2305         kportal_daemonize (name);
2306         kportal_blockallsigs ();
2307
2308         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2309
2310         while (!ksocknal_data.ksnd_shuttingdown) {
2311
2312                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2313                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2314                                             ksock_route_t, ksnr_connect_list);
2315                         
2316                         list_del (&route->ksnr_connect_list);
2317                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2318
2319                         ksocknal_autoconnect (route);
2320                         ksocknal_put_route (route);
2321
2322                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2323                         continue;
2324                 }
2325                 
2326                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2327
2328                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2329                                                ksocknal_data.ksnd_shuttingdown ||
2330                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2331
2332                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2333         }
2334
2335         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2336
2337         ksocknal_thread_fini ();
2338         return (0);
2339 }
2340
2341 ksock_conn_t *
2342 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2343 {
2344         /* We're called with a shared lock on ksnd_global_lock */
2345         ksock_conn_t      *conn;
2346         struct list_head  *ctmp;
2347         ksock_sched_t     *sched;
2348
2349         list_for_each (ctmp, &peer->ksnp_conns) {
2350                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2351                 sched = conn->ksnc_scheduler;
2352
2353                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2354                 LASSERT (!conn->ksnc_closing);
2355                 
2356                 if (conn->ksnc_rx_started &&
2357                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2358                         /* Timed out incomplete incoming message */
2359                         atomic_inc (&conn->ksnc_refcount);
2360                         CERROR ("Timed out RX from "LPX64" %p\n", 
2361                                 peer->ksnp_nid, conn);
2362                         return (conn);
2363                 }
2364                 
2365                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2366                      conn->ksnc_sock->sk->wmem_queued != 0) &&
2367                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2368                         /* Timed out messages queued for sending, or
2369                          * messages buffered in the socket's send buffer */
2370                         atomic_inc (&conn->ksnc_refcount);
2371                         CERROR ("Timed out TX to "LPX64" %s%d %p\n", 
2372                                 peer->ksnp_nid, 
2373                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2374                                 conn->ksnc_sock->sk->wmem_queued, conn);
2375                         return (conn);
2376                 }
2377         }
2378
2379         return (NULL);
2380 }
2381
2382 void
2383 ksocknal_check_peer_timeouts (int idx)
2384 {
2385         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2386         struct list_head *ptmp;
2387         ksock_peer_t     *peer;
2388         ksock_conn_t     *conn;
2389
2390  again:
2391         /* NB. We expect to have a look at all the peers and not find any
2392          * connections to time out, so we just use a shared lock while we
2393          * take a look... */
2394         read_lock (&ksocknal_data.ksnd_global_lock);
2395
2396         list_for_each (ptmp, peers) {
2397                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2398                 conn = ksocknal_find_timed_out_conn (peer);
2399                 
2400                 if (conn != NULL) {
2401                         read_unlock (&ksocknal_data.ksnd_global_lock);
2402
2403                         if (ksocknal_close_conn_unlocked (conn, -ETIMEDOUT)) {
2404                                 /* I actually closed... */
2405                                 CERROR ("Timeout out conn->"LPX64" ip %x:%d\n",
2406                                         peer->ksnp_nid, conn->ksnc_ipaddr,
2407                                         conn->ksnc_port);
2408                         }
2409                 
2410                         /* NB we won't find this one again, but we can't
2411                          * just proceed with the next peer, since we dropped
2412                          * ksnd_global_lock and it might be dead already! */
2413                         ksocknal_put_conn (conn);
2414                         goto again;
2415                 }
2416         }
2417
2418         read_unlock (&ksocknal_data.ksnd_global_lock);
2419 }
2420
2421 int
2422 ksocknal_reaper (void *arg)
2423 {
2424         wait_queue_t       wait;
2425         unsigned long      flags;
2426         ksock_conn_t      *conn;
2427         int                timeout;
2428         int                i;
2429         int                peer_index = 0;
2430         unsigned long      deadline = jiffies;
2431         
2432         kportal_daemonize ("ksocknal_reaper");
2433         kportal_blockallsigs ();
2434
2435         init_waitqueue_entry (&wait, current);
2436
2437         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2438
2439         while (!ksocknal_data.ksnd_shuttingdown) {
2440
2441                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2442                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2443                                            ksock_conn_t, ksnc_list);
2444                         list_del (&conn->ksnc_list);
2445                         
2446                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2447
2448                         ksocknal_terminate_conn (conn);
2449                         ksocknal_put_conn (conn);
2450
2451                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2452                         continue;
2453                 }
2454
2455                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2456                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2457                                            ksock_conn_t, ksnc_list);
2458                         list_del (&conn->ksnc_list);
2459                         
2460                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2461
2462                         ksocknal_destroy_conn (conn);
2463
2464                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2465                         continue;
2466                 }
2467                 
2468                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2469
2470                 /* careful with the jiffy wrap... */
2471                 while ((timeout = ((int)deadline - (int)jiffies)) <= 0) {
2472                         const int n = 4;
2473                         const int p = 1;
2474                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2475                         
2476                         /* Time to check for timeouts on a few more peers: I do
2477                          * checks every 'p' seconds on a proportion of the peer
2478                          * table and I need to check every connection 'n' times
2479                          * within a timeout interval, to ensure I detect a
2480                          * timeout on any connection within (n+1)/n times the
2481                          * timeout interval. */
2482
2483                         if (ksocknal_data.ksnd_io_timeout > n * p)
2484                                 chunk = (chunk * n * p) / 
2485                                         ksocknal_data.ksnd_io_timeout;
2486                         if (chunk == 0)
2487                                 chunk = 1;
2488
2489                         for (i = 0; i < chunk; i++) {
2490                                 ksocknal_check_peer_timeouts (peer_index);
2491                                 peer_index = (peer_index + 1) % 
2492                                              ksocknal_data.ksnd_peer_hash_size;
2493                         }
2494
2495                         deadline += p * HZ;
2496                 }
2497
2498                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2499                 set_current_state (TASK_INTERRUPTIBLE);
2500
2501                 if (!ksocknal_data.ksnd_shuttingdown &&
2502                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2503                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2504                         schedule_timeout (timeout);
2505
2506                 set_current_state (TASK_RUNNING);
2507                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2508
2509                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2510         }
2511
2512         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2513
2514         ksocknal_thread_fini ();
2515         return (0);
2516 }
2517
2518 nal_cb_t ksocknal_lib = {
2519         nal_data:       &ksocknal_data,                /* NAL private data */
2520         cb_send:         ksocknal_send,
2521         cb_send_pages:   ksocknal_send_pages,
2522         cb_recv:         ksocknal_recv,
2523         cb_recv_pages:   ksocknal_recv_pages,
2524         cb_read:         ksocknal_read,
2525         cb_write:        ksocknal_write,
2526         cb_callback:     ksocknal_callback,
2527         cb_malloc:       ksocknal_malloc,
2528         cb_free:         ksocknal_free,
2529         cb_printf:       ksocknal_printf,
2530         cb_cli:          ksocknal_cli,
2531         cb_sti:          ksocknal_sti,
2532         cb_dist:         ksocknal_dist
2533 };