Whamcloud - gitweb
Branch: b1_4_smallfix
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
29 #endif
30
31 /*
32  *  LIB functions follow
33  *
34  */
35 int
36 ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
37 {
38         /* I would guess that if ksocknal_get_peer (nid) == NULL,
39            and we're not routing, then 'nid' is very distant :) */
40         if (nal->libnal_ni.ni_pid.nid == nid) {
41                 *dist = 0;
42         } else {
43                 *dist = 1;
44         }
45
46         return 0;
47 }
48
49 void
50 ksocknal_free_ltx (ksock_ltx_t *ltx)
51 {
52         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
53         PORTAL_FREE(ltx, ltx->ltx_desc_size);
54 }
55
56 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
57 struct page *
58 ksocknal_kvaddr_to_page (unsigned long vaddr)
59 {
60         struct page *page;
61
62         if (vaddr >= VMALLOC_START &&
63             vaddr < VMALLOC_END)
64                 page = vmalloc_to_page ((void *)vaddr);
65 #if CONFIG_HIGHMEM
66         else if (vaddr >= PKMAP_BASE &&
67                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
68                 page = vmalloc_to_page ((void *)vaddr);
69                 /* in 2.4 ^ just walks the page tables */
70 #endif
71         else
72                 page = virt_to_page (vaddr);
73
74         if (page == NULL ||
75             !VALID_PAGE (page))
76                 return (NULL);
77
78         return (page);
79 }
80 #endif
81
82 int
83 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
84 {
85         struct socket *sock = conn->ksnc_sock;
86         struct iovec  *iov = tx->tx_iov;
87 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
88         unsigned long  vaddr = (unsigned long)iov->iov_base
89         int            offset = vaddr & (PAGE_SIZE - 1);
90         int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
91         struct page   *page;
92 #endif
93         int            nob;
94         int            rc;
95
96         /* NB we can't trust socket ops to either consume our iovs
97          * or leave them alone. */
98         LASSERT (tx->tx_niov > 0);
99         
100 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
101         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
102             (sock->sk->route_caps & NETIF_F_SG) &&
103             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
104             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
105                 int msgflg = MSG_DONTWAIT;
106                 
107                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
108                        (void *)vaddr, page, page_address(page), offset, zcsize);
109
110                 if (!list_empty (&conn->ksnc_tx_queue) ||
111                     zcsize < tx->tx_resid)
112                         msgflg |= MSG_MORE;
113                 
114                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
115         } else
116 #endif
117         {
118 #if SOCKNAL_SINGLE_FRAG_TX
119                 struct iovec    scratch;
120                 struct iovec   *scratchiov = &scratch;
121                 int             niov = 1;
122 #else
123                 struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
124                 int             niov = tx->tx_niov;
125 #endif
126                 struct msghdr msg = {
127                         .msg_name       = NULL,
128                         .msg_namelen    = 0,
129                         .msg_iov        = scratchiov,
130                         .msg_iovlen     = niov,
131                         .msg_control    = NULL,
132                         .msg_controllen = 0,
133                         .msg_flags      = MSG_DONTWAIT
134                 };
135                 mm_segment_t oldmm = get_fs();
136                 int  i;
137
138                 for (nob = i = 0; i < niov; i++) {
139                         scratchiov[i] = tx->tx_iov[i];
140                         nob += scratchiov[i].iov_len;
141                 }
142
143                 if (!list_empty(&conn->ksnc_tx_queue) ||
144                     nob < tx->tx_resid)
145                         msg.msg_flags |= MSG_MORE;
146                 
147                 set_fs (KERNEL_DS);
148                 rc = sock_sendmsg(sock, &msg, nob);
149                 set_fs (oldmm);
150         } 
151
152         if (rc <= 0)                            /* sent nothing? */
153                 return (rc);
154
155         nob = rc;
156         LASSERT (nob <= tx->tx_resid);
157         tx->tx_resid -= nob;
158
159         /* "consume" iov */
160         do {
161                 LASSERT (tx->tx_niov > 0);
162                 
163                 if (nob < iov->iov_len) {
164                         iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
165                         iov->iov_len -= nob;
166                         return (rc);
167                 }
168
169                 nob -= iov->iov_len;
170                 tx->tx_iov = ++iov;
171                 tx->tx_niov--;
172         } while (nob != 0);
173         
174         return (rc);
175 }
176
177 int
178 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
179 {
180         struct socket *sock = conn->ksnc_sock;
181         ptl_kiov_t    *kiov = tx->tx_kiov;
182         int            rc;
183         int            nob;
184         
185         /* NB we can't trust socket ops to either consume our iovs
186          * or leave them alone. */
187         LASSERT (tx->tx_niov == 0);
188         LASSERT (tx->tx_nkiov > 0);
189
190 #if SOCKNAL_ZC
191         if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
192             (sock->sk->route_caps & NETIF_F_SG) &&
193             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
194                 struct page   *page = kiov->kiov_page;
195                 int            offset = kiov->kiov_offset;
196                 int            fragsize = kiov->kiov_len;
197                 int            msgflg = MSG_DONTWAIT;
198
199                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
200                                page, offset, kiov->kiov_len);
201
202                 if (!list_empty(&conn->ksnc_tx_queue) ||
203                     fragsize < tx->tx_resid)
204                         msgflg |= MSG_MORE;
205
206                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
207                                        &tx->tx_zccd);
208         } else
209 #endif
210         {
211 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
212                 struct iovec  scratch;
213                 struct iovec *scratchiov = &scratch;
214                 int           niov = 1;
215 #else
216 #ifdef CONFIG_HIGHMEM
217 #warning "XXX risk of kmap deadlock on multiple frags..."
218 #endif
219                 struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
220                 int           niov = tx->tx_nkiov;
221 #endif
222                 struct msghdr msg = {
223                         .msg_name       = NULL,
224                         .msg_namelen    = 0,
225                         .msg_iov        = scratchiov,
226                         .msg_iovlen     = niov,
227                         .msg_control    = NULL,
228                         .msg_controllen = 0,
229                         .msg_flags      = MSG_DONTWAIT
230                 };
231                 mm_segment_t  oldmm = get_fs();
232                 int           i;
233                 
234                 for (nob = i = 0; i < niov; i++) {
235                         scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
236                                                  kiov[i].kiov_offset;
237                         nob += scratchiov[i].iov_len = kiov[i].kiov_len;
238                 }
239
240                 if (!list_empty(&conn->ksnc_tx_queue) ||
241                     nob < tx->tx_resid)
242                         msg.msg_flags |= MSG_DONTWAIT;
243
244                 set_fs (KERNEL_DS);
245                 rc = sock_sendmsg(sock, &msg, nob);
246                 set_fs (oldmm);
247
248                 for (i = 0; i < niov; i++)
249                         kunmap(kiov[i].kiov_page);
250         }
251
252         if (rc <= 0)                            /* sent nothing? */
253                 return (rc);
254
255         nob = rc;
256         LASSERT (nob <= tx->tx_resid);
257         tx->tx_resid -= nob;
258
259         do {
260                 LASSERT(tx->tx_nkiov > 0);
261                 
262                 if (nob < kiov->kiov_len) {
263                         kiov->kiov_offset += nob;
264                         kiov->kiov_len -= nob;
265                         return rc;
266                 }
267                 
268                 nob -= kiov->kiov_len;
269                 tx->tx_kiov = ++kiov;
270                 tx->tx_nkiov--;
271         } while (nob != 0);
272
273         return (rc);
274 }
275
276 int
277 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
278 {
279         int      rc;
280         int      bufnob;
281         
282         if (ksocknal_data.ksnd_stall_tx != 0) {
283                 set_current_state (TASK_UNINTERRUPTIBLE);
284                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
285         }
286
287         LASSERT (tx->tx_resid != 0);
288
289         rc = ksocknal_getconnsock (conn);
290         if (rc != 0) {
291                 LASSERT (conn->ksnc_closing);
292                 return (-ESHUTDOWN);
293         }
294
295         do {
296                 if (ksocknal_data.ksnd_enomem_tx > 0) {
297                         /* testing... */
298                         ksocknal_data.ksnd_enomem_tx--;
299                         rc = -EAGAIN;
300                 } else if (tx->tx_niov != 0) {
301                         rc = ksocknal_send_iov (conn, tx);
302                 } else {
303                         rc = ksocknal_send_kiov (conn, tx);
304                 }
305
306                 bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
307                 if (rc > 0)                     /* sent something? */
308                         conn->ksnc_tx_bufnob += rc; /* account it */
309                 
310                 if (bufnob < conn->ksnc_tx_bufnob) {
311                         /* allocated send buffer bytes < computed; infer
312                          * something got ACKed */
313                         conn->ksnc_tx_deadline = jiffies + 
314                                                  ksocknal_tunables.ksnd_io_timeout * HZ;
315                         conn->ksnc_peer->ksnp_last_alive = jiffies;
316                         conn->ksnc_tx_bufnob = bufnob;
317                         mb();
318                 }
319
320                 if (rc <= 0) { /* Didn't write anything? */
321                         unsigned long  flags;
322                         ksock_sched_t *sched;
323
324                         if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
325                                 rc = -EAGAIN;
326
327                         if (rc != -EAGAIN)
328                                 break;
329
330                         /* Check if EAGAIN is due to memory pressure */
331
332                         sched = conn->ksnc_scheduler;
333                         spin_lock_irqsave(&sched->kss_lock, flags);
334                                 
335                         if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
336                             !conn->ksnc_tx_ready) {
337                                 /* SOCK_NOSPACE is set when the socket fills
338                                  * and cleared in the write_space callback
339                                  * (which also sets ksnc_tx_ready).  If
340                                  * SOCK_NOSPACE and ksnc_tx_ready are BOTH
341                                  * zero, I didn't fill the socket and
342                                  * write_space won't reschedule me, so I
343                                  * return -ENOMEM to get my caller to retry
344                                  * after a timeout */
345                                 rc = -ENOMEM;
346                         }
347
348                         spin_unlock_irqrestore(&sched->kss_lock, flags);
349                         break;
350                 }
351
352                 /* socket's wmem_queued now includes 'rc' bytes */
353                 atomic_sub (rc, &conn->ksnc_tx_nob);
354                 rc = 0;
355
356         } while (tx->tx_resid != 0);
357
358         ksocknal_putconnsock (conn);
359         return (rc);
360 }
361
362 void
363 ksocknal_eager_ack (ksock_conn_t *conn)
364 {
365         int            opt = 1;
366         mm_segment_t   oldmm = get_fs();
367         struct socket *sock = conn->ksnc_sock;
368         
369         /* Remind the socket to ACK eagerly.  If I don't, the socket might
370          * think I'm about to send something it could piggy-back the ACK
371          * on, introducing delay in completing zero-copy sends in my
372          * peer. */
373
374         set_fs(KERNEL_DS);
375         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
376                                (char *)&opt, sizeof (opt));
377         set_fs(oldmm);
378 }
379
380 int
381 ksocknal_recv_iov (ksock_conn_t *conn)
382 {
383 #if SOCKNAL_SINGLE_FRAG_RX
384         struct iovec  scratch;
385         struct iovec *scratchiov = &scratch;
386         int           niov = 1;
387 #else
388         struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
389         int           niov = conn->ksnc_rx_niov;
390 #endif
391         struct iovec *iov = conn->ksnc_rx_iov;
392         struct msghdr msg = {
393                 .msg_name       = NULL,
394                 .msg_namelen    = 0,
395                 .msg_iov        = scratchiov,
396                 .msg_iovlen     = niov,
397                 .msg_control    = NULL,
398                 .msg_controllen = 0,
399                 .msg_flags      = 0
400         };
401         mm_segment_t oldmm = get_fs();
402         int          nob;
403         int          i;
404         int          rc;
405
406         /* NB we can't trust socket ops to either consume our iovs
407          * or leave them alone. */
408         LASSERT (niov > 0);
409
410         for (nob = i = 0; i < niov; i++) {
411                 scratchiov[i] = iov[i];
412                 nob += scratchiov[i].iov_len;
413         }
414         LASSERT (nob <= conn->ksnc_rx_nob_wanted);
415
416         set_fs (KERNEL_DS);
417         rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
418         /* NB this is just a boolean..........................^ */
419         set_fs (oldmm);
420
421         if (rc <= 0)
422                 return (rc);
423
424         /* received something... */
425         nob = rc;
426
427         conn->ksnc_peer->ksnp_last_alive = jiffies;
428         conn->ksnc_rx_deadline = jiffies + 
429                                  ksocknal_tunables.ksnd_io_timeout * HZ;
430         mb();                           /* order with setting rx_started */
431         conn->ksnc_rx_started = 1;
432
433         conn->ksnc_rx_nob_wanted -= nob;
434         conn->ksnc_rx_nob_left -= nob;
435
436         do {
437                 LASSERT (conn->ksnc_rx_niov > 0);
438                 
439                 if (nob < iov->iov_len) {
440                         iov->iov_len -= nob;
441                         iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
442                         return (-EAGAIN);
443                 }
444                 
445                 nob -= iov->iov_len;
446                 conn->ksnc_rx_iov = ++iov;
447                 conn->ksnc_rx_niov--;
448         } while (nob != 0);
449
450         return (rc);
451 }
452
453 int
454 ksocknal_recv_kiov (ksock_conn_t *conn)
455 {
456 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
457         struct iovec  scratch;
458         struct iovec *scratchiov = &scratch;
459         int           niov = 1;
460 #else
461 #ifdef CONFIG_HIGHMEM
462 #warning "XXX risk of kmap deadlock on multiple frags..."
463 #endif
464         struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
465         int           niov = conn->ksnc_rx_nkiov;
466 #endif
467         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
468         struct msghdr msg = {
469                 .msg_name       = NULL,
470                 .msg_namelen    = 0,
471                 .msg_iov        = scratchiov,
472                 .msg_iovlen     = niov,
473                 .msg_control    = NULL,
474                 .msg_controllen = 0,
475                 .msg_flags      = 0
476         };
477         mm_segment_t oldmm = get_fs();
478         int          nob;
479         int          i;
480         int          rc;
481
482         LASSERT (conn->ksnc_rx_nkiov > 0);
483
484         /* NB we can't trust socket ops to either consume our iovs
485          * or leave them alone. */
486         for (nob = i = 0; i < niov; i++) {
487                 scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
488                 nob += scratchiov[i].iov_len = kiov[i].kiov_len;
489         }
490         LASSERT (nob <= conn->ksnc_rx_nob_wanted);
491
492         set_fs (KERNEL_DS);
493         rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
494         /* NB this is just a boolean.......................^ */
495         set_fs (oldmm);
496
497         for (i = 0; i < niov; i++)
498                 kunmap(kiov[i].kiov_page);
499
500         if (rc <= 0)
501                 return (rc);
502         
503         /* received something... */
504         nob = rc;
505
506         conn->ksnc_peer->ksnp_last_alive = jiffies;
507         conn->ksnc_rx_deadline = jiffies + 
508                                  ksocknal_tunables.ksnd_io_timeout * HZ;
509         mb();                           /* order with setting rx_started */
510         conn->ksnc_rx_started = 1;
511
512         conn->ksnc_rx_nob_wanted -= nob;
513         conn->ksnc_rx_nob_left -= nob;
514
515         do {
516                 LASSERT (conn->ksnc_rx_nkiov > 0);
517                 
518                 if (nob < kiov->kiov_len) {
519                         kiov->kiov_offset += nob;
520                         kiov->kiov_len -= nob;
521                         return -EAGAIN;
522                 }
523                 
524                 nob -= kiov->kiov_len;
525                 conn->ksnc_rx_kiov = ++kiov;
526                 conn->ksnc_rx_nkiov--;
527         } while (nob != 0);
528
529         return 1;
530 }
531
532 int
533 ksocknal_receive (ksock_conn_t *conn) 
534 {
535         /* Return 1 on success, 0 on EOF, < 0 on error.
536          * Caller checks ksnc_rx_nob_wanted to determine
537          * progress/completion. */
538         int     rc;
539         ENTRY;
540         
541         if (ksocknal_data.ksnd_stall_rx != 0) {
542                 set_current_state (TASK_UNINTERRUPTIBLE);
543                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
544         }
545
546         rc = ksocknal_getconnsock (conn);
547         if (rc != 0) {
548                 LASSERT (conn->ksnc_closing);
549                 return (-ESHUTDOWN);
550         }
551
552         for (;;) {
553                 if (conn->ksnc_rx_niov != 0)
554                         rc = ksocknal_recv_iov (conn);
555                 else
556                         rc = ksocknal_recv_kiov (conn);
557
558                 if (rc <= 0) {
559                         /* error/EOF or partial receive */
560                         if (rc == -EAGAIN) {
561                                 rc = 1;
562                         } else if (rc == 0 && conn->ksnc_rx_started) {
563                                 /* EOF in the middle of a message */
564                                 rc = -EPROTO;
565                         }
566                         break;
567                 }
568
569                 /* Completed a fragment */
570
571                 if (conn->ksnc_rx_nob_wanted == 0) {
572                         /* Completed a message segment (header or payload) */
573                         if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
574                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
575                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
576                                 /* Remind the socket to ack eagerly... */
577                                 ksocknal_eager_ack(conn);
578                         }
579                         rc = 1;
580                         break;
581                 }
582         }
583
584         ksocknal_putconnsock (conn);
585         RETURN (rc);
586 }
587
588 #if SOCKNAL_ZC
589 void
590 ksocknal_zc_callback (zccd_t *zcd)
591 {
592         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
593         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
594         unsigned long  flags;
595         ENTRY;
596
597         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
598
599         spin_lock_irqsave (&sched->kss_lock, flags);
600
601         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
602         wake_up (&sched->kss_waitq);
603
604         spin_unlock_irqrestore (&sched->kss_lock, flags);
605         EXIT;
606 }
607 #endif
608
609 void
610 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
611 {
612         ksock_ltx_t   *ltx;
613         ENTRY;
614
615         if (tx->tx_conn != NULL) {
616 #if SOCKNAL_ZC
617                 /* zero copy completion isn't always from
618                  * process_transmit() so it needs to keep a ref on
619                  * tx_conn... */
620                 if (asynch)
621                         ksocknal_put_conn (tx->tx_conn);
622 #else
623                 LASSERT (!asynch);
624 #endif
625         }
626
627         if (tx->tx_isfwd) {             /* was a forwarded packet? */
628                 kpr_fwd_done (&ksocknal_data.ksnd_router,
629                               KSOCK_TX_2_KPR_FWD_DESC (tx), 
630                               (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
631                 EXIT;
632                 return;
633         }
634
635         /* local send */
636         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
637
638         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
639                       (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
640
641         ksocknal_free_ltx (ltx);
642         EXIT;
643 }
644
645 void
646 ksocknal_tx_launched (ksock_tx_t *tx) 
647 {
648 #if SOCKNAL_ZC
649         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
650                 ksock_conn_t  *conn = tx->tx_conn;
651                 
652                 /* zccd skbufs are still in-flight.  First take a ref on
653                  * conn, so it hangs about for ksocknal_tx_done... */
654                 atomic_inc (&conn->ksnc_refcount);
655
656                 /* ...then drop the initial ref on zccd, so the zero copy
657                  * callback can occur */
658                 zccd_put (&tx->tx_zccd);
659                 return;
660         }
661 #endif
662         /* Any zero-copy-ness (if any) has completed; I can complete the
663          * transmit now, avoiding an extra schedule */
664         ksocknal_tx_done (tx, 0);
665 }
666
667 int
668 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
669 {
670         unsigned long  flags;
671         int            rc;
672        
673         rc = ksocknal_transmit (conn, tx);
674
675         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
676
677         if (tx->tx_resid == 0) {
678                 /* Sent everything OK */
679                 LASSERT (rc == 0);
680
681                 ksocknal_tx_launched (tx);
682                 return (0);
683         }
684
685         if (rc == -EAGAIN)
686                 return (rc);
687
688         if (rc == -ENOMEM) {
689                 static int counter;
690
691                 counter++;   /* exponential backoff warnings */
692                 if ((counter & (-counter)) == counter)
693                         CWARN("%d ENOMEM tx %p\n", counter, conn);
694
695                 /* Queue on ksnd_enomem_conns for retry after a timeout */
696                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
697
698                 /* enomem list takes over scheduler's ref... */
699                 LASSERT (conn->ksnc_tx_scheduled);
700                 list_add_tail(&conn->ksnc_tx_list,
701                               &ksocknal_data.ksnd_enomem_conns);
702                 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
703                                    ksocknal_data.ksnd_reaper_waketime))
704                         wake_up (&ksocknal_data.ksnd_reaper_waitq);
705                 
706                 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
707                 return (rc);
708         }
709
710         /* Actual error */
711         LASSERT (rc < 0);
712
713         if (!conn->ksnc_closing)
714                 CERROR("[%p] Error %d on write to "LPX64
715                        " ip %d.%d.%d.%d:%d\n", conn, rc,
716                        conn->ksnc_peer->ksnp_nid,
717                        HIPQUAD(conn->ksnc_ipaddr),
718                        conn->ksnc_port);
719
720         ksocknal_close_conn_and_siblings (conn, rc);
721         ksocknal_tx_launched (tx);
722
723         return (rc);
724 }
725
726 void
727 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
728 {
729         unsigned long     flags;
730
731         /* called holding write lock on ksnd_global_lock */
732
733         LASSERT (!route->ksnr_deleted);
734         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
735         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
736         LASSERT (route->ksnr_connecting == 0);
737         
738         if (ksocknal_tunables.ksnd_typed_conns)
739                 route->ksnr_connecting = 
740                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
741         else
742                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
743
744         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
745         
746         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
747         
748         list_add_tail (&route->ksnr_connect_list,
749                        &ksocknal_data.ksnd_autoconnectd_routes);
750         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
751         
752         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
753 }
754
755 ksock_peer_t *
756 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
757 {
758         char          ipbuf[PTL_NALFMT_SIZE];
759         ptl_nid_t     target_nid;
760         int           rc;
761         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
762
763         if (peer != NULL)
764                 return (peer);
765
766         if (tx->tx_isfwd) {
767                 CERROR ("Can't send packet to "LPX64
768                        " %s: routed target is not a peer\n",
769                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
770                 return (NULL);
771         }
772
773         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
774                          &target_nid);
775         if (rc != 0) {
776                 CERROR ("Can't route to "LPX64" %s: router error %d\n",
777                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
778                 return (NULL);
779         }
780
781         peer = ksocknal_find_peer_locked (target_nid);
782         if (peer != NULL)
783                 return (peer);
784
785         CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
786                 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
787         return (NULL);
788 }
789
790 ksock_conn_t *
791 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
792 {
793         struct list_head *tmp;
794         ksock_conn_t     *typed = NULL;
795         int               tnob  = 0;
796         ksock_conn_t     *fallback = NULL;
797         int               fnob     = 0;
798         ksock_conn_t     *conn;
799
800         list_for_each (tmp, &peer->ksnp_conns) {
801                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
802 #if SOCKNAL_ROUND_ROBIN
803                 const int     nob = 0;
804 #else
805                 int           nob = atomic_read(&c->ksnc_tx_nob) +
806                                         c->ksnc_sock->sk->sk_wmem_queued;
807 #endif
808                 LASSERT (!c->ksnc_closing);
809
810                 if (fallback == NULL || nob < fnob) {
811                         fallback = c;
812                         fnob     = nob;
813                 }
814
815                 if (!ksocknal_tunables.ksnd_typed_conns)
816                         continue;
817
818                 switch (c->ksnc_type) {
819                 default:
820                         LBUG();
821                 case SOCKNAL_CONN_ANY:
822                         break;
823                 case SOCKNAL_CONN_BULK_IN:
824                         continue;
825                 case SOCKNAL_CONN_BULK_OUT:
826                         if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
827                                 continue;
828                         break;
829                 case SOCKNAL_CONN_CONTROL:
830                         if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
831                                 continue;
832                         break;
833                 }
834
835                 if (typed == NULL || nob < tnob) {
836                         typed = c;
837                         tnob  = nob;
838                 }
839         }
840
841         /* prefer the typed selection */
842         conn = (typed != NULL) ? typed : fallback;
843
844 #if SOCKNAL_ROUND_ROBIN
845         if (conn != NULL) {
846                 /* round-robin all else being equal */
847                 list_del (&conn->ksnc_list);
848                 list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
849         }
850 #endif
851         return conn;
852 }
853
854 void
855 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
856 {
857         unsigned long  flags;
858         ksock_sched_t *sched = conn->ksnc_scheduler;
859
860         /* called holding global lock (read or irq-write) and caller may
861          * not have dropped this lock between finding conn and calling me,
862          * so we don't need the {get,put}connsock dance to deref
863          * ksnc_sock... */
864         LASSERT(!conn->ksnc_closing);
865         LASSERT(tx->tx_resid == tx->tx_nob);
866         
867         CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
868                 conn->ksnc_peer->ksnp_nid,
869                 HIPQUAD(conn->ksnc_ipaddr),
870                 conn->ksnc_port);
871
872         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
873         tx->tx_conn = conn;
874
875 #if SOCKNAL_ZC
876         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
877         /* NB this sets 1 ref on zccd, so the callback can only occur after
878          * I've released this ref. */
879 #endif
880         spin_lock_irqsave (&sched->kss_lock, flags);
881
882         if (list_empty(&conn->ksnc_tx_queue) &&
883             conn->ksnc_sock->sk->sk_wmem_queued == 0) {
884                 /* First packet starts the timeout */
885                 conn->ksnc_tx_deadline = jiffies +
886                                          ksocknal_tunables.ksnd_io_timeout * HZ;
887                 conn->ksnc_tx_bufnob = 0;
888                 mb();    /* order with adding to tx_queue */
889         }
890
891         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
892                 
893         if (conn->ksnc_tx_ready &&      /* able to send */
894             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
895                 /* +1 ref for scheduler */
896                 atomic_inc (&conn->ksnc_refcount);
897                 list_add_tail (&conn->ksnc_tx_list, 
898                                &sched->kss_tx_conns);
899                 conn->ksnc_tx_scheduled = 1;
900                 wake_up (&sched->kss_waitq);
901         }
902
903         spin_unlock_irqrestore (&sched->kss_lock, flags);
904 }
905
906 ksock_route_t *
907 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
908 {
909         struct list_head  *tmp;
910         ksock_route_t     *route;
911         int                bits;
912         
913         list_for_each (tmp, &peer->ksnp_routes) {
914                 route = list_entry (tmp, ksock_route_t, ksnr_list);
915                 bits  = route->ksnr_connected;
916
917                 /* All typed connections established? */
918                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES)
919                         continue;
920
921                 /* Untyped connection established? */
922                 if ((bits & (1 << SOCKNAL_CONN_ANY)) != 0)
923                         continue;
924
925                 /* connection being established? */
926                 if (route->ksnr_connecting != 0)
927                         continue;
928
929                 /* too soon to retry this guy? */
930                 if (!time_after_eq (jiffies, route->ksnr_timeout))
931                         continue;
932                 
933                 return (route);
934         }
935         
936         return (NULL);
937 }
938
939 ksock_route_t *
940 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
941 {
942         struct list_head  *tmp;
943         ksock_route_t     *route;
944
945         list_for_each (tmp, &peer->ksnp_routes) {
946                 route = list_entry (tmp, ksock_route_t, ksnr_list);
947                 
948                 if (route->ksnr_connecting != 0)
949                         return (route);
950         }
951         
952         return (NULL);
953 }
954
955 int
956 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
957 {
958         unsigned long     flags;
959         ksock_peer_t     *peer;
960         ksock_conn_t     *conn;
961         ksock_route_t    *route;
962         rwlock_t         *g_lock;
963         
964         /* Ensure the frags we've been given EXACTLY match the number of
965          * bytes we want to send.  Many TCP/IP stacks disregard any total
966          * size parameters passed to them and just look at the frags. 
967          *
968          * We always expect at least 1 mapped fragment containing the
969          * complete portals header. */
970         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
971                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
972         LASSERT (tx->tx_niov >= 1);
973         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
974
975         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
976                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
977                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
978
979         tx->tx_conn = NULL;                     /* only set when assigned a conn */
980         tx->tx_resid = tx->tx_nob;
981         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
982
983         g_lock = &ksocknal_data.ksnd_global_lock;
984 #if !SOCKNAL_ROUND_ROBIN
985         read_lock (g_lock);
986
987         peer = ksocknal_find_target_peer_locked (tx, nid);
988         if (peer == NULL) {
989                 read_unlock (g_lock);
990                 return (-EHOSTUNREACH);
991         }
992
993         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
994                 conn = ksocknal_find_conn_locked (tx, peer);
995                 if (conn != NULL) {
996                         /* I've got no autoconnect routes that need to be
997                          * connecting and I do have an actual connection... */
998                         ksocknal_queue_tx_locked (tx, conn);
999                         read_unlock (g_lock);
1000                         return (0);
1001                 }
1002         }
1003  
1004         /* I'll need a write lock... */
1005         read_unlock (g_lock);
1006 #endif
1007         write_lock_irqsave(g_lock, flags);
1008
1009         peer = ksocknal_find_target_peer_locked (tx, nid);
1010         if (peer == NULL) {
1011                 write_unlock_irqrestore(g_lock, flags);
1012                 return (-EHOSTUNREACH);
1013         }
1014
1015         for (;;) {
1016                 /* launch any/all autoconnections that need it */
1017                 route = ksocknal_find_connectable_route_locked (peer);
1018                 if (route == NULL)
1019                         break;
1020
1021                 ksocknal_launch_autoconnect_locked (route);
1022         }
1023
1024         conn = ksocknal_find_conn_locked (tx, peer);
1025         if (conn != NULL) {
1026                 /* Connection exists; queue message on it */
1027                 ksocknal_queue_tx_locked (tx, conn);
1028                 write_unlock_irqrestore (g_lock, flags);
1029                 return (0);
1030         }
1031
1032         route = ksocknal_find_connecting_route_locked (peer);
1033         if (route != NULL) {
1034                 /* At least 1 connection is being established; queue the
1035                  * message... */
1036                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1037                 write_unlock_irqrestore (g_lock, flags);
1038                 return (0);
1039         }
1040         
1041         write_unlock_irqrestore (g_lock, flags);
1042         return (-EHOSTUNREACH);
1043 }
1044
1045 ptl_err_t
1046 ksocknal_sendmsg(lib_nal_t     *nal, 
1047                  void         *private, 
1048                  lib_msg_t    *cookie,
1049                  ptl_hdr_t    *hdr, 
1050                  int           type, 
1051                  ptl_nid_t     nid, 
1052                  ptl_pid_t     pid,
1053                  unsigned int  payload_niov, 
1054                  struct iovec *payload_iov, 
1055                  ptl_kiov_t   *payload_kiov,
1056                  size_t        payload_offset,
1057                  size_t        payload_nob)
1058 {
1059         ksock_ltx_t  *ltx;
1060         int           desc_size;
1061         int           rc;
1062
1063         /* NB 'private' is different depending on what we're sending.
1064          * Just ignore it... */
1065
1066         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1067                " pid %d\n", payload_nob, payload_niov, nid , pid);
1068
1069         LASSERT (payload_nob == 0 || payload_niov > 0);
1070         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1071
1072         /* It must be OK to kmap() if required */
1073         LASSERT (payload_kiov == NULL || !in_interrupt ());
1074         /* payload is either all vaddrs or all pages */
1075         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1076         
1077         if (payload_iov != NULL)
1078                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1079         else
1080                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1081         
1082         if (in_interrupt() ||
1083             type == PTL_MSG_ACK ||
1084             type == PTL_MSG_REPLY) {
1085                 /* Can't block if in interrupt or responding to an incoming
1086                  * message */
1087                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1088         } else {
1089                 PORTAL_ALLOC(ltx, desc_size);
1090         }
1091         
1092         if (ltx == NULL) {
1093                 CERROR("Can't allocate tx desc type %d size %d %s\n",
1094                        type, desc_size, in_interrupt() ? "(intr)" : "");
1095                 return (PTL_NO_SPACE);
1096         }
1097
1098         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1099
1100         ltx->ltx_desc_size = desc_size;
1101         
1102         /* We always have 1 mapped frag for the header */
1103         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1104         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1105         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1106         ltx->ltx_hdr = *hdr;
1107         
1108         ltx->ltx_private = private;
1109         ltx->ltx_cookie = cookie;
1110         
1111         ltx->ltx_tx.tx_isfwd = 0;
1112         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1113
1114         if (payload_iov != NULL) {
1115                 /* payload is all mapped */
1116                 ltx->ltx_tx.tx_kiov  = NULL;
1117                 ltx->ltx_tx.tx_nkiov = 0;
1118
1119                 ltx->ltx_tx.tx_niov = 
1120                         1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
1121                                             payload_niov, payload_iov,
1122                                             payload_offset, payload_nob);
1123         } else {
1124                 /* payload is all pages */
1125                 ltx->ltx_tx.tx_niov = 1;
1126
1127                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1128                 ltx->ltx_tx.tx_nkiov =
1129                         lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1130                                          payload_niov, payload_kiov,
1131                                          payload_offset, payload_nob);
1132         }
1133
1134         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1135         if (rc == 0)
1136                 return (PTL_OK);
1137         
1138         ksocknal_free_ltx(ltx);
1139         return (PTL_FAIL);
1140 }
1141
1142 ptl_err_t
1143 ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1144                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1145                unsigned int payload_niov, struct iovec *payload_iov,
1146                size_t payload_offset, size_t payload_len)
1147 {
1148         return (ksocknal_sendmsg(nal, private, cookie,
1149                                  hdr, type, nid, pid,
1150                                  payload_niov, payload_iov, NULL,
1151                                  payload_offset, payload_len));
1152 }
1153
1154 ptl_err_t
1155 ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1156                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1157                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1158                      size_t payload_offset, size_t payload_len)
1159 {
1160         return (ksocknal_sendmsg(nal, private, cookie,
1161                                  hdr, type, nid, pid,
1162                                  payload_niov, NULL, payload_kiov,
1163                                  payload_offset, payload_len));
1164 }
1165
1166 void
1167 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1168 {
1169         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1170         ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1171         int           rc;
1172         
1173         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1174                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1175
1176         /* I'm the gateway; must be the last hop */
1177         if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
1178                 nid = fwd->kprfd_target_nid;
1179
1180         /* setup iov for hdr */
1181         ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1182         ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1183
1184         ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
1185         ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1186         ftx->ftx_tx.tx_niov  = 1;
1187         ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
1188         ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1189         ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
1190
1191         rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1192         if (rc != 0)
1193                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1194 }
1195
1196 int
1197 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1198 {
1199         long          pid = kernel_thread (fn, arg, 0);
1200         unsigned long flags;
1201
1202         if (pid < 0)
1203                 return ((int)pid);
1204
1205         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1206         ksocknal_data.ksnd_nthreads++;
1207         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1208         return (0);
1209 }
1210
1211 void
1212 ksocknal_thread_fini (void)
1213 {
1214         unsigned long flags;
1215
1216         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1217         ksocknal_data.ksnd_nthreads--;
1218         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1219 }
1220
1221 void
1222 ksocknal_fmb_callback (void *arg, int error)
1223 {
1224         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1225         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1226         ptl_hdr_t         *hdr = &fmb->fmb_hdr;
1227         ksock_conn_t      *conn = NULL;
1228         ksock_sched_t     *sched;
1229         unsigned long      flags;
1230         char               ipbuf[PTL_NALFMT_SIZE];
1231         char               ipbuf2[PTL_NALFMT_SIZE];
1232
1233         if (error != 0)
1234                 CERROR("Failed to route packet from "
1235                        LPX64" %s to "LPX64" %s: %d\n",
1236                        le64_to_cpu(hdr->src_nid),
1237                        portals_nid2str(SOCKNAL, le64_to_cpu(hdr->src_nid), ipbuf),
1238                        le64_to_cpu(hdr->dest_nid),
1239                        portals_nid2str(SOCKNAL, le64_to_cpu(hdr->dest_nid), ipbuf2),
1240                        error);
1241         else
1242                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1243                         le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid));
1244
1245         /* drop peer ref taken on init */
1246         ksocknal_put_peer (fmb->fmb_peer);
1247
1248         spin_lock_irqsave (&fmp->fmp_lock, flags);
1249
1250         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1251         fmp->fmp_nactive_fmbs--;
1252
1253         if (!list_empty (&fmp->fmp_blocked_conns)) {
1254                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1255                                    ksock_conn_t, ksnc_rx_list);
1256                 list_del (&conn->ksnc_rx_list);
1257         }
1258
1259         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1260
1261         if (conn == NULL)
1262                 return;
1263
1264         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1265         LASSERT (conn->ksnc_rx_scheduled);
1266         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1267
1268         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1269
1270         sched = conn->ksnc_scheduler;
1271
1272         spin_lock_irqsave (&sched->kss_lock, flags);
1273
1274         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1275         wake_up (&sched->kss_waitq);
1276
1277         spin_unlock_irqrestore (&sched->kss_lock, flags);
1278 }
1279
1280 ksock_fmb_t *
1281 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1282 {
1283         int               payload_nob = conn->ksnc_rx_nob_left;
1284         unsigned long     flags;
1285         ksock_fmb_pool_t *pool;
1286         ksock_fmb_t      *fmb;
1287
1288         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1289         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1290
1291         if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1292                 pool = &ksocknal_data.ksnd_small_fmp;
1293         else
1294                 pool = &ksocknal_data.ksnd_large_fmp;
1295
1296         spin_lock_irqsave (&pool->fmp_lock, flags);
1297
1298         if (!list_empty (&pool->fmp_idle_fmbs)) {
1299                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1300                                  ksock_fmb_t, fmb_list);
1301                 list_del (&fmb->fmb_list);
1302                 pool->fmp_nactive_fmbs++;
1303                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1304
1305                 return (fmb);
1306         }
1307
1308         /* deschedule until fmb free */
1309
1310         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1311
1312         list_add_tail (&conn->ksnc_rx_list,
1313                        &pool->fmp_blocked_conns);
1314
1315         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1316         return (NULL);
1317 }
1318
1319 int
1320 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1321 {
1322         int       payload_nob = conn->ksnc_rx_nob_left;
1323         ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1324         int       niov = 0;
1325         int       nob = payload_nob;
1326
1327         LASSERT (conn->ksnc_rx_scheduled);
1328         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1329         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1330         LASSERT (payload_nob >= 0);
1331         LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1332         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1333         LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1334
1335         /* Take a ref on the conn's peer to prevent module unload before
1336          * forwarding completes. */
1337         fmb->fmb_peer = conn->ksnc_peer;
1338         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1339
1340         /* Copy the header we just read into the forwarding buffer.  If
1341          * there's payload, start reading reading it into the buffer,
1342          * otherwise the forwarding buffer can be kicked off
1343          * immediately. */
1344         fmb->fmb_hdr = conn->ksnc_hdr;
1345
1346         while (nob > 0) {
1347                 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1348                 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1349                 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1350                 nob -= PAGE_SIZE;
1351                 niov++;
1352         }
1353
1354         kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1355                      payload_nob, niov, fmb->fmb_kiov,
1356                      ksocknal_fmb_callback, fmb);
1357
1358         if (payload_nob == 0) {         /* got complete packet already */
1359                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1360                         conn, le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid);
1361
1362                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1363
1364                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1365                 return (1);
1366         }
1367
1368         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1369         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1370         
1371         /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1372          * buffer */
1373         LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1374
1375         conn->ksnc_rx_niov = 0;
1376         conn->ksnc_rx_nkiov = niov;
1377         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1378         memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1379         
1380         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1381                 le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1382         return (0);
1383 }
1384
1385 void
1386 ksocknal_fwd_parse (ksock_conn_t *conn)
1387 {
1388         ksock_peer_t *peer;
1389         ptl_nid_t     dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1390         ptl_nid_t     src_nid = le64_to_cpu(conn->ksnc_hdr.src_nid);
1391         int           body_len = le32_to_cpu(conn->ksnc_hdr.payload_length);
1392         char str[PTL_NALFMT_SIZE];
1393         char str2[PTL_NALFMT_SIZE];
1394
1395         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1396                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1397
1398         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1399         LASSERT (conn->ksnc_rx_scheduled);
1400
1401         if (body_len < 0) {                 /* length corrupt (overflow) */
1402                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1403                        "packet size %d illegal\n",
1404                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1405                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1406                        body_len);
1407
1408                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1409                 return;
1410         }
1411
1412         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1413                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1414                        " (%s): not forwarding\n",
1415                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1416                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1417                 /* on to new packet (skip this one's body) */
1418                 ksocknal_new_packet (conn, body_len);
1419                 return;
1420         }
1421
1422         if (body_len > PTL_MTU) {      /* too big to forward */
1423                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1424                         "(%s): packet size %d too big\n",
1425                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1426                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1427                         body_len);
1428                 /* on to new packet (skip this one's body) */
1429                 ksocknal_new_packet (conn, body_len);
1430                 return;
1431         }
1432
1433         /* should have gone direct */
1434         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1435         if (peer != NULL) {
1436                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1437                         "(%s): target is a peer\n",
1438                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1439                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1440                 ksocknal_put_peer (peer);  /* drop ref from get above */
1441
1442                 /* on to next packet (skip this one's body) */
1443                 ksocknal_new_packet (conn, body_len);
1444                 return;
1445         }
1446
1447         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1448         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1449         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1450 }
1451
1452 int
1453 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1454 {
1455         static char ksocknal_slop_buffer[4096];
1456
1457         int   nob;
1458         int   niov;
1459         int   skipped;
1460
1461         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1462                 conn->ksnc_rx_started = 0;
1463                 mb ();                          /* racing with timeout thread */
1464                 
1465                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1466                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1467                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1468
1469                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1470                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1471                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1472                 conn->ksnc_rx_niov = 1;
1473
1474                 conn->ksnc_rx_kiov = NULL;
1475                 conn->ksnc_rx_nkiov = 0;
1476                 return (1);
1477         }
1478
1479         /* Set up to skip as much a possible now.  If there's more left
1480          * (ran out of iov entries) we'll get called again */
1481
1482         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1483         conn->ksnc_rx_nob_left = nob_to_skip;
1484         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1485         skipped = 0;
1486         niov = 0;
1487
1488         do {
1489                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1490
1491                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1492                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1493                 niov++;
1494                 skipped += nob;
1495                 nob_to_skip -=nob;
1496
1497         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1498                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1499
1500         conn->ksnc_rx_niov = niov;
1501         conn->ksnc_rx_kiov = NULL;
1502         conn->ksnc_rx_nkiov = 0;
1503         conn->ksnc_rx_nob_wanted = skipped;
1504         return (0);
1505 }
1506
1507 int
1508 ksocknal_process_receive (ksock_conn_t *conn)
1509 {
1510         ksock_fmb_t  *fmb;
1511         int           rc;
1512         
1513         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1514
1515         /* doesn't need a forwarding buffer */
1516         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1517                 goto try_read;
1518
1519  get_fmb:
1520         fmb = ksocknal_get_idle_fmb (conn);
1521         if (fmb == NULL) {
1522                 /* conn descheduled waiting for idle fmb */
1523                 return (0);
1524         }
1525
1526         if (ksocknal_init_fmb (conn, fmb)) {
1527                 /* packet forwarded */
1528                 return (0);
1529         }
1530
1531  try_read:
1532         /* NB: sched lock NOT held */
1533         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1534                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1535                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1536                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1537
1538         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1539
1540         rc = ksocknal_receive(conn);
1541
1542         if (rc <= 0) {
1543                 LASSERT (rc != -EAGAIN);
1544
1545                 if (rc == 0)
1546                         CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1547                                conn, conn->ksnc_peer->ksnp_nid,
1548                                HIPQUAD(conn->ksnc_ipaddr),
1549                                conn->ksnc_port);
1550                 else if (!conn->ksnc_closing)
1551                         CERROR ("[%p] Error %d on read from "LPX64
1552                                 " ip %d.%d.%d.%d:%d\n",
1553                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1554                                 HIPQUAD(conn->ksnc_ipaddr),
1555                                 conn->ksnc_port);
1556
1557                 ksocknal_close_conn_and_siblings (conn, rc);
1558                 return (rc == 0 ? -ESHUTDOWN : rc);
1559         }
1560
1561         if (conn->ksnc_rx_nob_wanted != 0) {
1562                 /* short read */
1563                 return (-EAGAIN);
1564         }
1565         
1566         switch (conn->ksnc_rx_state) {
1567         case SOCKNAL_RX_HEADER:
1568                 if (conn->ksnc_hdr.type != cpu_to_le32(PTL_MSG_HELLO) &&
1569                     le64_to_cpu(conn->ksnc_hdr.dest_nid) != 
1570                     ksocknal_lib.libnal_ni.ni_pid.nid) {
1571                         /* This packet isn't for me */
1572                         ksocknal_fwd_parse (conn);
1573                         switch (conn->ksnc_rx_state) {
1574                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1575                                 return (0);     /* => come back later */
1576                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1577                                 goto try_read;  /* => go read it */
1578                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1579                                 goto get_fmb;   /* => go get a fwd msg buffer */
1580                         default:
1581                                 LBUG ();
1582                         }
1583                         /* Not Reached */
1584                 }
1585
1586                 /* sets wanted_len, iovs etc */
1587                 rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1588
1589                 if (rc != PTL_OK) {
1590                         /* I just received garbage: give up on this conn */
1591                         ksocknal_close_conn_and_siblings (conn, rc);
1592                         return (-EPROTO);
1593                 }
1594
1595                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1596                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1597                         goto try_read;          /* go read the payload */
1598                 }
1599                 /* Fall through (completed packet for me) */
1600
1601         case SOCKNAL_RX_BODY:
1602                 /* payload all received */
1603                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1604                 /* Fall through */
1605
1606         case SOCKNAL_RX_SLOP:
1607                 /* starting new packet? */
1608                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1609                         return (0);     /* come back later */
1610                 goto try_read;          /* try to finish reading slop now */
1611
1612         case SOCKNAL_RX_BODY_FWD:
1613                 /* payload all received */
1614                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1615                         conn, le64_to_cpu(conn->ksnc_hdr.src_nid),
1616                         le64_to_cpu(conn->ksnc_hdr.dest_nid),
1617                         conn->ksnc_rx_nob_left);
1618
1619                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1620                  * conn->ksnc_cookie */
1621                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1622                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1623
1624                 /* no slop in forwarded packets */
1625                 LASSERT (conn->ksnc_rx_nob_left == 0);
1626
1627                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1628                 return (0);                     /* (later) */
1629
1630         default:
1631                 break;
1632         }
1633
1634         /* Not Reached */
1635         LBUG ();
1636         return (-EINVAL);                       /* keep gcc happy */
1637 }
1638
1639 ptl_err_t
1640 ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1641                unsigned int niov, struct iovec *iov, 
1642                size_t offset, size_t mlen, size_t rlen)
1643 {
1644         ksock_conn_t *conn = (ksock_conn_t *)private;
1645
1646         LASSERT (mlen <= rlen);
1647         LASSERT (niov <= PTL_MD_MAX_IOV);
1648         
1649         conn->ksnc_cookie = msg;
1650         conn->ksnc_rx_nob_wanted = mlen;
1651         conn->ksnc_rx_nob_left   = rlen;
1652
1653         conn->ksnc_rx_nkiov = 0;
1654         conn->ksnc_rx_kiov = NULL;
1655         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1656         conn->ksnc_rx_niov =
1657                 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1658                                 niov, iov, offset, mlen);
1659
1660         LASSERT (mlen == 
1661                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1662                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1663
1664         return (PTL_OK);
1665 }
1666
1667 ptl_err_t
1668 ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1669                      unsigned int niov, ptl_kiov_t *kiov, 
1670                      size_t offset, size_t mlen, size_t rlen)
1671 {
1672         ksock_conn_t *conn = (ksock_conn_t *)private;
1673
1674         LASSERT (mlen <= rlen);
1675         LASSERT (niov <= PTL_MD_MAX_IOV);
1676         
1677         conn->ksnc_cookie = msg;
1678         conn->ksnc_rx_nob_wanted = mlen;
1679         conn->ksnc_rx_nob_left   = rlen;
1680
1681         conn->ksnc_rx_niov = 0;
1682         conn->ksnc_rx_iov  = NULL;
1683         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1684         conn->ksnc_rx_nkiov = 
1685                 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1686                                  niov, kiov, offset, mlen);
1687
1688         LASSERT (mlen == 
1689                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1690                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1691
1692         return (PTL_OK);
1693 }
1694
1695 static inline int
1696 ksocknal_sched_cansleep(ksock_sched_t *sched)
1697 {
1698         unsigned long flags;
1699         int           rc;
1700
1701         spin_lock_irqsave(&sched->kss_lock, flags);
1702
1703         rc = (!ksocknal_data.ksnd_shuttingdown &&
1704 #if SOCKNAL_ZC
1705               list_empty(&sched->kss_zctxdone_list) &&
1706 #endif
1707               list_empty(&sched->kss_rx_conns) &&
1708               list_empty(&sched->kss_tx_conns));
1709         
1710         spin_unlock_irqrestore(&sched->kss_lock, flags);
1711         return (rc);
1712 }
1713
1714 int ksocknal_scheduler (void *arg)
1715 {
1716         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1717         ksock_conn_t      *conn;
1718         ksock_tx_t        *tx;
1719         unsigned long      flags;
1720         int                rc;
1721         int                nloops = 0;
1722         int                id = sched - ksocknal_data.ksnd_schedulers;
1723         char               name[16];
1724
1725         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1726         kportal_daemonize (name);
1727         kportal_blockallsigs ();
1728
1729 #if (CONFIG_SMP && CPU_AFFINITY)
1730         id = ksocknal_sched2cpu(id);
1731         if (cpu_online(id)) {
1732                 cpumask_t m;
1733                 cpu_set(id, m);
1734                 set_cpus_allowed(current, m);
1735         } else {
1736                 CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
1737         }
1738 #endif /* CONFIG_SMP && CPU_AFFINITY */
1739         
1740         spin_lock_irqsave (&sched->kss_lock, flags);
1741
1742         while (!ksocknal_data.ksnd_shuttingdown) {
1743                 int did_something = 0;
1744
1745                 /* Ensure I progress everything semi-fairly */
1746
1747                 if (!list_empty (&sched->kss_rx_conns)) {
1748                         conn = list_entry(sched->kss_rx_conns.next,
1749                                           ksock_conn_t, ksnc_rx_list);
1750                         list_del(&conn->ksnc_rx_list);
1751
1752                         LASSERT(conn->ksnc_rx_scheduled);
1753                         LASSERT(conn->ksnc_rx_ready);
1754
1755                         /* clear rx_ready in case receive isn't complete.
1756                          * Do it BEFORE we call process_recv, since
1757                          * data_ready can set it any time after we release
1758                          * kss_lock. */
1759                         conn->ksnc_rx_ready = 0;
1760                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1761                         
1762                         rc = ksocknal_process_receive(conn);
1763                         
1764                         spin_lock_irqsave(&sched->kss_lock, flags);
1765
1766                         /* I'm the only one that can clear this flag */
1767                         LASSERT(conn->ksnc_rx_scheduled);
1768
1769                         /* Did process_receive get everything it wanted? */
1770                         if (rc == 0)
1771                                 conn->ksnc_rx_ready = 1;
1772                         
1773                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1774                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1775                                 /* Conn blocked for a forwarding buffer.
1776                                  * It will get queued for my attention when
1777                                  * one becomes available (and it might just
1778                                  * already have been!).  Meanwhile my ref
1779                                  * on it stays put. */
1780                         } else if (conn->ksnc_rx_ready) {
1781                                 /* reschedule for rx */
1782                                 list_add_tail (&conn->ksnc_rx_list,
1783                                                &sched->kss_rx_conns);
1784                         } else {
1785                                 conn->ksnc_rx_scheduled = 0;
1786                                 /* drop my ref */
1787                                 ksocknal_put_conn(conn);
1788                         }
1789
1790                         did_something = 1;
1791                 }
1792
1793                 if (!list_empty (&sched->kss_tx_conns)) {
1794                         conn = list_entry(sched->kss_tx_conns.next,
1795                                           ksock_conn_t, ksnc_tx_list);
1796                         list_del (&conn->ksnc_tx_list);
1797                         
1798                         LASSERT(conn->ksnc_tx_scheduled);
1799                         LASSERT(conn->ksnc_tx_ready);
1800                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1801                         
1802                         tx = list_entry(conn->ksnc_tx_queue.next,
1803                                         ksock_tx_t, tx_list);
1804                         /* dequeue now so empty list => more to send */
1805                         list_del(&tx->tx_list);
1806                         
1807                         /* Clear tx_ready in case send isn't complete.  Do
1808                          * it BEFORE we call process_transmit, since
1809                          * write_space can set it any time after we release
1810                          * kss_lock. */
1811                         conn->ksnc_tx_ready = 0;
1812                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1813
1814                         rc = ksocknal_process_transmit(conn, tx);
1815
1816                         spin_lock_irqsave (&sched->kss_lock, flags);
1817
1818                         if (rc == -ENOMEM || rc == -EAGAIN) {
1819                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1820                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1821                         } else {
1822                                 /* Complete send; assume space for more */
1823                                 conn->ksnc_tx_ready = 1;
1824                         }
1825
1826                         if (rc == -ENOMEM) {
1827                                 /* Do nothing; after a short timeout, this
1828                                  * conn will be reposted on kss_tx_conns. */
1829                         } else if (conn->ksnc_tx_ready &&
1830                                    !list_empty (&conn->ksnc_tx_queue)) {
1831                                 /* reschedule for tx */
1832                                 list_add_tail (&conn->ksnc_tx_list, 
1833                                                &sched->kss_tx_conns);
1834                         } else {
1835                                 conn->ksnc_tx_scheduled = 0;
1836                                 /* drop my ref */
1837                                 ksocknal_put_conn (conn);
1838                         }
1839                                 
1840                         did_something = 1;
1841                 }
1842 #if SOCKNAL_ZC
1843                 if (!list_empty (&sched->kss_zctxdone_list)) {
1844                         ksock_tx_t *tx =
1845                                 list_entry(sched->kss_zctxdone_list.next,
1846                                            ksock_tx_t, tx_list);
1847                         did_something = 1;
1848
1849                         list_del (&tx->tx_list);
1850                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1851
1852                         ksocknal_tx_done (tx, 1);
1853
1854                         spin_lock_irqsave (&sched->kss_lock, flags);
1855                 }
1856 #endif
1857                 if (!did_something ||           /* nothing to do */
1858                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1859                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1860
1861                         nloops = 0;
1862
1863                         if (!did_something) {   /* wait for something to do */
1864                                 rc = wait_event_interruptible (sched->kss_waitq,
1865                                                                !ksocknal_sched_cansleep(sched));
1866                                 LASSERT (rc == 0);
1867                         } else
1868                                our_cond_resched();
1869
1870                         spin_lock_irqsave (&sched->kss_lock, flags);
1871                 }
1872         }
1873
1874         spin_unlock_irqrestore (&sched->kss_lock, flags);
1875         ksocknal_thread_fini ();
1876         return (0);
1877 }
1878
1879 void
1880 ksocknal_data_ready (struct sock *sk, int n)
1881 {
1882         unsigned long  flags;
1883         ksock_conn_t  *conn;
1884         ksock_sched_t *sched;
1885         ENTRY;
1886
1887         /* interleave correctly with closing sockets... */
1888         read_lock (&ksocknal_data.ksnd_global_lock);
1889
1890         conn = sk->sk_user_data;
1891         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1892                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1893                 sk->sk_data_ready (sk, n);
1894         } else {
1895                 sched = conn->ksnc_scheduler;
1896
1897                 spin_lock_irqsave (&sched->kss_lock, flags);
1898
1899                 conn->ksnc_rx_ready = 1;
1900
1901                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1902                         list_add_tail(&conn->ksnc_rx_list,
1903                                       &sched->kss_rx_conns);
1904                         conn->ksnc_rx_scheduled = 1;
1905                         /* extra ref for scheduler */
1906                         atomic_inc (&conn->ksnc_refcount);
1907
1908                         wake_up (&sched->kss_waitq);
1909                 }
1910
1911                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1912         }
1913
1914         read_unlock (&ksocknal_data.ksnd_global_lock);
1915
1916         EXIT;
1917 }
1918
1919 void
1920 ksocknal_write_space (struct sock *sk)
1921 {
1922         unsigned long  flags;
1923         ksock_conn_t  *conn;
1924         ksock_sched_t *sched;
1925
1926         /* interleave correctly with closing sockets... */
1927         read_lock (&ksocknal_data.ksnd_global_lock);
1928
1929         conn = sk->sk_user_data;
1930
1931         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1932                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1933                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1934                                       " ready" : " blocked"),
1935                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1936                                       " scheduled" : " idle"),
1937                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1938                                       " empty" : " queued"));
1939
1940         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1941                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1942                 sk->sk_write_space (sk);
1943
1944                 read_unlock (&ksocknal_data.ksnd_global_lock);
1945                 return;
1946         }
1947
1948         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1949                 sched = conn->ksnc_scheduler;
1950
1951                 spin_lock_irqsave (&sched->kss_lock, flags);
1952
1953                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1954                 conn->ksnc_tx_ready = 1;
1955
1956                 if (!conn->ksnc_tx_scheduled && // not being progressed
1957                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1958                         list_add_tail (&conn->ksnc_tx_list,
1959                                        &sched->kss_tx_conns);
1960                         conn->ksnc_tx_scheduled = 1;
1961                         /* extra ref for scheduler */
1962                         atomic_inc (&conn->ksnc_refcount);
1963
1964                         wake_up (&sched->kss_waitq);
1965                 }
1966
1967                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1968         }
1969
1970         read_unlock (&ksocknal_data.ksnd_global_lock);
1971 }
1972
1973 int
1974 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1975 {
1976         int           rc;
1977         mm_segment_t  oldmm = get_fs();
1978
1979         while (nob > 0) {
1980                 struct iovec  iov = {
1981                         .iov_base = buffer,
1982                         .iov_len  = nob
1983                 };
1984                 struct msghdr msg = {
1985                         .msg_name       = NULL,
1986                         .msg_namelen    = 0,
1987                         .msg_iov        = &iov,
1988                         .msg_iovlen     = 1,
1989                         .msg_control    = NULL,
1990                         .msg_controllen = 0,
1991                         .msg_flags      = 0
1992                 };
1993
1994                 set_fs (KERNEL_DS);
1995                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1996                 set_fs (oldmm);
1997                 
1998                 if (rc < 0)
1999                         return (rc);
2000
2001                 if (rc == 0) {
2002                         CERROR ("Unexpected zero rc\n");
2003                         return (-ECONNABORTED);
2004                 }
2005
2006                 buffer = ((char *)buffer) + rc;
2007                 nob -= rc;
2008         }
2009         
2010         return (0);
2011 }
2012
2013 int
2014 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
2015 {
2016         int           rc;
2017         mm_segment_t  oldmm = get_fs();
2018         
2019         while (nob > 0) {
2020                 struct iovec  iov = {
2021                         .iov_base = buffer,
2022                         .iov_len  = nob
2023                 };
2024                 struct msghdr msg = {
2025                         .msg_name       = NULL,
2026                         .msg_namelen    = 0,
2027                         .msg_iov        = &iov,
2028                         .msg_iovlen     = 1,
2029                         .msg_control    = NULL,
2030                         .msg_controllen = 0,
2031                         .msg_flags      = 0
2032                 };
2033
2034                 set_fs (KERNEL_DS);
2035                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2036                 set_fs (oldmm);
2037                 
2038                 if (rc < 0)
2039                         return (rc);
2040
2041                 if (rc == 0)
2042                         return (-ECONNABORTED);
2043
2044                 buffer = ((char *)buffer) + rc;
2045                 nob -= rc;
2046         }
2047         
2048         return (0);
2049 }
2050
2051 int
2052 ksocknal_send_hello (ksock_conn_t *conn, __u32 *ipaddrs, int nipaddrs)
2053 {
2054         /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
2055         struct socket      *sock = conn->ksnc_sock;
2056         ptl_hdr_t           hdr;
2057         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2058         int                 i;
2059         int                 rc;
2060
2061         LASSERT (conn->ksnc_type != SOCKNAL_CONN_NONE);
2062         LASSERT (nipaddrs <= SOCKNAL_MAX_INTERFACES);
2063
2064         /* No need for getconnsock/putconnsock */
2065         LASSERT (!conn->ksnc_closing);
2066
2067         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2068         hmv->magic         = cpu_to_le32 (PORTALS_PROTO_MAGIC);
2069         hmv->version_major = cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
2070         hmv->version_minor = cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
2071
2072         hdr.src_nid        = cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
2073         hdr.type           = cpu_to_le32 (PTL_MSG_HELLO);
2074         hdr.payload_length = cpu_to_le32 (nipaddrs * sizeof(*ipaddrs));
2075
2076         hdr.msg.hello.type = cpu_to_le32 (conn->ksnc_type);
2077         hdr.msg.hello.incarnation =
2078                 cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2079
2080         /* Receiver is eager */
2081         rc = ksocknal_sock_write (sock, &hdr, sizeof(hdr));
2082         if (rc != 0) {
2083                 CERROR ("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
2084                         rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2085                 return (rc);
2086         }
2087         
2088         if (nipaddrs == 0)
2089                 return (0);
2090         
2091         for (i = 0; i < nipaddrs; i++) {
2092                 ipaddrs[i] = __cpu_to_le32 (ipaddrs[i]);
2093         }
2094
2095         rc = ksocknal_sock_write (sock, ipaddrs, nipaddrs * sizeof(*ipaddrs));
2096         if (rc != 0)
2097                 CERROR ("Error %d sending HELLO payload (%d)"
2098                         " to %u.%u.%u.%u/%d\n", rc, nipaddrs, 
2099                         HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2100         return (rc);
2101 }
2102
2103 int
2104 ksocknal_invert_type(int type)
2105 {
2106         switch (type)
2107         {
2108         case SOCKNAL_CONN_ANY:
2109         case SOCKNAL_CONN_CONTROL:
2110                 return (type);
2111         case SOCKNAL_CONN_BULK_IN:
2112                 return SOCKNAL_CONN_BULK_OUT;
2113         case SOCKNAL_CONN_BULK_OUT:
2114                 return SOCKNAL_CONN_BULK_IN;
2115         default:
2116                 return (SOCKNAL_CONN_NONE);
2117         }
2118 }
2119
2120 int
2121 ksocknal_recv_hello (ksock_conn_t *conn, ptl_nid_t *nid,
2122                      __u64 *incarnation, __u32 *ipaddrs)
2123 {
2124         struct socket      *sock = conn->ksnc_sock;
2125         int                 rc;
2126         int                 nips;
2127         int                 i;
2128         int                 type;
2129         ptl_hdr_t           hdr;
2130         ptl_magicversion_t *hmv;
2131
2132         hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2133         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2134
2135         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2136         if (rc != 0) {
2137                 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2138                         rc, HIPQUAD(conn->ksnc_ipaddr));
2139                 return (rc);
2140         }
2141
2142         if (hmv->magic != le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2143                 CERROR ("Bad magic %#08x (%#08x expected) from %u.%u.%u.%u\n",
2144                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC,
2145                         HIPQUAD(conn->ksnc_ipaddr));
2146                 return (-EPROTO);
2147         }
2148
2149         if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2150             hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2151                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2152                         " from %u.%u.%u.%u\n",
2153                         le16_to_cpu (hmv->version_major),
2154                         le16_to_cpu (hmv->version_minor),
2155                         PORTALS_PROTO_VERSION_MAJOR,
2156                         PORTALS_PROTO_VERSION_MINOR,
2157                         HIPQUAD(conn->ksnc_ipaddr));
2158                 return (-EPROTO);
2159         }
2160
2161 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
2162 # error "This code only understands protocol version 1.x"
2163 #endif
2164         /* version 1 sends magic/version as the dest_nid of a 'hello'
2165          * header, followed by payload full of interface IP addresses.
2166          * Read the rest of it in now... */
2167
2168         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2169         if (rc != 0) {
2170                 CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
2171                         rc, HIPQUAD(conn->ksnc_ipaddr));
2172                 return (rc);
2173         }
2174
2175         /* ...and check we got what we expected */
2176         if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
2177                 CERROR ("Expecting a HELLO hdr,"
2178                         " but got type %d from %u.%u.%u.%u\n",
2179                         le32_to_cpu (hdr.type),
2180                         HIPQUAD(conn->ksnc_ipaddr));
2181                 return (-EPROTO);
2182         }
2183
2184         if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2185                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY"
2186                        "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
2187                 return (-EPROTO);
2188         }
2189
2190         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2191                 *nid = le64_to_cpu(hdr.src_nid);
2192         } else if (*nid != le64_to_cpu (hdr.src_nid)) {
2193                 CERROR ("Connected to nid "LPX64"@%u.%u.%u.%u "
2194                         "but expecting "LPX64"\n",
2195                         le64_to_cpu (hdr.src_nid),
2196                         HIPQUAD(conn->ksnc_ipaddr), *nid);
2197                 return (-EPROTO);
2198         }
2199
2200         type = __le32_to_cpu(hdr.msg.hello.type);
2201
2202         if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2203                 /* I've accepted this connection; peer determines type */
2204                 conn->ksnc_type = ksocknal_invert_type(type);
2205                 if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2206                         CERROR ("Unexpected type %d from "LPX64"@%u.%u.%u.%u\n",
2207                                 type, *nid, HIPQUAD(conn->ksnc_ipaddr));
2208                         return (-EPROTO);
2209                 }
2210         } else if (ksocknal_invert_type(type) != conn->ksnc_type) {
2211                 CERROR ("Mismatched types: me %d, "LPX64"@%u.%u.%u.%u %d\n",
2212                         conn->ksnc_type, *nid, HIPQUAD(conn->ksnc_ipaddr),
2213                         le32_to_cpu(hdr.msg.hello.type));
2214                 return (-EPROTO);
2215         }
2216
2217         *incarnation = le64_to_cpu(hdr.msg.hello.incarnation);
2218
2219         nips = __le32_to_cpu (hdr.payload_length) / sizeof (__u32);
2220
2221         if (nips > SOCKNAL_MAX_INTERFACES ||
2222             nips * sizeof(__u32) != __le32_to_cpu (hdr.payload_length)) {
2223                 CERROR("Bad payload length %d from "LPX64"@%u.%u.%u.%u\n",
2224                        __le32_to_cpu (hdr.payload_length),
2225                        *nid, HIPQUAD(conn->ksnc_ipaddr));
2226         }
2227
2228         if (nips == 0)
2229                 return (0);
2230         
2231         rc = ksocknal_sock_read (sock, ipaddrs, nips * sizeof(*ipaddrs));
2232         if (rc != 0) {
2233                 CERROR ("Error %d reading IPs from "LPX64"@%u.%u.%u.%u\n",
2234                         rc, *nid, HIPQUAD(conn->ksnc_ipaddr));
2235                 return (rc);
2236         }
2237
2238         for (i = 0; i < nips; i++) {
2239                 ipaddrs[i] = __le32_to_cpu(ipaddrs[i]);
2240                 
2241                 if (ipaddrs[i] == 0) {
2242                         CERROR("Zero IP[%d] from "LPX64"@%u.%u.%u.%u\n",
2243                                i, *nid, HIPQUAD(conn->ksnc_ipaddr));
2244                         return (-EPROTO);
2245                 }
2246         }
2247
2248         return (nips);
2249 }
2250
2251 int
2252 ksocknal_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
2253 {
2254         mm_segment_t   oldmm = get_fs ();
2255         struct socket *sock = conn->ksnc_sock;
2256         int            len;
2257         int            rc;
2258
2259         rc = ksocknal_getconnsock (conn);
2260         if (rc != 0) {
2261                 LASSERT (conn->ksnc_closing);
2262                 *txmem = *rxmem = *nagle = 0;
2263                 return (-ESHUTDOWN);
2264         }
2265         
2266         set_fs (KERNEL_DS);
2267
2268         len = sizeof(*txmem);
2269         rc = sock_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,
2270                              (char *)txmem, &len);
2271         if (rc == 0) {
2272                 len = sizeof(*rxmem);
2273                 rc = sock_getsockopt(sock, SOL_SOCKET, SO_RCVBUF,
2274                                      (char *)rxmem, &len);
2275         }
2276         if (rc == 0) {
2277                 len = sizeof(*nagle);
2278                 rc = sock->ops->getsockopt(sock, SOL_TCP, TCP_NODELAY,
2279                                            (char *)nagle, &len);
2280         }
2281
2282         set_fs (oldmm);
2283         ksocknal_putconnsock (conn);
2284
2285         if (rc == 0)
2286                 *nagle = !*nagle;
2287         else
2288                 *txmem = *rxmem = *nagle = 0;
2289                 
2290         return (rc);
2291 }
2292
2293 int
2294 ksocknal_setup_sock (struct socket *sock)
2295 {
2296         mm_segment_t    oldmm = get_fs ();
2297         int             rc;
2298         int             option;
2299         int             keep_idle;
2300         int             keep_intvl;
2301         int             keep_count;
2302         int             do_keepalive;
2303         struct linger   linger;
2304
2305         sock->sk->sk_allocation = GFP_NOFS;
2306
2307         /* Ensure this socket aborts active sends immediately when we close
2308          * it. */
2309
2310         linger.l_onoff = 0;
2311         linger.l_linger = 0;
2312
2313         set_fs (KERNEL_DS);
2314         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2315                               (char *)&linger, sizeof (linger));
2316         set_fs (oldmm);
2317         if (rc != 0) {
2318                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2319                 return (rc);
2320         }
2321
2322         option = -1;
2323         set_fs (KERNEL_DS);
2324         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2325                                     (char *)&option, sizeof (option));
2326         set_fs (oldmm);
2327         if (rc != 0) {
2328                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2329                 return (rc);
2330         }
2331
2332         if (!ksocknal_tunables.ksnd_nagle) {
2333                 option = 1;
2334                 
2335                 set_fs (KERNEL_DS);
2336                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2337                                             (char *)&option, sizeof (option));
2338                 set_fs (oldmm);
2339                 if (rc != 0) {
2340                         CERROR ("Can't disable nagle: %d\n", rc);
2341                         return (rc);
2342                 }
2343         }
2344         
2345         if (ksocknal_tunables.ksnd_buffer_size > 0) {
2346                 option = ksocknal_tunables.ksnd_buffer_size;
2347                 
2348                 set_fs (KERNEL_DS);
2349                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2350                                       (char *)&option, sizeof (option));
2351                 set_fs (oldmm);
2352                 if (rc != 0) {
2353                         CERROR ("Can't set send buffer %d: %d\n",
2354                                 option, rc);
2355                         return (rc);
2356                 }
2357
2358                 set_fs (KERNEL_DS);
2359                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2360                                       (char *)&option, sizeof (option));
2361                 set_fs (oldmm);
2362                 if (rc != 0) {
2363                         CERROR ("Can't set receive buffer %d: %d\n",
2364                                 option, rc);
2365                         return (rc);
2366                 }
2367         }
2368
2369         /* snapshot tunables */
2370         keep_idle  = ksocknal_tunables.ksnd_keepalive_idle;
2371         keep_count = ksocknal_tunables.ksnd_keepalive_count;
2372         keep_intvl = ksocknal_tunables.ksnd_keepalive_intvl;
2373         
2374         do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
2375
2376         option = (do_keepalive ? 1 : 0);
2377         set_fs (KERNEL_DS);
2378         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2379                               (char *)&option, sizeof (option));
2380         set_fs (oldmm);
2381         if (rc != 0) {
2382                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2383                 return (rc);
2384         }
2385
2386         if (!do_keepalive)
2387                 return (0);
2388
2389         set_fs (KERNEL_DS);
2390         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2391                                     (char *)&keep_idle, sizeof (keep_idle));
2392         set_fs (oldmm);
2393         if (rc != 0) {
2394                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2395                 return (rc);
2396         }
2397
2398         set_fs (KERNEL_DS);
2399         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2400                                     (char *)&keep_intvl, sizeof (keep_intvl));
2401         set_fs (oldmm);
2402         if (rc != 0) {
2403                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2404                 return (rc);
2405         }
2406
2407         set_fs (KERNEL_DS);
2408         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2409                                     (char *)&keep_count, sizeof (keep_count));
2410         set_fs (oldmm);
2411         if (rc != 0) {
2412                 CERROR ("Can't set TCP_KEEPCNT: %d\n", rc);
2413                 return (rc);
2414         }
2415
2416         return (0);
2417 }
2418
2419 static int
2420 ksocknal_connect_sock(struct socket **sockp, int *may_retry, 
2421                       ksock_route_t *route, int local_port)
2422 {
2423         struct sockaddr_in  locaddr;
2424         struct sockaddr_in  srvaddr;
2425         struct socket      *sock;
2426         int                 rc;
2427         int                 option;
2428         mm_segment_t        oldmm = get_fs();
2429         struct timeval      tv;
2430
2431         memset(&locaddr, 0, sizeof(locaddr)); 
2432         locaddr.sin_family = AF_INET; 
2433         locaddr.sin_port = htons(local_port);
2434         locaddr.sin_addr.s_addr = 
2435                 (route->ksnr_myipaddr != 0) ? htonl(route->ksnr_myipaddr) 
2436                                             : INADDR_ANY;
2437  
2438         memset (&srvaddr, 0, sizeof (srvaddr));
2439         srvaddr.sin_family = AF_INET;
2440         srvaddr.sin_port = htons (route->ksnr_port);
2441         srvaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2442
2443         *may_retry = 0;
2444
2445         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2446         *sockp = sock;
2447         if (rc != 0) {
2448                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2449                 return (rc);
2450         }
2451
2452         /* Ugh; have to map_fd for compatibility with sockets passed in
2453          * from userspace.  And we actually need the sock->file refcounting
2454          * that this gives you :) */
2455
2456         rc = sock_map_fd (sock);
2457         if (rc < 0) {
2458                 sock_release (sock);
2459                 CERROR ("sock_map_fd error %d\n", rc);
2460                 return (rc);
2461         }
2462
2463         /* NB the file descriptor (rc) now owns the ref on sock->file */
2464         LASSERT (sock->file != NULL);
2465         LASSERT (file_count(sock->file) == 1);
2466
2467         get_file(sock->file);                /* extra ref makes sock->file */
2468         sys_close(rc);                       /* survive this close */
2469
2470         /* Still got a single ref on sock->file */
2471         LASSERT (file_count(sock->file) == 1);
2472
2473         /* Set the socket timeouts, so our connection attempt completes in
2474          * finite time */
2475         tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2476         tv.tv_usec = 0;
2477
2478         set_fs (KERNEL_DS);
2479         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2480                               (char *)&tv, sizeof (tv));
2481         set_fs (oldmm);
2482         if (rc != 0) {
2483                 CERROR ("Can't set send timeout %d: %d\n", 
2484                         ksocknal_tunables.ksnd_io_timeout, rc);
2485                 goto failed;
2486         }
2487         
2488         set_fs (KERNEL_DS);
2489         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2490                               (char *)&tv, sizeof (tv));
2491         set_fs (oldmm);
2492         if (rc != 0) {
2493                 CERROR ("Can't set receive timeout %d: %d\n",
2494                         ksocknal_tunables.ksnd_io_timeout, rc);
2495                 goto failed;
2496         }
2497
2498         set_fs (KERNEL_DS);
2499         option = 1;
2500         rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
2501                              (char *)&option, sizeof (option)); 
2502         set_fs (oldmm);
2503         if (rc != 0) {
2504                 CERROR("Can't set SO_REUSEADDR for socket: %d\n", rc);
2505                 goto failed;
2506         }
2507
2508         rc = sock->ops->bind(sock, 
2509                              (struct sockaddr *)&locaddr, sizeof(locaddr));
2510         if (rc == -EADDRINUSE) {
2511                 CDEBUG(D_NET, "Port %d already in use\n", local_port);
2512                 *may_retry = 1;
2513                 goto failed;
2514         }
2515         if (rc != 0) {
2516                 CERROR("Error trying to bind to reserved port %d: %d\n",
2517                        local_port, rc);
2518                 goto failed;
2519         }
2520
2521         rc = sock->ops->connect(sock,
2522                                 (struct sockaddr *)&srvaddr, sizeof(srvaddr),
2523                                 sock->file->f_flags);
2524         if (rc == 0)
2525                 return 0;
2526
2527         /* EADDRNOTAVAIL probably means we're already connected to the same
2528          * peer/port on the same local port on a differently typed
2529          * connection.  Let our caller retry with a different local
2530          * port... */
2531         *may_retry = (rc == -EADDRNOTAVAIL);
2532
2533         CDEBUG(*may_retry ? D_NET : D_ERROR,
2534                "Error %d connecting %u.%u.%u.%u/%d -> %u.%u.%u.%u/%d\n", rc,
2535                HIPQUAD(route->ksnr_myipaddr), local_port,
2536                HIPQUAD(route->ksnr_ipaddr), route->ksnr_port);
2537
2538  failed:
2539         fput(sock->file);
2540         return rc;
2541 }
2542
2543 int
2544 ksocknal_connect_peer (ksock_route_t *route, int type)
2545 {
2546         struct socket      *sock;
2547         int                 rc;
2548         int                 port;
2549         int                 may_retry;
2550         
2551         /* Iterate through reserved ports.  When typed connections are
2552          * used, we will need to bind to multiple ports, but we only know
2553          * this at connect time.  But, by that time we've already called
2554          * bind() so we need a new socket. */
2555
2556         for (port = 1023; port > 512; --port) {
2557
2558                 rc = ksocknal_connect_sock(&sock, &may_retry, route, port);
2559
2560                 if (rc == 0) {
2561                         rc = ksocknal_create_conn(route, sock, type);
2562                         fput(sock->file);
2563                         return rc;
2564                 }
2565
2566                 if (!may_retry)
2567                         return rc;
2568         }
2569
2570         CERROR("Out of ports trying to bind to a reserved port\n");
2571         return (-EADDRINUSE);
2572 }
2573
2574 void
2575 ksocknal_autoconnect (ksock_route_t *route)
2576 {
2577         LIST_HEAD        (zombies);
2578         ksock_tx_t       *tx;
2579         ksock_peer_t     *peer;
2580         unsigned long     flags;
2581         int               rc;
2582         int               type;
2583         
2584         for (;;) {
2585                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2586                         if ((route->ksnr_connecting & (1 << type)) != 0)
2587                                 break;
2588                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2589
2590                 rc = ksocknal_connect_peer (route, type);
2591                 if (rc != 0)
2592                         break;
2593                 
2594                 /* successfully autoconnected: create_conn did the
2595                  * route/conn binding and scheduled any blocked packets */
2596
2597                 if (route->ksnr_connecting == 0) {
2598                         /* No more connections required */
2599                         return;
2600                 }
2601         }
2602
2603         /* Connection attempt failed */
2604
2605         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2606
2607         peer = route->ksnr_peer;
2608         route->ksnr_connecting = 0;
2609
2610         /* This is a retry rather than a new connection */
2611         LASSERT (route->ksnr_retry_interval != 0);
2612         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2613         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2614                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2615
2616         if (!list_empty (&peer->ksnp_tx_queue) &&
2617             ksocknal_find_connecting_route_locked (peer) == NULL) {
2618                 LASSERT (list_empty (&peer->ksnp_conns));
2619
2620                 /* None of the connections that the blocked packets are
2621                  * waiting for have been successful.  Complete them now... */
2622                 do {
2623                         tx = list_entry (peer->ksnp_tx_queue.next,
2624                                          ksock_tx_t, tx_list);
2625                         list_del (&tx->tx_list);
2626                         list_add_tail (&tx->tx_list, &zombies);
2627                 } while (!list_empty (&peer->ksnp_tx_queue));
2628         }
2629
2630 #if 0           /* irrelevent with only eager routes */
2631         if (!route->ksnr_deleted) {
2632                 /* make this route least-favourite for re-selection */
2633                 list_del(&route->ksnr_list);
2634                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2635         }
2636 #endif        
2637         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2638
2639         while (!list_empty (&zombies)) {
2640                 char ipbuf[PTL_NALFMT_SIZE];
2641                 char ipbuf2[PTL_NALFMT_SIZE];
2642                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2643
2644                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2645                         le32_to_cpu (tx->tx_hdr->type),
2646                         le32_to_cpu (tx->tx_hdr->payload_length),
2647                         le64_to_cpu (tx->tx_hdr->src_nid),
2648                         portals_nid2str(SOCKNAL,
2649                                         le64_to_cpu(tx->tx_hdr->src_nid),
2650                                         ipbuf),
2651                         le64_to_cpu (tx->tx_hdr->dest_nid),
2652                         portals_nid2str(SOCKNAL,
2653                                         le64_to_cpu(tx->tx_hdr->src_nid),
2654                                         ipbuf2));
2655
2656                 list_del (&tx->tx_list);
2657                 /* complete now */
2658                 ksocknal_tx_done (tx, 0);
2659         }
2660 }
2661
2662 int
2663 ksocknal_autoconnectd (void *arg)
2664 {
2665         long               id = (long)arg;
2666         char               name[16];
2667         unsigned long      flags;
2668         ksock_route_t     *route;
2669         int                rc;
2670
2671         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2672         kportal_daemonize (name);
2673         kportal_blockallsigs ();
2674
2675         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2676
2677         while (!ksocknal_data.ksnd_shuttingdown) {
2678
2679                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2680                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2681                                             ksock_route_t, ksnr_connect_list);
2682
2683                         list_del (&route->ksnr_connect_list);
2684                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2685
2686                         ksocknal_autoconnect (route);
2687                         ksocknal_put_route (route);
2688
2689                         spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock,
2690                                           flags);
2691                         continue;
2692                 }
2693
2694                 spin_unlock_irqrestore(&ksocknal_data.ksnd_autoconnectd_lock,
2695                                        flags);
2696
2697                 rc = wait_event_interruptible(ksocknal_data.ksnd_autoconnectd_waitq,
2698                                               ksocknal_data.ksnd_shuttingdown ||
2699                                               !list_empty(&ksocknal_data.ksnd_autoconnectd_routes));
2700
2701                 spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock, flags);
2702         }
2703
2704         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2705
2706         ksocknal_thread_fini ();
2707         return (0);
2708 }
2709
2710 ksock_conn_t *
2711 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2712 {
2713         /* We're called with a shared lock on ksnd_global_lock */
2714         ksock_conn_t      *conn;
2715         struct list_head  *ctmp;
2716
2717         list_for_each (ctmp, &peer->ksnp_conns) {
2718                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2719
2720                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2721                 LASSERT (!conn->ksnc_closing);
2722
2723                 if (conn->ksnc_sock->sk->sk_err != 0) {
2724                         /* Something (e.g. failed keepalive) set the socket error */
2725                         atomic_inc (&conn->ksnc_refcount);
2726                         CERROR ("Socket error %d: "LPX64" %p %d.%d.%d.%d\n",
2727                                 conn->ksnc_sock->sk->sk_err, peer->ksnp_nid,
2728                                 conn, HIPQUAD(conn->ksnc_ipaddr));
2729                         return (conn);
2730                 }
2731
2732                 if (conn->ksnc_rx_started &&
2733                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2734                         /* Timed out incomplete incoming message */
2735                         atomic_inc (&conn->ksnc_refcount);
2736                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2737                                 peer->ksnp_nid,conn,HIPQUAD(conn->ksnc_ipaddr));
2738                         return (conn);
2739                 }
2740
2741                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2742                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2743                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2744                         /* Timed out messages queued for sending or
2745                          * buffered in the socket's send buffer */
2746                         atomic_inc (&conn->ksnc_refcount);
2747                         CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n",
2748                                 peer->ksnp_nid,
2749                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2750                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2751                                 HIPQUAD(conn->ksnc_ipaddr));
2752                         return (conn);
2753                 }
2754         }
2755
2756         return (NULL);
2757 }
2758
2759 void
2760 ksocknal_check_peer_timeouts (int idx)
2761 {
2762         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2763         struct list_head *ptmp;
2764         ksock_peer_t     *peer;
2765         ksock_conn_t     *conn;
2766
2767  again:
2768         /* NB. We expect to have a look at all the peers and not find any
2769          * connections to time out, so we just use a shared lock while we
2770          * take a look... */
2771         read_lock (&ksocknal_data.ksnd_global_lock);
2772
2773         list_for_each (ptmp, peers) {
2774                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2775                 conn = ksocknal_find_timed_out_conn (peer);
2776                 
2777                 if (conn != NULL) {
2778                         read_unlock (&ksocknal_data.ksnd_global_lock);
2779
2780                         CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2781                                 peer->ksnp_nid,
2782                                 HIPQUAD(conn->ksnc_ipaddr),
2783                                 conn->ksnc_port);
2784                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2785                         
2786                         /* NB we won't find this one again, but we can't
2787                          * just proceed with the next peer, since we dropped
2788                          * ksnd_global_lock and it might be dead already! */
2789                         ksocknal_put_conn (conn);
2790                         goto again;
2791                 }
2792         }
2793
2794         read_unlock (&ksocknal_data.ksnd_global_lock);
2795 }
2796
2797 int
2798 ksocknal_reaper (void *arg)
2799 {
2800         wait_queue_t       wait;
2801         unsigned long      flags;
2802         ksock_conn_t      *conn;
2803         ksock_sched_t     *sched;
2804         struct list_head   enomem_conns;
2805         int                nenomem_conns;
2806         int                timeout;
2807         int                i;
2808         int                peer_index = 0;
2809         unsigned long      deadline = jiffies;
2810         
2811         kportal_daemonize ("ksocknal_reaper");
2812         kportal_blockallsigs ();
2813
2814         INIT_LIST_HEAD(&enomem_conns);
2815         init_waitqueue_entry (&wait, current);
2816
2817         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2818
2819         while (!ksocknal_data.ksnd_shuttingdown) {
2820
2821                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2822                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2823                                            ksock_conn_t, ksnc_list);
2824                         list_del (&conn->ksnc_list);
2825                         
2826                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2827
2828                         ksocknal_terminate_conn (conn);
2829                         ksocknal_put_conn (conn);
2830
2831                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2832                         continue;
2833                 }
2834
2835                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2836                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2837                                            ksock_conn_t, ksnc_list);
2838                         list_del (&conn->ksnc_list);
2839                         
2840                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2841
2842                         ksocknal_destroy_conn (conn);
2843
2844                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2845                         continue;
2846                 }
2847
2848                 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2849                         list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2850                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2851                 }
2852
2853                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2854
2855                 /* reschedule all the connections that stalled with ENOMEM... */
2856                 nenomem_conns = 0;
2857                 while (!list_empty (&enomem_conns)) {
2858                         conn = list_entry (enomem_conns.next,
2859                                            ksock_conn_t, ksnc_tx_list);
2860                         list_del (&conn->ksnc_tx_list);
2861
2862                         sched = conn->ksnc_scheduler;
2863
2864                         spin_lock_irqsave (&sched->kss_lock, flags);
2865
2866                         LASSERT (conn->ksnc_tx_scheduled);
2867                         conn->ksnc_tx_ready = 1;
2868                         list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2869                         wake_up (&sched->kss_waitq);
2870
2871                         spin_unlock_irqrestore (&sched->kss_lock, flags);
2872                         nenomem_conns++;
2873                 }
2874                 
2875                 /* careful with the jiffy wrap... */
2876                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2877                         const int n = 4;
2878                         const int p = 1;
2879                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2880                         
2881                         /* Time to check for timeouts on a few more peers: I do
2882                          * checks every 'p' seconds on a proportion of the peer
2883                          * table and I need to check every connection 'n' times
2884                          * within a timeout interval, to ensure I detect a
2885                          * timeout on any connection within (n+1)/n times the
2886                          * timeout interval. */
2887
2888                         if (ksocknal_tunables.ksnd_io_timeout > n * p)
2889                                 chunk = (chunk * n * p) / 
2890                                         ksocknal_tunables.ksnd_io_timeout;
2891                         if (chunk == 0)
2892                                 chunk = 1;
2893
2894                         for (i = 0; i < chunk; i++) {
2895                                 ksocknal_check_peer_timeouts (peer_index);
2896                                 peer_index = (peer_index + 1) % 
2897                                              ksocknal_data.ksnd_peer_hash_size;
2898                         }
2899
2900                         deadline += p * HZ;
2901                 }
2902
2903                 if (nenomem_conns != 0) {
2904                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2905                          * This also prevents me getting woken immediately
2906                          * if any go back on my enomem list. */
2907                         timeout = SOCKNAL_ENOMEM_RETRY;
2908                 }
2909                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2910
2911                 set_current_state (TASK_INTERRUPTIBLE);
2912                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2913
2914                 if (!ksocknal_data.ksnd_shuttingdown &&
2915                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2916                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2917                         schedule_timeout (timeout);
2918
2919                 set_current_state (TASK_RUNNING);
2920                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2921
2922                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2923         }
2924
2925         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2926
2927         ksocknal_thread_fini ();
2928         return (0);
2929 }
2930
2931 lib_nal_t ksocknal_lib = {
2932         libnal_data:       &ksocknal_data,      /* NAL private data */
2933         libnal_send:        ksocknal_send,
2934         libnal_send_pages:  ksocknal_send_pages,
2935         libnal_recv:        ksocknal_recv,
2936         libnal_recv_pages:  ksocknal_recv_pages,
2937         libnal_dist:        ksocknal_dist
2938 };