Whamcloud - gitweb
Land b_smallfix onto HEAD (20040414_1359)
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
29 #endif
30
31 /*
32  *  LIB functions follow
33  *
34  */
35 ptl_err_t
36 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
37               user_ptr src_addr, size_t len)
38 {
39         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
40                nal->ni.nid, (long)len, src_addr, dst_addr);
41
42         memcpy( dst_addr, src_addr, len );
43         return PTL_OK;
44 }
45
46 ptl_err_t
47 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
48                void *src_addr, size_t len)
49 {
50         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
51                nal->ni.nid, (long)len, src_addr, dst_addr);
52
53         memcpy( dst_addr, src_addr, len );
54         return PTL_OK;
55 }
56
57 void *
58 ksocknal_malloc(nal_cb_t *nal, size_t len)
59 {
60         void *buf;
61
62         PORTAL_ALLOC(buf, len);
63
64         if (buf != NULL)
65                 memset(buf, 0, len);
66
67         return (buf);
68 }
69
70 void
71 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
72 {
73         PORTAL_FREE(buf, len);
74 }
75
76 void
77 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
78 {
79         va_list ap;
80         char msg[256];
81
82         va_start (ap, fmt);
83         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
84         va_end (ap);
85
86         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
87
88         CDEBUG (D_NET, "%s", msg);
89 }
90
91 void
92 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
93 {
94         ksock_nal_data_t *data = nal->nal_data;
95
96         /* OK to ignore 'flags'; we're only ever serialise threads and
97          * never need to lock out interrupts */
98         spin_lock(&data->ksnd_nal_cb_lock);
99 }
100
101 void
102 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
103 {
104         ksock_nal_data_t *data;
105         data = nal->nal_data;
106
107         /* OK to ignore 'flags'; we're only ever serialise threads and
108          * never need to lock out interrupts */
109         spin_unlock(&data->ksnd_nal_cb_lock);
110 }
111
112 void
113 ksocknal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
114 {
115         /* holding ksnd_nal_cb_lock */
116
117         if (eq->event_callback != NULL)
118                 eq->event_callback(ev);
119         
120         if (waitqueue_active(&ksocknal_data.ksnd_yield_waitq))
121                 wake_up_all(&ksocknal_data.ksnd_yield_waitq);
122 }
123
124 int
125 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
126 {
127         /* I would guess that if ksocknal_get_peer (nid) == NULL,
128            and we're not routing, then 'nid' is very distant :) */
129         if ( nal->ni.nid == nid ) {
130                 *dist = 0;
131         } else {
132                 *dist = 1;
133         }
134
135         return 0;
136 }
137
138 void
139 ksocknal_free_ltx (ksock_ltx_t *ltx)
140 {
141         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
142         PORTAL_FREE(ltx, ltx->ltx_desc_size);
143 }
144
145 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
146 struct page *
147 ksocknal_kvaddr_to_page (unsigned long vaddr)
148 {
149         struct page *page;
150
151         if (vaddr >= VMALLOC_START &&
152             vaddr < VMALLOC_END)
153                 page = vmalloc_to_page ((void *)vaddr);
154 #if CONFIG_HIGHMEM
155         else if (vaddr >= PKMAP_BASE &&
156                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
157                 page = vmalloc_to_page ((void *)vaddr);
158                 /* in 2.4 ^ just walks the page tables */
159 #endif
160         else
161                 page = virt_to_page (vaddr);
162
163         if (page == NULL ||
164             !VALID_PAGE (page))
165                 return (NULL);
166
167         return (page);
168 }
169 #endif
170
171 int
172 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
173 {
174         struct socket *sock = conn->ksnc_sock;
175         struct iovec  *iov = tx->tx_iov;
176         int            fragsize = iov->iov_len;
177         unsigned long  vaddr = (unsigned long)iov->iov_base;
178         int            more = (tx->tx_niov > 1) || 
179                               (tx->tx_nkiov > 0) ||
180                               (!list_empty (&conn->ksnc_tx_queue));
181 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
182         int            offset = vaddr & (PAGE_SIZE - 1);
183         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
184         struct page   *page;
185 #endif
186         int            rc;
187
188         /* NB we can't trust socket ops to either consume our iovs
189          * or leave them alone, so we only send 1 frag at a time. */
190         LASSERT (fragsize <= tx->tx_resid);
191         LASSERT (tx->tx_niov > 0);
192         
193 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
194         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
195             (sock->sk->route_caps & NETIF_F_SG) &&
196             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
197             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
198                 
199                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
200                        (void *)vaddr, page, page_address(page), offset, zcsize);
201
202                 if (fragsize > zcsize) {
203                         more = 1;
204                         fragsize = zcsize;
205                 }
206
207                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
208                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
209                                        &tx->tx_zccd);
210         } else
211 #endif
212         {
213                 /* NB don't pass tx's iov; sendmsg may or may not update it */
214                 struct iovec fragiov = { .iov_base = (void *)vaddr,
215                                          .iov_len  = fragsize};
216                 struct msghdr msg = {
217                         .msg_name       = NULL,
218                         .msg_namelen    = 0,
219                         .msg_iov        = &fragiov,
220                         .msg_iovlen     = 1,
221                         .msg_control    = NULL,
222                         .msg_controllen = 0,
223                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
224                 };
225                 mm_segment_t oldmm = get_fs();
226
227                 set_fs (KERNEL_DS);
228                 rc = sock_sendmsg(sock, &msg, fragsize);
229                 set_fs (oldmm);
230         } 
231
232         if (rc > 0) {
233                 tx->tx_resid -= rc;
234
235                 if (rc < iov->iov_len) {
236                         /* didn't send whole iov entry... */
237                         iov->iov_base = (void *)(vaddr + rc);
238                         iov->iov_len -= rc;
239                 } else {
240                         tx->tx_iov++;
241                         tx->tx_niov--;
242                 }
243         }
244         
245         return (rc);
246 }
247
248 int
249 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
250 {
251         struct socket *sock = conn->ksnc_sock;
252         ptl_kiov_t    *kiov = tx->tx_kiov;
253         int            fragsize = kiov->kiov_len;
254         struct page   *page = kiov->kiov_page;
255         int            offset = kiov->kiov_offset;
256         int            more = (tx->tx_nkiov > 1) ||
257                               (!list_empty (&conn->ksnc_tx_queue));
258         int            rc;
259
260         /* NB we can't trust socket ops to either consume our iovs
261          * or leave them alone, so we only send 1 frag at a time. */
262         LASSERT (fragsize <= tx->tx_resid);
263         LASSERT (offset + fragsize <= PAGE_SIZE);
264         LASSERT (tx->tx_niov == 0);
265         LASSERT (tx->tx_nkiov > 0);
266
267 #if SOCKNAL_ZC
268         if (fragsize >= ksocknal_tunables.ksnd_zc_min_frag &&
269             (sock->sk->route_caps & NETIF_F_SG) &&
270             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
271
272                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
273                                page, offset, fragsize);
274
275                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
276                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
277                                        &tx->tx_zccd);
278         } else
279 #endif
280         {
281                 char *addr = ((char *)kmap (page)) + offset;
282                 struct iovec fragiov = {.iov_base = addr,
283                                         .iov_len  = fragsize};
284                 struct msghdr msg = {
285                         .msg_name       = NULL,
286                         .msg_namelen    = 0,
287                         .msg_iov        = &fragiov,
288                         .msg_iovlen     = 1,
289                         .msg_control    = NULL,
290                         .msg_controllen = 0,
291                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
292                 };
293                 mm_segment_t  oldmm = get_fs();
294                 
295                 set_fs (KERNEL_DS);
296                 rc = sock_sendmsg(sock, &msg, fragsize);
297                 set_fs (oldmm);
298
299                 kunmap (page);
300         }
301
302         if (rc > 0) {
303                 tx->tx_resid -= rc;
304  
305                 if (rc < fragsize) {
306                         kiov->kiov_offset = offset + rc;
307                         kiov->kiov_len    = fragsize - rc;
308                 } else {
309                         tx->tx_kiov++;
310                         tx->tx_nkiov--;
311                 }
312         }
313
314         return (rc);
315 }
316
317 int
318 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
319 {
320         int      rc;
321         
322         if (ksocknal_data.ksnd_stall_tx != 0) {
323                 set_current_state (TASK_UNINTERRUPTIBLE);
324                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
325         }
326
327         LASSERT (tx->tx_resid != 0);
328
329         rc = ksocknal_getconnsock (conn);
330         if (rc != 0) {
331                 LASSERT (conn->ksnc_closing);
332                 return (-ESHUTDOWN);
333         }
334
335         do {
336                 if (ksocknal_data.ksnd_enomem_tx > 0) {
337                         /* testing... */
338                         ksocknal_data.ksnd_enomem_tx--;
339                         rc = -EAGAIN;
340                 } else if (tx->tx_niov != 0) {
341                         rc = ksocknal_send_iov (conn, tx);
342                 } else {
343                         rc = ksocknal_send_kiov (conn, tx);
344                 }
345
346                 if (rc <= 0) {
347                         /* Didn't write anything.
348                          *
349                          * NB: rc == 0 and rc == -EAGAIN both mean try
350                          * again later (linux stack returns -EAGAIN for
351                          * this, but Adaptech TOE returns 0).
352                          *
353                          * Also, sends never fail with -ENOMEM, just
354                          * -EAGAIN, but with the added bonus that we can't
355                          * expect write_space() to call us back to tell us
356                          * when to try sending again.  We use the
357                          * SOCK_NOSPACE flag to diagnose...  */
358
359                         LASSERT(rc != -ENOMEM);
360
361                         if (rc == 0 || rc == -EAGAIN) {
362                                 if (test_bit(SOCK_NOSPACE, 
363                                              &conn->ksnc_sock->flags)) {
364                                         rc = -EAGAIN;
365                                 } else {
366                                         static int counter;
367                          
368                                         counter++;
369                                         if ((counter & (-counter)) == counter)
370                                                 CWARN("%d ENOMEM tx %p\n", 
371                                                       counter, conn);
372                                         rc = -ENOMEM;
373                                 }
374                         }
375                         break;
376                 }
377
378                 rc = 0;
379
380                 /* Consider the connection alive since we managed to chuck
381                  * more data into it.  Really, we'd like to consider it
382                  * alive only when the peer ACKs something, but
383                  * write_space() only gets called back while SOCK_NOSPACE
384                  * is set.  Instead, we presume peer death has occurred if
385                  * the socket doesn't drain within a timout */
386                 conn->ksnc_tx_deadline = jiffies + 
387                                          ksocknal_tunables.ksnd_io_timeout * HZ;
388                 conn->ksnc_peer->ksnp_last_alive = jiffies;
389
390         } while (tx->tx_resid != 0);
391
392         ksocknal_putconnsock (conn);
393         return (rc);
394 }
395
396 void
397 ksocknal_eager_ack (ksock_conn_t *conn)
398 {
399         int            opt = 1;
400         mm_segment_t   oldmm = get_fs();
401         struct socket *sock = conn->ksnc_sock;
402         
403         /* Remind the socket to ACK eagerly.  If I don't, the socket might
404          * think I'm about to send something it could piggy-back the ACK
405          * on, introducing delay in completing zero-copy sends in my
406          * peer. */
407
408         set_fs(KERNEL_DS);
409         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
410                                (char *)&opt, sizeof (opt));
411         set_fs(oldmm);
412 }
413
414 int
415 ksocknal_recv_iov (ksock_conn_t *conn)
416 {
417         struct iovec *iov = conn->ksnc_rx_iov;
418         int           fragsize  = iov->iov_len;
419         unsigned long vaddr = (unsigned long)iov->iov_base;
420         struct iovec  fragiov = { .iov_base = (void *)vaddr,
421                                   .iov_len  = fragsize};
422         struct msghdr msg = {
423                 .msg_name       = NULL,
424                 .msg_namelen    = 0,
425                 .msg_iov        = &fragiov,
426                 .msg_iovlen     = 1,
427                 .msg_control    = NULL,
428                 .msg_controllen = 0,
429                 .msg_flags      = 0
430         };
431         mm_segment_t oldmm = get_fs();
432         int          rc;
433
434         /* NB we can't trust socket ops to either consume our iovs
435          * or leave them alone, so we only receive 1 frag at a time. */
436         LASSERT (conn->ksnc_rx_niov > 0);
437         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
438
439         set_fs (KERNEL_DS);
440         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
441         /* NB this is just a boolean............................^ */
442         set_fs (oldmm);
443
444         if (rc <= 0)
445                 return (rc);
446
447         /* received something... */
448         conn->ksnc_peer->ksnp_last_alive = jiffies;
449         conn->ksnc_rx_deadline = jiffies + 
450                                  ksocknal_tunables.ksnd_io_timeout * HZ;
451         mb();                           /* order with setting rx_started */
452         conn->ksnc_rx_started = 1;
453
454         conn->ksnc_rx_nob_wanted -= rc;
455         conn->ksnc_rx_nob_left -= rc;
456                 
457         if (rc < fragsize) {
458                 iov->iov_base = (void *)(vaddr + rc);
459                 iov->iov_len = fragsize - rc;
460                 return (-EAGAIN);
461         }
462
463         conn->ksnc_rx_iov++;
464         conn->ksnc_rx_niov--;
465         return (1);
466 }
467
468 int
469 ksocknal_recv_kiov (ksock_conn_t *conn)
470 {
471         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
472         struct page  *page = kiov->kiov_page;
473         int           offset = kiov->kiov_offset;
474         int           fragsize = kiov->kiov_len;
475         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
476         struct iovec  fragiov = { .iov_base = (void *)vaddr,
477                                   .iov_len  = fragsize};
478         struct msghdr msg = {
479                 .msg_name       = NULL,
480                 .msg_namelen    = 0,
481                 .msg_iov        = &fragiov,
482                 .msg_iovlen     = 1,
483                 .msg_control    = NULL,
484                 .msg_controllen = 0,
485                 .msg_flags      = 0
486         };
487         mm_segment_t oldmm = get_fs();
488         int          rc;
489
490         /* NB we can't trust socket ops to either consume our iovs
491          * or leave them alone, so we only receive 1 frag at a time. */
492         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
493         LASSERT (conn->ksnc_rx_nkiov > 0);
494         LASSERT (offset + fragsize <= PAGE_SIZE);
495
496         set_fs (KERNEL_DS);
497         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
498         /* NB this is just a boolean............................^ */
499         set_fs (oldmm);
500
501         kunmap (page);
502         
503         if (rc <= 0)
504                 return (rc);
505         
506         /* received something... */
507         conn->ksnc_peer->ksnp_last_alive = jiffies;
508         conn->ksnc_rx_deadline = jiffies + 
509                                  ksocknal_tunables.ksnd_io_timeout * HZ;
510         mb();                           /* order with setting rx_started */
511         conn->ksnc_rx_started = 1;
512
513         conn->ksnc_rx_nob_wanted -= rc;
514         conn->ksnc_rx_nob_left -= rc;
515                 
516         if (rc < fragsize) {
517                 kiov->kiov_offset = offset + rc;
518                 kiov->kiov_len = fragsize - rc;
519                 return (-EAGAIN);
520         }
521
522         conn->ksnc_rx_kiov++;
523         conn->ksnc_rx_nkiov--;
524         return (1);
525 }
526
527 int
528 ksocknal_receive (ksock_conn_t *conn) 
529 {
530         /* Return 1 on success, 0 on EOF, < 0 on error.
531          * Caller checks ksnc_rx_nob_wanted to determine
532          * progress/completion. */
533         int     rc;
534         ENTRY;
535         
536         if (ksocknal_data.ksnd_stall_rx != 0) {
537                 set_current_state (TASK_UNINTERRUPTIBLE);
538                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
539         }
540
541         rc = ksocknal_getconnsock (conn);
542         if (rc != 0) {
543                 LASSERT (conn->ksnc_closing);
544                 return (-ESHUTDOWN);
545         }
546
547         for (;;) {
548                 if (conn->ksnc_rx_niov != 0)
549                         rc = ksocknal_recv_iov (conn);
550                 else
551                         rc = ksocknal_recv_kiov (conn);
552
553                 if (rc <= 0) {
554                         /* error/EOF or partial receive */
555                         if (rc == -EAGAIN) {
556                                 rc = 1;
557                         } else if (rc == 0 && conn->ksnc_rx_started) {
558                                 /* EOF in the middle of a message */
559                                 rc = -EPROTO;
560                         }
561                         break;
562                 }
563
564                 /* Completed a fragment */
565
566                 if (conn->ksnc_rx_nob_wanted == 0) {
567                         /* Completed a message segment (header or payload) */
568                         if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
569                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
570                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
571                                 /* Remind the socket to ack eagerly... */
572                                 ksocknal_eager_ack(conn);
573                         }
574                         rc = 1;
575                         break;
576                 }
577         }
578
579         ksocknal_putconnsock (conn);
580         RETURN (rc);
581 }
582
583 #if SOCKNAL_ZC
584 void
585 ksocknal_zc_callback (zccd_t *zcd)
586 {
587         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
588         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
589         unsigned long  flags;
590         ENTRY;
591
592         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
593
594         spin_lock_irqsave (&sched->kss_lock, flags);
595
596         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
597         wake_up (&sched->kss_waitq);
598
599         spin_unlock_irqrestore (&sched->kss_lock, flags);
600         EXIT;
601 }
602 #endif
603
604 void
605 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
606 {
607         ksock_ltx_t   *ltx;
608         ENTRY;
609
610         if (tx->tx_conn != NULL) {
611                 /* This tx got queued on a conn; do the accounting... */
612                 atomic_sub (tx->tx_nob, &tx->tx_conn->ksnc_tx_nob);
613 #if SOCKNAL_ZC
614                 /* zero copy completion isn't always from
615                  * process_transmit() so it needs to keep a ref on
616                  * tx_conn... */
617                 if (asynch)
618                         ksocknal_put_conn (tx->tx_conn);
619 #else
620                 LASSERT (!asynch);
621 #endif
622         }
623
624         if (tx->tx_isfwd) {             /* was a forwarded packet? */
625                 kpr_fwd_done (&ksocknal_data.ksnd_router,
626                               KSOCK_TX_2_KPR_FWD_DESC (tx), 
627                               (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
628                 EXIT;
629                 return;
630         }
631
632         /* local send */
633         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
634
635         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
636                       (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
637
638         ksocknal_free_ltx (ltx);
639         EXIT;
640 }
641
642 void
643 ksocknal_tx_launched (ksock_tx_t *tx) 
644 {
645 #if SOCKNAL_ZC
646         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
647                 ksock_conn_t  *conn = tx->tx_conn;
648                 
649                 /* zccd skbufs are still in-flight.  First take a ref on
650                  * conn, so it hangs about for ksocknal_tx_done... */
651                 atomic_inc (&conn->ksnc_refcount);
652
653                 /* ...then drop the initial ref on zccd, so the zero copy
654                  * callback can occur */
655                 zccd_put (&tx->tx_zccd);
656                 return;
657         }
658 #endif
659         /* Any zero-copy-ness (if any) has completed; I can complete the
660          * transmit now, avoiding an extra schedule */
661         ksocknal_tx_done (tx, 0);
662 }
663
664 int
665 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
666 {
667         unsigned long  flags;
668         int            rc;
669        
670         rc = ksocknal_transmit (conn, tx);
671
672         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
673
674         if (tx->tx_resid == 0) {
675                 /* Sent everything OK */
676                 LASSERT (rc == 0);
677
678                 ksocknal_tx_launched (tx);
679                 return (0);
680         }
681
682         if (rc == -EAGAIN)
683                 return (rc);
684
685         if (rc == -ENOMEM) {
686                 /* Queue on ksnd_enomem_conns for retry after a timeout */
687                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
688
689                 /* enomem list takes over scheduler's ref... */
690                 LASSERT (conn->ksnc_tx_scheduled);
691                 list_add_tail(&conn->ksnc_tx_list,
692                               &ksocknal_data.ksnd_enomem_conns);
693                 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
694                                    ksocknal_data.ksnd_reaper_waketime))
695                         wake_up (&ksocknal_data.ksnd_reaper_waitq);
696                 
697                 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
698                 return (rc);
699         }
700
701         /* Actual error */
702         LASSERT (rc < 0);
703
704         if (!conn->ksnc_closing)
705                 CERROR("[%p] Error %d on write to "LPX64
706                        " ip %d.%d.%d.%d:%d\n", conn, rc,
707                        conn->ksnc_peer->ksnp_nid,
708                        HIPQUAD(conn->ksnc_ipaddr),
709                        conn->ksnc_port);
710
711         ksocknal_close_conn_and_siblings (conn, rc);
712         ksocknal_tx_launched (tx);
713
714         return (rc);
715 }
716
717 void
718 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
719 {
720         unsigned long     flags;
721
722         /* called holding write lock on ksnd_global_lock */
723
724         LASSERT (!route->ksnr_deleted);
725         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
726         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
727         LASSERT (!route->ksnr_connecting);
728         
729         if (ksocknal_tunables.ksnd_typed_conns)
730                 route->ksnr_connecting = 
731                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
732         else
733                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
734
735         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
736         
737         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
738         
739         list_add_tail (&route->ksnr_connect_list,
740                        &ksocknal_data.ksnd_autoconnectd_routes);
741         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
742         
743         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
744 }
745
746 ksock_peer_t *
747 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
748 {
749         char          ipbuf[PTL_NALFMT_SIZE];
750         ptl_nid_t     target_nid;
751         int           rc;
752         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
753
754         if (peer != NULL)
755                 return (peer);
756
757         if (tx->tx_isfwd) {
758                 CERROR ("Can't send packet to "LPX64
759                        " %s: routed target is not a peer\n",
760                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
761                 return (NULL);
762         }
763
764         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
765                          &target_nid);
766         if (rc != 0) {
767                 CERROR ("Can't route to "LPX64" %s: router error %d\n",
768                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
769                 return (NULL);
770         }
771
772         peer = ksocknal_find_peer_locked (target_nid);
773         if (peer != NULL)
774                 return (peer);
775
776         CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
777                 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
778         return (NULL);
779 }
780
781 ksock_conn_t *
782 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
783 {
784         struct list_head *tmp;
785         ksock_conn_t     *typed = NULL;
786         int               tnob  = 0;
787         ksock_conn_t     *fallback = NULL;
788         int               fnob     = 0;
789
790         /* Find the conn with the shortest tx queue */
791         list_for_each (tmp, &peer->ksnp_conns) {
792                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
793                 int           nob = atomic_read(&c->ksnc_tx_nob) +
794                                         c->ksnc_sock->sk->sk_wmem_queued;
795
796                 LASSERT (!c->ksnc_closing);
797
798                 if (fallback == NULL || nob < fnob) {
799                         fallback = c;
800                         fnob     = nob;
801                 }
802
803                 if (!ksocknal_tunables.ksnd_typed_conns)
804                         continue;
805
806                 switch (c->ksnc_type) {
807                 default:
808                         LBUG();
809                 case SOCKNAL_CONN_ANY:
810                         break;
811                 case SOCKNAL_CONN_BULK_IN:
812                         continue;
813                 case SOCKNAL_CONN_BULK_OUT:
814                         if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
815                                 continue;
816                         break;
817                 case SOCKNAL_CONN_CONTROL:
818                         if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
819                                 continue;
820                         break;
821                 }
822
823                 if (typed == NULL || nob < tnob) {
824                         typed = c;
825                         tnob  = nob;
826                 }
827         }
828
829         /* prefer the typed selection */
830         return ((typed != NULL) ? typed : fallback);
831 }
832
833 void
834 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
835 {
836         unsigned long  flags;
837         ksock_sched_t *sched = conn->ksnc_scheduler;
838
839         /* called holding global lock (read or irq-write) and caller may
840          * not have dropped this lock between finding conn and calling me,
841          * so we don't need the {get,put}connsock dance to deref
842          * ksnc_sock... */
843         LASSERT(!conn->ksnc_closing);
844         LASSERT(tx->tx_resid == tx->tx_nob);
845         
846         CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
847                 conn->ksnc_peer->ksnp_nid,
848                 HIPQUAD(conn->ksnc_ipaddr),
849                 conn->ksnc_port);
850
851         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
852         tx->tx_conn = conn;
853
854 #if SOCKNAL_ZC
855         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
856         /* NB this sets 1 ref on zccd, so the callback can only occur after
857          * I've released this ref. */
858 #endif
859         spin_lock_irqsave (&sched->kss_lock, flags);
860
861         conn->ksnc_tx_deadline = jiffies + 
862                                  ksocknal_tunables.ksnd_io_timeout * HZ;
863         mb();                                   /* order with list_add_tail */
864
865         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
866                 
867         if (conn->ksnc_tx_ready &&      /* able to send */
868             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
869                 /* +1 ref for scheduler */
870                 atomic_inc (&conn->ksnc_refcount);
871                 list_add_tail (&conn->ksnc_tx_list, 
872                                &sched->kss_tx_conns);
873                 conn->ksnc_tx_scheduled = 1;
874                 wake_up (&sched->kss_waitq);
875         }
876
877         spin_unlock_irqrestore (&sched->kss_lock, flags);
878 }
879
880 ksock_route_t *
881 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
882 {
883         struct list_head  *tmp;
884         ksock_route_t     *route;
885         ksock_route_t     *candidate = NULL;
886         int                found = 0;
887         int                bits;
888         
889         list_for_each (tmp, &peer->ksnp_routes) {
890                 route = list_entry (tmp, ksock_route_t, ksnr_list);
891                 bits  = route->ksnr_connected;
892                 
893                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES ||
894                     (bits & (1 << SOCKNAL_CONN_ANY)) != 0 ||
895                     route->ksnr_connecting != 0) {
896                         /* All typed connections have been established, or
897                          * an untyped connection has been established, or
898                          * connections are currently being established */
899                         found = 1;
900                         continue;
901                 }
902
903                 /* too soon to retry this guy? */
904                 if (!time_after_eq (jiffies, route->ksnr_timeout))
905                         continue;
906                 
907                 /* always do eager routes */
908                 if (route->ksnr_eager)
909                         return (route);
910
911                 if (candidate == NULL) {
912                         /* If we don't find any other route that is fully
913                          * connected or connecting, the first connectable
914                          * route is returned.  If it fails to connect, it
915                          * will get placed at the end of the list */
916                         candidate = route;
917                 }
918         }
919  
920         return (found ? NULL : candidate);
921 }
922
923 ksock_route_t *
924 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
925 {
926         struct list_head  *tmp;
927         ksock_route_t     *route;
928
929         list_for_each (tmp, &peer->ksnp_routes) {
930                 route = list_entry (tmp, ksock_route_t, ksnr_list);
931                 
932                 if (route->ksnr_connecting != 0)
933                         return (route);
934         }
935         
936         return (NULL);
937 }
938
939 int
940 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
941 {
942         unsigned long     flags;
943         ksock_peer_t     *peer;
944         ksock_conn_t     *conn;
945         ksock_route_t    *route;
946         rwlock_t         *g_lock;
947         
948         /* Ensure the frags we've been given EXACTLY match the number of
949          * bytes we want to send.  Many TCP/IP stacks disregard any total
950          * size parameters passed to them and just look at the frags. 
951          *
952          * We always expect at least 1 mapped fragment containing the
953          * complete portals header. */
954         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
955                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
956         LASSERT (tx->tx_niov >= 1);
957         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
958
959         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
960                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
961                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
962
963         tx->tx_conn = NULL;                     /* only set when assigned a conn */
964         tx->tx_resid = tx->tx_nob;
965         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
966
967         g_lock = &ksocknal_data.ksnd_global_lock;
968         read_lock (g_lock);
969         
970         peer = ksocknal_find_target_peer_locked (tx, nid);
971         if (peer == NULL) {
972                 read_unlock (g_lock);
973                 return (-EHOSTUNREACH);
974         }
975
976         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
977                 conn = ksocknal_find_conn_locked (tx, peer);
978                 if (conn != NULL) {
979                         /* I've got no autoconnect routes that need to be
980                          * connecting and I do have an actual connection... */
981                         ksocknal_queue_tx_locked (tx, conn);
982                         read_unlock (g_lock);
983                         return (0);
984                 }
985         }
986         
987         /* Making one or more connections; I'll need a write lock... */
988
989         atomic_inc (&peer->ksnp_refcount);      /* +1 ref for me while I unlock */
990         read_unlock (g_lock);
991         write_lock_irqsave (g_lock, flags);
992         
993         if (peer->ksnp_closing) {               /* peer deleted as I blocked! */
994                 write_unlock_irqrestore (g_lock, flags);
995                 ksocknal_put_peer (peer);
996                 return (-EHOSTUNREACH);
997         }
998         ksocknal_put_peer (peer);               /* drop ref I got above */
999
1000         for (;;) {
1001                 /* launch any/all autoconnections that need it */
1002                 route = ksocknal_find_connectable_route_locked (peer);
1003                 if (route == NULL)
1004                         break;
1005
1006                 ksocknal_launch_autoconnect_locked (route);
1007         }
1008
1009         conn = ksocknal_find_conn_locked (tx, peer);
1010         if (conn != NULL) {
1011                 /* Connection exists; queue message on it */
1012                 ksocknal_queue_tx_locked (tx, conn);
1013                 write_unlock_irqrestore (g_lock, flags);
1014                 return (0);
1015         }
1016
1017         route = ksocknal_find_connecting_route_locked (peer);
1018         if (route != NULL) {
1019                 /* At least 1 connection is being established; queue the
1020                  * message... */
1021                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1022                 write_unlock_irqrestore (g_lock, flags);
1023                 return (0);
1024         }
1025         
1026         write_unlock_irqrestore (g_lock, flags);
1027         return (-EHOSTUNREACH);
1028 }
1029
1030 ptl_err_t
1031 ksocknal_sendmsg(nal_cb_t     *nal, 
1032                  void         *private, 
1033                  lib_msg_t    *cookie,
1034                  ptl_hdr_t    *hdr, 
1035                  int           type, 
1036                  ptl_nid_t     nid, 
1037                  ptl_pid_t     pid,
1038                  unsigned int  payload_niov, 
1039                  struct iovec *payload_iov, 
1040                  ptl_kiov_t   *payload_kiov,
1041                  size_t        payload_offset,
1042                  size_t        payload_nob)
1043 {
1044         ksock_ltx_t  *ltx;
1045         int           desc_size;
1046         int           rc;
1047
1048         /* NB 'private' is different depending on what we're sending.
1049          * Just ignore it... */
1050
1051         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1052                " pid %d\n", payload_nob, payload_niov, nid , pid);
1053
1054         LASSERT (payload_nob == 0 || payload_niov > 0);
1055         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1056
1057         /* It must be OK to kmap() if required */
1058         LASSERT (payload_kiov == NULL || !in_interrupt ());
1059         /* payload is either all vaddrs or all pages */
1060         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1061         
1062         if (payload_iov != NULL)
1063                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1064         else
1065                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1066         
1067         if (in_interrupt() ||
1068             type == PTL_MSG_ACK ||
1069             type == PTL_MSG_REPLY) {
1070                 /* Can't block if in interrupt or responding to an incoming
1071                  * message */
1072                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1073         } else {
1074                 PORTAL_ALLOC(ltx, desc_size);
1075         }
1076         
1077         if (ltx == NULL) {
1078                 CERROR("Can't allocate tx desc type %d size %d %s\n",
1079                        type, desc_size, in_interrupt() ? "(intr)" : "");
1080                 return (PTL_NO_SPACE);
1081         }
1082
1083         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1084
1085         ltx->ltx_desc_size = desc_size;
1086         
1087         /* We always have 1 mapped frag for the header */
1088         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1089         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1090         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1091         ltx->ltx_hdr = *hdr;
1092         
1093         ltx->ltx_private = private;
1094         ltx->ltx_cookie = cookie;
1095         
1096         ltx->ltx_tx.tx_isfwd = 0;
1097         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1098
1099         if (payload_iov != NULL) {
1100                 /* payload is all mapped */
1101                 ltx->ltx_tx.tx_kiov  = NULL;
1102                 ltx->ltx_tx.tx_nkiov = 0;
1103
1104                 ltx->ltx_tx.tx_niov = 
1105                         1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
1106                                             payload_niov, payload_iov,
1107                                             payload_offset, payload_nob);
1108         } else {
1109                 /* payload is all pages */
1110                 ltx->ltx_tx.tx_niov = 1;
1111
1112                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1113                 ltx->ltx_tx.tx_nkiov =
1114                         lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1115                                          payload_niov, payload_kiov,
1116                                          payload_offset, payload_nob);
1117         }
1118
1119         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1120         if (rc == 0)
1121                 return (PTL_OK);
1122         
1123         ksocknal_free_ltx(ltx);
1124         return (PTL_FAIL);
1125 }
1126
1127 ptl_err_t
1128 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
1129                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1130                unsigned int payload_niov, struct iovec *payload_iov,
1131                size_t payload_offset, size_t payload_len)
1132 {
1133         return (ksocknal_sendmsg(nal, private, cookie,
1134                                  hdr, type, nid, pid,
1135                                  payload_niov, payload_iov, NULL,
1136                                  payload_offset, payload_len));
1137 }
1138
1139 ptl_err_t
1140 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
1141                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1142                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1143                      size_t payload_offset, size_t payload_len)
1144 {
1145         return (ksocknal_sendmsg(nal, private, cookie,
1146                                  hdr, type, nid, pid,
1147                                  payload_niov, NULL, payload_kiov,
1148                                  payload_offset, payload_len));
1149 }
1150
1151 void
1152 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1153 {
1154         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1155         ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1156         int           rc;
1157         
1158         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1159                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1160
1161         /* I'm the gateway; must be the last hop */
1162         if (nid == ksocknal_lib.ni.nid)
1163                 nid = fwd->kprfd_target_nid;
1164
1165         /* setup iov for hdr */
1166         ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1167         ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1168
1169         ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
1170         ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1171         ftx->ftx_tx.tx_niov  = 1;
1172         ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
1173         ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1174         ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
1175
1176         rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1177         if (rc != 0)
1178                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1179 }
1180
1181 int
1182 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1183 {
1184         long    pid = kernel_thread (fn, arg, 0);
1185
1186         if (pid < 0)
1187                 return ((int)pid);
1188
1189         atomic_inc (&ksocknal_data.ksnd_nthreads);
1190         return (0);
1191 }
1192
1193 void
1194 ksocknal_thread_fini (void)
1195 {
1196         atomic_dec (&ksocknal_data.ksnd_nthreads);
1197 }
1198
1199 void
1200 ksocknal_fmb_callback (void *arg, int error)
1201 {
1202         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1203         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1204         ptl_hdr_t         *hdr = (ptl_hdr_t *)page_address(fmb->fmb_kiov[0].kiov_page);
1205         ksock_conn_t      *conn = NULL;
1206         ksock_sched_t     *sched;
1207         unsigned long      flags;
1208         char               ipbuf[PTL_NALFMT_SIZE];
1209         char               ipbuf2[PTL_NALFMT_SIZE];
1210
1211         if (error != 0)
1212                 CERROR("Failed to route packet from "
1213                        LPX64" %s to "LPX64" %s: %d\n",
1214                        NTOH__u64(hdr->src_nid),
1215                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->src_nid), ipbuf),
1216                        NTOH__u64(hdr->dest_nid),
1217                        portals_nid2str(SOCKNAL, NTOH__u64(hdr->dest_nid), ipbuf2),
1218                        error);
1219         else
1220                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1221                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
1222
1223         /* drop peer ref taken on init */
1224         ksocknal_put_peer (fmb->fmb_peer);
1225
1226         spin_lock_irqsave (&fmp->fmp_lock, flags);
1227
1228         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1229         fmp->fmp_nactive_fmbs--;
1230
1231         if (!list_empty (&fmp->fmp_blocked_conns)) {
1232                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1233                                    ksock_conn_t, ksnc_rx_list);
1234                 list_del (&conn->ksnc_rx_list);
1235         }
1236
1237         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1238
1239         if (conn == NULL)
1240                 return;
1241
1242         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1243         LASSERT (conn->ksnc_rx_scheduled);
1244         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1245
1246         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1247
1248         sched = conn->ksnc_scheduler;
1249
1250         spin_lock_irqsave (&sched->kss_lock, flags);
1251
1252         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1253         wake_up (&sched->kss_waitq);
1254
1255         spin_unlock_irqrestore (&sched->kss_lock, flags);
1256 }
1257
1258 ksock_fmb_t *
1259 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1260 {
1261         int               payload_nob = conn->ksnc_rx_nob_left;
1262         unsigned long     flags;
1263         ksock_fmb_pool_t *pool;
1264         ksock_fmb_t      *fmb;
1265
1266         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1267         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1268
1269         if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1270                 pool = &ksocknal_data.ksnd_small_fmp;
1271         else
1272                 pool = &ksocknal_data.ksnd_large_fmp;
1273
1274         spin_lock_irqsave (&pool->fmp_lock, flags);
1275
1276         if (!list_empty (&pool->fmp_idle_fmbs)) {
1277                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1278                                  ksock_fmb_t, fmb_list);
1279                 list_del (&fmb->fmb_list);
1280                 pool->fmp_nactive_fmbs++;
1281                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1282
1283                 return (fmb);
1284         }
1285
1286         /* deschedule until fmb free */
1287
1288         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1289
1290         list_add_tail (&conn->ksnc_rx_list,
1291                        &pool->fmp_blocked_conns);
1292
1293         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1294         return (NULL);
1295 }
1296
1297 int
1298 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1299 {
1300         int       payload_nob = conn->ksnc_rx_nob_left;
1301         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1302         int       niov = 0;
1303         int       nob = payload_nob;
1304
1305         LASSERT (conn->ksnc_rx_scheduled);
1306         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1307         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1308         LASSERT (payload_nob >= 0);
1309         LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1310         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1311         LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1312
1313         /* Take a ref on the conn's peer to prevent module unload before
1314          * forwarding completes. */
1315         fmb->fmb_peer = conn->ksnc_peer;
1316         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1317
1318         /* Copy the header we just read into the forwarding buffer.  If
1319          * there's payload, start reading reading it into the buffer,
1320          * otherwise the forwarding buffer can be kicked off
1321          * immediately. */
1322         fmb->fmb_hdr = conn->ksnc_hdr;
1323
1324         while (nob > 0) {
1325                 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1326                 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1327                 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1328                 nob -= PAGE_SIZE;
1329                 niov++;
1330         }
1331
1332         kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1333                      payload_nob, niov, fmb->fmb_kiov,
1334                      ksocknal_fmb_callback, fmb);
1335
1336         if (payload_nob == 0) {         /* got complete packet already */
1337                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1338                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid);
1339
1340                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1341
1342                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1343                 return (1);
1344         }
1345
1346         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1347         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1348         
1349         /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1350          * buffer */
1351         LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1352
1353         conn->ksnc_rx_niov = 0;
1354         conn->ksnc_rx_nkiov = niov;
1355         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1356         memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1357         
1358         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1359                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1360         return (0);
1361 }
1362
1363 void
1364 ksocknal_fwd_parse (ksock_conn_t *conn)
1365 {
1366         ksock_peer_t *peer;
1367         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1368         ptl_nid_t     src_nid = NTOH__u64 (conn->ksnc_hdr.src_nid);
1369         int           body_len = NTOH__u32 (conn->ksnc_hdr.payload_length);
1370         char str[PTL_NALFMT_SIZE];
1371         char str2[PTL_NALFMT_SIZE];
1372
1373         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1374                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1375
1376         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1377         LASSERT (conn->ksnc_rx_scheduled);
1378
1379         if (body_len < 0) {                 /* length corrupt (overflow) */
1380                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1381                        "packet size %d illegal\n",
1382                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1383                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1384                        body_len);
1385
1386                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1387                 return;
1388         }
1389
1390         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1391                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1392                        " (%s): not forwarding\n",
1393                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1394                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1395                 /* on to new packet (skip this one's body) */
1396                 ksocknal_new_packet (conn, body_len);
1397                 return;
1398         }
1399
1400         if (body_len > PTL_MTU) {      /* too big to forward */
1401                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1402                         "(%s): packet size %d too big\n",
1403                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1404                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1405                         body_len);
1406                 /* on to new packet (skip this one's body) */
1407                 ksocknal_new_packet (conn, body_len);
1408                 return;
1409         }
1410
1411         /* should have gone direct */
1412         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1413         if (peer != NULL) {
1414                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1415                         "(%s): target is a peer\n",
1416                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1417                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1418                 ksocknal_put_peer (peer);  /* drop ref from get above */
1419
1420                 /* on to next packet (skip this one's body) */
1421                 ksocknal_new_packet (conn, body_len);
1422                 return;
1423         }
1424
1425         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1426         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1427         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1428 }
1429
1430 int
1431 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1432 {
1433         static char ksocknal_slop_buffer[4096];
1434
1435         int   nob;
1436         int   niov;
1437         int   skipped;
1438
1439         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1440                 conn->ksnc_rx_started = 0;
1441                 mb ();                          /* racing with timeout thread */
1442                 
1443                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1444                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1445                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1446
1447                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1448                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1449                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1450                 conn->ksnc_rx_niov = 1;
1451
1452                 conn->ksnc_rx_kiov = NULL;
1453                 conn->ksnc_rx_nkiov = 0;
1454                 return (1);
1455         }
1456
1457         /* Set up to skip as much a possible now.  If there's more left
1458          * (ran out of iov entries) we'll get called again */
1459
1460         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1461         conn->ksnc_rx_nob_left = nob_to_skip;
1462         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1463         skipped = 0;
1464         niov = 0;
1465
1466         do {
1467                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1468
1469                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1470                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1471                 niov++;
1472                 skipped += nob;
1473                 nob_to_skip -=nob;
1474
1475         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1476                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1477
1478         conn->ksnc_rx_niov = niov;
1479         conn->ksnc_rx_kiov = NULL;
1480         conn->ksnc_rx_nkiov = 0;
1481         conn->ksnc_rx_nob_wanted = skipped;
1482         return (0);
1483 }
1484
1485 int
1486 ksocknal_process_receive (ksock_conn_t *conn)
1487 {
1488         ksock_fmb_t  *fmb;
1489         int           rc;
1490         
1491         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1492
1493         /* doesn't need a forwarding buffer */
1494         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1495                 goto try_read;
1496
1497  get_fmb:
1498         fmb = ksocknal_get_idle_fmb (conn);
1499         if (fmb == NULL) {
1500                 /* conn descheduled waiting for idle fmb */
1501                 return (0);
1502         }
1503
1504         if (ksocknal_init_fmb (conn, fmb)) {
1505                 /* packet forwarded */
1506                 return (0);
1507         }
1508
1509  try_read:
1510         /* NB: sched lock NOT held */
1511         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1512                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1513                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1514                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1515
1516         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1517
1518         rc = ksocknal_receive(conn);
1519
1520         if (rc <= 0) {
1521                 LASSERT (rc != -EAGAIN);
1522
1523                 if (rc == 0)
1524                         CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1525                                conn, conn->ksnc_peer->ksnp_nid,
1526                                HIPQUAD(conn->ksnc_ipaddr),
1527                                conn->ksnc_port);
1528                 else if (!conn->ksnc_closing)
1529                         CERROR ("[%p] Error %d on read from "LPX64
1530                                 " ip %d.%d.%d.%d:%d\n",
1531                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1532                                 HIPQUAD(conn->ksnc_ipaddr),
1533                                 conn->ksnc_port);
1534
1535                 ksocknal_close_conn_and_siblings (conn, rc);
1536                 return (rc == 0 ? -ESHUTDOWN : rc);
1537         }
1538
1539         if (conn->ksnc_rx_nob_wanted != 0) {
1540                 /* short read */
1541                 return (-EAGAIN);
1542         }
1543         
1544         switch (conn->ksnc_rx_state) {
1545         case SOCKNAL_RX_HEADER:
1546                 if (conn->ksnc_hdr.type != HTON__u32(PTL_MSG_HELLO) &&
1547                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1548                         /* This packet isn't for me */
1549                         ksocknal_fwd_parse (conn);
1550                         switch (conn->ksnc_rx_state) {
1551                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1552                                 return (0);     /* => come back later */
1553                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1554                                 goto try_read;  /* => go read it */
1555                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1556                                 goto get_fmb;   /* => go get a fwd msg buffer */
1557                         default:
1558                                 LBUG ();
1559                         }
1560                         /* Not Reached */
1561                 }
1562
1563                 /* sets wanted_len, iovs etc */
1564                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1565
1566                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1567                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1568                         goto try_read;          /* go read the payload */
1569                 }
1570                 /* Fall through (completed packet for me) */
1571
1572         case SOCKNAL_RX_BODY:
1573                 /* payload all received */
1574                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1575                 /* Fall through */
1576
1577         case SOCKNAL_RX_SLOP:
1578                 /* starting new packet? */
1579                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1580                         return (0);     /* come back later */
1581                 goto try_read;          /* try to finish reading slop now */
1582
1583         case SOCKNAL_RX_BODY_FWD:
1584                 /* payload all received */
1585                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1586                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1587                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1588                         conn->ksnc_rx_nob_left);
1589
1590                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1591                  * conn->ksnc_cookie */
1592                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1593                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1594
1595                 /* no slop in forwarded packets */
1596                 LASSERT (conn->ksnc_rx_nob_left == 0);
1597
1598                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1599                 return (0);                     /* (later) */
1600
1601         default:
1602                 break;
1603         }
1604
1605         /* Not Reached */
1606         LBUG ();
1607         return (-EINVAL);                       /* keep gcc happy */
1608 }
1609
1610 ptl_err_t
1611 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1612                unsigned int niov, struct iovec *iov, 
1613                size_t offset, size_t mlen, size_t rlen)
1614 {
1615         ksock_conn_t *conn = (ksock_conn_t *)private;
1616
1617         LASSERT (mlen <= rlen);
1618         LASSERT (niov <= PTL_MD_MAX_IOV);
1619         
1620         conn->ksnc_cookie = msg;
1621         conn->ksnc_rx_nob_wanted = mlen;
1622         conn->ksnc_rx_nob_left   = rlen;
1623
1624         conn->ksnc_rx_nkiov = 0;
1625         conn->ksnc_rx_kiov = NULL;
1626         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1627         conn->ksnc_rx_niov =
1628                 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1629                                 niov, iov, offset, mlen);
1630
1631         LASSERT (mlen == 
1632                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1633                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1634
1635         return (PTL_OK);
1636 }
1637
1638 ptl_err_t
1639 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1640                      unsigned int niov, ptl_kiov_t *kiov, 
1641                      size_t offset, size_t mlen, size_t rlen)
1642 {
1643         ksock_conn_t *conn = (ksock_conn_t *)private;
1644
1645         LASSERT (mlen <= rlen);
1646         LASSERT (niov <= PTL_MD_MAX_IOV);
1647         
1648         conn->ksnc_cookie = msg;
1649         conn->ksnc_rx_nob_wanted = mlen;
1650         conn->ksnc_rx_nob_left   = rlen;
1651
1652         conn->ksnc_rx_niov = 0;
1653         conn->ksnc_rx_iov  = NULL;
1654         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1655         conn->ksnc_rx_nkiov = 
1656                 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1657                                  niov, kiov, offset, mlen);
1658
1659         LASSERT (mlen == 
1660                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1661                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1662
1663         return (PTL_OK);
1664 }
1665
1666 int ksocknal_scheduler (void *arg)
1667 {
1668         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1669         ksock_conn_t      *conn;
1670         ksock_tx_t        *tx;
1671         unsigned long      flags;
1672         int                rc;
1673         int                nloops = 0;
1674         int                id = sched - ksocknal_data.ksnd_schedulers;
1675         char               name[16];
1676
1677         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1678         kportal_daemonize (name);
1679         kportal_blockallsigs ();
1680
1681 #if (CONFIG_SMP && CPU_AFFINITY)
1682         if ((cpu_online_map & (1 << id)) != 0) {
1683 #if 1
1684                 current->cpus_allowed = (1 << id);
1685 #else
1686                 set_cpus_allowed (current, 1<<id);
1687 #endif
1688         } else {
1689                 CERROR ("Can't set CPU affinity for %s\n", name);
1690         }
1691 #endif /* CONFIG_SMP && CPU_AFFINITY */
1692         
1693         spin_lock_irqsave (&sched->kss_lock, flags);
1694
1695         while (!ksocknal_data.ksnd_shuttingdown) {
1696                 int did_something = 0;
1697
1698                 /* Ensure I progress everything semi-fairly */
1699
1700                 if (!list_empty (&sched->kss_rx_conns)) {
1701                         conn = list_entry(sched->kss_rx_conns.next,
1702                                           ksock_conn_t, ksnc_rx_list);
1703                         list_del(&conn->ksnc_rx_list);
1704
1705                         LASSERT(conn->ksnc_rx_scheduled);
1706                         LASSERT(conn->ksnc_rx_ready);
1707
1708                         /* clear rx_ready in case receive isn't complete.
1709                          * Do it BEFORE we call process_recv, since
1710                          * data_ready can set it any time after we release
1711                          * kss_lock. */
1712                         conn->ksnc_rx_ready = 0;
1713                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1714                         
1715                         rc = ksocknal_process_receive(conn);
1716                         
1717                         spin_lock_irqsave(&sched->kss_lock, flags);
1718
1719                         /* I'm the only one that can clear this flag */
1720                         LASSERT(conn->ksnc_rx_scheduled);
1721
1722                         /* Did process_receive get everything it wanted? */
1723                         if (rc == 0)
1724                                 conn->ksnc_rx_ready = 1;
1725                         
1726                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1727                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1728                                 /* Conn blocked for a forwarding buffer.
1729                                  * It will get queued for my attention when
1730                                  * one becomes available (and it might just
1731                                  * already have been!).  Meanwhile my ref
1732                                  * on it stays put. */
1733                         } else if (conn->ksnc_rx_ready) {
1734                                 /* reschedule for rx */
1735                                 list_add_tail (&conn->ksnc_rx_list,
1736                                                &sched->kss_rx_conns);
1737                         } else {
1738                                 conn->ksnc_rx_scheduled = 0;
1739                                 /* drop my ref */
1740                                 ksocknal_put_conn(conn);
1741                         }
1742
1743                         did_something = 1;
1744                 }
1745
1746                 if (!list_empty (&sched->kss_tx_conns)) {
1747                         conn = list_entry(sched->kss_tx_conns.next,
1748                                           ksock_conn_t, ksnc_tx_list);
1749                         list_del (&conn->ksnc_tx_list);
1750                         
1751                         LASSERT(conn->ksnc_tx_scheduled);
1752                         LASSERT(conn->ksnc_tx_ready);
1753                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1754                         
1755                         tx = list_entry(conn->ksnc_tx_queue.next,
1756                                         ksock_tx_t, tx_list);
1757                         /* dequeue now so empty list => more to send */
1758                         list_del(&tx->tx_list);
1759                         
1760                         /* Clear tx_ready in case send isn't complete.  Do
1761                          * it BEFORE we call process_transmit, since
1762                          * write_space can set it any time after we release
1763                          * kss_lock. */
1764                         conn->ksnc_tx_ready = 0;
1765                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1766
1767                         rc = ksocknal_process_transmit(conn, tx);
1768
1769                         spin_lock_irqsave (&sched->kss_lock, flags);
1770
1771                         if (rc == -ENOMEM || rc == -EAGAIN) {
1772                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1773                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1774                         } else {
1775                                 /* Complete send; assume space for more */
1776                                 conn->ksnc_tx_ready = 1;
1777                         }
1778
1779                         if (rc == -ENOMEM) {
1780                                 /* Do nothing; after a short timeout, this
1781                                  * conn will be reposted on kss_tx_conns. */
1782                         } else if (conn->ksnc_tx_ready &&
1783                                    !list_empty (&conn->ksnc_tx_queue)) {
1784                                 /* reschedule for tx */
1785                                 list_add_tail (&conn->ksnc_tx_list, 
1786                                                &sched->kss_tx_conns);
1787                         } else {
1788                                 conn->ksnc_tx_scheduled = 0;
1789                                 /* drop my ref */
1790                                 ksocknal_put_conn (conn);
1791                         }
1792                                 
1793                         did_something = 1;
1794                 }
1795 #if SOCKNAL_ZC
1796                 if (!list_empty (&sched->kss_zctxdone_list)) {
1797                         ksock_tx_t *tx =
1798                                 list_entry(sched->kss_zctxdone_list.next,
1799                                            ksock_tx_t, tx_list);
1800                         did_something = 1;
1801
1802                         list_del (&tx->tx_list);
1803                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1804
1805                         ksocknal_tx_done (tx, 1);
1806
1807                         spin_lock_irqsave (&sched->kss_lock, flags);
1808                 }
1809 #endif
1810                 if (!did_something ||           /* nothing to do */
1811                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1812                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1813
1814                         nloops = 0;
1815
1816                         if (!did_something) {   /* wait for something to do */
1817 #if SOCKNAL_ZC
1818                                 rc = wait_event_interruptible (sched->kss_waitq,
1819                                                                ksocknal_data.ksnd_shuttingdown ||
1820                                                                !list_empty(&sched->kss_rx_conns) ||
1821                                                                !list_empty(&sched->kss_tx_conns) ||
1822                                                                !list_empty(&sched->kss_zctxdone_list));
1823 #else
1824                                 rc = wait_event_interruptible (sched->kss_waitq,
1825                                                                ksocknal_data.ksnd_shuttingdown ||
1826                                                                !list_empty(&sched->kss_rx_conns) ||
1827                                                                !list_empty(&sched->kss_tx_conns));
1828 #endif
1829                                 LASSERT (rc == 0);
1830                         } else
1831                                our_cond_resched();
1832
1833                         spin_lock_irqsave (&sched->kss_lock, flags);
1834                 }
1835         }
1836
1837         spin_unlock_irqrestore (&sched->kss_lock, flags);
1838         ksocknal_thread_fini ();
1839         return (0);
1840 }
1841
1842 void
1843 ksocknal_data_ready (struct sock *sk, int n)
1844 {
1845         unsigned long  flags;
1846         ksock_conn_t  *conn;
1847         ksock_sched_t *sched;
1848         ENTRY;
1849
1850         /* interleave correctly with closing sockets... */
1851         read_lock (&ksocknal_data.ksnd_global_lock);
1852
1853         conn = sk->sk_user_data;
1854         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1855                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1856                 sk->sk_data_ready (sk, n);
1857         } else {
1858                 sched = conn->ksnc_scheduler;
1859
1860                 spin_lock_irqsave (&sched->kss_lock, flags);
1861
1862                 conn->ksnc_rx_ready = 1;
1863
1864                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1865                         list_add_tail(&conn->ksnc_rx_list,
1866                                       &sched->kss_rx_conns);
1867                         conn->ksnc_rx_scheduled = 1;
1868                         /* extra ref for scheduler */
1869                         atomic_inc (&conn->ksnc_refcount);
1870
1871                         wake_up (&sched->kss_waitq);
1872                 }
1873
1874                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1875         }
1876
1877         read_unlock (&ksocknal_data.ksnd_global_lock);
1878
1879         EXIT;
1880 }
1881
1882 void
1883 ksocknal_write_space (struct sock *sk)
1884 {
1885         unsigned long  flags;
1886         ksock_conn_t  *conn;
1887         ksock_sched_t *sched;
1888
1889         /* interleave correctly with closing sockets... */
1890         read_lock (&ksocknal_data.ksnd_global_lock);
1891
1892         conn = sk->sk_user_data;
1893
1894         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1895                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1896                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1897                                       " ready" : " blocked"),
1898                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1899                                       " scheduled" : " idle"),
1900                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1901                                       " empty" : " queued"));
1902
1903         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1904                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1905                 sk->sk_write_space (sk);
1906
1907                 read_unlock (&ksocknal_data.ksnd_global_lock);
1908                 return;
1909         }
1910
1911         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1912                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1913
1914                 sched = conn->ksnc_scheduler;
1915
1916                 spin_lock_irqsave (&sched->kss_lock, flags);
1917
1918                 conn->ksnc_tx_ready = 1;
1919
1920                 if (!conn->ksnc_tx_scheduled && // not being progressed
1921                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1922                         list_add_tail (&conn->ksnc_tx_list,
1923                                        &sched->kss_tx_conns);
1924                         conn->ksnc_tx_scheduled = 1;
1925                         /* extra ref for scheduler */
1926                         atomic_inc (&conn->ksnc_refcount);
1927
1928                         wake_up (&sched->kss_waitq);
1929                 }
1930
1931                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1932         }
1933
1934         read_unlock (&ksocknal_data.ksnd_global_lock);
1935 }
1936
1937 int
1938 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1939 {
1940         int           rc;
1941         mm_segment_t  oldmm = get_fs();
1942
1943         while (nob > 0) {
1944                 struct iovec  iov = {
1945                         .iov_base = buffer,
1946                         .iov_len  = nob
1947                 };
1948                 struct msghdr msg = {
1949                         .msg_name       = NULL,
1950                         .msg_namelen    = 0,
1951                         .msg_iov        = &iov,
1952                         .msg_iovlen     = 1,
1953                         .msg_control    = NULL,
1954                         .msg_controllen = 0,
1955                         .msg_flags      = 0
1956                 };
1957
1958                 set_fs (KERNEL_DS);
1959                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1960                 set_fs (oldmm);
1961                 
1962                 if (rc < 0)
1963                         return (rc);
1964
1965                 if (rc == 0) {
1966                         CERROR ("Unexpected zero rc\n");
1967                         return (-ECONNABORTED);
1968                 }
1969
1970                 buffer = ((char *)buffer) + rc;
1971                 nob -= rc;
1972         }
1973         
1974         return (0);
1975 }
1976
1977 int
1978 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
1979 {
1980         int           rc;
1981         mm_segment_t  oldmm = get_fs();
1982         
1983         while (nob > 0) {
1984                 struct iovec  iov = {
1985                         .iov_base = buffer,
1986                         .iov_len  = nob
1987                 };
1988                 struct msghdr msg = {
1989                         .msg_name       = NULL,
1990                         .msg_namelen    = 0,
1991                         .msg_iov        = &iov,
1992                         .msg_iovlen     = 1,
1993                         .msg_control    = NULL,
1994                         .msg_controllen = 0,
1995                         .msg_flags      = 0
1996                 };
1997
1998                 set_fs (KERNEL_DS);
1999                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2000                 set_fs (oldmm);
2001                 
2002                 if (rc < 0)
2003                         return (rc);
2004
2005                 if (rc == 0)
2006                         return (-ECONNABORTED);
2007
2008                 buffer = ((char *)buffer) + rc;
2009                 nob -= rc;
2010         }
2011         
2012         return (0);
2013 }
2014
2015 int
2016 ksocknal_hello (struct socket *sock, ptl_nid_t *nid, int *type,
2017                 __u64 *incarnation)
2018 {
2019         int                 rc;
2020         ptl_hdr_t           hdr;
2021         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2022         char                ipbuf[PTL_NALFMT_SIZE];
2023         char                ipbuf2[PTL_NALFMT_SIZE];
2024
2025         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2026
2027         memset (&hdr, 0, sizeof (hdr));
2028         hmv->magic         = __cpu_to_le32 (PORTALS_PROTO_MAGIC);
2029         hmv->version_major = __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
2030         hmv->version_minor = __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
2031
2032         hdr.src_nid = __cpu_to_le64 (ksocknal_lib.ni.nid);
2033         hdr.type    = __cpu_to_le32 (PTL_MSG_HELLO);
2034
2035         hdr.msg.hello.type = __cpu_to_le32 (*type);
2036         hdr.msg.hello.incarnation =
2037                 __cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2038
2039         /* Assume sufficient socket buffering for this message */
2040         rc = ksocknal_sock_write (sock, &hdr, sizeof (hdr));
2041         if (rc != 0) {
2042                 CERROR ("Error %d sending HELLO to "LPX64" %s\n",
2043                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2044                 return (rc);
2045         }
2046
2047         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2048         if (rc != 0) {
2049                 CERROR ("Error %d reading HELLO from "LPX64" %s\n",
2050                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2051                 return (rc);
2052         }
2053
2054         if (hmv->magic != __le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2055                 CERROR ("Bad magic %#08x (%#08x expected) from "LPX64" %s\n",
2056                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC, *nid,
2057                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2058                 return (-EPROTO);
2059         }
2060
2061         if (hmv->version_major != __cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2062             hmv->version_minor != __cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2063                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2064                         " from "LPX64" %s\n",
2065                         __le16_to_cpu (hmv->version_major),
2066                         __le16_to_cpu (hmv->version_minor),
2067                         PORTALS_PROTO_VERSION_MAJOR,
2068                         PORTALS_PROTO_VERSION_MINOR,
2069                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2070                 return (-EPROTO);
2071         }
2072
2073 #if (PORTALS_PROTO_VERSION_MAJOR != 0)
2074 # error "This code only understands protocol version 0.x"
2075 #endif
2076         /* version 0 sends magic/version as the dest_nid of a 'hello' header,
2077          * so read the rest of it in now... */
2078
2079         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2080         if (rc != 0) {
2081                 CERROR ("Error %d reading rest of HELLO hdr from "LPX64" %s\n",
2082                         rc, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf));
2083                 return (rc);
2084         }
2085
2086         /* ...and check we got what we expected */
2087         if (hdr.type != __cpu_to_le32 (PTL_MSG_HELLO) ||
2088             hdr.payload_length != __cpu_to_le32 (0)) {
2089                 CERROR ("Expecting a HELLO hdr with 0 payload,"
2090                         " but got type %d with %d payload from "LPX64" %s\n",
2091                         __le32_to_cpu (hdr.type),
2092                         __le32_to_cpu (hdr.payload_length), *nid,
2093                         portals_nid2str(SOCKNAL, *nid, ipbuf));
2094                 return (-EPROTO);
2095         }
2096
2097         if (__le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2098                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY\n");
2099                 return (-EPROTO);
2100         }
2101
2102         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2103                 *nid = __le64_to_cpu(hdr.src_nid);
2104         } else if (*nid != __le64_to_cpu (hdr.src_nid)) {
2105                 CERROR ("Connected to nid "LPX64" %s, but expecting "LPX64" %s\n",
2106                         __le64_to_cpu (hdr.src_nid),
2107                         portals_nid2str(SOCKNAL,
2108                                         __le64_to_cpu(hdr.src_nid),
2109                                         ipbuf),
2110                         *nid, portals_nid2str(SOCKNAL, *nid, ipbuf2));
2111                 return (-EPROTO);
2112         }
2113
2114         if (*type == SOCKNAL_CONN_NONE) {
2115                 /* I've accepted this connection; peer determines type */
2116                 *type = __le32_to_cpu(hdr.msg.hello.type);
2117                 switch (*type) {
2118                 case SOCKNAL_CONN_ANY:
2119                 case SOCKNAL_CONN_CONTROL:
2120                         break;
2121                 case SOCKNAL_CONN_BULK_IN:
2122                         *type = SOCKNAL_CONN_BULK_OUT;
2123                         break;
2124                 case SOCKNAL_CONN_BULK_OUT:
2125                         *type = SOCKNAL_CONN_BULK_IN;
2126                         break;
2127                 default:
2128                         CERROR ("Unexpected type %d from "LPX64" %s\n",
2129                                 *type, *nid,
2130                                 portals_nid2str(SOCKNAL, *nid, ipbuf));
2131                         return (-EPROTO);
2132                 }
2133         } else if (__le32_to_cpu(hdr.msg.hello.type) != SOCKNAL_CONN_NONE) {
2134                 CERROR ("Mismatched types: me %d "LPX64" %s %d\n",
2135                         *type, *nid, portals_nid2str(SOCKNAL, *nid, ipbuf),
2136                         __le32_to_cpu(hdr.msg.hello.type));
2137                 return (-EPROTO);
2138         }
2139
2140         *incarnation = __le64_to_cpu(hdr.msg.hello.incarnation);
2141
2142         return (0);
2143 }
2144
2145 int
2146 ksocknal_setup_sock (struct socket *sock)
2147 {
2148         mm_segment_t    oldmm = get_fs ();
2149         int             rc;
2150         int             option;
2151         struct linger   linger;
2152
2153 #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0))
2154         sock->sk->sk_allocation = GFP_NOFS;
2155 #else
2156         sock->sk->allocation = GFP_NOFS;
2157 #endif
2158
2159         /* Ensure this socket aborts active sends immediately when we close
2160          * it. */
2161
2162         linger.l_onoff = 0;
2163         linger.l_linger = 0;
2164
2165         set_fs (KERNEL_DS);
2166         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2167                               (char *)&linger, sizeof (linger));
2168         set_fs (oldmm);
2169         if (rc != 0) {
2170                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2171                 return (rc);
2172         }
2173
2174         option = -1;
2175         set_fs (KERNEL_DS);
2176         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2177                                     (char *)&option, sizeof (option));
2178         set_fs (oldmm);
2179         if (rc != 0) {
2180                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2181                 return (rc);
2182         }
2183
2184 #if SOCKNAL_USE_KEEPALIVES
2185         /* Keepalives: If 3/4 of the timeout elapses, start probing every
2186          * second until the timeout elapses. */
2187
2188         option = (ksocknal_tunables.ksnd_io_timeout * 3) / 4;
2189         set_fs (KERNEL_DS);
2190         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2191                                     (char *)&option, sizeof (option));
2192         set_fs (oldmm);
2193         if (rc != 0) {
2194                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2195                 return (rc);
2196         }
2197         
2198         option = 1;
2199         set_fs (KERNEL_DS);
2200         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2201                                     (char *)&option, sizeof (option));
2202         set_fs (oldmm);
2203         if (rc != 0) {
2204                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2205                 return (rc);
2206         }
2207         
2208         option = ksocknal_tunables.ksnd_io_timeout / 4;
2209         set_fs (KERNEL_DS);
2210         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2211                                     (char *)&option, sizeof (option));
2212         set_fs (oldmm);
2213         if (rc != 0) {
2214                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2215                 return (rc);
2216         }
2217
2218         option = 1;
2219         set_fs (KERNEL_DS);
2220         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2221                               (char *)&option, sizeof (option));
2222         set_fs (oldmm);
2223         if (rc != 0) {
2224                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2225                 return (rc);
2226         }
2227 #endif
2228         return (0);
2229 }
2230
2231 int
2232 ksocknal_connect_peer (ksock_route_t *route, int type)
2233 {
2234         struct sockaddr_in  peer_addr;
2235         mm_segment_t        oldmm = get_fs();
2236         struct timeval      tv;
2237         int                 fd;
2238         struct socket      *sock;
2239         int                 rc;
2240         char                ipbuf[PTL_NALFMT_SIZE];
2241
2242         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2243         if (rc != 0) {
2244                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2245                 return (rc);
2246         }
2247
2248         /* Ugh; have to map_fd for compatibility with sockets passed in
2249          * from userspace.  And we actually need the sock->file refcounting
2250          * that this gives you :) */
2251
2252         fd = sock_map_fd (sock);
2253         if (fd < 0) {
2254                 sock_release (sock);
2255                 CERROR ("sock_map_fd error %d\n", fd);
2256                 return (fd);
2257         }
2258
2259         /* NB the fd now owns the ref on sock->file */
2260         LASSERT (sock->file != NULL);
2261         LASSERT (file_count(sock->file) == 1);
2262
2263         /* Set the socket timeouts, so our connection attempt completes in
2264          * finite time */
2265         tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2266         tv.tv_usec = 0;
2267
2268         set_fs (KERNEL_DS);
2269         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2270                               (char *)&tv, sizeof (tv));
2271         set_fs (oldmm);
2272         if (rc != 0) {
2273                 CERROR ("Can't set send timeout %d: %d\n", 
2274                         ksocknal_tunables.ksnd_io_timeout, rc);
2275                 goto out;
2276         }
2277         
2278         set_fs (KERNEL_DS);
2279         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2280                               (char *)&tv, sizeof (tv));
2281         set_fs (oldmm);
2282         if (rc != 0) {
2283                 CERROR ("Can't set receive timeout %d: %d\n",
2284                         ksocknal_tunables.ksnd_io_timeout, rc);
2285                 goto out;
2286         }
2287
2288         {
2289                 int  option = 1;
2290                 
2291                 set_fs (KERNEL_DS);
2292                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2293                                             (char *)&option, sizeof (option));
2294                 set_fs (oldmm);
2295                 if (rc != 0) {
2296                         CERROR ("Can't disable nagle: %d\n", rc);
2297                         goto out;
2298                 }
2299         }
2300         
2301         if (route->ksnr_buffer_size != 0) {
2302                 int option = route->ksnr_buffer_size;
2303                 
2304                 set_fs (KERNEL_DS);
2305                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2306                                       (char *)&option, sizeof (option));
2307                 set_fs (oldmm);
2308                 if (rc != 0) {
2309                         CERROR ("Can't set send buffer %d: %d\n",
2310                                 route->ksnr_buffer_size, rc);
2311                         goto out;
2312                 }
2313
2314                 set_fs (KERNEL_DS);
2315                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2316                                       (char *)&option, sizeof (option));
2317                 set_fs (oldmm);
2318                 if (rc != 0) {
2319                         CERROR ("Can't set receive buffer %d: %d\n",
2320                                 route->ksnr_buffer_size, rc);
2321                         goto out;
2322                 }
2323         }
2324         
2325         memset (&peer_addr, 0, sizeof (peer_addr));
2326         peer_addr.sin_family = AF_INET;
2327         peer_addr.sin_port = htons (route->ksnr_port);
2328         peer_addr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2329         
2330         rc = sock->ops->connect (sock, (struct sockaddr *)&peer_addr, 
2331                                  sizeof (peer_addr), sock->file->f_flags);
2332         if (rc != 0) {
2333                 CERROR ("Error %d connecting to "LPX64" %s\n", rc,
2334                         route->ksnr_peer->ksnp_nid,
2335                         portals_nid2str(SOCKNAL,
2336                                         route->ksnr_peer->ksnp_nid,
2337                                         ipbuf));
2338                 goto out;
2339         }
2340         
2341         rc = ksocknal_create_conn (route, sock, route->ksnr_irq_affinity, type);
2342         if (rc == 0) {
2343                 /* Take an extra ref on sock->file to compensate for the
2344                  * upcoming close which will lose fd's ref on it. */
2345                 get_file (sock->file);
2346         }
2347
2348  out:
2349         sys_close (fd);
2350         return (rc);
2351 }
2352
2353 void
2354 ksocknal_autoconnect (ksock_route_t *route)
2355 {
2356         LIST_HEAD        (zombies);
2357         ksock_tx_t       *tx;
2358         ksock_peer_t     *peer;
2359         unsigned long     flags;
2360         int               rc;
2361         int               type;
2362         
2363         for (;;) {
2364                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2365                         if ((route->ksnr_connecting & (1 << type)) != 0)
2366                                 break;
2367                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2368
2369                 rc = ksocknal_connect_peer (route, type);
2370
2371                 if (rc != 0)
2372                         break;
2373                 
2374                 /* successfully autoconnected: create_conn did the
2375                  * route/conn binding and scheduled any blocked packets */
2376
2377                 if (route->ksnr_connecting == 0) {
2378                         /* No more connections required */
2379                         return;
2380                 }
2381         }
2382
2383         /* Connection attempt failed */
2384
2385         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2386
2387         peer = route->ksnr_peer;
2388         route->ksnr_connecting = 0;
2389
2390         /* This is a retry rather than a new connection */
2391         LASSERT (route->ksnr_retry_interval != 0);
2392         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2393         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2394                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2395
2396         if (!list_empty (&peer->ksnp_tx_queue) &&
2397             ksocknal_find_connecting_route_locked (peer) == NULL) {
2398                 LASSERT (list_empty (&peer->ksnp_conns));
2399
2400                 /* None of the connections that the blocked packets are
2401                  * waiting for have been successful.  Complete them now... */
2402                 do {
2403                         tx = list_entry (peer->ksnp_tx_queue.next,
2404                                          ksock_tx_t, tx_list);
2405                         list_del (&tx->tx_list);
2406                         list_add_tail (&tx->tx_list, &zombies);
2407                 } while (!list_empty (&peer->ksnp_tx_queue));
2408         }
2409
2410         /* make this route least-favourite for re-selection */
2411         if (!route->ksnr_deleted) {
2412                 list_del(&route->ksnr_list);
2413                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2414         }
2415         
2416         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2417
2418         while (!list_empty (&zombies)) {
2419                 char ipbuf[PTL_NALFMT_SIZE];
2420                 char ipbuf2[PTL_NALFMT_SIZE];
2421                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2422
2423                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2424                         NTOH__u32 (tx->tx_hdr->type),
2425                         NTOH__u32 (tx->tx_hdr->payload_length),
2426                         NTOH__u64 (tx->tx_hdr->src_nid),
2427                         portals_nid2str(SOCKNAL,
2428                                         NTOH__u64(tx->tx_hdr->src_nid),
2429                                         ipbuf),
2430                         NTOH__u64 (tx->tx_hdr->dest_nid),
2431                         portals_nid2str(SOCKNAL,
2432                                         NTOH__u64(tx->tx_hdr->src_nid),
2433                                         ipbuf2));
2434
2435                 list_del (&tx->tx_list);
2436                 /* complete now */
2437                 ksocknal_tx_done (tx, 0);
2438         }
2439 }
2440
2441 int
2442 ksocknal_autoconnectd (void *arg)
2443 {
2444         long               id = (long)arg;
2445         char               name[16];
2446         unsigned long      flags;
2447         ksock_route_t     *route;
2448         int                rc;
2449
2450         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2451         kportal_daemonize (name);
2452         kportal_blockallsigs ();
2453
2454         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2455
2456         while (!ksocknal_data.ksnd_shuttingdown) {
2457
2458                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2459                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2460                                             ksock_route_t, ksnr_connect_list);
2461                         
2462                         list_del (&route->ksnr_connect_list);
2463                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2464
2465                         ksocknal_autoconnect (route);
2466                         ksocknal_put_route (route);
2467
2468                         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2469                         continue;
2470                 }
2471                 
2472                 spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2473
2474                 rc = wait_event_interruptible (ksocknal_data.ksnd_autoconnectd_waitq,
2475                                                ksocknal_data.ksnd_shuttingdown ||
2476                                                !list_empty (&ksocknal_data.ksnd_autoconnectd_routes));
2477
2478                 spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2479         }
2480
2481         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2482
2483         ksocknal_thread_fini ();
2484         return (0);
2485 }
2486
2487 ksock_conn_t *
2488 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2489 {
2490         /* We're called with a shared lock on ksnd_global_lock */
2491         ksock_conn_t      *conn;
2492         struct list_head  *ctmp;
2493         ksock_sched_t     *sched;
2494
2495         list_for_each (ctmp, &peer->ksnp_conns) {
2496                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2497                 sched = conn->ksnc_scheduler;
2498
2499                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2500                 LASSERT (!conn->ksnc_closing);
2501                 
2502                 if (conn->ksnc_rx_started &&
2503                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2504                         /* Timed out incomplete incoming message */
2505                         atomic_inc (&conn->ksnc_refcount);
2506                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2507                                 peer->ksnp_nid, conn, HIPQUAD(conn->ksnc_ipaddr));
2508                         return (conn);
2509                 }
2510                 
2511                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2512                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2513                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2514                         /* Timed out messages queued for sending, or
2515                          * messages buffered in the socket's send buffer */
2516                         atomic_inc (&conn->ksnc_refcount);
2517                         CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n", 
2518                                 peer->ksnp_nid, 
2519                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2520                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2521                                 HIPQUAD(conn->ksnc_ipaddr));
2522                         return (conn);
2523                 }
2524         }
2525
2526         return (NULL);
2527 }
2528
2529 void
2530 ksocknal_check_peer_timeouts (int idx)
2531 {
2532         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2533         struct list_head *ptmp;
2534         ksock_peer_t     *peer;
2535         ksock_conn_t     *conn;
2536
2537  again:
2538         /* NB. We expect to have a look at all the peers and not find any
2539          * connections to time out, so we just use a shared lock while we
2540          * take a look... */
2541         read_lock (&ksocknal_data.ksnd_global_lock);
2542
2543         list_for_each (ptmp, peers) {
2544                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2545                 conn = ksocknal_find_timed_out_conn (peer);
2546                 
2547                 if (conn != NULL) {
2548                         read_unlock (&ksocknal_data.ksnd_global_lock);
2549
2550                         CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2551                                 peer->ksnp_nid,
2552                                 HIPQUAD(conn->ksnc_ipaddr),
2553                                 conn->ksnc_port);
2554                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2555                         
2556                         /* NB we won't find this one again, but we can't
2557                          * just proceed with the next peer, since we dropped
2558                          * ksnd_global_lock and it might be dead already! */
2559                         ksocknal_put_conn (conn);
2560                         goto again;
2561                 }
2562         }
2563
2564         read_unlock (&ksocknal_data.ksnd_global_lock);
2565 }
2566
2567 int
2568 ksocknal_reaper (void *arg)
2569 {
2570         wait_queue_t       wait;
2571         unsigned long      flags;
2572         ksock_conn_t      *conn;
2573         ksock_sched_t     *sched;
2574         struct list_head   enomem_conns;
2575         int                nenomem_conns;
2576         int                timeout;
2577         int                i;
2578         int                peer_index = 0;
2579         unsigned long      deadline = jiffies;
2580         
2581         kportal_daemonize ("ksocknal_reaper");
2582         kportal_blockallsigs ();
2583
2584         INIT_LIST_HEAD(&enomem_conns);
2585         init_waitqueue_entry (&wait, current);
2586
2587         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2588
2589         while (!ksocknal_data.ksnd_shuttingdown) {
2590
2591                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2592                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2593                                            ksock_conn_t, ksnc_list);
2594                         list_del (&conn->ksnc_list);
2595                         
2596                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2597
2598                         ksocknal_terminate_conn (conn);
2599                         ksocknal_put_conn (conn);
2600
2601                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2602                         continue;
2603                 }
2604
2605                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2606                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2607                                            ksock_conn_t, ksnc_list);
2608                         list_del (&conn->ksnc_list);
2609                         
2610                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2611
2612                         ksocknal_destroy_conn (conn);
2613
2614                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2615                         continue;
2616                 }
2617
2618                 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2619                         list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2620                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2621                 }
2622
2623                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2624
2625                 /* reschedule all the connections that stalled with ENOMEM... */
2626                 nenomem_conns = 0;
2627                 while (!list_empty (&enomem_conns)) {
2628                         conn = list_entry (enomem_conns.next,
2629                                            ksock_conn_t, ksnc_tx_list);
2630                         list_del (&conn->ksnc_tx_list);
2631
2632                         sched = conn->ksnc_scheduler;
2633
2634                         spin_lock_irqsave (&sched->kss_lock, flags);
2635
2636                         LASSERT (conn->ksnc_tx_scheduled);
2637                         conn->ksnc_tx_ready = 1;
2638                         list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2639                         wake_up (&sched->kss_waitq);
2640
2641                         spin_unlock_irqrestore (&sched->kss_lock, flags);
2642                         nenomem_conns++;
2643                 }
2644                 
2645                 /* careful with the jiffy wrap... */
2646                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2647                         const int n = 4;
2648                         const int p = 1;
2649                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2650                         
2651                         /* Time to check for timeouts on a few more peers: I do
2652                          * checks every 'p' seconds on a proportion of the peer
2653                          * table and I need to check every connection 'n' times
2654                          * within a timeout interval, to ensure I detect a
2655                          * timeout on any connection within (n+1)/n times the
2656                          * timeout interval. */
2657
2658                         if (ksocknal_tunables.ksnd_io_timeout > n * p)
2659                                 chunk = (chunk * n * p) / 
2660                                         ksocknal_tunables.ksnd_io_timeout;
2661                         if (chunk == 0)
2662                                 chunk = 1;
2663
2664                         for (i = 0; i < chunk; i++) {
2665                                 ksocknal_check_peer_timeouts (peer_index);
2666                                 peer_index = (peer_index + 1) % 
2667                                              ksocknal_data.ksnd_peer_hash_size;
2668                         }
2669
2670                         deadline += p * HZ;
2671                 }
2672
2673                 if (nenomem_conns != 0) {
2674                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2675                          * This also prevents me getting woken immediately
2676                          * if any go back on my enomem list. */
2677                         timeout = SOCKNAL_ENOMEM_RETRY;
2678                 }
2679                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2680
2681                 set_current_state (TASK_INTERRUPTIBLE);
2682                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2683
2684                 if (!ksocknal_data.ksnd_shuttingdown &&
2685                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2686                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2687                         schedule_timeout (timeout);
2688
2689                 set_current_state (TASK_RUNNING);
2690                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2691
2692                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2693         }
2694
2695         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2696
2697         ksocknal_thread_fini ();
2698         return (0);
2699 }
2700
2701 nal_cb_t ksocknal_lib = {
2702         nal_data:       &ksocknal_data,                /* NAL private data */
2703         cb_send:         ksocknal_send,
2704         cb_send_pages:   ksocknal_send_pages,
2705         cb_recv:         ksocknal_recv,
2706         cb_recv_pages:   ksocknal_recv_pages,
2707         cb_read:         ksocknal_read,
2708         cb_write:        ksocknal_write,
2709         cb_malloc:       ksocknal_malloc,
2710         cb_free:         ksocknal_free,
2711         cb_printf:       ksocknal_printf,
2712         cb_cli:          ksocknal_cli,
2713         cb_sti:          ksocknal_sti,
2714         cb_callback:     ksocknal_callback,
2715         cb_dist:         ksocknal_dist
2716 };