Whamcloud - gitweb
b=2776
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 /*
29  *  LIB functions follow
30  *
31  */
32 ptl_err_t
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34               user_ptr src_addr, size_t len)
35 {
36         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37                nal->ni.nid, (long)len, src_addr, dst_addr);
38
39         memcpy( dst_addr, src_addr, len );
40         return PTL_OK;
41 }
42
43 ptl_err_t
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45                void *src_addr, size_t len)
46 {
47         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48                nal->ni.nid, (long)len, src_addr, dst_addr);
49
50         memcpy( dst_addr, src_addr, len );
51         return PTL_OK;
52 }
53
54 void *
55 ksocknal_malloc(nal_cb_t *nal, size_t len)
56 {
57         void *buf;
58
59         PORTAL_ALLOC(buf, len);
60
61         if (buf != NULL)
62                 memset(buf, 0, len);
63
64         return (buf);
65 }
66
67 void
68 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
69 {
70         PORTAL_FREE(buf, len);
71 }
72
73 void
74 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
75 {
76         va_list ap;
77         char msg[256];
78
79         va_start (ap, fmt);
80         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
81         va_end (ap);
82
83         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
84
85         CDEBUG (D_NET, "%s", msg);
86 }
87
88 void
89 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
90 {
91         ksock_nal_data_t *data = nal->nal_data;
92
93         /* OK to ignore 'flags'; we're only ever serialise threads and
94          * never need to lock out interrupts */
95         spin_lock(&data->ksnd_nal_cb_lock);
96 }
97
98 void
99 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
100 {
101         ksock_nal_data_t *data;
102         data = nal->nal_data;
103
104         /* OK to ignore 'flags'; we're only ever serialise threads and
105          * never need to lock out interrupts */
106         spin_unlock(&data->ksnd_nal_cb_lock);
107 }
108
109 void
110 ksocknal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
111 {
112         /* holding ksnd_nal_cb_lock */
113
114         if (eq->event_callback != NULL)
115                 eq->event_callback(ev);
116         
117         if (waitqueue_active(&ksocknal_data.ksnd_yield_waitq))
118                 wake_up_all(&ksocknal_data.ksnd_yield_waitq);
119 }
120
121 int
122 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
123 {
124         /* I would guess that if ksocknal_get_peer (nid) == NULL,
125            and we're not routing, then 'nid' is very distant :) */
126         if ( nal->ni.nid == nid ) {
127                 *dist = 0;
128         } else {
129                 *dist = 1;
130         }
131
132         return 0;
133 }
134
135 void
136 ksocknal_free_ltx (ksock_ltx_t *ltx)
137 {
138         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
139         PORTAL_FREE(ltx, ltx->ltx_desc_size);
140 }
141
142 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
143 struct page *
144 ksocknal_kvaddr_to_page (unsigned long vaddr)
145 {
146         struct page *page;
147
148         if (vaddr >= VMALLOC_START &&
149             vaddr < VMALLOC_END)
150                 page = vmalloc_to_page ((void *)vaddr);
151 #if CONFIG_HIGHMEM
152         else if (vaddr >= PKMAP_BASE &&
153                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
154                 page = vmalloc_to_page ((void *)vaddr);
155                 /* in 2.4 ^ just walks the page tables */
156 #endif
157         else
158                 page = virt_to_page (vaddr);
159
160         if (page == NULL ||
161             !VALID_PAGE (page))
162                 return (NULL);
163
164         return (page);
165 }
166 #endif
167
168 int
169 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
170 {
171         struct socket *sock = conn->ksnc_sock;
172         struct iovec  *iov = tx->tx_iov;
173         int            fragsize = iov->iov_len;
174         unsigned long  vaddr = (unsigned long)iov->iov_base;
175         int            more = (tx->tx_niov > 1) || 
176                               (tx->tx_nkiov > 0) ||
177                               (!list_empty (&conn->ksnc_tx_queue));
178 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
179         int            offset = vaddr & (PAGE_SIZE - 1);
180         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
181         struct page   *page;
182 #endif
183         int            rc;
184
185         /* NB we can't trust socket ops to either consume our iovs
186          * or leave them alone, so we only send 1 frag at a time. */
187         LASSERT (fragsize <= tx->tx_resid);
188         LASSERT (tx->tx_niov > 0);
189         
190 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
191         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
192             (sock->sk->route_caps & NETIF_F_SG) &&
193             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
194             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
195                 
196                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
197                        (void *)vaddr, page, page_address(page), offset, zcsize);
198
199                 if (fragsize > zcsize) {
200                         more = 1;
201                         fragsize = zcsize;
202                 }
203
204                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
205                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
206                                        &tx->tx_zccd);
207         } else
208 #endif
209         {
210                 /* NB don't pass tx's iov; sendmsg may or may not update it */
211                 struct iovec fragiov = { .iov_base = (void *)vaddr,
212                                          .iov_len  = fragsize};
213                 struct msghdr msg = {
214                         .msg_name       = NULL,
215                         .msg_namelen    = 0,
216                         .msg_iov        = &fragiov,
217                         .msg_iovlen     = 1,
218                         .msg_control    = NULL,
219                         .msg_controllen = 0,
220                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
221                 };
222                 mm_segment_t oldmm = get_fs();
223
224                 set_fs (KERNEL_DS);
225                 rc = sock_sendmsg(sock, &msg, fragsize);
226                 set_fs (oldmm);
227         } 
228
229         if (rc > 0) {
230                 tx->tx_resid -= rc;
231
232                 if (rc < iov->iov_len) {
233                         /* didn't send whole iov entry... */
234                         iov->iov_base = (void *)(vaddr + rc);
235                         iov->iov_len -= rc;
236                 } else {
237                         tx->tx_iov++;
238                         tx->tx_niov--;
239                 }
240         }
241         
242         return (rc);
243 }
244
245 int
246 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
247 {
248         struct socket *sock = conn->ksnc_sock;
249         ptl_kiov_t    *kiov = tx->tx_kiov;
250         int            fragsize = kiov->kiov_len;
251         struct page   *page = kiov->kiov_page;
252         int            offset = kiov->kiov_offset;
253         int            more = (tx->tx_nkiov > 1) ||
254                               (!list_empty (&conn->ksnc_tx_queue));
255         int            rc;
256
257         /* NB we can't trust socket ops to either consume our iovs
258          * or leave them alone, so we only send 1 frag at a time. */
259         LASSERT (fragsize <= tx->tx_resid);
260         LASSERT (offset + fragsize <= PAGE_SIZE);
261         LASSERT (tx->tx_niov == 0);
262         LASSERT (tx->tx_nkiov > 0);
263
264 #if SOCKNAL_ZC
265         if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
266             (sock->sk->route_caps & NETIF_F_SG) &&
267             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
268
269                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
270                                page, offset, fragsize);
271
272                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
273                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
274                                        &tx->tx_zccd);
275         } else
276 #endif
277         {
278                 char *addr = ((char *)kmap (page)) + offset;
279                 struct iovec fragiov = {.iov_base = addr,
280                                         .iov_len  = fragsize};
281                 struct msghdr msg = {
282                         .msg_name       = NULL,
283                         .msg_namelen    = 0,
284                         .msg_iov        = &fragiov,
285                         .msg_iovlen     = 1,
286                         .msg_control    = NULL,
287                         .msg_controllen = 0,
288                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
289                 };
290                 mm_segment_t  oldmm = get_fs();
291                 
292                 set_fs (KERNEL_DS);
293                 rc = sock_sendmsg(sock, &msg, fragsize);
294                 set_fs (oldmm);
295
296                 kunmap (page);
297         }
298
299         if (rc > 0) {
300                 tx->tx_resid -= rc;
301  
302                 if (rc < fragsize) {
303                         kiov->kiov_offset = offset + rc;
304                         kiov->kiov_len    = fragsize - rc;
305                 } else {
306                         tx->tx_kiov++;
307                         tx->tx_nkiov--;
308                 }
309         }
310
311         return (rc);
312 }
313
314 int
315 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
316 {
317         int      rc;
318         
319         if (ksocknal_data.ksnd_stall_tx != 0) {
320                 set_current_state (TASK_UNINTERRUPTIBLE);
321                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
322         }
323
324         LASSERT (tx->tx_resid != 0);
325
326         rc = ksocknal_getconnsock (conn);
327         if (rc != 0) {
328                 LASSERT (conn->ksnc_closing);
329                 return (-ESHUTDOWN);
330         }
331
332         do {
333                 if (ksocknal_data.ksnd_enomem_tx > 0) {
334                         /* testing... */
335                         ksocknal_data.ksnd_enomem_tx--;
336                         rc = -EAGAIN;
337                 } else if (tx->tx_niov != 0) {
338                         rc = ksocknal_send_iov (conn, tx);
339                 } else {
340                         rc = ksocknal_send_kiov (conn, tx);
341                 }
342
343                 if (rc <= 0) {
344                         /* Didn't write anything.
345                          *
346                          * NB: rc == 0 and rc == -EAGAIN both mean try
347                          * again later (linux stack returns -EAGAIN for
348                          * this, but Adaptech TOE returns 0).
349                          *
350                          * Also, sends never fail with -ENOMEM, just
351                          * -EAGAIN, but with the added bonus that we can't
352                          * expect write_space() to call us back to tell us
353                          * when to try sending again.  We use the
354                          * SOCK_NOSPACE flag to diagnose...  */
355
356                         LASSERT(rc != -ENOMEM);
357
358                         if (rc == 0 || rc == -EAGAIN) {
359                                 if (test_bit(SOCK_NOSPACE, 
360                                              &conn->ksnc_sock->flags)) {
361                                         rc = -EAGAIN;
362                                 } else {
363                                         static int counter;
364                          
365                                         counter++;
366                                         if ((counter & (-counter)) == counter)
367                                                 CWARN("%d ENOMEM tx %p\n", 
368                                                       counter, conn);
369                                         rc = -ENOMEM;
370                                 }
371                         }
372                         break;
373                 }
374
375                 rc = 0;
376
377                 /* Consider the connection alive since we managed to chuck
378                  * more data into it.  Really, we'd like to consider it
379                  * alive only when the peer ACKs something, but
380                  * write_space() only gets called back while SOCK_NOSPACE
381                  * is set.  Instead, we presume peer death has occurred if
382                  * the socket doesn't drain within a timout */
383                 conn->ksnc_tx_deadline = jiffies + 
384                                          ksocknal_data.ksnd_io_timeout * HZ;
385                 conn->ksnc_peer->ksnp_last_alive = jiffies;
386
387         } while (tx->tx_resid != 0);
388
389         ksocknal_putconnsock (conn);
390         return (rc);
391 }
392
393 void
394 ksocknal_eager_ack (ksock_conn_t *conn)
395 {
396         int            opt = 1;
397         mm_segment_t   oldmm = get_fs();
398         struct socket *sock = conn->ksnc_sock;
399         
400         /* Remind the socket to ACK eagerly.  If I don't, the socket might
401          * think I'm about to send something it could piggy-back the ACK
402          * on, introducing delay in completing zero-copy sends in my
403          * peer. */
404
405         set_fs(KERNEL_DS);
406         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
407                                (char *)&opt, sizeof (opt));
408         set_fs(oldmm);
409 }
410
411 int
412 ksocknal_recv_iov (ksock_conn_t *conn)
413 {
414         struct iovec *iov = conn->ksnc_rx_iov;
415         int           fragsize  = iov->iov_len;
416         unsigned long vaddr = (unsigned long)iov->iov_base;
417         struct iovec  fragiov = { .iov_base = (void *)vaddr,
418                                   .iov_len  = fragsize};
419         struct msghdr msg = {
420                 .msg_name       = NULL,
421                 .msg_namelen    = 0,
422                 .msg_iov        = &fragiov,
423                 .msg_iovlen     = 1,
424                 .msg_control    = NULL,
425                 .msg_controllen = 0,
426                 .msg_flags      = 0
427         };
428         mm_segment_t oldmm = get_fs();
429         int          rc;
430
431         /* NB we can't trust socket ops to either consume our iovs
432          * or leave them alone, so we only receive 1 frag at a time. */
433         LASSERT (conn->ksnc_rx_niov > 0);
434         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
435
436         set_fs (KERNEL_DS);
437         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
438         /* NB this is just a boolean............................^ */
439         set_fs (oldmm);
440
441         if (rc <= 0)
442                 return (rc);
443
444         /* received something... */
445         conn->ksnc_peer->ksnp_last_alive = jiffies;
446         conn->ksnc_rx_deadline = jiffies + 
447                                  ksocknal_data.ksnd_io_timeout * HZ;
448         mb();                           /* order with setting rx_started */
449         conn->ksnc_rx_started = 1;
450
451         conn->ksnc_rx_nob_wanted -= rc;
452         conn->ksnc_rx_nob_left -= rc;
453                 
454         if (rc < fragsize) {
455                 iov->iov_base = (void *)(vaddr + rc);
456                 iov->iov_len = fragsize - rc;
457                 return (-EAGAIN);
458         }
459
460         conn->ksnc_rx_iov++;
461         conn->ksnc_rx_niov--;
462         return (1);
463 }
464
465 int
466 ksocknal_recv_kiov (ksock_conn_t *conn)
467 {
468         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
469         struct page  *page = kiov->kiov_page;
470         int           offset = kiov->kiov_offset;
471         int           fragsize = kiov->kiov_len;
472         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
473         struct iovec  fragiov = { .iov_base = (void *)vaddr,
474                                   .iov_len  = fragsize};
475         struct msghdr msg = {
476                 .msg_name       = NULL,
477                 .msg_namelen    = 0,
478                 .msg_iov        = &fragiov,
479                 .msg_iovlen     = 1,
480                 .msg_control    = NULL,
481                 .msg_controllen = 0,
482                 .msg_flags      = 0
483         };
484         mm_segment_t oldmm = get_fs();
485         int          rc;
486
487         /* NB we can't trust socket ops to either consume our iovs
488          * or leave them alone, so we only receive 1 frag at a time. */
489         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
490         LASSERT (conn->ksnc_rx_nkiov > 0);
491         LASSERT (offset + fragsize <= PAGE_SIZE);
492
493         set_fs (KERNEL_DS);
494         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
495         /* NB this is just a boolean............................^ */
496         set_fs (oldmm);
497
498         kunmap (page);
499         
500         if (rc <= 0)
501                 return (rc);
502         
503         /* received something... */
504         conn->ksnc_peer->ksnp_last_alive = jiffies;
505         conn->ksnc_rx_deadline = jiffies + 
506                                  ksocknal_data.ksnd_io_timeout * HZ;
507         mb();                           /* order with setting rx_started */
508         conn->ksnc_rx_started = 1;
509
510         conn->ksnc_rx_nob_wanted -= rc;
511         conn->ksnc_rx_nob_left -= rc;
512                 
513         if (rc < fragsize) {
514                 kiov->kiov_offset = offset + rc;
515                 kiov->kiov_len = fragsize - rc;
516                 return (-EAGAIN);
517         }
518
519         conn->ksnc_rx_kiov++;
520         conn->ksnc_rx_nkiov--;
521         return (1);
522 }
523
524 int
525 ksocknal_receive (ksock_conn_t *conn) 
526 {
527         /* Return 1 on success, 0 on EOF, < 0 on error.
528          * Caller checks ksnc_rx_nob_wanted to determine
529          * progress/completion. */
530         int     rc;
531         ENTRY;
532         
533         if (ksocknal_data.ksnd_stall_rx != 0) {
534                 set_current_state (TASK_UNINTERRUPTIBLE);
535                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
536         }
537
538         rc = ksocknal_getconnsock (conn);
539         if (rc != 0) {
540                 LASSERT (conn->ksnc_closing);
541                 return (-ESHUTDOWN);
542         }
543
544         for (;;) {
545                 if (conn->ksnc_rx_niov != 0)
546                         rc = ksocknal_recv_iov (conn);
547                 else
548                         rc = ksocknal_recv_kiov (conn);
549
550                 if (rc <= 0) {
551                         /* error/EOF or partial receive */
552                         if (rc == -EAGAIN) {
553                                 rc = 1;
554                         } else if (rc == 0 && conn->ksnc_rx_started) {
555                                 /* EOF in the middle of a message */
556                                 rc = -EPROTO;
557                         }
558                         break;
559                 }
560
561                 /* Completed a fragment */
562
563                 if (conn->ksnc_rx_nob_wanted == 0) {
564                         /* Completed a message segment (header or payload) */
565                         if ((ksocknal_data.ksnd_eager_ack & conn->ksnc_type) != 0 &&
566                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
567                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
568                                 /* Remind the socket to ack eagerly... */
569                                 ksocknal_eager_ack(conn);
570                         }
571                         rc = 1;
572                         break;
573                 }
574         }
575
576         ksocknal_putconnsock (conn);
577         RETURN (rc);
578 }
579
580 #if SOCKNAL_ZC
581 void
582 ksocknal_zc_callback (zccd_t *zcd)
583 {
584         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
585         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
586         unsigned long  flags;
587         ENTRY;
588
589         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
590
591         spin_lock_irqsave (&sched->kss_lock, flags);
592
593         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
594         wake_up (&sched->kss_waitq);
595
596         spin_unlock_irqrestore (&sched->kss_lock, flags);
597         EXIT;
598 }
599 #endif
600
601 void
602 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
603 {
604         ksock_ltx_t   *ltx;
605         ENTRY;
606
607         if (tx->tx_conn != NULL) {
608                 /* This tx got queued on a conn; do the accounting... */
609                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
610 #if SOCKNAL_ZC
611                 /* zero copy completion isn't always from
612                  * process_transmit() so it needs to keep a ref on
613                  * tx_conn... */
614                 if (asynch)
615                         ksocknal_put_conn (tx->tx_conn);
616 #else
617                 LASSERT (!asynch);
618 #endif
619         }
620
621         if (tx->tx_isfwd) {             /* was a forwarded packet? */
622                 kpr_fwd_done (&ksocknal_data.ksnd_router,
623                               KSOCK_TX_2_KPR_FWD_DESC (tx), 
624                               (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
625                 EXIT;
626                 return;
627         }
628
629         /* local send */
630         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
631
632         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
633                       (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
634
635         ksocknal_free_ltx (ltx);
636         EXIT;
637 }
638
639 void
640 ksocknal_tx_launched (ksock_tx_t *tx) 
641 {
642 #if SOCKNAL_ZC
643         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
644                 ksock_conn_t  *conn = tx->tx_conn;
645                 
646                 /* zccd skbufs are still in-flight.  First take a ref on
647                  * conn, so it hangs about for ksocknal_tx_done... */
648                 atomic_inc (&conn->ksnc_refcount);
649
650                 /* ...then drop the initial ref on zccd, so the zero copy
651                  * callback can occur */
652                 zccd_put (&tx->tx_zccd);
653                 return;
654         }
655 #endif
656         /* Any zero-copy-ness (if any) has completed; I can complete the
657          * transmit now, avoiding an extra schedule */
658         ksocknal_tx_done (tx, 0);
659 }
660
661 int
662 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
663 {
664         unsigned long  flags;
665         int            rc;
666        
667         rc = ksocknal_transmit (conn, tx);
668
669         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
670
671         if (tx->tx_resid == 0) {
672                 /* Sent everything OK */
673                 LASSERT (rc == 0);
674
675                 ksocknal_tx_launched (tx);
676                 return (0);
677         }
678
679         if (rc == -EAGAIN)
680                 return (rc);
681
682         if (rc == -ENOMEM) {
683                 /* Queue on ksnd_enomem_conns for retry after a timeout */
684                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
685
686                 /* enomem list takes over scheduler's ref... */
687                 LASSERT (conn->ksnc_tx_scheduled);
688                 list_add_tail(&conn->ksnc_tx_list,
689                               &ksocknal_data.ksnd_enomem_conns);
690                 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
691                                    ksocknal_data.ksnd_reaper_waketime))
692                         wake_up (&ksocknal_data.ksnd_reaper_waitq);
693                 
694                 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
695                 return (rc);
696         }
697
698         /* Actual error */
699         LASSERT (rc < 0);
700
701         if (!conn->ksnc_closing)
702                 CERROR("[%p] Error %d on write to "LPX64
703                        " ip %d.%d.%d.%d:%d\n", conn, rc,
704                        conn->ksnc_peer->ksnp_nid,
705                        HIPQUAD(conn->ksnc_ipaddr),
706                        conn->ksnc_port);
707
708         ksocknal_close_conn_and_siblings (conn, rc);
709         ksocknal_tx_launched (tx);
710
711         return (rc);
712 }
713
714 void
715 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
716 {
717         unsigned long     flags;
718
719         /* called holding write lock on ksnd_global_lock */
720
721         LASSERT (!route->ksnr_deleted);
722         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
723         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
724         LASSERT (!route->ksnr_connecting);
725         
726         if (ksocknal_data.ksnd_typed_conns)
727                 route->ksnr_connecting = 
728                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
729         else
730                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
731
732         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
733         
734         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
735         
736         list_add_tail (&route->ksnr_connect_list,
737                        &ksocknal_data.ksnd_autoconnectd_routes);
738         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
739         
740         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
741 }
742
743 ksock_peer_t *
744 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
745 {
746         char          ipbuf[PTL_NALFMT_SIZE];
747         ptl_nid_t     target_nid;
748         int           rc;
749         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
750
751         if (peer != NULL)
752                 return (peer);
753
754         if (tx->tx_isfwd) {
755                 CERROR ("Can't send packet to "LPX64
756                        " %s: routed target is not a peer\n",
757                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
758                 return (NULL);
759         }
760
761         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
762                          &target_nid);
763         if (rc != 0) {
764                 CERROR ("Can't route to "LPX64" %s: router error %d\n",
765                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
766                 return (NULL);
767         }
768
769         peer = ksocknal_find_peer_locked (target_nid);
770         if (peer != NULL)
771                 return (peer);
772
773         CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
774                 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
775         return (NULL);
776 }
777
778 ksock_conn_t *
779 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
780 {
781         struct list_head *tmp;
782         ksock_conn_t     *typed = NULL;
783         int               tnob  = 0;
784         ksock_conn_t     *fallback = NULL;
785         int               fnob     = 0;
786
787         /* Find the conn with the shortest tx queue */
788         list_for_each (tmp, &peer->ksnp_conns) {
789                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
790                 int           nob = atomic_read(&c->ksnc_tx_nob) +
791                                         c->ksnc_sock->sk->sk_wmem_queued;
792
793                 LASSERT (!c->ksnc_closing);
794
795                 if (fallback == NULL || nob < fnob) {
796                         fallback = c;
797                         fnob     = nob;
798                 }
799
800                 if (!ksocknal_data.ksnd_typed_conns)
801                         continue;
802
803                 switch (c->ksnc_type) {
804                 default:
805                         LBUG();
806                 case SOCKNAL_CONN_ANY:
807                         break;
808                 case SOCKNAL_CONN_BULK_IN:
809                         continue;
810                 case SOCKNAL_CONN_BULK_OUT:
811                         if (tx->tx_nob < ksocknal_data.ksnd_min_bulk)
812                                 continue;
813                         break;
814                 case SOCKNAL_CONN_CONTROL:
815                         if (tx->tx_nob >= ksocknal_data.ksnd_min_bulk)
816                                 continue;
817                         break;
818                 }
819
820                 if (typed == NULL || nob < tnob) {
821                         typed = c;
822                         tnob  = nob;
823                 }
824         }
825
826         /* prefer the typed selection */
827         return ((typed != NULL) ? typed : fallback);
828 }
829
830 void
831 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
832 {
833         unsigned long  flags;
834         ksock_sched_t *sched = conn->ksnc_scheduler;
835
836         /* called holding global lock (read or irq-write) and caller may
837          * not have dropped this lock between finding conn and calling me,
838          * so we don't need the {get,put}connsock dance to deref
839          * ksnc_sock... */
840         LASSERT(!conn->ksnc_closing);
841         LASSERT(tx->tx_resid == tx->tx_nob);
842         
843         CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
844                 conn->ksnc_peer->ksnp_nid,
845                 HIPQUAD(conn->ksnc_ipaddr),
846                 conn->ksnc_port);
847
848         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
849         tx->tx_conn = conn;
850
851 #if SOCKNAL_ZC
852         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
853         /* NB this sets 1 ref on zccd, so the callback can only occur after
854          * I've released this ref. */
855 #endif
856         spin_lock_irqsave (&sched->kss_lock, flags);
857
858         conn->ksnc_tx_deadline = jiffies + 
859                                  ksocknal_data.ksnd_io_timeout * HZ;
860         mb();                                   /* order with list_add_tail */
861
862         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
863                 
864         if (conn->ksnc_tx_ready &&      /* able to send */
865             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
866                 /* +1 ref for scheduler */
867                 atomic_inc (&conn->ksnc_refcount);
868                 list_add_tail (&conn->ksnc_tx_list, 
869                                &sched->kss_tx_conns);
870                 conn->ksnc_tx_scheduled = 1;
871                 wake_up (&sched->kss_waitq);
872         }
873
874         spin_unlock_irqrestore (&sched->kss_lock, flags);
875 }
876
877 ksock_route_t *
878 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
879 {
880         struct list_head  *tmp;
881         ksock_route_t     *route;
882         ksock_route_t     *candidate = NULL;
883         int                found = 0;
884         int                bits;
885         
886         list_for_each (tmp, &peer->ksnp_routes) {
887                 route = list_entry (tmp, ksock_route_t, ksnr_list);
888                 bits  = route->ksnr_connected;
889                 
890                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
891                     (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
892                     route->ksnr_connecting != 0) {
893                         /* All typed connections have been established, or
894                          * an untyped connection has been established, or
895                          * connections are currently being established */
896                         found = 1;
897                         continue;
898                 }
899
900                 /* too soon to retry this guy? */
901                 if (!time_after_eq (jiffies, route->ksnr_timeout))
902                         continue;
903                 
904                 /* always do eager routes */
905                 if (route->ksnr_eager)
906                         return (route);
907
908                 if (candidate == NULL) {
909                         /* If we don't find any other route that is fully
910                          * connected or connecting, the first connectable
911                          * route is returned.  If it fails to connect, it
912                          * will get placed at the end of the list */
913                         candidate = route;
914                 }
915         }
916  
917         return (found ? NULL : candidate);
918 }
919
920 ksock_route_t *
921 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
922 {
923         struct list_head  *tmp;
924         ksock_route_t     *route;
925
926         list_for_each (tmp, &peer->ksnp_routes) {
927                 route = list_entry (tmp, ksock_route_t, ksnr_list);
928                 
929                 if (route->ksnr_connecting != 0)
930                         return (route);
931         }
932         
933         return (NULL);
934 }
935
936 int
937 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
938 {
939         unsigned long     flags;
940         ksock_peer_t     *peer;
941         ksock_conn_t     *conn;
942         ksock_route_t    *route;
943         rwlock_t         *g_lock;
944         
945         /* Ensure the frags we've been given EXACTLY match the number of
946          * bytes we want to send.  Many TCP/IP stacks disregard any total
947          * size parameters passed to them and just look at the frags. 
948          *
949          * We always expect at least 1 mapped fragment containing the
950          * complete portals header. */
951         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
952                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
953         LASSERT (tx->tx_niov >= 1);
954         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
955
956         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
957                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
958                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
959
960         tx->tx_conn = NULL;                     /* only set when assigned a conn */
961         tx->tx_resid = tx->tx_nob;
962         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
963
964         g_lock = &ksocknal_data.ksnd_global_lock;
965         read_lock (g_lock);
966         
967         peer = ksocknal_find_target_peer_locked (tx, nid);
968         if (peer == NULL) {
969                 read_unlock (g_lock);
970                 return (-EHOSTUNREACH);
971         }
972
973         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
974                 conn = ksocknal_find_conn_locked (tx, peer);
975                 if (conn != NULL) {
976                         /* I've got no autoconnect routes that need to be
977                          * connecting and I do have an actual connection... */
978                         ksocknal_queue_tx_locked (tx, conn);
979                         read_unlock (g_lock);
980                         return (0);
981                 }
982         }
983         
984         /* Making one or more connections; I'll need a write lock... */
985
986         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
987         read_unlock (g_lock);
988         write_lock_irqsave (g_lock, flags);
989         
990         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
991                 write_unlock_irqrestore (g_lock, flags);
992                 ksocknal_put_peer (peer);
993                 return (-EHOSTUNREACH);
994         }
995         ksocknal_put_peer (peer);               /* drop ref I got above */
996
997         for (;;) {
998                 /* launch any/all autoconnections that need it */
999                 route = ksocknal_find_connectable_route_locked (peer);
1000                 if (route == NULL)
1001                         break;
1002
1003                 ksocknal_launch_autoconnect_locked (route);
1004         }
1005
1006         conn = ksocknal_find_conn_locked (tx, peer);
1007         if (conn != NULL) {
1008                 /* Connection exists; queue message on it */
1009                 ksocknal_queue_tx_locked (tx, conn);
1010                 write_unlock_irqrestore (g_lock, flags);
1011                 return (0);
1012         }
1013
1014         route = ksocknal_find_connecting_route_locked (peer);
1015         if (route != NULL) {
1016                 /* At least 1 connection is being established; queue the
1017                  * message... */
1018                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1019                 write_unlock_irqrestore (g_lock, flags);
1020                 return (0);
1021         }
1022         
1023         write_unlock_irqrestore (g_lock, flags);
1024         return (-EHOSTUNREACH);
1025 }
1026
1027 ptl_err_t
1028 ksocknal_sendmsg(nal_cb_t     *nal, 
1029                  void         *private, 
1030                  lib_msg_t    *cookie,
1031                  ptl_hdr_t    *hdr, 
1032                  int           type, 
1033                  ptl_nid_t     nid, 
1034                  ptl_pid_t     pid,
1035                  unsigned int  payload_niov, 
1036                  struct iovec *payload_iov, 
1037                  ptl_kiov_t   *payload_kiov,
1038                  size_t        payload_offset,
1039                  size_t        payload_nob)
1040 {
1041         ksock_ltx_t  *ltx;
1042         int           desc_size;
1043         int           rc;
1044
1045         /* NB 'private' is different depending on what we're sending.
1046          * Just ignore it... */
1047
1048         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1049                " pid %d\n", payload_nob, payload_niov, nid , pid);
1050
1051         LASSERT (payload_nob == 0 || payload_niov > 0);
1052         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1053
1054         /* It must be OK to kmap() if required */
1055         LASSERT (payload_kiov == NULL || !in_interrupt ());
1056         /* payload is either all vaddrs or all pages */
1057         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1058         
1059         if (payload_iov != NULL)
1060                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1061         else
1062                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1063         
1064         if (in_interrupt() ||
1065             type == PTL_MSG_ACK ||
1066             type == PTL_MSG_REPLY) {
1067                 /* Can't block if in interrupt or responding to an incoming
1068                  * message */
1069                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1070         } else {
1071                 PORTAL_ALLOC(ltx, desc_size);
1072         }
1073         
1074         if (ltx == NULL) {
1075                 CERROR("Can't allocate tx desc type %d size %d %s\n",
1076                        type, desc_size, in_interrupt() ? "(intr)" : "");
1077                 return (PTL_NO_SPACE);
1078         }
1079
1080         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1081
1082         ltx->ltx_desc_size = desc_size;
1083         
1084         /* We always have 1 mapped frag for the header */
1085         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1086         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1087         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1088         ltx->ltx_hdr = *hdr;
1089         
1090         ltx->ltx_private = private;
1091         ltx->ltx_cookie = cookie;
1092         
1093         ltx->ltx_tx.tx_isfwd = 0;
1094         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1095
1096         if (payload_iov != NULL) {
1097                 /* payload is all mapped */
1098                 ltx->ltx_tx.tx_kiov  = NULL;
1099                 ltx->ltx_tx.tx_nkiov = 0;
1100
1101                 ltx->ltx_tx.tx_niov = 
1102                         1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
1103                                             payload_niov, payload_iov,
1104                                             payload_offset, payload_nob);
1105         } else {
1106                 /* payload is all pages */
1107                 ltx->ltx_tx.tx_niov = 1;
1108
1109                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1110                 ltx->ltx_tx.tx_nkiov =
1111                         lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1112                                          payload_niov, payload_kiov,
1113                                          payload_offset, payload_nob);
1114         }
1115
1116         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1117         if (rc == 0)
1118                 return (PTL_OK);
1119         
1120         ksocknal_free_ltx(ltx);
1121         return (PTL_FAIL);
1122 }
1123
1124 ptl_err_t
1125 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1126                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1127                unsigned int payload_niov, struct iovec *payload_iov,
1128                size_t payload_offset, size_t payload_len)
1129 {
1130         return (ksocknal_sendmsg(nal, private, cookie,
1131                                  hdr, type, nid, pid,
1132                                  payload_niov, payload_iov, NULL,
1133                                  payload_offset, payload_len));
1134 }
1135
1136 ptl_err_t
1137 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1138                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1139                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1140                      size_t payload_offset, size_t payload_len)
1141 {
1142         return (ksocknal_sendmsg(nal, private, cookie,
1143                                  hdr, type, nid, pid,
1144                                  payload_niov, NULL, payload_kiov,
1145                                  payload_offset, payload_len));
1146 }
1147
1148 void
1149 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1150 {
1151         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1152         ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1153         int           rc;
1154         
1155         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1156                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1157
1158         /* I'm the gateway; must be the last hop */
1159         if (nid == ksocknal_lib.ni.nid)
1160                 nid = fwd->kprfd_target_nid;
1161
1162         /* setup iov for hdr */
1163         ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1164         ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1165
1166         ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
1167         ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1168         ftx->ftx_tx.tx_niov  = 1;
1169         ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
1170         ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1171         ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
1172
1173         rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1174         if (rc != 0)
1175                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1176 }
1177
1178 int
1179 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1180 {
1181         long    pid = kernel_thread (fn, arg, 0);
1182
1183         if (pid < 0)
1184                 return ((int)pid);
1185
1186         atomic_inc (&ksocknal_data.ksnd_nthreads);
1187         return (0);
1188 }
1189
1190 void
1191 ksocknal_thread_fini (void)
1192 {
1193         atomic_dec (&ksocknal_data.ksnd_nthreads);
1194 }
1195
1196 void
1197 ksocknal_fmb_callback (void *arg, int error)
1198 {
1199         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1200         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1201         ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
1202         ksock_conn_t      *conn = NULL;
1203         ksock_sched_t     *sched;
1204         unsigned long      flags;
1205         char               ipbuf[PTL_NALFMT_SIZE];
1206         char               ipbuf2[PTL_NALFMT_SIZE];
1207
1208         if (error != 0)
1209                 CERROR("Failed to route packet from "
1210                        LPX64" %s to "LPX64" %s: %d\n",
1211                        NTOH__u64(hdr->src_nid),
1212                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
1213                        NTOH__u64(hdr->dest_nid),
1214                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2),
1215                        error);
1216         else
1217                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1218                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1219
1220         /* drop peer ref taken on init */
1221         ksocknal_put_peer (fmb->fmb_peer);
1222
1223         spin_lock_irqsave (&fmp->fmp_lock, flags);
1224
1225         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1226         fmp->fmp_nactive_fmbs--;
1227
1228         if (!list_empty (&fmp->fmp_blocked_conns)) {
1229                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1230                                    ksock_conn_t, ksnc_rx_list);
1231                 list_del (&conn->ksnc_rx_list);
1232         }
1233
1234         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1235
1236         if (conn == NULL)
1237                 return;
1238
1239         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1240         LASSERT (conn->ksnc_rx_scheduled);
1241         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1242
1243         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1244
1245         sched = conn->ksnc_scheduler;
1246
1247         spin_lock_irqsave (&sched->kss_lock, flags);
1248
1249         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1250         wake_up (&sched->kss_waitq);
1251
1252         spin_unlock_irqrestore (&sched->kss_lock, flags);
1253 }
1254
1255 ksock_fmb_t *
1256 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1257 {
1258         int               payload_nob = conn->ksnc_rx_nob_left;
1259         unsigned long     flags;
1260         ksock_fmb_pool_t *pool;
1261         ksock_fmb_t      *fmb;
1262
1263         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1264         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1265
1266         if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1267                 pool = &ksocknal_data.ksnd_small_fmp;
1268         else
1269                 pool = &ksocknal_data.ksnd_large_fmp;
1270
1271         spin_lock_irqsave (&pool->fmp_lock, flags);
1272
1273         if (!list_empty (&pool->fmp_idle_fmbs)) {
1274                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1275                                  ksock_fmb_t, fmb_list);
1276                 list_del (&fmb->fmb_list);
1277                 pool->fmp_nactive_fmbs++;
1278                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1279
1280                 return (fmb);
1281         }
1282
1283         /* deschedule until fmb free */
1284
1285         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1286
1287         list_add_tail (&conn->ksnc_rx_list,
1288                        &pool->fmp_blocked_conns);
1289
1290         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1291         return (NULL);
1292 }
1293
1294 int
1295 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1296 {
1297         int       payload_nob = conn->ksnc_rx_nob_left;
1298         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1299         int       niov = 0;
1300         int       nob = payload_nob;
1301
1302         LASSERT (conn->ksnc_rx_scheduled);
1303         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1304         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1305         LASSERT (payload_nob >= 0);
1306         LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1307         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1308         LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1309
1310         /* Take a ref on the conn's peer to prevent module unload before
1311          * forwarding completes. */
1312         fmb->fmb_peer = conn->ksnc_peer;
1313         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1314
1315         /* Copy the header we just read into the forwarding buffer.  If
1316          * there's payload, start reading reading it into the buffer,
1317          * otherwise the forwarding buffer can be kicked off
1318          * immediately. */
1319         fmb->fmb_hdr = conn->ksnc_hdr;
1320
1321         while (nob > 0) {
1322                 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1323                 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1324                 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1325                 nob -= PAGE_SIZE;
1326                 niov++;
1327         }
1328
1329         kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1330                      payload_nob, niov, fmb->fmb_kiov,
1331                      ksocknal_fmb_callback, fmb);
1332
1333         if (payload_nob == 0) {         /* got complete packet already */
1334                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1335                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
1336
1337                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1338
1339                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1340                 return (1);
1341         }
1342
1343         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1344         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1345         
1346         /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1347          * buffer */
1348         LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1349
1350         conn->ksnc_rx_niov = 0;
1351         conn->ksnc_rx_nkiov = niov;
1352         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1353         memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1354         
1355         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1356                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1357         return (0);
1358 }
1359
1360 void
1361 ksocknal_fwd_parse (ksock_conn_t *conn)
1362 {
1363         ksock_peer_t *peer;
1364         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1365         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1366         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1367         char str[PTL_NALFMT_SIZE];
1368         char str2[PTL_NALFMT_SIZE];
1369
1370         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1371                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1372
1373         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1374         LASSERT (conn->ksnc_rx_scheduled);
1375
1376         if (body_len < 0) {                 /* length corrupt (overflow) */
1377                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1378                        "packet size %d illegal\n",
1379                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1380                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1381                        body_len);
1382
1383                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1384                 return;
1385         }
1386
1387         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1388                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1389                        " (%s): not forwarding\n",
1390                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1391                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1392                 /* on to new packet (skip this one's body) */
1393                 ksocknal_new_packet (conn, body_len);
1394                 return;
1395         }
1396
1397         if (body_len > PTL_MTU) {      /* too big to forward */
1398                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1399                         "(%s): packet size %d too big\n",
1400                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1401                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1402                         body_len);
1403                 /* on to new packet (skip this one's body) */
1404                 ksocknal_new_packet (conn, body_len);
1405                 return;
1406         }
1407
1408         /* should have gone direct */
1409         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1410         if (peer != NULL) {
1411                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1412                         "(%s): target is a peer\n",
1413                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1414                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1415                 ksocknal_put_peer (peer);  /* drop ref from get above */
1416
1417                 /* on to next packet (skip this one's body) */
1418                 ksocknal_new_packet (conn, body_len);
1419                 return;
1420         }
1421
1422         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1423         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1424         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1425 }
1426
1427 int
1428 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1429 {
1430         static char ksocknal_slop_buffer[4096];
1431
1432         int   nob;
1433         int   niov;
1434         int   skipped;
1435
1436         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1437                 conn->ksnc_rx_started = 0;
1438                 mb ();                          /* racing with timeout thread */
1439                 
1440                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1441                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1442                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1443
1444                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1445                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1446                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1447                 conn->ksnc_rx_niov = 1;
1448
1449                 conn->ksnc_rx_kiov = NULL;
1450                 conn->ksnc_rx_nkiov = 0;
1451                 return (1);
1452         }
1453
1454         /* Set up to skip as much a possible now.  If there's more left
1455          * (ran out of iov entries) we'll get called again */
1456
1457         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1458         conn->ksnc_rx_nob_left = nob_to_skip;
1459         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1460         skipped = 0;
1461         niov = 0;
1462
1463         do {
1464                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1465
1466                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1467                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1468                 niov++;
1469                 skipped += nob;
1470                 nob_to_skip -=nob;
1471
1472         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1473                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1474
1475         conn->ksnc_rx_niov = niov;
1476         conn->ksnc_rx_kiov = NULL;
1477         conn->ksnc_rx_nkiov = 0;
1478         conn->ksnc_rx_nob_wanted = skipped;
1479         return (0);
1480 }
1481
1482 int
1483 ksocknal_process_receive (ksock_conn_t *conn)
1484 {
1485         ksock_fmb_t  *fmb;
1486         int           rc;
1487         
1488         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1489
1490         /* doesn't need a forwarding buffer */
1491         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1492                 goto try_read;
1493
1494  get_fmb:
1495         fmb = ksocknal_get_idle_fmb (conn);
1496         if (fmb == NULL) {
1497                 /* conn descheduled waiting for idle fmb */
1498                 return (0);
1499         }
1500
1501         if (ksocknal_init_fmb (conn, fmb)) {
1502                 /* packet forwarded */
1503                 return (0);
1504         }
1505
1506  try_read:
1507         /* NB: sched lock NOT held */
1508         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1509                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1510                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1511                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1512
1513         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1514
1515         rc = ksocknal_receive(conn);
1516
1517         if (rc <= 0) {
1518                 LASSERT (rc != -EAGAIN);
1519
1520                 if (rc == 0)
1521                         CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1522                                conn, conn->ksnc_peer->ksnp_nid,
1523                                HIPQUAD(conn->ksnc_ipaddr),
1524                                conn->ksnc_port);
1525                 else if (!conn->ksnc_closing)
1526                         CERROR ("[%p] Error %d on read from "LPX64
1527                                 " ip %d.%d.%d.%d:%d\n",
1528                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1529                                 HIPQUAD(conn->ksnc_ipaddr),
1530                                 conn->ksnc_port);
1531
1532                 ksocknal_close_conn_and_siblings (conn, rc);
1533                 return (rc == 0 ? -ESHUTDOWN : rc);
1534         }
1535
1536         if (conn->ksnc_rx_nob_wanted != 0) {
1537                 /* short read */
1538                 return (-EAGAIN);
1539         }
1540         
1541         switch (conn->ksnc_rx_state) {
1542         case SOCKNAL_RX_HEADER:
1543                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1544                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1545                         /* This packet isn't for me */
1546                         ksocknal_fwd_parse (conn);
1547                         switch (conn->ksnc_rx_state) {
1548                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1549                                 return (0);     /* => come back later */
1550                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1551                                 goto try_read;  /* => go read it */
1552                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1553                                 goto get_fmb;   /* => go get a fwd msg buffer */
1554                         default:
1555                                 LBUG ();
1556                         }
1557                         /* Not Reached */
1558                 }
1559
1560                 /* sets wanted_len, iovs etc */
1561                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1562
1563                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1564                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1565                         goto try_read;          /* go read the payload */
1566                 }
1567                 /* Fall through (completed packet for me) */
1568
1569         case SOCKNAL_RX_BODY:
1570                 /* payload all received */
1571                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1572                 /* Fall through */
1573
1574         case SOCKNAL_RX_SLOP:
1575                 /* starting new packet? */
1576                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1577                         return (0);     /* come back later */
1578                 goto try_read;          /* try to finish reading slop now */
1579
1580         case SOCKNAL_RX_BODY_FWD:
1581                 /* payload all received */
1582                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1583                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1584                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1585                         conn->ksnc_rx_nob_left);
1586
1587                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1588                  * conn->ksnc_cookie */
1589                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1590                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1591
1592                 /* no slop in forwarded packets */
1593                 LASSERT (conn->ksnc_rx_nob_left == 0);
1594
1595                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1596                 return (0);                     /* (later) */
1597
1598         default:
1599                 break;
1600         }
1601
1602         /* Not Reached */
1603         LBUG ();
1604         return (-EINVAL);                       /* keep gcc happy */
1605 }
1606
1607 ptl_err_t
1608 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1609                unsigned int niov, struct iovec *iov, 
1610                size_t offset, size_t mlen, size_t rlen)
1611 {
1612         ksock_conn_t *conn = (ksock_conn_t *)private;
1613
1614         LASSERT (mlen <= rlen);
1615         LASSERT (niov <= PTL_MD_MAX_IOV);
1616         
1617         conn->ksnc_cookie = msg;
1618         conn->ksnc_rx_nob_wanted = mlen;
1619         conn->ksnc_rx_nob_left   = rlen;
1620
1621         conn->ksnc_rx_nkiov = 0;
1622         conn->ksnc_rx_kiov = NULL;
1623         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1624         conn->ksnc_rx_niov =
1625                 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1626                                 niov, iov, offset, mlen);
1627
1628         LASSERT (mlen == 
1629                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1630                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1631
1632         return (PTL_OK);
1633 }
1634
1635 ptl_err_t
1636 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1637                      unsigned int niov, ptl_kiov_t *kiov, 
1638                      size_t offset, size_t mlen, size_t rlen)
1639 {
1640         ksock_conn_t *conn = (ksock_conn_t *)private;
1641
1642         LASSERT (mlen <= rlen);
1643         LASSERT (niov <= PTL_MD_MAX_IOV);
1644         
1645         conn->ksnc_cookie = msg;
1646         conn->ksnc_rx_nob_wanted = mlen;
1647         conn->ksnc_rx_nob_left   = rlen;
1648
1649         conn->ksnc_rx_niov = 0;
1650         conn->ksnc_rx_iov  = NULL;
1651         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1652         conn->ksnc_rx_nkiov = 
1653                 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1654                                  niov, kiov, offset, mlen);
1655
1656         LASSERT (mlen == 
1657                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1658                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1659
1660         return (PTL_OK);
1661 }
1662
1663 int ksocknal_scheduler (void *arg)
1664 {
1665         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1666         ksock_conn_t      *conn;
1667         ksock_tx_t        *tx;
1668         unsigned long      flags;
1669         int                rc;
1670         int                nloops = 0;
1671         int                id = sched - ksocknal_data.ksnd_schedulers;
1672         char               name[16];
1673
1674         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1675         kportal_daemonize (name);
1676         kportal_blockallsigs ();
1677
1678 #if (CONFIG_SMP && CPU_AFFINITY)
1679         if ((cpu_online_map & (1 << id)) != 0) {
1680 #if 1
1681                 current->cpus_allowed = (1 << id);
1682 #else
1683                 set_cpus_allowed (current, 1<<id);
1684 #endif
1685         } else {
1686                 CERROR ("Can't set CPU affinity for %s\n", name);
1687         }
1688 #endif /* CONFIG_SMP && CPU_AFFINITY */
1689         
1690         spin_lock_irqsave (&sched->kss_lock, flags);
1691
1692         while (!ksocknal_data.ksnd_shuttingdown) {
1693                 int did_something = 0;
1694
1695                 /* Ensure I progress everything semi-fairly */
1696
1697                 if (!list_empty (&sched->kss_rx_conns)) {
1698                         conn = list_entry(sched->kss_rx_conns.next,
1699                                           ksock_conn_t, ksnc_rx_list);
1700                         list_del(&conn->ksnc_rx_list);
1701
1702                         LASSERT(conn->ksnc_rx_scheduled);
1703                         LASSERT(conn->ksnc_rx_ready);
1704
1705                         /* clear rx_ready in case receive isn't complete.
1706                          * Do it BEFORE we call process_recv, since
1707                          * data_ready can set it any time after we release
1708                          * kss_lock. */
1709                         conn->ksnc_rx_ready = 0;
1710                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1711                         
1712                         rc = ksocknal_process_receive(conn);
1713                         
1714                         spin_lock_irqsave(&sched->kss_lock, flags);
1715
1716                         /* I'm the only one that can clear this flag */
1717                         LASSERT(conn->ksnc_rx_scheduled);
1718
1719                         /* Did process_receive get everything it wanted? */
1720                         if (rc == 0)
1721                                 conn->ksnc_rx_ready = 1;
1722                         
1723                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1724                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1725                                 /* Conn blocked for a forwarding buffer.
1726                                  * It will get queued for my attention when
1727                                  * one becomes available (and it might just
1728                                  * already have been!).  Meanwhile my ref
1729                                  * on it stays put. */
1730                         } else if (conn->ksnc_rx_ready) {
1731                                 /* reschedule for rx */
1732                                 list_add_tail (&conn->ksnc_rx_list,
1733                                                &sched->kss_rx_conns);
1734                         } else {
1735                                 conn->ksnc_rx_scheduled = 0;
1736                                 /* drop my ref */
1737                                 ksocknal_put_conn(conn);
1738                         }
1739
1740                         did_something = 1;
1741                 }
1742
1743                 if (!list_empty (&sched->kss_tx_conns)) {
1744                         conn = list_entry(sched->kss_tx_conns.next,
1745                                           ksock_conn_t, ksnc_tx_list);
1746                         list_del (&conn->ksnc_tx_list);
1747                         
1748                         LASSERT(conn->ksnc_tx_scheduled);
1749                         LASSERT(conn->ksnc_tx_ready);
1750                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1751                         
1752                         tx = list_entry(conn->ksnc_tx_queue.next,
1753                                         ksock_tx_t, tx_list);
1754                         /* dequeue now so empty list => more to send */
1755                         list_del(&tx->tx_list);
1756                         
1757                         /* Clear tx_ready in case send isn't complete.  Do
1758                          * it BEFORE we call process_transmit, since
1759                          * write_space can set it any time after we release
1760                          * kss_lock. */
1761                         conn->ksnc_tx_ready = 0;
1762                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1763
1764                         rc = ksocknal_process_transmit(conn, tx);
1765
1766                         spin_lock_irqsave (&sched->kss_lock, flags);
1767
1768                         if (rc == -ENOMEM || rc == -EAGAIN) {
1769                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1770                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1771                         } else {
1772                                 /* Complete send; assume space for more */
1773                                 conn->ksnc_tx_ready = 1;
1774                         }
1775
1776                         if (rc == -ENOMEM) {
1777                                 /* Do nothing; after a short timeout, this
1778                                  * conn will be reposted on kss_tx_conns. */
1779                         } else if (conn->ksnc_tx_ready &&
1780                                    !list_empty (&conn->ksnc_tx_queue)) {
1781                                 /* reschedule for tx */
1782                                 list_add_tail (&conn->ksnc_tx_list, 
1783                                                &sched->kss_tx_conns);
1784                         } else {
1785                                 conn->ksnc_tx_scheduled = 0;
1786                                 /* drop my ref */
1787                                 ksocknal_put_conn (conn);
1788                         }
1789                                 
1790                         did_something = 1;
1791                 }
1792 #if SOCKNAL_ZC
1793                 if (!list_empty (&sched->kss_zctxdone_list)) {
1794                         ksock_tx_t *tx =
1795                                 list_entry(sched->kss_zctxdone_list.next,
1796                                            ksock_tx_t, tx_list);
1797                         did_something = 1;
1798
1799                         list_del (&tx->tx_list);
1800                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1801
1802                         ksocknal_tx_done (tx, 1);
1803
1804                         spin_lock_irqsave (&sched->kss_lock, flags);
1805                 }
1806 #endif
1807                 if (!did_something ||           /* nothing to do */
1808                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1809                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1810
1811                         nloops = 0;
1812
1813                         if (!did_something) {   /* wait for something to do */
1814 #if SOCKNAL_ZC
1815                                 rc = wait_event_interruptible (sched->kss_waitq,
1816                                                                ksocknal_data.ksnd_shuttingdown ||
1817                                                                !list_empty(&sched->kss_rx_conns) ||
1818                                                                !list_empty(&sched->kss_tx_conns) ||
1819                                                                !list_empty(&sched->kss_zctxdone_list));
1820 #else
1821                                 rc = wait_event_interruptible (sched->kss_waitq,
1822                                                                ksocknal_data.ksnd_shuttingdown ||
1823                                                                !list_empty(&sched->kss_rx_conns) ||
1824                                                                !list_empty(&sched->kss_tx_conns));
1825 #endif
1826                                 LASSERT (rc == 0);
1827                         } else
1828                                our_cond_resched();
1829
1830                         spin_lock_irqsave (&sched->kss_lock, flags);
1831                 }
1832         }
1833
1834         spin_unlock_irqrestore (&sched->kss_lock, flags);
1835         ksocknal_thread_fini ();
1836         return (0);
1837 }
1838
1839 void
1840 ksocknal_data_ready (struct sock *sk, int n)
1841 {
1842         unsigned long  flags;
1843         ksock_conn_t  *conn;
1844         ksock_sched_t *sched;
1845         ENTRY;
1846
1847         /* interleave correctly with closing sockets... */
1848         read_lock (&ksocknal_data.ksnd_global_lock);
1849
1850         conn = sk->sk_user_data;
1851         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1852                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1853                 sk->sk_data_ready (sk, n);
1854         } else {
1855                 sched = conn->ksnc_scheduler;
1856
1857                 spin_lock_irqsave (&sched->kss_lock, flags);
1858
1859                 conn->ksnc_rx_ready = 1;
1860
1861                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1862                         list_add_tail(&conn->ksnc_rx_list,
1863                                       &sched->kss_rx_conns);
1864                         conn->ksnc_rx_scheduled = 1;
1865                         /* extra ref for scheduler */
1866                         atomic_inc (&conn->ksnc_refcount);
1867
1868                         wake_up (&sched->kss_waitq);
1869                 }
1870
1871                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1872         }
1873
1874         read_unlock (&ksocknal_data.ksnd_global_lock);
1875
1876         EXIT;
1877 }
1878
1879 void
1880 ksocknal_write_space (struct sock *sk)
1881 {
1882         unsigned long  flags;
1883         ksock_conn_t  *conn;
1884         ksock_sched_t *sched;
1885
1886         /* interleave correctly with closing sockets... */
1887         read_lock (&ksocknal_data.ksnd_global_lock);
1888
1889         conn = sk->sk_user_data;
1890
1891         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1892                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1893                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1894                                       " ready" : " blocked"),
1895                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1896                                       " scheduled" : " idle"),
1897                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1898                                       " empty" : " queued"));
1899
1900         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1901                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1902                 sk->sk_write_space (sk);
1903
1904                 read_unlock (&ksocknal_data.ksnd_global_lock);
1905                 return;
1906         }
1907
1908         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1909                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1910
1911                 sched = conn->ksnc_scheduler;
1912
1913                 spin_lock_irqsave (&sched->kss_lock, flags);
1914
1915                 conn->ksnc_tx_ready = 1;
1916
1917                 if (!conn->ksnc_tx_scheduled && // not being progressed
1918                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1919                         list_add_tail (&conn->ksnc_tx_list,
1920                                        &sched->kss_tx_conns);
1921                         conn->ksnc_tx_scheduled = 1;
1922                         /* extra ref for scheduler */
1923                         atomic_inc (&conn->ksnc_refcount);
1924
1925                         wake_up (&sched->kss_waitq);
1926                 }
1927
1928                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1929         }
1930
1931         read_unlock (&ksocknal_data.ksnd_global_lock);
1932 }
1933
1934 int
1935 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1936 {
1937         int           rc;
1938         mm_segment_t  oldmm = get_fs();
1939
1940         while (nob > 0) {
1941                 struct iovec  iov = {
1942                         .iov_base = buffer,
1943                         .iov_len  = nob
1944                 };
1945                 struct msghdr msg = {
1946                         .msg_name       = NULL,
1947                         .msg_namelen    = 0,
1948                         .msg_iov        = &iov,
1949                         .msg_iovlen     = 1,
1950                         .msg_control    = NULL,
1951                         .msg_controllen = 0,
1952                         .msg_flags      = 0
1953                 };
1954
1955                 set_fs (KERNEL_DS);
1956                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1957                 set_fs (oldmm);
1958                 
1959                 if (rc < 0)
1960                         return (rc);
1961
1962                 if (rc == 0) {
1963                         CERROR ("Unexpected zero rc\n");
1964                         return (-ECONNABORTED);
1965                 }
1966
1967                 buffer = ((char *)buffer) + rc;
1968                 nob -= rc;
1969         }
1970         
1971         return (0);
1972 }
1973
1974 int
1975 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1976 {
1977         int           rc;
1978         mm_segment_t  oldmm = get_fs();
1979         
1980         while (nob > 0) {
1981                 struct iovec  iov = {
1982                         .iov_base = buffer,
1983                         .iov_len  = nob
1984                 };
1985                 struct msghdr msg = {
1986                         .msg_name       = NULL,
1987                         .msg_namelen    = 0,
1988                         .msg_iov        = &iov,
1989                         .msg_iovlen     = 1,
1990                         .msg_control    = NULL,
1991                         .msg_controllen = 0,
1992                         .msg_flags      = 0
1993                 };
1994
1995                 set_fs (KERNEL_DS);
1996                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
1997                 set_fs (oldmm);
1998                 
1999                 if (rc < 0)
2000                         return (rc);
2001
2002                 if (rc == 0)
2003                         return (-ECONNABORTED);
2004
2005                 buffer = ((char *)buffer) + rc;
2006                 nob -= rc;
2007         }
2008         
2009         return (0);
2010 }
2011
2012 int
2013 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
2014                 __u64 *incarnation)
2015 {
2016         int                 rc;
2017         ptl_hdr_t           hdr;
2018         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2019         char                ipbuf[PTL_NALFMT_SIZE];
2020         char                ipbuf2[PTL_NALFMT_SIZE];
2021
2022         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2023
2024         memset (&hdr, 0, sizeof (hdr));
2025         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
2026         hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
2027         hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
2028
2029         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
2030         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
2031
2032         hdr.msg.hello.type = __cpu_to_le32 (*type);
2033         hdr.msg.hello.incarnation =
2034                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2035
2036         /* Assume sufficient socket buffering for this message */
2037         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
2038         if (rc != 0) {
2039                 CERROR ("Error %d sending HELLO to "LPX64" %s\n",
2040                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2041                 return (rc);
2042         }
2043
2044         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2045         if (rc != 0) {
2046                 CERROR ("Error %d reading HELLO from "LPX64" %s\n",
2047                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2048                 return (rc);
2049         }
2050
2051         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2052                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
2053                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
2054                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2055                 return (-EPROTO);
2056         }
2057
2058         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2059             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2060                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2061                         " from "LPX64" %s\n",
2062                         __le16_to_cpu (hmv->version_major),
2063                         __le16_to_cpu (hmv->version_minor),
2064                         PORTALS_PROTO_VERSION_MAJOR,
2065                         PORTALS_PROTO_VERSION_MINOR,
2066                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2067                 return (-EPROTO);
2068         }
2069
2070 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
2071 # error "This code only understands protocol version 0.x"
2072 #endif
2073         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2074          * so read the rest of it in now... */
2075
2076         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2077         if (rc != 0) {
2078                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
2079                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2080                 return (rc);
2081         }
2082
2083         /* ...and check we got what we expected */
2084         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2085             hdr.payload_length != __cpu_to_le32 (0)) {
2086                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2087                         " but got type %d with %d payload from "LPX64" %s\n",
2088                         __le32_to_cpu (hdr.type),
2089                         __le32_to_cpu (hdr.payload_length), *nid,
2090                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2091                 return (-EPROTO);
2092         }
2093
2094         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2095                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2096                 return (-EPROTO);
2097         }
2098
2099         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2100                 *nid = __le64_to_cpu(hdr.src_nid);
2101         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2102                 CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
2103                         __le64_to_cpu (hdr.src_nid),
2104                         portals_nid2str(SOCKNAL,
2105                                         __le64_to_cpu(hdr.src_nid),
2106                                         ipbuf),
2107                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2));
2108                 return (-EPROTO);
2109         }
2110
2111         if (*type == SOCKNAL_CONN_NONE) {
2112                 /* I've accepted this connection; peer determines type */
2113                 *type = __le32_to_cpu(hdr.msg.hello.type);
2114                 switch (*type) {
2115                 case SOCKNAL_CONN_ANY:
2116                 case SOCKNAL_CONN_CONTROL:
2117                         break;
2118                 case SOCKNAL_CONN_BULK_IN:
2119                         *type = SOCKNAL_CONN_BULK_OUT;
2120                         break;
2121                 case SOCKNAL_CONN_BULK_OUT:
2122                         *type = SOCKNAL_CONN_BULK_IN;
2123                         break;
2124                 default:
2125                         CERROR ("Unexpected type %d from "LPX64" %s\n",
2126                                 *type, *nid,
2127                                 portals_nid2str(SOCKNAL, *nid, ipbuf));
2128                         return (-EPROTO);
2129                 }
2130         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2131                 CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
2132                         *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
2133                         __le32_to_cpu(hdr.msg.hello.type));
2134                 return (-EPROTO);
2135         }
2136
2137         *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2138
2139         return (0);
2140 }
2141
2142 int
2143 ksocknal_setup_sock (struct socket *sock)
2144 {
2145         mm_segment_t    oldmm = get_fs ();
2146         int             rc;
2147         int             option;
2148         struct linger   linger;
2149
2150 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2151         sock->sk->sk_allocation = GFP_NOFS;
2152 #else
2153         sock->sk->allocation = GFP_NOFS;
2154 #endif
2155
2156         /* Ensure this socket aborts active sends immediately when we close
2157          * it. */
2158
2159         linger.l_onoff = 0;
2160         linger.l_linger = 0;
2161
2162         set_fs (KERNEL_DS);
2163         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2164                               (char *)&linger, sizeof (linger));
2165         set_fs (oldmm);
2166         if (rc != 0) {
2167                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2168                 return (rc);
2169         }
2170
2171         option = -1;
2172         set_fs (KERNEL_DS);
2173         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2174                                     (char *)&option, sizeof (option));
2175         set_fs (oldmm);
2176         if (rc != 0) {
2177                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2178                 return (rc);
2179         }
2180
2181 #if SOCKNAL_USE_KEEPALIVES
2182         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2183          * second until the timeout elapses. */
2184
2185         option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2186         set_fs (KERNEL_DS);
2187         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2188                                     (char *)&option, sizeof (option));
2189         set_fs (oldmm);
2190         if (rc != 0) {
2191                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2192                 return (rc);
2193         }
2194         
2195         option = 1;
2196         set_fs (KERNEL_DS);
2197         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2198                                     (char *)&option, sizeof (option));
2199         set_fs (oldmm);
2200         if (rc != 0) {
2201                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2202                 return (rc);
2203         }
2204         
2205         option = ksocknal_data.ksnd_io_timeout / 4;
2206         set_fs (KERNEL_DS);
2207         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2208                                     (char *)&option, sizeof (option));
2209         set_fs (oldmm);
2210         if (rc != 0) {
2211                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2212                 return (rc);
2213         }
2214
2215         option = 1;
2216         set_fs (KERNEL_DS);
2217         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2218                               (char *)&option, sizeof (option));
2219         set_fs (oldmm);
2220         if (rc != 0) {
2221                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2222                 return (rc);
2223         }
2224 #endif
2225         return (0);
2226 }
2227
2228 int
2229 ksocknal_connect_peer (ksock_route_t *route, int type)
2230 {
2231         struct sockaddr_in  peer_addr;
2232         mm_segment_t        oldmm = get_fs();
2233         struct timeval      tv;
2234         int                 fd;
2235         struct socket      *sock;
2236         int                 rc;
2237         char                ipbuf[PTL_NALFMT_SIZE];
2238
2239         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2240         if (rc != 0) {
2241                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2242                 return (rc);
2243         }
2244
2245         /* Ugh; have to map_fd for compatibility with sockets passed in
2246          * from userspace.  And we actually need the sock->file refcounting
2247          * that this gives you :) */
2248
2249         fd = sock_map_fd (sock);
2250         if (fd < 0) {
2251                 sock_release (sock);
2252                 CERROR ("sock_map_fd error %d\n", fd);
2253                 return (fd);
2254         }
2255
2256         /* NB the fd now owns the ref on sock->file */
2257         LASSERT (sock->file != NULL);
2258         LASSERT (file_count(sock->file) == 1);
2259
2260         /* Set the socket timeouts, so our connection attempt completes in
2261          * finite time */
2262         tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2263         tv.tv_usec = 0;
2264
2265         set_fs (KERNEL_DS);
2266         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2267                               (char *)&tv, sizeof (tv));
2268         set_fs (oldmm);
2269         if (rc != 0) {
2270                 CERROR ("Can't set send timeout %d: %d\n", 
2271                         ksocknal_data.ksnd_io_timeout, rc);
2272                 goto out;
2273         }
2274         
2275         set_fs (KERNEL_DS);
2276         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2277                               (char *)&tv, sizeof (tv));
2278         set_fs (oldmm);
2279         if (rc != 0) {
2280                 CERROR ("Can't set receive timeout %d: %d\n",
2281                         ksocknal_data.ksnd_io_timeout, rc);
2282                 goto out;
2283         }
2284
2285         {
2286                 int  option = 1;
2287                 
2288                 set_fs (KERNEL_DS);
2289                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2290                                             (char *)&option, sizeof (option));
2291                 set_fs (oldmm);
2292                 if (rc != 0) {
2293                         CERROR ("Can't disable nagle: %d\n", rc);
2294                         goto out;
2295                 }
2296         }
2297         
2298         if (route->ksnr_buffer_size != 0) {
2299                 int option = route->ksnr_buffer_size;
2300                 
2301                 set_fs (KERNEL_DS);
2302                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2303                                       (char *)&option, sizeof (option));
2304                 set_fs (oldmm);
2305                 if (rc != 0) {
2306                         CERROR ("Can't set send buffer %d: %d\n",
2307                                 route->ksnr_buffer_size, rc);
2308                         goto out;
2309                 }
2310
2311                 set_fs (KERNEL_DS);
2312                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2313                                       (char *)&option, sizeof (option));
2314                 set_fs (oldmm);
2315                 if (rc != 0) {
2316                         CERROR ("Can't set receive buffer %d: %d\n",
2317                                 route->ksnr_buffer_size, rc);
2318                         goto out;
2319                 }
2320         }
2321         
2322         memset (&peer_addr, 0, sizeof (peer_addr));
2323         peer_addr.sin_family = AF_INET;
2324         peer_addr.sin_port = htons (route->ksnr_port);
2325         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2326         
2327         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2328                                  sizeof (peer_addr), sock->file->f_flags);
2329         if (rc != 0) {
2330                 CERROR ("Error %d connecting to "LPX64" %s\n", rc,
2331                         route->ksnr_peer->ksnp_nid,
2332                         portals_nid2str(SOCKNAL,
2333                                         route->ksnr_peer->ksnp_nid,
2334                                         ipbuf));
2335                 goto out;
2336         }
2337         
2338         rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2339         if (rc == 0) {
2340                 /* Take an extra ref on sock->file to compensate for the
2341                  * upcoming close which will lose fd's ref on it. */
2342                 get_file (sock->file);
2343         }
2344
2345  out:
2346         sys_close (fd);
2347         return (rc);
2348 }
2349
2350 void
2351 ksocknal_autoconnect (ksock_route_t *route)
2352 {
2353         LIST_HEAD        (zombies);
2354         ksock_tx_t       *tx;
2355         ksock_peer_t     *peer;
2356         unsigned long     flags;
2357         int               rc;
2358         int               type;
2359         
2360         for (;;) {
2361                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2362                         if ((route->ksnr_connecting & (1 << type)) != 0)
2363                                 break;
2364                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2365
2366                 rc = ksocknal_connect_peer (route, type);
2367
2368                 if (rc != 0)
2369                         break;
2370                 
2371                 /* successfully autoconnected: create_conn did the
2372                  * route/conn binding and scheduled any blocked packets */
2373
2374                 if (route->ksnr_connecting == 0) {
2375                         /* No more connections required */
2376                         return;
2377                 }
2378         }
2379
2380         /* Connection attempt failed */
2381
2382         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2383
2384         peer = route->ksnr_peer;
2385         route->ksnr_connecting = 0;
2386
2387         /* This is a retry rather than a new connection */
2388         LASSERT (route->ksnr_retry_interval != 0);
2389         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2390         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2391                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2392
2393         if (!list_empty (&peer->ksnp_tx_queue) &&
2394             ksocknal_find_connecting_route_locked (peer) == NULL) {
2395                 LASSERT (list_empty (&peer->ksnp_conns));
2396
2397                 /* None of the connections that the blocked packets are
2398                  * waiting for have been successful.  Complete them now... */
2399                 do {
2400                         tx = list_entry (peer->ksnp_tx_queue.next,
2401                                          ksock_tx_t, tx_list);
2402                         list_del (&tx->tx_list);
2403                         list_add_tail (&tx->tx_list, &zombies);
2404                 } while (!list_empty (&peer->ksnp_tx_queue));
2405         }
2406
2407         /* make this route least-favourite for re-selection */
2408         if (!route->ksnr_deleted) {
2409                 list_del(&route->ksnr_list);
2410                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2411         }
2412         
2413         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2414
2415         while (!list_empty (&zombies)) {
2416                 char ipbuf[PTL_NALFMT_SIZE];
2417                 char ipbuf2[PTL_NALFMT_SIZE];
2418                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2419
2420                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2421                         NTOH__u32 (tx->tx_hdr->type),
2422                         NTOH__u32 (tx->tx_hdr->payload_length),
2423                         NTOH__u64 (tx->tx_hdr->src_nid),
2424                         portals_nid2str(SOCKNAL,
2425                                         NTOH__u64(tx->tx_hdr->src_nid),
2426                                         ipbuf),
2427                         NTOH__u64 (tx->tx_hdr->dest_nid),
2428                         portals_nid2str(SOCKNAL,
2429                                         NTOH__u64(tx->tx_hdr->src_nid),
2430                                         ipbuf2));
2431
2432                 list_del (&tx->tx_list);
2433                 /* complete now */
2434                 ksocknal_tx_done (tx, 0);
2435         }
2436 }
2437
2438 int
2439 ksocknal_autoconnectd (void *arg)
2440 {
2441         long               id = (long)arg;
2442         char               name[16];
2443         unsigned long      flags;
2444         ksock_route_t     *route;
2445         int                rc;
2446
2447         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2448         kportal_daemonize (name);
2449         kportal_blockallsigs ();
2450
2451         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2452
2453         while (!ksocknal_data.ksnd_shuttingdown) {
2454
2455                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2456                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2457                                             ksock_route_t, ksnr_connect_list);
2458                         
2459                         list_del (&route->ksnr_connect_list);
2460                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2461
2462                         ksocknal_autoconnect (route);
2463                         ksocknal_put_route (route);
2464
2465                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2466                         continue;
2467                 }
2468                 
2469                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2470
2471                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2472                                                ksocknal_data.ksnd_shuttingdown ||
2473                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2474
2475                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2476         }
2477
2478         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2479
2480         ksocknal_thread_fini ();
2481         return (0);
2482 }
2483
2484 ksock_conn_t *
2485 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2486 {
2487         /* We're called with a shared lock on ksnd_global_lock */
2488         ksock_conn_t      *conn;
2489         struct list_head  *ctmp;
2490         ksock_sched_t     *sched;
2491
2492         list_for_each (ctmp, &peer->ksnp_conns) {
2493                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2494                 sched = conn->ksnc_scheduler;
2495
2496                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2497                 LASSERT (!conn->ksnc_closing);
2498                 
2499                 if (conn->ksnc_rx_started &&
2500                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2501                         /* Timed out incomplete incoming message */
2502                         atomic_inc (&conn->ksnc_refcount);
2503                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2504                                 peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
2505                         return (conn);
2506                 }
2507                 
2508                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2509                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2510                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2511                         /* Timed out messages queued for sending, or
2512                          * messages buffered in the socket's send buffer */
2513                         atomic_inc (&conn->ksnc_refcount);
2514                         CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", 
2515                                 peer->ksnp_nid, 
2516                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2517                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2518                                 HIPQUAD(conn->ksnc_ipaddr));
2519                         return (conn);
2520                 }
2521         }
2522
2523         return (NULL);
2524 }
2525
2526 void
2527 ksocknal_check_peer_timeouts (int idx)
2528 {
2529         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2530         struct list_head *ptmp;
2531         ksock_peer_t     *peer;
2532         ksock_conn_t     *conn;
2533
2534  again:
2535         /* NB. We expect to have a look at all the peers and not find any
2536          * connections to time out, so we just use a shared lock while we
2537          * take a look... */
2538         read_lock (&ksocknal_data.ksnd_global_lock);
2539
2540         list_for_each (ptmp, peers) {
2541                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2542                 conn = ksocknal_find_timed_out_conn (peer);
2543                 
2544                 if (conn != NULL) {
2545                         read_unlock (&ksocknal_data.ksnd_global_lock);
2546
2547                         CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2548                                 peer->ksnp_nid,
2549                                 HIPQUAD(conn->ksnc_ipaddr),
2550                                 conn->ksnc_port);
2551                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2552                         
2553                         /* NB we won't find this one again, but we can't
2554                          * just proceed with the next peer, since we dropped
2555                          * ksnd_global_lock and it might be dead already! */
2556                         ksocknal_put_conn (conn);
2557                         goto again;
2558                 }
2559         }
2560
2561         read_unlock (&ksocknal_data.ksnd_global_lock);
2562 }
2563
2564 int
2565 ksocknal_reaper (void *arg)
2566 {
2567         wait_queue_t       wait;
2568         unsigned long      flags;
2569         ksock_conn_t      *conn;
2570         ksock_sched_t     *sched;
2571         struct list_head   enomem_conns;
2572         int                nenomem_conns;
2573         int                timeout;
2574         int                i;
2575         int                peer_index = 0;
2576         unsigned long      deadline = jiffies;
2577         
2578         kportal_daemonize ("ksocknal_reaper");
2579         kportal_blockallsigs ();
2580
2581         INIT_LIST_HEAD(&enomem_conns);
2582         init_waitqueue_entry (&wait, current);
2583
2584         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2585
2586         while (!ksocknal_data.ksnd_shuttingdown) {
2587
2588                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2589                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2590                                            ksock_conn_t, ksnc_list);
2591                         list_del (&conn->ksnc_list);
2592                         
2593                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2594
2595                         ksocknal_terminate_conn (conn);
2596                         ksocknal_put_conn (conn);
2597
2598                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2599                         continue;
2600                 }
2601
2602                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2603                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2604                                            ksock_conn_t, ksnc_list);
2605                         list_del (&conn->ksnc_list);
2606                         
2607                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2608
2609                         ksocknal_destroy_conn (conn);
2610
2611                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2612                         continue;
2613                 }
2614
2615                 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2616                         list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2617                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2618                 }
2619
2620                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2621
2622                 /* reschedule all the connections that stalled with ENOMEM... */
2623                 nenomem_conns = 0;
2624                 while (!list_empty (&enomem_conns)) {
2625                         conn = list_entry (enomem_conns.next,
2626                                            ksock_conn_t, ksnc_tx_list);
2627                         list_del (&conn->ksnc_tx_list);
2628
2629                         sched = conn->ksnc_scheduler;
2630
2631                         spin_lock_irqsave (&sched->kss_lock, flags);
2632
2633                         LASSERT (conn->ksnc_tx_scheduled);
2634                         conn->ksnc_tx_ready = 1;
2635                         list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2636                         wake_up (&sched->kss_waitq);
2637
2638                         spin_unlock_irqrestore (&sched->kss_lock, flags);
2639                         nenomem_conns++;
2640                 }
2641                 
2642                 /* careful with the jiffy wrap... */
2643                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2644                         const int n = 4;
2645                         const int p = 1;
2646                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2647                         
2648                         /* Time to check for timeouts on a few more peers: I do
2649                          * checks every 'p' seconds on a proportion of the peer
2650                          * table and I need to check every connection 'n' times
2651                          * within a timeout interval, to ensure I detect a
2652                          * timeout on any connection within (n+1)/n times the
2653                          * timeout interval. */
2654
2655                         if (ksocknal_data.ksnd_io_timeout > n * p)
2656                                 chunk = (chunk * n * p) / 
2657                                         ksocknal_data.ksnd_io_timeout;
2658                         if (chunk == 0)
2659                                 chunk = 1;
2660
2661                         for (i = 0; i < chunk; i++) {
2662                                 ksocknal_check_peer_timeouts (peer_index);
2663                                 peer_index = (peer_index + 1) % 
2664                                              ksocknal_data.ksnd_peer_hash_size;
2665                         }
2666
2667                         deadline += p * HZ;
2668                 }
2669
2670                 if (nenomem_conns != 0) {
2671                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2672                          * This also prevents me getting woken immediately
2673                          * if any go back on my enomem list. */
2674                         timeout = SOCKNAL_ENOMEM_RETRY;
2675                 }
2676                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2677
2678                 set_current_state (TASK_INTERRUPTIBLE);
2679                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2680
2681                 if (!ksocknal_data.ksnd_shuttingdown &&
2682                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2683                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2684                         schedule_timeout (timeout);
2685
2686                 set_current_state (TASK_RUNNING);
2687                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2688
2689                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2690         }
2691
2692         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2693
2694         ksocknal_thread_fini ();
2695         return (0);
2696 }
2697
2698 nal_cb_t ksocknal_lib = {
2699         nal_data:       &ksocknal_data,                /* NAL private data */
2700         cb_send:         ksocknal_send,
2701         cb_send_pages:   ksocknal_send_pages,
2702         cb_recv:         ksocknal_recv,
2703         cb_recv_pages:   ksocknal_recv_pages,
2704         cb_read:         ksocknal_read,
2705         cb_write:        ksocknal_write,
2706         cb_malloc:       ksocknal_malloc,
2707         cb_free:         ksocknal_free,
2708         cb_printf:       ksocknal_printf,
2709         cb_cli:          ksocknal_cli,
2710         cb_sti:          ksocknal_sti,
2711         cb_callback:     ksocknal_callback,
2712         cb_dist:         ksocknal_dist
2713 };