Whamcloud - gitweb
Print out dotted-quad IP addresses in the socknal (from 1.0.4)
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 /*
29  *  LIB functions follow
30  *
31  */
32 int
33 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
34               user_ptr src_addr, size_t len)
35 {
36         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
37                nal->ni.nid, (long)len, src_addr, dst_addr);
38
39         memcpy( dst_addr, src_addr, len );
40         return 0;
41 }
42
43 int
44 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
45                void *src_addr, size_t len)
46 {
47         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
48                nal->ni.nid, (long)len, src_addr, dst_addr);
49
50         memcpy( dst_addr, src_addr, len );
51         return 0;
52 }
53
54 int
55 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
56                          ptl_event_t *ev)
57 {
58         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
59                nal->ni.nid, eq, ev);
60
61         if (eq->event_callback != NULL)
62                 eq->event_callback(ev);
63
64         return 0;
65 }
66
67 void *
68 ksocknal_malloc(nal_cb_t *nal, size_t len)
69 {
70         void *buf;
71
72         PORTAL_ALLOC(buf, len);
73
74         if (buf != NULL)
75                 memset(buf, 0, len);
76
77         return (buf);
78 }
79
80 void
81 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
82 {
83         PORTAL_FREE(buf, len);
84 }
85
86 void
87 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
88 {
89         va_list ap;
90         char msg[256];
91
92         va_start (ap, fmt);
93         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
94         va_end (ap);
95
96         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
97
98         CDEBUG (D_NET, "%s", msg);
99 }
100
101 void
102 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *data = nal->nal_data;
105
106         spin_lock(&data->ksnd_nal_cb_lock);
107 }
108
109 void
110 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data;
113         data = nal->nal_data;
114
115         spin_unlock(&data->ksnd_nal_cb_lock);
116 }
117
118 int
119 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
120 {
121         /* I would guess that if ksocknal_get_peer (nid) == NULL,
122            and we're not routing, then 'nid' is very distant :) */
123         if ( nal->ni.nid == nid ) {
124                 *dist = 0;
125         } else {
126                 *dist = 1;
127         }
128
129         return 0;
130 }
131
132 void
133 ksocknal_free_ltx (ksock_ltx_t *ltx)
134 {
135         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
136         PORTAL_FREE(ltx, ltx->ltx_desc_size);
137 }
138
139 #if SOCKNAL_ZC
140 struct page *
141 ksocknal_kvaddr_to_page (unsigned long vaddr)
142 {
143         struct page *page;
144
145         if (vaddr >= VMALLOC_START &&
146             vaddr < VMALLOC_END)
147                 page = vmalloc_to_page ((void *)vaddr);
148 #if CONFIG_HIGHMEM
149         else if (vaddr >= PKMAP_BASE &&
150                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
151                 page = vmalloc_to_page ((void *)vaddr);
152                 /* in 2.4 ^ just walks the page tables */
153 #endif
154         else
155                 page = virt_to_page (vaddr);
156
157         if (page == NULL ||
158             !VALID_PAGE (page))
159                 return (NULL);
160
161         return (page);
162 }
163 #endif
164
165 int
166 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
167 {
168         struct socket *sock = conn->ksnc_sock;
169         struct iovec  *iov = tx->tx_iov;
170         int            fragsize = iov->iov_len;
171         unsigned long  vaddr = (unsigned long)iov->iov_base;
172         int            more = (tx->tx_niov > 1) || 
173                               (tx->tx_nkiov > 0) ||
174                               (!list_empty (&conn->ksnc_tx_queue));
175 #if SOCKNAL_ZC
176         int            offset = vaddr & (PAGE_SIZE - 1);
177         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
178         struct page   *page;
179 #endif
180         int            rc;
181
182         /* NB we can't trust socket ops to either consume our iovs
183          * or leave them alone, so we only send 1 frag at a time. */
184         LASSERT (fragsize <= tx->tx_resid);
185         LASSERT (tx->tx_niov > 0);
186         
187 #if SOCKNAL_ZC
188         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
189             (sock->sk->route_caps & NETIF_F_SG) &&
190             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
191             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
192                 
193                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
194                        (void *)vaddr, page, page_address(page), offset, zcsize);
195
196                 if (fragsize > zcsize) {
197                         more = 1;
198                         fragsize = zcsize;
199                 }
200
201                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
202                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
203                                        &tx->tx_zccd);
204         } else
205 #endif
206         {
207                 /* NB don't pass tx's iov; sendmsg may or may not update it */
208                 struct iovec fragiov = { .iov_base = (void *)vaddr,
209                                          .iov_len  = fragsize};
210                 struct msghdr msg = {
211                         .msg_name       = NULL,
212                         .msg_namelen    = 0,
213                         .msg_iov        = &fragiov,
214                         .msg_iovlen     = 1,
215                         .msg_control    = NULL,
216                         .msg_controllen = 0,
217                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
218                 };
219                 mm_segment_t oldmm = get_fs();
220
221                 set_fs (KERNEL_DS);
222                 rc = sock_sendmsg(sock, &msg, fragsize);
223                 set_fs (oldmm);
224         } 
225
226         if (rc > 0) {
227                 tx->tx_resid -= rc;
228
229                 if (rc < iov->iov_len) {
230                         /* didn't send whole iov entry... */
231                         iov->iov_base = (void *)(vaddr + rc);
232                         iov->iov_len -= rc;
233                 } else {
234                         tx->tx_iov++;
235                         tx->tx_niov--;
236                 }
237         }
238         
239         return (rc);
240 }
241
242 int
243 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
244 {
245         struct socket *sock = conn->ksnc_sock;
246         ptl_kiov_t    *kiov = tx->tx_kiov;
247         int            fragsize = kiov->kiov_len;
248         struct page   *page = kiov->kiov_page;
249         int            offset = kiov->kiov_offset;
250         int            more = (tx->tx_nkiov > 1) ||
251                               (!list_empty (&conn->ksnc_tx_queue));
252         int            rc;
253
254         /* NB we can't trust socket ops to either consume our iovs
255          * or leave them alone, so we only send 1 frag at a time. */
256         LASSERT (fragsize <= tx->tx_resid);
257         LASSERT (offset + fragsize <= PAGE_SIZE);
258         LASSERT (tx->tx_niov == 0);
259         LASSERT (tx->tx_nkiov > 0);
260
261 #if SOCKNAL_ZC
262         if (fragsize >= ksocknal_data.ksnd_zc_min_frag &&
263             (sock->sk->route_caps & NETIF_F_SG) &&
264             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
265
266                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
267                                page, offset, fragsize);
268
269                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
270                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
271                                        &tx->tx_zccd);
272         } else
273 #endif
274         {
275                 char *addr = ((char *)kmap (page)) + offset;
276                 struct iovec fragiov = {.iov_base = addr,
277                                         .iov_len  = fragsize};
278                 struct msghdr msg = {
279                         .msg_name       = NULL,
280                         .msg_namelen    = 0,
281                         .msg_iov        = &fragiov,
282                         .msg_iovlen     = 1,
283                         .msg_control    = NULL,
284                         .msg_controllen = 0,
285                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
286                 };
287                 mm_segment_t  oldmm = get_fs();
288                 
289                 set_fs (KERNEL_DS);
290                 rc = sock_sendmsg(sock, &msg, fragsize);
291                 set_fs (oldmm);
292
293                 kunmap (page);
294         }
295
296         if (rc > 0) {
297                 tx->tx_resid -= rc;
298  
299                 if (rc < fragsize) {
300                         kiov->kiov_offset = offset + rc;
301                         kiov->kiov_len    = fragsize - rc;
302                 } else {
303                         tx->tx_kiov++;
304                         tx->tx_nkiov--;
305                 }
306         }
307
308         return (rc);
309 }
310
311 int
312 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
313 {
314         int      rc;
315         
316         if (ksocknal_data.ksnd_stall_tx != 0) {
317                 set_current_state (TASK_UNINTERRUPTIBLE);
318                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
319         }
320
321         LASSERT (tx->tx_resid != 0);
322
323         rc = ksocknal_getconnsock (conn);
324         if (rc != 0) {
325                 LASSERT (conn->ksnc_closing);
326                 return (-ESHUTDOWN);
327         }
328
329         do {
330                 if (ksocknal_data.ksnd_enomem_tx > 0) {
331                         /* testing... */
332                         ksocknal_data.ksnd_enomem_tx--;
333                         rc = -EAGAIN;
334                 } else if (tx->tx_niov != 0) {
335                         rc = ksocknal_send_iov (conn, tx);
336                 } else {
337                         rc = ksocknal_send_kiov (conn, tx);
338                 }
339
340                 if (rc <= 0) {
341                         /* Didn't write anything.
342                          *
343                          * NB: rc == 0 and rc == -EAGAIN both mean try
344                          * again later (linux stack returns -EAGAIN for
345                          * this, but Adaptech TOE returns 0).
346                          *
347                          * Also, sends never fail with -ENOMEM, just
348                          * -EAGAIN, but with the added bonus that we can't
349                          * expect write_space() to call us back to tell us
350                          * when to try sending again.  We use the
351                          * SOCK_NOSPACE flag to diagnose...  */
352
353                         LASSERT(rc != -ENOMEM);
354
355                         if (rc == 0 || rc == -EAGAIN) {
356                                 if (test_bit(SOCK_NOSPACE, 
357                                              &conn->ksnc_sock->flags)) {
358                                         rc = -EAGAIN;
359                                 } else {
360                                         static int counter;
361                          
362                                         counter++;
363                                         if ((counter & (-counter)) == counter)
364                                                 CWARN("%d ENOMEM tx %p\n", 
365                                                       counter, conn);
366                                         rc = -ENOMEM;
367                                 }
368                         }
369                         break;
370                 }
371
372                 rc = 0;
373
374                 /* Consider the connection alive since we managed to chuck
375                  * more data into it.  Really, we'd like to consider it
376                  * alive only when the peer ACKs something, but
377                  * write_space() only gets called back while SOCK_NOSPACE
378                  * is set.  Instead, we presume peer death has occurred if
379                  * the socket doesn't drain within a timout */
380                 conn->ksnc_tx_deadline = jiffies + 
381                                          ksocknal_data.ksnd_io_timeout * HZ;
382                 conn->ksnc_peer->ksnp_last_alive = jiffies;
383
384         } while (tx->tx_resid != 0);
385
386         ksocknal_putconnsock (conn);
387         return (rc);
388 }
389
390 void
391 ksocknal_eager_ack (ksock_conn_t *conn)
392 {
393         int            opt = 1;
394         mm_segment_t   oldmm = get_fs();
395         struct socket *sock = conn->ksnc_sock;
396         
397         /* Remind the socket to ACK eagerly.  If I don't, the socket might
398          * think I'm about to send something it could piggy-back the ACK
399          * on, introducing delay in completing zero-copy sends in my
400          * peer. */
401
402         set_fs(KERNEL_DS);
403         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
404                                (char *)&opt, sizeof (opt));
405         set_fs(oldmm);
406 }
407
408 int
409 ksocknal_recv_iov (ksock_conn_t *conn)
410 {
411         struct iovec *iov = conn->ksnc_rx_iov;
412         int           fragsize  = iov->iov_len;
413         unsigned long vaddr = (unsigned long)iov->iov_base;
414         struct iovec  fragiov = { .iov_base = (void *)vaddr,
415                                   .iov_len  = fragsize};
416         struct msghdr msg = {
417                 .msg_name       = NULL,
418                 .msg_namelen    = 0,
419                 .msg_iov        = &fragiov,
420                 .msg_iovlen     = 1,
421                 .msg_control    = NULL,
422                 .msg_controllen = 0,
423                 .msg_flags      = 0
424         };
425         mm_segment_t oldmm = get_fs();
426         int          rc;
427
428         /* NB we can't trust socket ops to either consume our iovs
429          * or leave them alone, so we only receive 1 frag at a time. */
430         LASSERT (conn->ksnc_rx_niov > 0);
431         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
432
433         set_fs (KERNEL_DS);
434         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
435         /* NB this is just a boolean............................^ */
436         set_fs (oldmm);
437
438         if (rc <= 0)
439                 return (rc);
440
441         /* received something... */
442         conn->ksnc_peer->ksnp_last_alive = jiffies;
443         conn->ksnc_rx_deadline = jiffies + 
444                                  ksocknal_data.ksnd_io_timeout * HZ;
445         mb();                           /* order with setting rx_started */
446         conn->ksnc_rx_started = 1;
447
448         conn->ksnc_rx_nob_wanted -= rc;
449         conn->ksnc_rx_nob_left -= rc;
450                 
451         if (rc < fragsize) {
452                 iov->iov_base = (void *)(vaddr + rc);
453                 iov->iov_len = fragsize - rc;
454                 return (-EAGAIN);
455         }
456
457         conn->ksnc_rx_iov++;
458         conn->ksnc_rx_niov--;
459         return (1);
460 }
461
462 int
463 ksocknal_recv_kiov (ksock_conn_t *conn)
464 {
465         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
466         struct page  *page = kiov->kiov_page;
467         int           offset = kiov->kiov_offset;
468         int           fragsize = kiov->kiov_len;
469         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
470         struct iovec  fragiov = { .iov_base = (void *)vaddr,
471                                   .iov_len  = fragsize};
472         struct msghdr msg = {
473                 .msg_name       = NULL,
474                 .msg_namelen    = 0,
475                 .msg_iov        = &fragiov,
476                 .msg_iovlen     = 1,
477                 .msg_control    = NULL,
478                 .msg_controllen = 0,
479                 .msg_flags      = 0
480         };
481         mm_segment_t oldmm = get_fs();
482         int          rc;
483
484         /* NB we can't trust socket ops to either consume our iovs
485          * or leave them alone, so we only receive 1 frag at a time. */
486         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
487         LASSERT (conn->ksnc_rx_nkiov > 0);
488         LASSERT (offset + fragsize <= PAGE_SIZE);
489
490         set_fs (KERNEL_DS);
491         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
492         /* NB this is just a boolean............................^ */
493         set_fs (oldmm);
494
495         kunmap (page);
496         
497         if (rc <= 0)
498                 return (rc);
499         
500         /* received something... */
501         conn->ksnc_peer->ksnp_last_alive = jiffies;
502         conn->ksnc_rx_deadline = jiffies + 
503                                  ksocknal_data.ksnd_io_timeout * HZ;
504         mb();                           /* order with setting rx_started */
505         conn->ksnc_rx_started = 1;
506
507         conn->ksnc_rx_nob_wanted -= rc;
508         conn->ksnc_rx_nob_left -= rc;
509                 
510         if (rc < fragsize) {
511                 kiov->kiov_offset = offset + rc;
512                 kiov->kiov_len = fragsize - rc;
513                 return (-EAGAIN);
514         }
515
516         conn->ksnc_rx_kiov++;
517         conn->ksnc_rx_nkiov--;
518         return (1);
519 }
520
521 int
522 ksocknal_receive (ksock_conn_t *conn) 
523 {
524         /* Return 1 on success, 0 on EOF, < 0 on error.
525          * Caller checks ksnc_rx_nob_wanted to determine
526          * progress/completion. */
527         int     rc;
528         ENTRY;
529         
530         if (ksocknal_data.ksnd_stall_rx != 0) {
531                 set_current_state (TASK_UNINTERRUPTIBLE);
532                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
533         }
534
535         rc = ksocknal_getconnsock (conn);
536         if (rc != 0) {
537                 LASSERT (conn->ksnc_closing);
538                 return (-ESHUTDOWN);
539         }
540
541         for (;;) {
542                 if (conn->ksnc_rx_niov != 0)
543                         rc = ksocknal_recv_iov (conn);
544                 else
545                         rc = ksocknal_recv_kiov (conn);
546
547                 if (rc <= 0) {
548                         /* error/EOF or partial receive */
549                         if (rc == -EAGAIN) {
550                                 rc = 1;
551                         } else if (rc == 0 && conn->ksnc_rx_started) {
552                                 /* EOF in the middle of a message */
553                                 rc = -EPROTO;
554                         }
555                         break;
556                 }
557
558                 /* Completed a fragment */
559
560                 if (conn->ksnc_rx_nob_wanted == 0) {
561                         /* Completed a message segment (header or payload) */
562                         if ((ksocknal_data.ksnd_eager_ack & conn->ksnc_type) != 0 &&
563                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
564                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
565                                 /* Remind the socket to ack eagerly... */
566                                 ksocknal_eager_ack(conn);
567                         }
568                         rc = 1;
569                         break;
570                 }
571         }
572
573         ksocknal_putconnsock (conn);
574         RETURN (rc);
575 }
576
577 #if SOCKNAL_ZC
578 void
579 ksocknal_zc_callback (zccd_t *zcd)
580 {
581         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
582         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
583         unsigned long  flags;
584         ENTRY;
585
586         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
587
588         spin_lock_irqsave (&sched->kss_lock, flags);
589
590         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
591         wake_up (&sched->kss_waitq);
592
593         spin_unlock_irqrestore (&sched->kss_lock, flags);
594         EXIT;
595 }
596 #endif
597
598 void
599 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
600 {
601         ksock_ltx_t   *ltx;
602         ENTRY;
603
604         if (tx->tx_conn != NULL) {
605                 /* This tx got queued on a conn; do the accounting... */
606                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
607 #if SOCKNAL_ZC
608                 /* zero copy completion isn't always from
609                  * process_transmit() so it needs to keep a ref on
610                  * tx_conn... */
611                 if (asynch)
612                         ksocknal_put_conn (tx->tx_conn);
613 #else
614                 LASSERT (!asynch);
615 #endif
616         }
617
618         if (tx->tx_isfwd) {             /* was a forwarded packet? */
619                 kpr_fwd_done (&ksocknal_data.ksnd_router,
620                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
621                 EXIT;
622                 return;
623         }
624
625         /* local send */
626         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
627
628         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
629
630         ksocknal_free_ltx (ltx);
631         EXIT;
632 }
633
634 void
635 ksocknal_tx_launched (ksock_tx_t *tx) 
636 {
637 #if SOCKNAL_ZC
638         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
639                 ksock_conn_t  *conn = tx->tx_conn;
640                 
641                 /* zccd skbufs are still in-flight.  First take a ref on
642                  * conn, so it hangs about for ksocknal_tx_done... */
643                 atomic_inc (&conn->ksnc_refcount);
644
645                 /* ...then drop the initial ref on zccd, so the zero copy
646                  * callback can occur */
647                 zccd_put (&tx->tx_zccd);
648                 return;
649         }
650 #endif
651         /* Any zero-copy-ness (if any) has completed; I can complete the
652          * transmit now, avoiding an extra schedule */
653         ksocknal_tx_done (tx, 0);
654 }
655
656 int
657 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
658 {
659         unsigned long  flags;
660         int            rc;
661        
662         rc = ksocknal_transmit (conn, tx);
663
664         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
665
666         if (tx->tx_resid == 0) {
667                 /* Sent everything OK */
668                 LASSERT (rc == 0);
669
670                 ksocknal_tx_launched (tx);
671                 return (0);
672         }
673
674         if (rc == -EAGAIN)
675                 return (rc);
676
677         if (rc == -ENOMEM) {
678                 /* Queue on ksnd_enomem_conns for retry after a timeout */
679                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
680
681                 /* enomem list takes over scheduler's ref... */
682                 LASSERT (conn->ksnc_tx_scheduled);
683                 list_add_tail(&conn->ksnc_tx_list,
684                               &ksocknal_data.ksnd_enomem_conns);
685                 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
686                                    ksocknal_data.ksnd_reaper_waketime))
687                         wake_up (&ksocknal_data.ksnd_reaper_waitq);
688                 
689                 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
690                 return (rc);
691         }
692
693         /* Actual error */
694         LASSERT (rc < 0);
695
696         if (!conn->ksnc_closing)
697                 CERROR ("[%p] Error %d on write to "LPX64
698                         " ip %d.%d.%d.%d:%d\n",conn, rc, 
699                         conn->ksnc_peer->ksnp_nid,
700                         HIPQUAD(conn->ksnc_ipaddr),
701                         conn->ksnc_port);
702
703         ksocknal_close_conn_and_siblings (conn, rc);
704         ksocknal_tx_launched (tx);
705         
706         return (rc);
707
708
709 void
710 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
711 {
712         unsigned long     flags;
713
714         /* called holding write lock on ksnd_global_lock */
715
716         LASSERT (!route->ksnr_deleted);
717         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
718         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
719         LASSERT (!route->ksnr_connecting);
720         
721         if (ksocknal_data.ksnd_typed_conns)
722                 route->ksnr_connecting = 
723                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
724         else
725                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
726
727         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
728         
729         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
730         
731         list_add_tail (&route->ksnr_connect_list,
732                        &ksocknal_data.ksnd_autoconnectd_routes);
733         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
734         
735         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
736 }
737
738 ksock_peer_t *
739 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
740 {
741         char          ipbuf[PTL_NALFMT_SIZE];
742         ptl_nid_t     target_nid;
743         int           rc;
744         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
745         
746         if (peer != NULL)
747                 return (peer);
748         
749         if (tx->tx_isfwd) {
750                 CERROR ("Can't send packet to "LPX64
751                         " %s: routed target is not a peer\n", 
752                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
753                 return (NULL);
754         }
755         
756         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
757                          &target_nid);
758         if (rc != 0) {
759                 CERROR ("Can't route to "LPX64" %s: router error %d\n", 
760                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
761                 return (NULL);
762         }
763
764         peer = ksocknal_find_peer_locked (target_nid);
765         if (peer != NULL)
766                 return (peer);
767
768         CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
769                 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
770         return (NULL);
771 }
772
773 ksock_conn_t *
774 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer) 
775 {
776         struct list_head *tmp;
777         ksock_conn_t     *typed = NULL;
778         int               tnob  = 0;
779         ksock_conn_t     *fallback = NULL;
780         int               fnob     = 0;
781         
782         /* Find the conn with the shortest tx queue */
783         list_for_each (tmp, &peer->ksnp_conns) {
784                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
785                 int           nob = atomic_read(&c->ksnc_tx_nob);
786
787                 LASSERT (!c->ksnc_closing);
788
789                 if (fallback == NULL || nob < fnob) {
790                         fallback = c;
791                         fnob     = nob;
792                 }
793
794                 if (!ksocknal_data.ksnd_typed_conns)
795                         continue;
796
797                 switch (c->ksnc_type) {
798                 default:
799                         LBUG();
800                 case SOCKNAL_CONN_ANY:
801                         break;
802                 case SOCKNAL_CONN_BULK_IN:
803                         continue;
804                 case SOCKNAL_CONN_BULK_OUT:
805                         if (tx->tx_nob < ksocknal_data.ksnd_min_bulk)
806                                 continue;
807                         break;
808                 case SOCKNAL_CONN_CONTROL:
809                         if (tx->tx_nob >= ksocknal_data.ksnd_min_bulk)
810                                 continue;
811                         break;
812                 }
813
814                 if (typed == NULL || nob < tnob) {
815                         typed = c;
816                         tnob  = nob;
817                 }
818         }
819
820         /* prefer the typed selection */
821         return ((typed != NULL) ? typed : fallback);
822 }
823
824 void
825 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
826 {
827         unsigned long  flags;
828         ksock_sched_t *sched = conn->ksnc_scheduler;
829
830         /* called holding global lock (read or irq-write) and caller may
831          * not have dropped this lock between finding conn and calling me,
832          * so we don't need the {get,put}connsock dance to deref
833          * ksnc_sock... */
834         LASSERT(!conn->ksnc_closing);
835         LASSERT(tx->tx_resid == tx->tx_nob);
836         
837         CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
838                 conn->ksnc_peer->ksnp_nid,
839                 HIPQUAD(conn->ksnc_ipaddr),
840                 conn->ksnc_port);
841
842         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
843         tx->tx_conn = conn;
844
845 #if SOCKNAL_ZC
846         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
847         /* NB this sets 1 ref on zccd, so the callback can only occur after
848          * I've released this ref. */
849 #endif
850         spin_lock_irqsave (&sched->kss_lock, flags);
851
852         conn->ksnc_tx_deadline = jiffies + 
853                                  ksocknal_data.ksnd_io_timeout * HZ;
854         mb();                                   /* order with list_add_tail */
855
856         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
857                 
858         if (conn->ksnc_tx_ready &&      /* able to send */
859             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
860                 /* +1 ref for scheduler */
861                 atomic_inc (&conn->ksnc_refcount);
862                 list_add_tail (&conn->ksnc_tx_list, 
863                                &sched->kss_tx_conns);
864                 conn->ksnc_tx_scheduled = 1;
865                 wake_up (&sched->kss_waitq);
866         }
867
868         spin_unlock_irqrestore (&sched->kss_lock, flags);
869 }
870
871 ksock_route_t *
872 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
873 {
874         struct list_head  *tmp;
875         ksock_route_t     *route;
876         ksock_route_t     *candidate = NULL;
877         int                found = 0;
878         int                bits;
879         
880         list_for_each (tmp, &peer->ksnp_routes) {
881                 route = list_entry (tmp, ksock_route_t, ksnr_list);
882                 bits  = route->ksnr_connected;
883                 
884                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
885                     (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
886                     route->ksnr_connecting != 0) {
887                         /* All typed connections have been established, or
888                          * an untyped connection has been established, or
889                          * connections are currently being established */
890                         found = 1;
891                         continue;
892                 }
893
894                 /* too soon to retry this guy? */
895                 if (!time_after_eq (jiffies, route->ksnr_timeout))
896                         continue;
897                 
898                 /* always do eager routes */
899                 if (route->ksnr_eager)
900                         return (route);
901
902                 if (candidate == NULL) {
903                         /* If we don't find any other route that is fully
904                          * connected or connecting, the first connectable
905                          * route is returned.  If it fails to connect, it
906                          * will get placed at the end of the list */
907                         candidate = route;
908                 }
909         }
910  
911         return (found ? NULL : candidate);
912 }
913
914 ksock_route_t *
915 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
916 {
917         struct list_head  *tmp;
918         ksock_route_t     *route;
919
920         list_for_each (tmp, &peer->ksnp_routes) {
921                 route = list_entry (tmp, ksock_route_t, ksnr_list);
922                 
923                 if (route->ksnr_connecting != 0)
924                         return (route);
925         }
926         
927         return (NULL);
928 }
929
930 int
931 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
932 {
933         unsigned long     flags;
934         ksock_peer_t     *peer;
935         ksock_conn_t     *conn;
936         ksock_route_t    *route;
937         rwlock_t         *g_lock;
938         
939         /* Ensure the frags we've been given EXACTLY match the number of
940          * bytes we want to send.  Many TCP/IP stacks disregard any total
941          * size parameters passed to them and just look at the frags. 
942          *
943          * We always expect at least 1 mapped fragment containing the
944          * complete portals header. */
945         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
946                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
947         LASSERT (tx->tx_niov >= 1);
948         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
949
950         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
951                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
952                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
953
954         tx->tx_conn = NULL;                     /* only set when assigned a conn */
955         tx->tx_resid = tx->tx_nob;
956         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
957
958         g_lock = &ksocknal_data.ksnd_global_lock;
959         read_lock (g_lock);
960         
961         peer = ksocknal_find_target_peer_locked (tx, nid);
962         if (peer == NULL) {
963                 read_unlock (g_lock);
964                 return (-EHOSTUNREACH);
965         }
966
967         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
968                 conn = ksocknal_find_conn_locked (tx, peer);
969                 if (conn != NULL) {
970                         /* I've got no autoconnect routes that need to be
971                          * connecting and I do have an actual connection... */
972                         ksocknal_queue_tx_locked (tx, conn);
973                         read_unlock (g_lock);
974                         return (0);
975                 }
976         }
977         
978         /* Making one or more connections; I'll need a write lock... */
979
980         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
981         read_unlock (g_lock);
982         write_lock_irqsave (g_lock, flags);
983         
984         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
985                 write_unlock_irqrestore (g_lock, flags);
986                 ksocknal_put_peer (peer);
987                 return (-EHOSTUNREACH);
988         }
989         ksocknal_put_peer (peer);               /* drop ref I got above */
990
991         for (;;) {
992                 /* launch any/all autoconnections that need it */
993                 route = ksocknal_find_connectable_route_locked (peer);
994                 if (route == NULL)
995                         break;
996
997                 ksocknal_launch_autoconnect_locked (route);
998         }
999
1000         conn = ksocknal_find_conn_locked (tx, peer);
1001         if (conn != NULL) {
1002                 /* Connection exists; queue message on it */
1003                 ksocknal_queue_tx_locked (tx, conn);
1004                 write_unlock_irqrestore (g_lock, flags);
1005                 return (0);
1006         }
1007
1008         route = ksocknal_find_connecting_route_locked (peer);
1009         if (route != NULL) {
1010                 /* At least 1 connection is being established; queue the
1011                  * message... */
1012                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1013                 write_unlock_irqrestore (g_lock, flags);
1014                 return (0);
1015         }
1016         
1017         write_unlock_irqrestore (g_lock, flags);
1018         return (-EHOSTUNREACH);
1019 }
1020
1021 int
1022 ksocknal_sendmsg(nal_cb_t     *nal, 
1023                  void         *private, 
1024                  lib_msg_t    *cookie,
1025                  ptl_hdr_t    *hdr, 
1026                  int           type, 
1027                  ptl_nid_t     nid, 
1028                  ptl_pid_t     pid,
1029                  unsigned int  payload_niov, 
1030                  struct iovec *payload_iov, 
1031                  ptl_kiov_t   *payload_kiov,
1032                  size_t        payload_nob)
1033 {
1034         ksock_ltx_t  *ltx;
1035         int           desc_size;
1036         int           rc;
1037
1038         /* NB 'private' is different depending on what we're sending.
1039          * Just ignore it... */
1040
1041         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1042                " pid %d\n", payload_nob, payload_niov, nid , pid);
1043
1044         LASSERT (payload_nob == 0 || payload_niov > 0);
1045         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1046
1047         /* It must be OK to kmap() if required */
1048         LASSERT (payload_kiov == NULL || !in_interrupt ());
1049         /* payload is either all vaddrs or all pages */
1050         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1051         
1052         if (payload_iov != NULL)
1053                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1054         else
1055                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1056         
1057         if (in_interrupt() ||
1058             type == PTL_MSG_ACK ||
1059             type == PTL_MSG_REPLY) {
1060                 /* Can't block if in interrupt or responding to an incoming
1061                  * message */
1062                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1063         } else {
1064                 PORTAL_ALLOC(ltx, desc_size);
1065         }
1066         
1067         if (ltx == NULL) {
1068                 CERROR("Can't allocate tx desc type %d size %d %s\n",
1069                        type, desc_size, in_interrupt() ? "(intr)" : "");
1070                 return (PTL_NOSPACE);
1071         }
1072
1073         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1074
1075         ltx->ltx_desc_size = desc_size;
1076         
1077         /* We always have 1 mapped frag for the header */
1078         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1079         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1080         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1081         ltx->ltx_hdr = *hdr;
1082         
1083         ltx->ltx_private = private;
1084         ltx->ltx_cookie = cookie;
1085         
1086         ltx->ltx_tx.tx_isfwd = 0;
1087         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1088
1089         if (payload_iov != NULL) {
1090                 /* payload is all mapped */
1091                 ltx->ltx_tx.tx_kiov  = NULL;
1092                 ltx->ltx_tx.tx_nkiov = 0;
1093
1094                 ltx->ltx_tx.tx_niov = 1 + payload_niov;
1095
1096                 memcpy(ltx->ltx_iov + 1, payload_iov,
1097                        payload_niov * sizeof (*payload_iov));
1098
1099         } else {
1100                 /* payload is all pages */
1101                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1102                 ltx->ltx_tx.tx_nkiov = payload_niov;
1103
1104                 ltx->ltx_tx.tx_niov = 1;
1105
1106                 memcpy(ltx->ltx_kiov, payload_kiov, 
1107                        payload_niov * sizeof (*payload_kiov));
1108         }
1109
1110         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1111         if (rc == 0)
1112                 return (PTL_OK);
1113         
1114         ksocknal_free_ltx(ltx);
1115         return (PTL_FAIL);
1116 }
1117
1118 int
1119 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1120                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1121                unsigned int payload_niov, struct iovec *payload_iov,
1122                size_t payload_len)
1123 {
1124         return (ksocknal_sendmsg(nal, private, cookie,
1125                                  hdr, type, nid, pid,
1126                                  payload_niov, payload_iov, NULL,
1127                                  payload_len));
1128 }
1129
1130 int
1131 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1132                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1133                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1134                      size_t payload_len)
1135 {
1136         return (ksocknal_sendmsg(nal, private, cookie,
1137                                  hdr, type, nid, pid,
1138                                  payload_niov, NULL, payload_kiov,
1139                                  payload_len));
1140 }
1141
1142 void
1143 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1144 {
1145         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1146         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
1147         int           rc;
1148         
1149         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1150                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1151
1152         /* I'm the gateway; must be the last hop */
1153         if (nid == ksocknal_lib.ni.nid)
1154                 nid = fwd->kprfd_target_nid;
1155
1156         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
1157         tx->tx_nob   = fwd->kprfd_nob;
1158         tx->tx_niov  = fwd->kprfd_niov;
1159         tx->tx_iov   = fwd->kprfd_iov;
1160         tx->tx_nkiov = 0;
1161         tx->tx_kiov  = NULL;
1162
1163         rc = ksocknal_launch_packet (tx, nid);
1164         if (rc != 0)
1165                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1166 }
1167
1168 int
1169 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1170 {
1171         long    pid = kernel_thread (fn, arg, 0);
1172
1173         if (pid < 0)
1174                 return ((int)pid);
1175
1176         atomic_inc (&ksocknal_data.ksnd_nthreads);
1177         return (0);
1178 }
1179
1180 void
1181 ksocknal_thread_fini (void)
1182 {
1183         atomic_dec (&ksocknal_data.ksnd_nthreads);
1184 }
1185
1186 void
1187 ksocknal_fmb_callback (void *arg, int error)
1188 {
1189         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1190         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1191         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
1192         ksock_conn_t      *conn = NULL;
1193         ksock_sched_t     *sched;
1194         unsigned long      flags;
1195         char               ipbuf[PTL_NALFMT_SIZE];
1196
1197         if (error != 0)
1198                 CERROR("Failed to route packet from "
1199                        LPX64" %s to "LPX64" %s: %d\n",
1200                        NTOH__u64(hdr->src_nid),
1201                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
1202                        NTOH__u64(hdr->dest_nid),
1203                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf),
1204                        error);
1205         else
1206                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1207                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1208
1209         /* drop peer ref taken on init */
1210         ksocknal_put_peer (fmb->fmb_peer);
1211         
1212         spin_lock_irqsave (&fmp->fmp_lock, flags);
1213
1214         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1215         fmp->fmp_nactive_fmbs--;
1216
1217         if (!list_empty (&fmp->fmp_blocked_conns)) {
1218                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1219                                    ksock_conn_t, ksnc_rx_list);
1220                 list_del (&conn->ksnc_rx_list);
1221         }
1222
1223         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1224
1225         if (conn == NULL)
1226                 return;
1227
1228         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1229         LASSERT (conn->ksnc_rx_scheduled);
1230         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1231
1232         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1233
1234         sched = conn->ksnc_scheduler;
1235
1236         spin_lock_irqsave (&sched->kss_lock, flags);
1237
1238         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1239         wake_up (&sched->kss_waitq);
1240
1241         spin_unlock_irqrestore (&sched->kss_lock, flags);
1242 }
1243
1244 ksock_fmb_t *
1245 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1246 {
1247         int               payload_nob = conn->ksnc_rx_nob_left;
1248         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1249         unsigned long     flags;
1250         ksock_fmb_pool_t *pool;
1251         ksock_fmb_t      *fmb;
1252
1253         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1254         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1255
1256         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1257                 pool = &ksocknal_data.ksnd_small_fmp;
1258         else
1259                 pool = &ksocknal_data.ksnd_large_fmp;
1260
1261         spin_lock_irqsave (&pool->fmp_lock, flags);
1262
1263         if (!list_empty (&pool->fmp_idle_fmbs)) {
1264                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1265                                  ksock_fmb_t, fmb_list);
1266                 list_del (&fmb->fmb_list);
1267                 pool->fmp_nactive_fmbs++;
1268                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1269
1270                 return (fmb);
1271         }
1272
1273         /* deschedule until fmb free */
1274
1275         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1276
1277         list_add_tail (&conn->ksnc_rx_list,
1278                        &pool->fmp_blocked_conns);
1279
1280         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1281         return (NULL);
1282 }
1283
1284 int
1285 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1286 {
1287         int payload_nob = conn->ksnc_rx_nob_left;
1288         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
1289         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1290         int niov;                               /* at least the header */
1291         int nob;
1292
1293         LASSERT (conn->ksnc_rx_scheduled);
1294         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1295         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1296         LASSERT (payload_nob >= 0);
1297         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
1298         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1299
1300         /* Got a forwarding buffer; copy the header we just read into the
1301          * forwarding buffer.  If there's payload, start reading reading it
1302          * into the buffer, otherwise the forwarding buffer can be kicked
1303          * off immediately.
1304          *
1305          * NB fmb->fmb_iov spans the WHOLE packet.
1306          *    conn->ksnc_rx_iov spans just the payload.
1307          */
1308         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
1309
1310         /* copy header */
1311         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
1312
1313         /* Take a ref on the conn's peer to prevent module unload before
1314          * forwarding completes.  NB we ref peer and not conn since because
1315          * all refs on conn after it has been closed must remove themselves
1316          * in finite time */
1317         fmb->fmb_peer = conn->ksnc_peer;
1318         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1319
1320         if (payload_nob == 0) {         /* got complete packet already */
1321                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1322                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1323                         dest_nid, packet_nob);
1324
1325                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1326
1327                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1328                               packet_nob, 1, fmb->fmb_iov,
1329                               ksocknal_fmb_callback, fmb);
1330
1331                 /* forward it now */
1332                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1333
1334                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1335                 return (1);
1336         }
1337
1338         niov = 1;
1339         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1340                 fmb->fmb_iov[0].iov_len = packet_nob;
1341         } else {
1342                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1343                 nob = packet_nob - PAGE_SIZE;
1344
1345                 do {
1346                         LASSERT (niov < fmb->fmb_npages);
1347                         fmb->fmb_iov[niov].iov_base =
1348                                 page_address (fmb->fmb_pages[niov]);
1349                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1350                         nob -= PAGE_SIZE;
1351                         niov++;
1352                 } while (nob > 0);
1353         }
1354
1355         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1356                       packet_nob, niov, fmb->fmb_iov,
1357                       ksocknal_fmb_callback, fmb);
1358
1359         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1360         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1361         
1362         /* payload is desc's iov-ed buffer, but skipping the hdr */
1363         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1364                  sizeof (struct iovec));
1365
1366         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1367         conn->ksnc_rx_iov[0].iov_base =
1368                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1369                          sizeof (ptl_hdr_t));
1370         conn->ksnc_rx_iov[0].iov_len =
1371                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1372
1373         if (niov > 1)
1374                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1375                        (niov - 1) * sizeof (struct iovec));
1376
1377         conn->ksnc_rx_niov = niov;
1378
1379         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1380                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1381         return (0);
1382 }
1383
1384 void
1385 ksocknal_fwd_parse (ksock_conn_t *conn)
1386 {
1387         ksock_peer_t *peer;
1388         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1389         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1390         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1391         char str[PTL_NALFMT_SIZE];
1392
1393         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1394                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1395
1396         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1397         LASSERT (conn->ksnc_rx_scheduled);
1398
1399         if (body_len < 0) {                 /* length corrupt (overflow) */
1400                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1401                        "packet size %d illegal\n",
1402                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1403                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1404                        body_len);
1405
1406                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1407                 return;
1408         }
1409
1410         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1411                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1412                        " (%s): not forwarding\n",
1413                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1414                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1415                 /* on to new packet (skip this one's body) */
1416                 ksocknal_new_packet (conn, body_len);
1417                 return;
1418         }
1419
1420         if (body_len > PTL_MTU) {      /* too big to forward */
1421                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1422                         "(%s): packet size %d too big\n",
1423                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1424                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str),
1425                         body_len);
1426                 /* on to new packet (skip this one's body) */
1427                 ksocknal_new_packet (conn, body_len);
1428                 return;
1429         }
1430
1431         /* should have gone direct */
1432         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1433         if (peer != NULL) {
1434                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1435                         "(%s): target is a peer\n",
1436                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1437                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str));
1438                 ksocknal_put_peer (peer);  /* drop ref from get above */
1439
1440                 /* on to next packet (skip this one's body) */
1441                 ksocknal_new_packet (conn, body_len);
1442                 return;
1443         }
1444
1445         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1446         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1447         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1448 }
1449
1450 int
1451 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1452 {
1453         static char ksocknal_slop_buffer[4096];
1454
1455         int   nob;
1456         int   niov;
1457         int   skipped;
1458
1459         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1460                 conn->ksnc_rx_started = 0;
1461                 mb ();                          /* racing with timeout thread */
1462                 
1463                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1464                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1465                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1466
1467                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1468                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1469                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1470                 conn->ksnc_rx_niov = 1;
1471
1472                 conn->ksnc_rx_kiov = NULL;
1473                 conn->ksnc_rx_nkiov = 0;
1474                 return (1);
1475         }
1476
1477         /* Set up to skip as much a possible now.  If there's more left
1478          * (ran out of iov entries) we'll get called again */
1479
1480         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1481         conn->ksnc_rx_nob_left = nob_to_skip;
1482         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1483         skipped = 0;
1484         niov = 0;
1485
1486         do {
1487                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1488
1489                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1490                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1491                 niov++;
1492                 skipped += nob;
1493                 nob_to_skip -=nob;
1494
1495         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1496                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1497
1498         conn->ksnc_rx_niov = niov;
1499         conn->ksnc_rx_kiov = NULL;
1500         conn->ksnc_rx_nkiov = 0;
1501         conn->ksnc_rx_nob_wanted = skipped;
1502         return (0);
1503 }
1504
1505 int
1506 ksocknal_process_receive (ksock_conn_t *conn)
1507 {
1508         ksock_fmb_t  *fmb;
1509         int           rc;
1510         
1511         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1512
1513         /* doesn't need a forwarding buffer */
1514         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1515                 goto try_read;
1516
1517  get_fmb:
1518         fmb = ksocknal_get_idle_fmb (conn);
1519         if (fmb == NULL) {
1520                 /* conn descheduled waiting for idle fmb */
1521                 return (0);
1522         }
1523
1524         if (ksocknal_init_fmb (conn, fmb)) {
1525                 /* packet forwarded */
1526                 return (0);
1527         }
1528
1529  try_read:
1530         /* NB: sched lock NOT held */
1531         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1532                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1533                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1534                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1535
1536         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1537
1538         rc = ksocknal_receive(conn);
1539
1540         if (rc <= 0) {
1541                 LASSERT (rc != -EAGAIN);
1542
1543                 if (rc == 0)
1544                         CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1545                                conn, conn->ksnc_peer->ksnp_nid,
1546                                HIPQUAD(conn->ksnc_ipaddr),
1547                                conn->ksnc_port);
1548                 else if (!conn->ksnc_closing)
1549                         CERROR ("[%p] Error %d on read from "LPX64
1550                                 " ip %d.%d.%d.%d:%d\n",
1551                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1552                                 HIPQUAD(conn->ksnc_ipaddr),
1553                                 conn->ksnc_port);
1554
1555                 ksocknal_close_conn_and_siblings (conn, rc);
1556                 return (rc == 0 ? -ESHUTDOWN : rc);
1557         }
1558
1559         if (conn->ksnc_rx_nob_wanted != 0) {
1560                 /* short read */
1561                 return (-EAGAIN);
1562         }
1563         
1564         switch (conn->ksnc_rx_state) {
1565         case SOCKNAL_RX_HEADER:
1566                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1567                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1568                         /* This packet isn't for me */
1569                         ksocknal_fwd_parse (conn);
1570                         switch (conn->ksnc_rx_state) {
1571                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1572                                 return (0);     /* => come back later */
1573                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1574                                 goto try_read;  /* => go read it */
1575                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1576                                 goto get_fmb;   /* => go get a fwd msg buffer */
1577                         default:
1578                                 LBUG ();
1579                         }
1580                         /* Not Reached */
1581                 }
1582
1583                 /* sets wanted_len, iovs etc */
1584                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1585
1586                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1587                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1588                         goto try_read;          /* go read the payload */
1589                 }
1590                 /* Fall through (completed packet for me) */
1591
1592         case SOCKNAL_RX_BODY:
1593                 /* payload all received */
1594                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1595                 /* Fall through */
1596
1597         case SOCKNAL_RX_SLOP:
1598                 /* starting new packet? */
1599                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1600                         return (0);     /* come back later */
1601                 goto try_read;          /* try to finish reading slop now */
1602
1603         case SOCKNAL_RX_BODY_FWD:
1604                 /* payload all received */
1605                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1606                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1607                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1608                         conn->ksnc_rx_nob_left);
1609
1610                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1611                  * conn->ksnc_cookie */
1612                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1613                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1614
1615                 /* no slop in forwarded packets */
1616                 LASSERT (conn->ksnc_rx_nob_left == 0);
1617
1618                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1619                 return (0);                     /* (later) */
1620
1621         default:
1622                 break;
1623         }
1624
1625         /* Not Reached */
1626         LBUG ();
1627         return (-EINVAL);                       /* keep gcc happy */
1628 }
1629
1630 int
1631 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1632                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1633 {
1634         ksock_conn_t *conn = (ksock_conn_t *)private;
1635
1636         LASSERT (mlen <= rlen);
1637         LASSERT (niov <= PTL_MD_MAX_IOV);
1638         
1639         conn->ksnc_cookie = msg;
1640         conn->ksnc_rx_nob_wanted = mlen;
1641         conn->ksnc_rx_nob_left   = rlen;
1642
1643         conn->ksnc_rx_nkiov = 0;
1644         conn->ksnc_rx_kiov = NULL;
1645         conn->ksnc_rx_niov = niov;
1646         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1647         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1648
1649         LASSERT (mlen == 
1650                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1651                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1652
1653         return (rlen);
1654 }
1655
1656 int
1657 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1658                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1659 {
1660         ksock_conn_t *conn = (ksock_conn_t *)private;
1661
1662         LASSERT (mlen <= rlen);
1663         LASSERT (niov <= PTL_MD_MAX_IOV);
1664         
1665         conn->ksnc_cookie = msg;
1666         conn->ksnc_rx_nob_wanted = mlen;
1667         conn->ksnc_rx_nob_left   = rlen;
1668
1669         conn->ksnc_rx_niov = 0;
1670         conn->ksnc_rx_iov  = NULL;
1671         conn->ksnc_rx_nkiov = niov;
1672         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1673         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1674
1675         LASSERT (mlen == 
1676                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1677                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1678
1679         return (rlen);
1680 }
1681
1682 int ksocknal_scheduler (void *arg)
1683 {
1684         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1685         ksock_conn_t      *conn;
1686         ksock_tx_t        *tx;
1687         unsigned long      flags;
1688         int                rc;
1689         int                nloops = 0;
1690         int                id = sched - ksocknal_data.ksnd_schedulers;
1691         char               name[16];
1692
1693         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1694         kportal_daemonize (name);
1695         kportal_blockallsigs ();
1696
1697 #if (CONFIG_SMP && CPU_AFFINITY)
1698         if ((cpu_online_map & (1 << id)) != 0) {
1699 #if 1
1700                 current->cpus_allowed = (1 << id);
1701 #else
1702                 set_cpus_allowed (current, 1<<id);
1703 #endif
1704         } else {
1705                 CERROR ("Can't set CPU affinity for %s\n", name);
1706         }
1707 #endif /* CONFIG_SMP && CPU_AFFINITY */
1708         
1709         spin_lock_irqsave (&sched->kss_lock, flags);
1710
1711         while (!ksocknal_data.ksnd_shuttingdown) {
1712                 int did_something = 0;
1713
1714                 /* Ensure I progress everything semi-fairly */
1715
1716                 if (!list_empty (&sched->kss_rx_conns)) {
1717                         conn = list_entry(sched->kss_rx_conns.next,
1718                                           ksock_conn_t, ksnc_rx_list);
1719                         list_del(&conn->ksnc_rx_list);
1720
1721                         LASSERT(conn->ksnc_rx_scheduled);
1722                         LASSERT(conn->ksnc_rx_ready);
1723
1724                         /* clear rx_ready in case receive isn't complete.
1725                          * Do it BEFORE we call process_recv, since
1726                          * data_ready can set it any time after we release
1727                          * kss_lock. */
1728                         conn->ksnc_rx_ready = 0;
1729                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1730                         
1731                         rc = ksocknal_process_receive(conn);
1732                         
1733                         spin_lock_irqsave(&sched->kss_lock, flags);
1734
1735                         /* I'm the only one that can clear this flag */
1736                         LASSERT(conn->ksnc_rx_scheduled);
1737
1738                         /* Did process_receive get everything it wanted? */
1739                         if (rc == 0)
1740                                 conn->ksnc_rx_ready = 1;
1741                         
1742                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1743                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1744                                 /* Conn blocked for a forwarding buffer.
1745                                  * It will get queued for my attention when
1746                                  * one becomes available (and it might just
1747                                  * already have been!).  Meanwhile my ref
1748                                  * on it stays put. */
1749                         } else if (conn->ksnc_rx_ready) {
1750                                 /* reschedule for rx */
1751                                 list_add_tail (&conn->ksnc_rx_list,
1752                                                &sched->kss_rx_conns);
1753                         } else {
1754                                 conn->ksnc_rx_scheduled = 0;
1755                                 /* drop my ref */
1756                                 ksocknal_put_conn(conn);
1757                         }
1758
1759                         did_something = 1;
1760                 }
1761
1762                 if (!list_empty (&sched->kss_tx_conns)) {
1763                         conn = list_entry(sched->kss_tx_conns.next,
1764                                           ksock_conn_t, ksnc_tx_list);
1765                         list_del (&conn->ksnc_tx_list);
1766                         
1767                         LASSERT(conn->ksnc_tx_scheduled);
1768                         LASSERT(conn->ksnc_tx_ready);
1769                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1770                         
1771                         tx = list_entry(conn->ksnc_tx_queue.next,
1772                                         ksock_tx_t, tx_list);
1773                         /* dequeue now so empty list => more to send */
1774                         list_del(&tx->tx_list);
1775                         
1776                         /* Clear tx_ready in case send isn't complete.  Do
1777                          * it BEFORE we call process_transmit, since
1778                          * write_space can set it any time after we release
1779                          * kss_lock. */
1780                         conn->ksnc_tx_ready = 0;
1781                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1782
1783                         rc = ksocknal_process_transmit(conn, tx);
1784
1785                         spin_lock_irqsave (&sched->kss_lock, flags);
1786
1787                         if (rc == -ENOMEM || rc == -EAGAIN) {
1788                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1789                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1790                         } else {
1791                                 /* Complete send; assume space for more */
1792                                 conn->ksnc_tx_ready = 1;
1793                         }
1794
1795                         if (rc == -ENOMEM) {
1796                                 /* Do nothing; after a short timeout, this
1797                                  * conn will be reposted on kss_tx_conns. */
1798                         } else if (conn->ksnc_tx_ready &&
1799                                    !list_empty (&conn->ksnc_tx_queue)) {
1800                                 /* reschedule for tx */
1801                                 list_add_tail (&conn->ksnc_tx_list, 
1802                                                &sched->kss_tx_conns);
1803                         } else {
1804                                 conn->ksnc_tx_scheduled = 0;
1805                                 /* drop my ref */
1806                                 ksocknal_put_conn (conn);
1807                         }
1808                                 
1809                         did_something = 1;
1810                 }
1811 #if SOCKNAL_ZC
1812                 if (!list_empty (&sched->kss_zctxdone_list)) {
1813                         ksock_tx_t *tx =
1814                                 list_entry(sched->kss_zctxdone_list.next,
1815                                            ksock_tx_t, tx_list);
1816                         did_something = 1;
1817
1818                         list_del (&tx->tx_list);
1819                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1820
1821                         ksocknal_tx_done (tx, 1);
1822
1823                         spin_lock_irqsave (&sched->kss_lock, flags);
1824                 }
1825 #endif
1826                 if (!did_something ||           /* nothing to do */
1827                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1828                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1829
1830                         nloops = 0;
1831
1832                         if (!did_something) {   /* wait for something to do */
1833 #if SOCKNAL_ZC
1834                                 rc = wait_event_interruptible (sched->kss_waitq,
1835                                                                ksocknal_data.ksnd_shuttingdown ||
1836                                                                !list_empty(&sched->kss_rx_conns) ||
1837                                                                !list_empty(&sched->kss_tx_conns) ||
1838                                                                !list_empty(&sched->kss_zctxdone_list));
1839 #else
1840                                 rc = wait_event_interruptible (sched->kss_waitq,
1841                                                                ksocknal_data.ksnd_shuttingdown ||
1842                                                                !list_empty(&sched->kss_rx_conns) ||
1843                                                                !list_empty(&sched->kss_tx_conns));
1844 #endif
1845                                 LASSERT (rc == 0);
1846                         } else
1847                                our_cond_resched();
1848
1849                         spin_lock_irqsave (&sched->kss_lock, flags);
1850                 }
1851         }
1852
1853         spin_unlock_irqrestore (&sched->kss_lock, flags);
1854         ksocknal_thread_fini ();
1855         return (0);
1856 }
1857
1858 void
1859 ksocknal_data_ready (struct sock *sk, int n)
1860 {
1861         unsigned long  flags;
1862         ksock_conn_t  *conn;
1863         ksock_sched_t *sched;
1864         ENTRY;
1865
1866         /* interleave correctly with closing sockets... */
1867         read_lock (&ksocknal_data.ksnd_global_lock);
1868
1869         conn = sk->sk_user_data;
1870         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1871                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1872                 sk->sk_data_ready (sk, n);
1873         } else {
1874                 sched = conn->ksnc_scheduler;
1875
1876                 spin_lock_irqsave (&sched->kss_lock, flags);
1877
1878                 conn->ksnc_rx_ready = 1;
1879
1880                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1881                         list_add_tail(&conn->ksnc_rx_list,
1882                                       &sched->kss_rx_conns);
1883                         conn->ksnc_rx_scheduled = 1;
1884                         /* extra ref for scheduler */
1885                         atomic_inc (&conn->ksnc_refcount);
1886
1887                         wake_up (&sched->kss_waitq);
1888                 }
1889
1890                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1891         }
1892
1893         read_unlock (&ksocknal_data.ksnd_global_lock);
1894
1895         EXIT;
1896 }
1897
1898 void
1899 ksocknal_write_space (struct sock *sk)
1900 {
1901         unsigned long  flags;
1902         ksock_conn_t  *conn;
1903         ksock_sched_t *sched;
1904
1905         /* interleave correctly with closing sockets... */
1906         read_lock (&ksocknal_data.ksnd_global_lock);
1907
1908         conn = sk->sk_user_data;
1909
1910         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1911                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1912                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1913                                       " ready" : " blocked"),
1914                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1915                                       " scheduled" : " idle"),
1916                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1917                                       " empty" : " queued"));
1918
1919         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1920                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1921                 sk->sk_write_space (sk);
1922
1923                 read_unlock (&ksocknal_data.ksnd_global_lock);
1924                 return;
1925         }
1926
1927         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1928                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1929
1930                 sched = conn->ksnc_scheduler;
1931
1932                 spin_lock_irqsave (&sched->kss_lock, flags);
1933
1934                 conn->ksnc_tx_ready = 1;
1935
1936                 if (!conn->ksnc_tx_scheduled && // not being progressed
1937                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1938                         list_add_tail (&conn->ksnc_tx_list,
1939                                        &sched->kss_tx_conns);
1940                         conn->ksnc_tx_scheduled = 1;
1941                         /* extra ref for scheduler */
1942                         atomic_inc (&conn->ksnc_refcount);
1943
1944                         wake_up (&sched->kss_waitq);
1945                 }
1946
1947                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1948         }
1949
1950         read_unlock (&ksocknal_data.ksnd_global_lock);
1951 }
1952
1953 int
1954 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1955 {
1956         int           rc;
1957         mm_segment_t  oldmm = get_fs();
1958
1959         while (nob > 0) {
1960                 struct iovec  iov = {
1961                         .iov_base = buffer,
1962                         .iov_len  = nob
1963                 };
1964                 struct msghdr msg = {
1965                         .msg_name       = NULL,
1966                         .msg_namelen    = 0,
1967                         .msg_iov        = &iov,
1968                         .msg_iovlen     = 1,
1969                         .msg_control    = NULL,
1970                         .msg_controllen = 0,
1971                         .msg_flags      = 0
1972                 };
1973
1974                 set_fs (KERNEL_DS);
1975                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1976                 set_fs (oldmm);
1977                 
1978                 if (rc < 0)
1979                         return (rc);
1980
1981                 if (rc == 0) {
1982                         CERROR ("Unexpected zero rc\n");
1983                         return (-ECONNABORTED);
1984                 }
1985
1986                 buffer = ((char *)buffer) + rc;
1987                 nob -= rc;
1988         }
1989         
1990         return (0);
1991 }
1992
1993 int
1994 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1995 {
1996         int           rc;
1997         mm_segment_t  oldmm = get_fs();
1998         
1999         while (nob > 0) {
2000                 struct iovec  iov = {
2001                         .iov_base = buffer,
2002                         .iov_len  = nob
2003                 };
2004                 struct msghdr msg = {
2005                         .msg_name       = NULL,
2006                         .msg_namelen    = 0,
2007                         .msg_iov        = &iov,
2008                         .msg_iovlen     = 1,
2009                         .msg_control    = NULL,
2010                         .msg_controllen = 0,
2011                         .msg_flags      = 0
2012                 };
2013
2014                 set_fs (KERNEL_DS);
2015                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2016                 set_fs (oldmm);
2017                 
2018                 if (rc < 0)
2019                         return (rc);
2020
2021                 if (rc == 0)
2022                         return (-ECONNABORTED);
2023
2024                 buffer = ((char *)buffer) + rc;
2025                 nob -= rc;
2026         }
2027         
2028         return (0);
2029 }
2030
2031 int
2032 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type, __u64 *incarnation)
2033 {
2034         int                 rc;
2035         ptl_hdr_t           hdr;
2036         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2037         char                ipbuf[PTL_NALFMT_SIZE];
2038
2039         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2040
2041         memset (&hdr, 0, sizeof (hdr));
2042         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
2043         hmv->version_major = __cpu_to_le32 (PORTALS_PROTO_VERSION_MAJOR);
2044         hmv->version_minor = __cpu_to_le32 (PORTALS_PROTO_VERSION_MINOR);
2045         
2046         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
2047         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
2048
2049         hdr.msg.hello.type = __cpu_to_le32 (*type);
2050         hdr.msg.hello.incarnation = 
2051                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2052
2053         /* Assume sufficient socket buffering for this message */
2054         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
2055         if (rc != 0) {
2056                 CERROR ("Error %d sending HELLO to "LPX64" %s\n",
2057                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2058                 return (rc);
2059         }
2060
2061         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2062         if (rc != 0) {
2063                 CERROR ("Error %d reading HELLO from "LPX64" %s\n",
2064                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2065                 return (rc);
2066         }
2067         
2068         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2069                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
2070                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
2071                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2072                 return (-EPROTO);
2073         }
2074
2075         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2076             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2077                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2078                         " from "LPX64" %s\n",
2079                         __le16_to_cpu (hmv->version_major),
2080                         __le16_to_cpu (hmv->version_minor),
2081                         PORTALS_PROTO_VERSION_MAJOR,
2082                         PORTALS_PROTO_VERSION_MINOR,
2083                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2084                 return (-EPROTO);
2085         }
2086
2087 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
2088 # error "This code only understands protocol version 0.x"
2089 #endif
2090         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2091          * so read the rest of it in now... */
2092
2093         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2094         if (rc != 0) {
2095                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
2096                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2097                 return (rc);
2098         }
2099
2100         /* ...and check we got what we expected */
2101         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2102             hdr.payload_length != __cpu_to_le32 (0)) {
2103                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2104                         " but got type %d with %d payload from "LPX64" %s\n",
2105                         __le32_to_cpu (hdr.type),
2106                         __le32_to_cpu (hdr.payload_length), *nid,
2107                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2108                 return (-EPROTO);
2109         }
2110
2111         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2112                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2113                 return (-EPROTO);
2114         }
2115
2116         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2117                 *nid = __le64_to_cpu(hdr.src_nid);
2118         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2119                 CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
2120                         __le64_to_cpu (hdr.src_nid),
2121                         portals_nid2str(SOCKNAL, 
2122                                         __le64_to_cpu(hdr.src_nid),
2123                                         ipbuf),
2124                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2125                 return (-EPROTO);
2126         }
2127
2128         if (*type == SOCKNAL_CONN_NONE) {
2129                 /* I've accepted this connection; peer determines type */
2130                 *type = __le32_to_cpu(hdr.msg.hello.type);
2131                 switch (*type) {
2132                 case SOCKNAL_CONN_ANY:
2133                 case SOCKNAL_CONN_CONTROL:
2134                         break;
2135                 case SOCKNAL_CONN_BULK_IN:
2136                         *type = SOCKNAL_CONN_BULK_OUT;
2137                         break;
2138                 case SOCKNAL_CONN_BULK_OUT:
2139                         *type = SOCKNAL_CONN_BULK_IN;
2140                         break;
2141                 default:
2142                         CERROR ("Unexpected type %d from "LPX64" %s\n", 
2143                                 *type, *nid,
2144                                 portals_nid2str(SOCKNAL, *nid, ipbuf));
2145                         return (-EPROTO);
2146                 }
2147         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2148                 CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
2149                         *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
2150                         __le32_to_cpu(hdr.msg.hello.type));
2151                 return (-EPROTO);
2152         }
2153
2154         *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2155
2156         return (0);
2157 }
2158
2159 int
2160 ksocknal_setup_sock (struct socket *sock)
2161 {
2162         mm_segment_t    oldmm = get_fs ();
2163         int             rc;
2164         int             option;
2165         struct linger   linger;
2166
2167 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2168         sock->sk->sk_allocation = GFP_NOFS;
2169 #else
2170         sock->sk->allocation = GFP_NOFS;
2171 #endif
2172
2173         /* Ensure this socket aborts active sends immediately when we close
2174          * it. */
2175
2176         linger.l_onoff = 0;
2177         linger.l_linger = 0;
2178
2179         set_fs (KERNEL_DS);
2180         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2181                               (char *)&linger, sizeof (linger));
2182         set_fs (oldmm);
2183         if (rc != 0) {
2184                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2185                 return (rc);
2186         }
2187
2188         option = -1;
2189         set_fs (KERNEL_DS);
2190         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2191                                     (char *)&option, sizeof (option));
2192         set_fs (oldmm);
2193         if (rc != 0) {
2194                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2195                 return (rc);
2196         }
2197
2198 #if SOCKNAL_USE_KEEPALIVES
2199         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2200          * second until the timeout elapses. */
2201
2202         option = (ksocknal_data.ksnd_io_timeout * 3) / 4;
2203         set_fs (KERNEL_DS);
2204         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2205                                     (char *)&option, sizeof (option));
2206         set_fs (oldmm);
2207         if (rc != 0) {
2208                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2209                 return (rc);
2210         }
2211         
2212         option = 1;
2213         set_fs (KERNEL_DS);
2214         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2215                                     (char *)&option, sizeof (option));
2216         set_fs (oldmm);
2217         if (rc != 0) {
2218                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2219                 return (rc);
2220         }
2221         
2222         option = ksocknal_data.ksnd_io_timeout / 4;
2223         set_fs (KERNEL_DS);
2224         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2225                                     (char *)&option, sizeof (option));
2226         set_fs (oldmm);
2227         if (rc != 0) {
2228                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2229                 return (rc);
2230         }
2231
2232         option = 1;
2233         set_fs (KERNEL_DS);
2234         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2235                               (char *)&option, sizeof (option));
2236         set_fs (oldmm);
2237         if (rc != 0) {
2238                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2239                 return (rc);
2240         }
2241 #endif
2242         return (0);
2243 }
2244
2245 int
2246 ksocknal_connect_peer (ksock_route_t *route, int type)
2247 {
2248         struct sockaddr_in  peer_addr;
2249         mm_segment_t        oldmm = get_fs();
2250         struct timeval      tv;
2251         int                 fd;
2252         struct socket      *sock;
2253         int                 rc;
2254         char                ipbuf[PTL_NALFMT_SIZE];
2255
2256         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2257         if (rc != 0) {
2258                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2259                 return (rc);
2260         }
2261
2262         /* Ugh; have to map_fd for compatibility with sockets passed in
2263          * from userspace.  And we actually need the sock->file refcounting
2264          * that this gives you :) */
2265
2266         fd = sock_map_fd (sock);
2267         if (fd < 0) {
2268                 sock_release (sock);
2269                 CERROR ("sock_map_fd error %d\n", fd);
2270                 return (fd);
2271         }
2272
2273         /* NB the fd now owns the ref on sock->file */
2274         LASSERT (sock->file != NULL);
2275         LASSERT (file_count(sock->file) == 1);
2276
2277         /* Set the socket timeouts, so our connection attempt completes in
2278          * finite time */
2279         tv.tv_sec = ksocknal_data.ksnd_io_timeout;
2280         tv.tv_usec = 0;
2281
2282         set_fs (KERNEL_DS);
2283         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2284                               (char *)&tv, sizeof (tv));
2285         set_fs (oldmm);
2286         if (rc != 0) {
2287                 CERROR ("Can't set send timeout %d: %d\n", 
2288                         ksocknal_data.ksnd_io_timeout, rc);
2289                 goto out;
2290         }
2291         
2292         set_fs (KERNEL_DS);
2293         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2294                               (char *)&tv, sizeof (tv));
2295         set_fs (oldmm);
2296         if (rc != 0) {
2297                 CERROR ("Can't set receive timeout %d: %d\n",
2298                         ksocknal_data.ksnd_io_timeout, rc);
2299                 goto out;
2300         }
2301
2302         {
2303                 int  option = 1;
2304                 
2305                 set_fs (KERNEL_DS);
2306                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2307                                             (char *)&option, sizeof (option));
2308                 set_fs (oldmm);
2309                 if (rc != 0) {
2310                         CERROR ("Can't disable nagle: %d\n", rc);
2311                         goto out;
2312                 }
2313         }
2314         
2315         if (route->ksnr_buffer_size != 0) {
2316                 int option = route->ksnr_buffer_size;
2317                 
2318                 set_fs (KERNEL_DS);
2319                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2320                                       (char *)&option, sizeof (option));
2321                 set_fs (oldmm);
2322                 if (rc != 0) {
2323                         CERROR ("Can't set send buffer %d: %d\n",
2324                                 route->ksnr_buffer_size, rc);
2325                         goto out;
2326                 }
2327
2328                 set_fs (KERNEL_DS);
2329                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2330                                       (char *)&option, sizeof (option));
2331                 set_fs (oldmm);
2332                 if (rc != 0) {
2333                         CERROR ("Can't set receive buffer %d: %d\n",
2334                                 route->ksnr_buffer_size, rc);
2335                         goto out;
2336                 }
2337         }
2338         
2339         memset (&peer_addr, 0, sizeof (peer_addr));
2340         peer_addr.sin_family = AF_INET;
2341         peer_addr.sin_port = htons (route->ksnr_port);
2342         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2343         
2344         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2345                                  sizeof (peer_addr), sock->file->f_flags);
2346         if (rc != 0) {
2347                 CERROR ("Error %d connecting to "LPX64" %s\n", rc,
2348                         route->ksnr_peer->ksnp_nid,
2349                         portals_nid2str(SOCKNAL, 
2350                                         route->ksnr_peer->ksnp_nid, 
2351                                         ipbuf));
2352                 goto out;
2353         }
2354         
2355         rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2356         if (rc == 0) {
2357                 /* Take an extra ref on sock->file to compensate for the
2358                  * upcoming close which will lose fd's ref on it. */
2359                 get_file (sock->file);
2360         }
2361
2362  out:
2363         sys_close (fd);
2364         return (rc);
2365 }
2366
2367 void
2368 ksocknal_autoconnect (ksock_route_t *route)
2369 {
2370         LIST_HEAD        (zombies);
2371         ksock_tx_t       *tx;
2372         ksock_peer_t     *peer;
2373         unsigned long     flags;
2374         int               rc;
2375         int               type;
2376         
2377         for (;;) {
2378                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2379                         if ((route->ksnr_connecting & (1 << type)) != 0)
2380                                 break;
2381                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2382
2383                 rc = ksocknal_connect_peer (route, type);
2384
2385                 if (rc != 0)
2386                         break;
2387                 
2388                 /* successfully autoconnected: create_conn did the
2389                  * route/conn binding and scheduled any blocked packets */
2390
2391                 if (route->ksnr_connecting == 0) {
2392                         /* No more connections required */
2393                         return;
2394                 }
2395         }
2396
2397         /* Connection attempt failed */
2398
2399         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2400
2401         peer = route->ksnr_peer;
2402         route->ksnr_connecting = 0;
2403
2404         /* This is a retry rather than a new connection */
2405         LASSERT (route->ksnr_retry_interval != 0);
2406         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2407         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2408                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2409
2410         if (!list_empty (&peer->ksnp_tx_queue) &&
2411             ksocknal_find_connecting_route_locked (peer) == NULL) {
2412                 LASSERT (list_empty (&peer->ksnp_conns));
2413
2414                 /* None of the connections that the blocked packets are
2415                  * waiting for have been successful.  Complete them now... */
2416                 do {
2417                         tx = list_entry (peer->ksnp_tx_queue.next,
2418                                          ksock_tx_t, tx_list);
2419                         list_del (&tx->tx_list);
2420                         list_add_tail (&tx->tx_list, &zombies);
2421                 } while (!list_empty (&peer->ksnp_tx_queue));
2422         }
2423
2424         /* make this route least-favourite for re-selection */
2425         if (!route->ksnr_deleted) {
2426                 list_del(&route->ksnr_list);
2427                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2428         }
2429         
2430         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2431
2432         while (!list_empty (&zombies)) {
2433                 char ipbuf[PTL_NALFMT_SIZE];
2434                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2435                 
2436                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2437                         NTOH__u32 (tx->tx_hdr->type),
2438                         NTOH__u32 (tx->tx_hdr->payload_length),
2439                         NTOH__u64 (tx->tx_hdr->src_nid),
2440                         portals_nid2str(SOCKNAL,
2441                                         NTOH__u64(tx->tx_hdr->src_nid),
2442                                         ipbuf),
2443                         NTOH__u64 (tx->tx_hdr->dest_nid),
2444                         portals_nid2str(SOCKNAL,
2445                                         NTOH__u64(tx->tx_hdr->src_nid),
2446                                         ipbuf));
2447
2448                 list_del (&tx->tx_list);
2449                 /* complete now */
2450                 ksocknal_tx_done (tx, 0);
2451         }
2452 }
2453
2454 int
2455 ksocknal_autoconnectd (void *arg)
2456 {
2457         long               id = (long)arg;
2458         char               name[16];
2459         unsigned long      flags;
2460         ksock_route_t     *route;
2461         int                rc;
2462
2463         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2464         kportal_daemonize (name);
2465         kportal_blockallsigs ();
2466
2467         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2468
2469         while (!ksocknal_data.ksnd_shuttingdown) {
2470
2471                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2472                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2473                                             ksock_route_t, ksnr_connect_list);
2474                         
2475                         list_del (&route->ksnr_connect_list);
2476                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2477
2478                         ksocknal_autoconnect (route);
2479                         ksocknal_put_route (route);
2480
2481                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2482                         continue;
2483                 }
2484                 
2485                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2486
2487                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2488                                                ksocknal_data.ksnd_shuttingdown ||
2489                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2490
2491                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2492         }
2493
2494         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2495
2496         ksocknal_thread_fini ();
2497         return (0);
2498 }
2499
2500 ksock_conn_t *
2501 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2502 {
2503         /* We're called with a shared lock on ksnd_global_lock */
2504         ksock_conn_t      *conn;
2505         struct list_head  *ctmp;
2506         ksock_sched_t     *sched;
2507
2508         list_for_each (ctmp, &peer->ksnp_conns) {
2509                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2510                 sched = conn->ksnc_scheduler;
2511
2512                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2513                 LASSERT (!conn->ksnc_closing);
2514                 
2515                 if (conn->ksnc_rx_started &&
2516                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2517                         /* Timed out incomplete incoming message */
2518                         atomic_inc (&conn->ksnc_refcount);
2519                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2520                                 peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
2521                         return (conn);
2522                 }
2523                 
2524                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2525                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2526                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2527                         /* Timed out messages queued for sending, or
2528                          * messages buffered in the socket's send buffer */
2529                         atomic_inc (&conn->ksnc_refcount);
2530                         CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", 
2531                                 peer->ksnp_nid, 
2532                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2533                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2534                                 HIPQUAD(conn->ksnc_ipaddr));
2535                         return (conn);
2536                 }
2537         }
2538
2539         return (NULL);
2540 }
2541
2542 void
2543 ksocknal_check_peer_timeouts (int idx)
2544 {
2545         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2546         struct list_head *ptmp;
2547         ksock_peer_t     *peer;
2548         ksock_conn_t     *conn;
2549
2550  again:
2551         /* NB. We expect to have a look at all the peers and not find any
2552          * connections to time out, so we just use a shared lock while we
2553          * take a look... */
2554         read_lock (&ksocknal_data.ksnd_global_lock);
2555
2556         list_for_each (ptmp, peers) {
2557                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2558                 conn = ksocknal_find_timed_out_conn (peer);
2559                 
2560                 if (conn != NULL) {
2561                         read_unlock (&ksocknal_data.ksnd_global_lock);
2562
2563                         CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2564                                 peer->ksnp_nid,
2565                                 HIPQUAD(conn->ksnc_ipaddr),
2566                                 conn->ksnc_port);
2567                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2568                         
2569                         /* NB we won't find this one again, but we can't
2570                          * just proceed with the next peer, since we dropped
2571                          * ksnd_global_lock and it might be dead already! */
2572                         ksocknal_put_conn (conn);
2573                         goto again;
2574                 }
2575         }
2576
2577         read_unlock (&ksocknal_data.ksnd_global_lock);
2578 }
2579
2580 int
2581 ksocknal_reaper (void *arg)
2582 {
2583         wait_queue_t       wait;
2584         unsigned long      flags;
2585         ksock_conn_t      *conn;
2586         ksock_sched_t     *sched;
2587         struct list_head   enomem_conns;
2588         int                nenomem_conns;
2589         int                timeout;
2590         int                i;
2591         int                peer_index = 0;
2592         unsigned long      deadline = jiffies;
2593         
2594         kportal_daemonize ("ksocknal_reaper");
2595         kportal_blockallsigs ();
2596
2597         INIT_LIST_HEAD(&enomem_conns);
2598         init_waitqueue_entry (&wait, current);
2599
2600         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2601
2602         while (!ksocknal_data.ksnd_shuttingdown) {
2603
2604                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2605                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2606                                            ksock_conn_t, ksnc_list);
2607                         list_del (&conn->ksnc_list);
2608                         
2609                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2610
2611                         ksocknal_terminate_conn (conn);
2612                         ksocknal_put_conn (conn);
2613
2614                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2615                         continue;
2616                 }
2617
2618                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2619                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2620                                            ksock_conn_t, ksnc_list);
2621                         list_del (&conn->ksnc_list);
2622                         
2623                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2624
2625                         ksocknal_destroy_conn (conn);
2626
2627                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2628                         continue;
2629                 }
2630
2631                 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2632                         list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2633                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2634                 }
2635
2636                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2637
2638                 /* reschedule all the connections that stalled with ENOMEM... */
2639                 nenomem_conns = 0;
2640                 while (!list_empty (&enomem_conns)) {
2641                         conn = list_entry (enomem_conns.next,
2642                                            ksock_conn_t, ksnc_tx_list);
2643                         list_del (&conn->ksnc_tx_list);
2644
2645                         sched = conn->ksnc_scheduler;
2646
2647                         spin_lock_irqsave (&sched->kss_lock, flags);
2648
2649                         LASSERT (conn->ksnc_tx_scheduled);
2650                         conn->ksnc_tx_ready = 1;
2651                         list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2652                         wake_up (&sched->kss_waitq);
2653
2654                         spin_unlock_irqrestore (&sched->kss_lock, flags);
2655                         nenomem_conns++;
2656                 }
2657                 
2658                 /* careful with the jiffy wrap... */
2659                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2660                         const int n = 4;
2661                         const int p = 1;
2662                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2663                         
2664                         /* Time to check for timeouts on a few more peers: I do
2665                          * checks every 'p' seconds on a proportion of the peer
2666                          * table and I need to check every connection 'n' times
2667                          * within a timeout interval, to ensure I detect a
2668                          * timeout on any connection within (n+1)/n times the
2669                          * timeout interval. */
2670
2671                         if (ksocknal_data.ksnd_io_timeout > n * p)
2672                                 chunk = (chunk * n * p) / 
2673                                         ksocknal_data.ksnd_io_timeout;
2674                         if (chunk == 0)
2675                                 chunk = 1;
2676
2677                         for (i = 0; i < chunk; i++) {
2678                                 ksocknal_check_peer_timeouts (peer_index);
2679                                 peer_index = (peer_index + 1) % 
2680                                              ksocknal_data.ksnd_peer_hash_size;
2681                         }
2682
2683                         deadline += p * HZ;
2684                 }
2685
2686                 if (nenomem_conns != 0) {
2687                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2688                          * This also prevents me getting woken immediately
2689                          * if any go back on my enomem list. */
2690                         timeout = SOCKNAL_ENOMEM_RETRY;
2691                 }
2692                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2693
2694                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2695                 set_current_state (TASK_INTERRUPTIBLE);
2696
2697                 if (!ksocknal_data.ksnd_shuttingdown &&
2698                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2699                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2700                         schedule_timeout (timeout);
2701
2702                 set_current_state (TASK_RUNNING);
2703                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2704
2705                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2706         }
2707
2708         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2709
2710         ksocknal_thread_fini ();
2711         return (0);
2712 }
2713
2714 nal_cb_t ksocknal_lib = {
2715         nal_data:       &ksocknal_data,                /* NAL private data */
2716         cb_send:         ksocknal_send,
2717         cb_send_pages:   ksocknal_send_pages,
2718         cb_recv:         ksocknal_recv,
2719         cb_recv_pages:   ksocknal_recv_pages,
2720         cb_read:         ksocknal_read,
2721         cb_write:        ksocknal_write,
2722         cb_callback:     ksocknal_callback,
2723         cb_malloc:       ksocknal_malloc,
2724         cb_free:         ksocknal_free,
2725         cb_printf:       ksocknal_printf,
2726         cb_cli:          ksocknal_cli,
2727         cb_sti:          ksocknal_sti,
2728         cb_dist:         ksocknal_dist
2729 };