Whamcloud - gitweb
* Added socknal multi-frag I/O
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27 #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0))
28 # include <linux/syscalls.h>
29 #endif
30
31 /*
32  *  LIB functions follow
33  *
34  */
35 int
36 ksocknal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
37 {
38         /* I would guess that if ksocknal_get_peer (nid) == NULL,
39            and we're not routing, then 'nid' is very distant :) */
40         if (nal->libnal_ni.ni_pid.nid == nid) {
41                 *dist = 0;
42         } else {
43                 *dist = 1;
44         }
45
46         return 0;
47 }
48
49 void
50 ksocknal_free_ltx (ksock_ltx_t *ltx)
51 {
52         atomic_dec(&ksocknal_data.ksnd_nactive_ltxs);
53         PORTAL_FREE(ltx, ltx->ltx_desc_size);
54 }
55
56 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
57 struct page *
58 ksocknal_kvaddr_to_page (unsigned long vaddr)
59 {
60         struct page *page;
61
62         if (vaddr >= VMALLOC_START &&
63             vaddr < VMALLOC_END)
64                 page = vmalloc_to_page ((void *)vaddr);
65 #if CONFIG_HIGHMEM
66         else if (vaddr >= PKMAP_BASE &&
67                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
68                 page = vmalloc_to_page ((void *)vaddr);
69                 /* in 2.4 ^ just walks the page tables */
70 #endif
71         else
72                 page = virt_to_page (vaddr);
73
74         if (page == NULL ||
75             !VALID_PAGE (page))
76                 return (NULL);
77
78         return (page);
79 }
80 #endif
81
82 int
83 ksocknal_send_iov (ksock_conn_t *conn, ksock_tx_t *tx)
84 {
85         struct socket *sock = conn->ksnc_sock;
86         struct iovec  *iov = tx->tx_iov;
87 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
88         unsigned long  vaddr = (unsigned long)iov->iov_base
89         int            offset = vaddr & (PAGE_SIZE - 1);
90         int            zcsize = MIN (iov->iov_len, PAGE_SIZE - offset);
91         struct page   *page;
92 #endif
93         int            nob;
94         int            rc;
95
96         /* NB we can't trust socket ops to either consume our iovs
97          * or leave them alone. */
98         LASSERT (tx->tx_niov > 0);
99         
100 #if (SOCKNAL_ZC && SOCKNAL_VADDR_ZC)
101         if (zcsize >= ksocknal_data.ksnd_zc_min_frag &&
102             (sock->sk->route_caps & NETIF_F_SG) &&
103             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
104             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
105                 int msgflg = MSG_DONTWAIT;
106                 
107                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
108                        (void *)vaddr, page, page_address(page), offset, zcsize);
109
110                 if (!list_empty (&conn->ksnc_tx_queue) ||
111                     zcsize < tx->tx_resid)
112                         msgflg |= MSG_MORE;
113                 
114                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, msgflg, &tx->tx_zccd);
115         } else
116 #endif
117         {
118 #if SOCKNAL_SINGLE_FRAG_TX
119                 struct iovec    scratch;
120                 struct iovec   *scratchiov = &scratch;
121                 int             niov = 1;
122 #else
123                 struct iovec   *scratchiov = conn->ksnc_tx_scratch_iov;
124                 int             niov = tx->tx_niov;
125 #endif
126                 struct msghdr msg = {
127                         .msg_name       = NULL,
128                         .msg_namelen    = 0,
129                         .msg_iov        = scratchiov,
130                         .msg_iovlen     = niov,
131                         .msg_control    = NULL,
132                         .msg_controllen = 0,
133                         .msg_flags      = MSG_DONTWAIT
134                 };
135                 mm_segment_t oldmm = get_fs();
136                 int  i;
137
138                 for (nob = i = 0; i < niov; i++) {
139                         scratchiov[i] = tx->tx_iov[i];
140                         nob += scratchiov[i].iov_len;
141                 }
142
143                 if (!list_empty(&conn->ksnc_tx_queue) ||
144                     nob < tx->tx_resid)
145                         msg.msg_flags |= MSG_MORE;
146                 
147                 set_fs (KERNEL_DS);
148                 rc = sock_sendmsg(sock, &msg, nob);
149                 set_fs (oldmm);
150         } 
151
152         if (rc <= 0)                            /* sent nothing? */
153                 return (rc);
154
155         nob = rc;
156         LASSERT (nob <= tx->tx_resid);
157         tx->tx_resid -= nob;
158
159         /* "consume" iov */
160         do {
161                 LASSERT (tx->tx_niov > 0);
162                 
163                 if (nob < iov->iov_len) {
164                         iov->iov_base = (void *)(((unsigned long)(iov->iov_base)) + nob);
165                         iov->iov_len -= nob;
166                         return (rc);
167                 }
168
169                 nob -= iov->iov_len;
170                 tx->tx_iov = ++iov;
171                 tx->tx_niov--;
172         } while (nob != 0);
173         
174         return (rc);
175 }
176
177 int
178 ksocknal_send_kiov (ksock_conn_t *conn, ksock_tx_t *tx)
179 {
180         struct socket *sock = conn->ksnc_sock;
181         ptl_kiov_t    *kiov = tx->tx_kiov;
182         int            rc;
183         int            nob;
184         
185         /* NB we can't trust socket ops to either consume our iovs
186          * or leave them alone. */
187         LASSERT (tx->tx_niov == 0);
188         LASSERT (tx->tx_nkiov > 0);
189
190 #if SOCKNAL_ZC
191         if (kiov->kiov_len >= ksocknal_tunables.ksnd_zc_min_frag &&
192             (sock->sk->route_caps & NETIF_F_SG) &&
193             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM))) {
194                 struct page   *page = kiov->kiov_page;
195                 int            offset = kiov->kiov_offset;
196                 int            fragsize = kiov->kiov_len;
197                 int            msgflg = MSG_DONTWAIT;
198
199                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
200                                page, offset, kiov->kiov_len);
201
202                 if (!list_empty(&conn->ksnc_tx_queue) ||
203                     fragsize < tx->tx_resid)
204                         msgflg |= MSG_MORE;
205
206                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize, msgflg,
207                                        &tx->tx_zccd);
208         } else
209 #endif
210         {
211 #if SOCKNAL_SINGLE_FRAG_TX || !SOCKNAL_RISK_KMAP_DEADLOCK
212                 struct iovec  scratch;
213                 struct iovec *scratchiov = &scratch;
214                 int           niov = 1;
215 #else
216 #warning "XXX risk of kmap deadlock on multiple frags..."
217                 struct iovec *scratchiov = conn->ksnc_tx_scratch_iov;
218                 int           niov = tx->tx_nkiov;
219 #endif
220                 struct msghdr msg = {
221                         .msg_name       = NULL,
222                         .msg_namelen    = 0,
223                         .msg_iov        = scratchiov,
224                         .msg_iovlen     = niov,
225                         .msg_control    = NULL,
226                         .msg_controllen = 0,
227                         .msg_flags      = MSG_DONTWAIT
228                 };
229                 mm_segment_t  oldmm = get_fs();
230                 int           i;
231                 
232                 for (nob = i = 0; i < niov; i++) {
233                         scratchiov[i].iov_base = kmap(kiov[i].kiov_page) +
234                                                  kiov[i].kiov_offset;
235                         nob += scratchiov[i].iov_len = kiov[i].kiov_len;
236                 }
237
238                 if (!list_empty(&conn->ksnc_tx_queue) ||
239                     nob < tx->tx_resid)
240                         msg.msg_flags |= MSG_DONTWAIT;
241
242                 set_fs (KERNEL_DS);
243                 rc = sock_sendmsg(sock, &msg, nob);
244                 set_fs (oldmm);
245
246                 for (i = 0; i < niov; i++)
247                         kunmap(kiov[i].kiov_page);
248         }
249
250         if (rc <= 0)                            /* sent nothing? */
251                 return (rc);
252
253         nob = rc;
254         LASSERT (nob <= tx->tx_resid);
255         tx->tx_resid -= nob;
256
257         do {
258                 LASSERT(tx->tx_nkiov > 0);
259                 
260                 if (nob < kiov->kiov_len) {
261                         kiov->kiov_offset += nob;
262                         kiov->kiov_len -= nob;
263                         return rc;
264                 }
265                 
266                 nob -= kiov->kiov_len;
267                 tx->tx_kiov = ++kiov;
268                 tx->tx_nkiov--;
269         } while (nob != 0);
270
271         return (rc);
272 }
273
274 int
275 ksocknal_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
276 {
277         int      rc;
278         int      bufnob;
279         
280         if (ksocknal_data.ksnd_stall_tx != 0) {
281                 set_current_state (TASK_UNINTERRUPTIBLE);
282                 schedule_timeout (ksocknal_data.ksnd_stall_tx * HZ);
283         }
284
285         LASSERT (tx->tx_resid != 0);
286
287         rc = ksocknal_getconnsock (conn);
288         if (rc != 0) {
289                 LASSERT (conn->ksnc_closing);
290                 return (-ESHUTDOWN);
291         }
292
293         do {
294                 if (ksocknal_data.ksnd_enomem_tx > 0) {
295                         /* testing... */
296                         ksocknal_data.ksnd_enomem_tx--;
297                         rc = -EAGAIN;
298                 } else if (tx->tx_niov != 0) {
299                         rc = ksocknal_send_iov (conn, tx);
300                 } else {
301                         rc = ksocknal_send_kiov (conn, tx);
302                 }
303
304                 bufnob = conn->ksnc_sock->sk->sk_wmem_queued;
305                 if (rc > 0)                     /* sent something? */
306                         conn->ksnc_tx_bufnob += rc; /* account it */
307                 
308                 if (bufnob < conn->ksnc_tx_bufnob) {
309                         /* allocated send buffer bytes < computed; infer
310                          * something got ACKed */
311                         conn->ksnc_tx_deadline = jiffies + 
312                                                  ksocknal_tunables.ksnd_io_timeout * HZ;
313                         conn->ksnc_peer->ksnp_last_alive = jiffies;
314                         conn->ksnc_tx_bufnob = bufnob;
315                         mb();
316                 }
317
318                 if (rc <= 0) { /* Didn't write anything? */
319                         unsigned long  flags;
320                         ksock_sched_t *sched;
321
322                         if (rc == 0) /* some stacks return 0 instead of -EAGAIN */
323                                 rc = -EAGAIN;
324
325                         if (rc != -EAGAIN)
326                                 break;
327
328                         /* Check if EAGAIN is due to memory pressure */
329
330                         sched = conn->ksnc_scheduler;
331                         spin_lock_irqsave(&sched->kss_lock, flags);
332                                 
333                         if (!test_bit(SOCK_NOSPACE, &conn->ksnc_sock->flags) &&
334                             !conn->ksnc_tx_ready) {
335                                 /* SOCK_NOSPACE is set when the socket fills
336                                  * and cleared in the write_space callback
337                                  * (which also sets ksnc_tx_ready).  If
338                                  * SOCK_NOSPACE and ksnc_tx_ready are BOTH
339                                  * zero, I didn't fill the socket and
340                                  * write_space won't reschedule me, so I
341                                  * return -ENOMEM to get my caller to retry
342                                  * after a timeout */
343                                 rc = -ENOMEM;
344                         }
345
346                         spin_unlock_irqrestore(&sched->kss_lock, flags);
347                         break;
348                 }
349
350                 /* socket's wmem_queued now includes 'rc' bytes */
351                 atomic_sub (rc, &conn->ksnc_tx_nob);
352                 rc = 0;
353
354         } while (tx->tx_resid != 0);
355
356         ksocknal_putconnsock (conn);
357         return (rc);
358 }
359
360 void
361 ksocknal_eager_ack (ksock_conn_t *conn)
362 {
363         int            opt = 1;
364         mm_segment_t   oldmm = get_fs();
365         struct socket *sock = conn->ksnc_sock;
366         
367         /* Remind the socket to ACK eagerly.  If I don't, the socket might
368          * think I'm about to send something it could piggy-back the ACK
369          * on, introducing delay in completing zero-copy sends in my
370          * peer. */
371
372         set_fs(KERNEL_DS);
373         sock->ops->setsockopt (sock, SOL_TCP, TCP_QUICKACK,
374                                (char *)&opt, sizeof (opt));
375         set_fs(oldmm);
376 }
377
378 int
379 ksocknal_recv_iov (ksock_conn_t *conn)
380 {
381 #if SOCKNAL_SINGLE_FRAG_RX
382         struct iovec  scratch;
383         struct iovec *scratchiov = &scratch;
384         int           niov = 1;
385 #else
386         struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
387         int           niov = conn->ksnc_rx_niov;
388 #endif
389         struct iovec *iov = conn->ksnc_rx_iov;
390         struct msghdr msg = {
391                 .msg_name       = NULL,
392                 .msg_namelen    = 0,
393                 .msg_iov        = scratchiov,
394                 .msg_iovlen     = niov,
395                 .msg_control    = NULL,
396                 .msg_controllen = 0,
397                 .msg_flags      = 0
398         };
399         mm_segment_t oldmm = get_fs();
400         int          nob;
401         int          i;
402         int          rc;
403
404         /* NB we can't trust socket ops to either consume our iovs
405          * or leave them alone. */
406         LASSERT (niov > 0);
407
408         for (nob = i = 0; i < niov; i++) {
409                 scratchiov[i] = iov[i];
410                 nob += scratchiov[i].iov_len;
411         }
412         LASSERT (nob <= conn->ksnc_rx_nob_wanted);
413
414         set_fs (KERNEL_DS);
415         rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
416         /* NB this is just a boolean..........................^ */
417         set_fs (oldmm);
418
419         if (rc <= 0)
420                 return (rc);
421
422         /* received something... */
423         nob = rc;
424
425         conn->ksnc_peer->ksnp_last_alive = jiffies;
426         conn->ksnc_rx_deadline = jiffies + 
427                                  ksocknal_tunables.ksnd_io_timeout * HZ;
428         mb();                           /* order with setting rx_started */
429         conn->ksnc_rx_started = 1;
430
431         conn->ksnc_rx_nob_wanted -= nob;
432         conn->ksnc_rx_nob_left -= nob;
433
434         do {
435                 LASSERT (conn->ksnc_rx_niov > 0);
436                 
437                 if (nob < iov->iov_len) {
438                         iov->iov_len -= nob;
439                         iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
440                         return (-EAGAIN);
441                 }
442                 
443                 nob -= iov->iov_len;
444                 conn->ksnc_rx_iov = ++iov;
445                 conn->ksnc_rx_niov--;
446         } while (nob != 0);
447
448         return (rc);
449 }
450
451 int
452 ksocknal_recv_kiov (ksock_conn_t *conn)
453 {
454 #if SOCKNAL_SINGLE_FRAG_RX || !SOCKNAL_RISK_KMAP_DEADLOCK
455         struct iovec  scratch;
456         struct iovec *scratchiov = &scratch;
457         int           niov = 1;
458 #else
459 #warning "XXX risk of kmap deadlock on multiple frags..."
460         struct iovec *scratchiov = conn->ksnc_rx_scratch_iov;
461         int           niov = conn->ksnc_rx_nkiov;
462 #endif   
463         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
464         struct msghdr msg = {
465                 .msg_name       = NULL,
466                 .msg_namelen    = 0,
467                 .msg_iov        = scratchiov,
468                 .msg_iovlen     = niov,
469                 .msg_control    = NULL,
470                 .msg_controllen = 0,
471                 .msg_flags      = 0
472         };
473         mm_segment_t oldmm = get_fs();
474         int          nob;
475         int          i;
476         int          rc;
477
478         LASSERT (conn->ksnc_rx_nkiov > 0);
479
480         /* NB we can't trust socket ops to either consume our iovs
481          * or leave them alone. */
482         for (nob = i = 0; i < niov; i++) {
483                 scratchiov[i].iov_base = kmap(kiov[i].kiov_page) + kiov[i].kiov_offset;
484                 nob += scratchiov[i].iov_len = kiov[i].kiov_len;
485         }
486         LASSERT (nob <= conn->ksnc_rx_nob_wanted);
487
488         set_fs (KERNEL_DS);
489         rc = sock_recvmsg (conn->ksnc_sock, &msg, nob, MSG_DONTWAIT);
490         /* NB this is just a boolean.......................^ */
491         set_fs (oldmm);
492
493         for (i = 0; i < niov; i++)
494                 kunmap(kiov[i].kiov_page);
495
496         if (rc <= 0)
497                 return (rc);
498         
499         /* received something... */
500         nob = rc;
501
502         conn->ksnc_peer->ksnp_last_alive = jiffies;
503         conn->ksnc_rx_deadline = jiffies + 
504                                  ksocknal_tunables.ksnd_io_timeout * HZ;
505         mb();                           /* order with setting rx_started */
506         conn->ksnc_rx_started = 1;
507
508         conn->ksnc_rx_nob_wanted -= nob;
509         conn->ksnc_rx_nob_left -= nob;
510
511         do {
512                 LASSERT (conn->ksnc_rx_nkiov > 0);
513                 
514                 if (nob < kiov->kiov_len) {
515                         kiov->kiov_offset += nob;
516                         kiov->kiov_len -= nob;
517                         return -EAGAIN;
518                 }
519                 
520                 nob -= kiov->kiov_len;
521                 conn->ksnc_rx_kiov = ++kiov;
522                 conn->ksnc_rx_nkiov--;
523         } while (nob != 0);
524
525         return 1;
526 }
527
528 int
529 ksocknal_receive (ksock_conn_t *conn) 
530 {
531         /* Return 1 on success, 0 on EOF, < 0 on error.
532          * Caller checks ksnc_rx_nob_wanted to determine
533          * progress/completion. */
534         int     rc;
535         ENTRY;
536         
537         if (ksocknal_data.ksnd_stall_rx != 0) {
538                 set_current_state (TASK_UNINTERRUPTIBLE);
539                 schedule_timeout (ksocknal_data.ksnd_stall_rx * HZ);
540         }
541
542         rc = ksocknal_getconnsock (conn);
543         if (rc != 0) {
544                 LASSERT (conn->ksnc_closing);
545                 return (-ESHUTDOWN);
546         }
547
548         for (;;) {
549                 if (conn->ksnc_rx_niov != 0)
550                         rc = ksocknal_recv_iov (conn);
551                 else
552                         rc = ksocknal_recv_kiov (conn);
553
554                 if (rc <= 0) {
555                         /* error/EOF or partial receive */
556                         if (rc == -EAGAIN) {
557                                 rc = 1;
558                         } else if (rc == 0 && conn->ksnc_rx_started) {
559                                 /* EOF in the middle of a message */
560                                 rc = -EPROTO;
561                         }
562                         break;
563                 }
564
565                 /* Completed a fragment */
566
567                 if (conn->ksnc_rx_nob_wanted == 0) {
568                         /* Completed a message segment (header or payload) */
569                         if ((ksocknal_tunables.ksnd_eager_ack & conn->ksnc_type) != 0 &&
570                             (conn->ksnc_rx_state ==  SOCKNAL_RX_BODY ||
571                              conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD)) {
572                                 /* Remind the socket to ack eagerly... */
573                                 ksocknal_eager_ack(conn);
574                         }
575                         rc = 1;
576                         break;
577                 }
578         }
579
580         ksocknal_putconnsock (conn);
581         RETURN (rc);
582 }
583
584 #if SOCKNAL_ZC
585 void
586 ksocknal_zc_callback (zccd_t *zcd)
587 {
588         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
589         ksock_sched_t *sched = tx->tx_conn->ksnc_scheduler;
590         unsigned long  flags;
591         ENTRY;
592
593         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
594
595         spin_lock_irqsave (&sched->kss_lock, flags);
596
597         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
598         wake_up (&sched->kss_waitq);
599
600         spin_unlock_irqrestore (&sched->kss_lock, flags);
601         EXIT;
602 }
603 #endif
604
605 void
606 ksocknal_tx_done (ksock_tx_t *tx, int asynch)
607 {
608         ksock_ltx_t   *ltx;
609         ENTRY;
610
611         if (tx->tx_conn != NULL) {
612 #if SOCKNAL_ZC
613                 /* zero copy completion isn't always from
614                  * process_transmit() so it needs to keep a ref on
615                  * tx_conn... */
616                 if (asynch)
617                         ksocknal_put_conn (tx->tx_conn);
618 #else
619                 LASSERT (!asynch);
620 #endif
621         }
622
623         if (tx->tx_isfwd) {             /* was a forwarded packet? */
624                 kpr_fwd_done (&ksocknal_data.ksnd_router,
625                               KSOCK_TX_2_KPR_FWD_DESC (tx), 
626                               (tx->tx_resid == 0) ? 0 : -ECONNABORTED);
627                 EXIT;
628                 return;
629         }
630
631         /* local send */
632         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
633
634         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie,
635                       (tx->tx_resid == 0) ? PTL_OK : PTL_FAIL);
636
637         ksocknal_free_ltx (ltx);
638         EXIT;
639 }
640
641 void
642 ksocknal_tx_launched (ksock_tx_t *tx) 
643 {
644 #if SOCKNAL_ZC
645         if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
646                 ksock_conn_t  *conn = tx->tx_conn;
647                 
648                 /* zccd skbufs are still in-flight.  First take a ref on
649                  * conn, so it hangs about for ksocknal_tx_done... */
650                 atomic_inc (&conn->ksnc_refcount);
651
652                 /* ...then drop the initial ref on zccd, so the zero copy
653                  * callback can occur */
654                 zccd_put (&tx->tx_zccd);
655                 return;
656         }
657 #endif
658         /* Any zero-copy-ness (if any) has completed; I can complete the
659          * transmit now, avoiding an extra schedule */
660         ksocknal_tx_done (tx, 0);
661 }
662
663 int
664 ksocknal_process_transmit (ksock_conn_t *conn, ksock_tx_t *tx)
665 {
666         unsigned long  flags;
667         int            rc;
668        
669         rc = ksocknal_transmit (conn, tx);
670
671         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_resid, rc);
672
673         if (tx->tx_resid == 0) {
674                 /* Sent everything OK */
675                 LASSERT (rc == 0);
676
677                 ksocknal_tx_launched (tx);
678                 return (0);
679         }
680
681         if (rc == -EAGAIN)
682                 return (rc);
683
684         if (rc == -ENOMEM) {
685                 static int counter;
686
687                 counter++;   /* exponential backoff warnings */
688                 if ((counter & (-counter)) == counter)
689                         CWARN("%d ENOMEM tx %p\n", counter, conn);
690
691                 /* Queue on ksnd_enomem_conns for retry after a timeout */
692                 spin_lock_irqsave(&ksocknal_data.ksnd_reaper_lock, flags);
693
694                 /* enomem list takes over scheduler's ref... */
695                 LASSERT (conn->ksnc_tx_scheduled);
696                 list_add_tail(&conn->ksnc_tx_list,
697                               &ksocknal_data.ksnd_enomem_conns);
698                 if (!time_after_eq(jiffies + SOCKNAL_ENOMEM_RETRY,
699                                    ksocknal_data.ksnd_reaper_waketime))
700                         wake_up (&ksocknal_data.ksnd_reaper_waitq);
701                 
702                 spin_unlock_irqrestore(&ksocknal_data.ksnd_reaper_lock, flags);
703                 return (rc);
704         }
705
706         /* Actual error */
707         LASSERT (rc < 0);
708
709         if (!conn->ksnc_closing)
710                 CERROR("[%p] Error %d on write to "LPX64
711                        " ip %d.%d.%d.%d:%d\n", conn, rc,
712                        conn->ksnc_peer->ksnp_nid,
713                        HIPQUAD(conn->ksnc_ipaddr),
714                        conn->ksnc_port);
715
716         ksocknal_close_conn_and_siblings (conn, rc);
717         ksocknal_tx_launched (tx);
718
719         return (rc);
720 }
721
722 void
723 ksocknal_launch_autoconnect_locked (ksock_route_t *route)
724 {
725         unsigned long     flags;
726
727         /* called holding write lock on ksnd_global_lock */
728
729         LASSERT (!route->ksnr_deleted);
730         LASSERT ((route->ksnr_connected & (1 << SOCKNAL_CONN_ANY)) == 0);
731         LASSERT ((route->ksnr_connected & KSNR_TYPED_ROUTES) != KSNR_TYPED_ROUTES);
732         LASSERT (route->ksnr_connecting == 0);
733         
734         if (ksocknal_tunables.ksnd_typed_conns)
735                 route->ksnr_connecting = 
736                         KSNR_TYPED_ROUTES & ~route->ksnr_connected;
737         else
738                 route->ksnr_connecting = (1 << SOCKNAL_CONN_ANY);
739
740         atomic_inc (&route->ksnr_refcount);     /* extra ref for asynchd */
741         
742         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
743         
744         list_add_tail (&route->ksnr_connect_list,
745                        &ksocknal_data.ksnd_autoconnectd_routes);
746         wake_up (&ksocknal_data.ksnd_autoconnectd_waitq);
747         
748         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
749 }
750
751 ksock_peer_t *
752 ksocknal_find_target_peer_locked (ksock_tx_t *tx, ptl_nid_t nid)
753 {
754         char          ipbuf[PTL_NALFMT_SIZE];
755         ptl_nid_t     target_nid;
756         int           rc;
757         ksock_peer_t *peer = ksocknal_find_peer_locked (nid);
758
759         if (peer != NULL)
760                 return (peer);
761
762         if (tx->tx_isfwd) {
763                 CERROR ("Can't send packet to "LPX64
764                        " %s: routed target is not a peer\n",
765                         nid, portals_nid2str(SOCKNAL, nid, ipbuf));
766                 return (NULL);
767         }
768
769         rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, tx->tx_nob,
770                          &target_nid);
771         if (rc != 0) {
772                 CERROR ("Can't route to "LPX64" %s: router error %d\n",
773                         nid, portals_nid2str(SOCKNAL, nid, ipbuf), rc);
774                 return (NULL);
775         }
776
777         peer = ksocknal_find_peer_locked (target_nid);
778         if (peer != NULL)
779                 return (peer);
780
781         CERROR ("Can't send packet to "LPX64" %s: no peer entry\n",
782                 target_nid, portals_nid2str(SOCKNAL, target_nid, ipbuf));
783         return (NULL);
784 }
785
786 ksock_conn_t *
787 ksocknal_find_conn_locked (ksock_tx_t *tx, ksock_peer_t *peer)
788 {
789         struct list_head *tmp;
790         ksock_conn_t     *typed = NULL;
791         int               tnob  = 0;
792         ksock_conn_t     *fallback = NULL;
793         int               fnob     = 0;
794         ksock_conn_t     *conn;
795
796         list_for_each (tmp, &peer->ksnp_conns) {
797                 ksock_conn_t *c = list_entry(tmp, ksock_conn_t, ksnc_list);
798 #if SOCKNAL_ROUND_ROBIN
799                 const int     nob = 0;
800 #else
801                 int           nob = atomic_read(&c->ksnc_tx_nob) +
802                                         c->ksnc_sock->sk->sk_wmem_queued;
803 #endif
804                 LASSERT (!c->ksnc_closing);
805
806                 if (fallback == NULL || nob < fnob) {
807                         fallback = c;
808                         fnob     = nob;
809                 }
810
811                 if (!ksocknal_tunables.ksnd_typed_conns)
812                         continue;
813
814                 switch (c->ksnc_type) {
815                 default:
816                         LBUG();
817                 case SOCKNAL_CONN_ANY:
818                         break;
819                 case SOCKNAL_CONN_BULK_IN:
820                         continue;
821                 case SOCKNAL_CONN_BULK_OUT:
822                         if (tx->tx_nob < ksocknal_tunables.ksnd_min_bulk)
823                                 continue;
824                         break;
825                 case SOCKNAL_CONN_CONTROL:
826                         if (tx->tx_nob >= ksocknal_tunables.ksnd_min_bulk)
827                                 continue;
828                         break;
829                 }
830
831                 if (typed == NULL || nob < tnob) {
832                         typed = c;
833                         tnob  = nob;
834                 }
835         }
836
837         /* prefer the typed selection */
838         conn = (typed != NULL) ? typed : fallback;
839
840 #if SOCKNAL_ROUND_ROBIN
841         if (conn != NULL) {
842                 /* round-robin all else being equal */
843                 list_del (&conn->ksnc_list);
844                 list_add_tail (&conn->ksnc_list, &peer->ksnp_conns);
845         }
846 #endif
847         return conn;
848 }
849
850 void
851 ksocknal_queue_tx_locked (ksock_tx_t *tx, ksock_conn_t *conn)
852 {
853         unsigned long  flags;
854         ksock_sched_t *sched = conn->ksnc_scheduler;
855
856         /* called holding global lock (read or irq-write) and caller may
857          * not have dropped this lock between finding conn and calling me,
858          * so we don't need the {get,put}connsock dance to deref
859          * ksnc_sock... */
860         LASSERT(!conn->ksnc_closing);
861         LASSERT(tx->tx_resid == tx->tx_nob);
862         
863         CDEBUG (D_NET, "Sending to "LPX64" ip %d.%d.%d.%d:%d\n", 
864                 conn->ksnc_peer->ksnp_nid,
865                 HIPQUAD(conn->ksnc_ipaddr),
866                 conn->ksnc_port);
867
868         atomic_add (tx->tx_nob, &conn->ksnc_tx_nob);
869         tx->tx_conn = conn;
870
871 #if SOCKNAL_ZC
872         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
873         /* NB this sets 1 ref on zccd, so the callback can only occur after
874          * I've released this ref. */
875 #endif
876         spin_lock_irqsave (&sched->kss_lock, flags);
877
878         if (list_empty(&conn->ksnc_tx_queue) &&
879             conn->ksnc_sock->sk->sk_wmem_queued == 0) {
880                 /* First packet starts the timeout */
881                 conn->ksnc_tx_deadline = jiffies +
882                                          ksocknal_tunables.ksnd_io_timeout * HZ;
883                 conn->ksnc_tx_bufnob = 0;
884                 mb();    /* order with adding to tx_queue */
885         }
886
887         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
888                 
889         if (conn->ksnc_tx_ready &&      /* able to send */
890             !conn->ksnc_tx_scheduled) { /* not scheduled to send */
891                 /* +1 ref for scheduler */
892                 atomic_inc (&conn->ksnc_refcount);
893                 list_add_tail (&conn->ksnc_tx_list, 
894                                &sched->kss_tx_conns);
895                 conn->ksnc_tx_scheduled = 1;
896                 wake_up (&sched->kss_waitq);
897         }
898
899         spin_unlock_irqrestore (&sched->kss_lock, flags);
900 }
901
902 ksock_route_t *
903 ksocknal_find_connectable_route_locked (ksock_peer_t *peer)
904 {
905         struct list_head  *tmp;
906         ksock_route_t     *route;
907         int                bits;
908         
909         list_for_each (tmp, &peer->ksnp_routes) {
910                 route = list_entry (tmp, ksock_route_t, ksnr_list);
911                 bits  = route->ksnr_connected;
912
913                 /* All typed connections established? */
914                 if ((bits & KSNR_TYPED_ROUTES) == KSNR_TYPED_ROUTES)
915                         continue;
916
917                 /* Untyped connection established? */
918                 if ((bits & (1 << SOCKNAL_CONN_ANY)) != 0)
919                         continue;
920
921                 /* connection being established? */
922                 if (route->ksnr_connecting != 0)
923                         continue;
924
925                 /* too soon to retry this guy? */
926                 if (!time_after_eq (jiffies, route->ksnr_timeout))
927                         continue;
928                 
929                 return (route);
930         }
931         
932         return (NULL);
933 }
934
935 ksock_route_t *
936 ksocknal_find_connecting_route_locked (ksock_peer_t *peer)
937 {
938         struct list_head  *tmp;
939         ksock_route_t     *route;
940
941         list_for_each (tmp, &peer->ksnp_routes) {
942                 route = list_entry (tmp, ksock_route_t, ksnr_list);
943                 
944                 if (route->ksnr_connecting != 0)
945                         return (route);
946         }
947         
948         return (NULL);
949 }
950
951 int
952 ksocknal_launch_packet (ksock_tx_t *tx, ptl_nid_t nid)
953 {
954         unsigned long     flags;
955         ksock_peer_t     *peer;
956         ksock_conn_t     *conn;
957         ksock_route_t    *route;
958         rwlock_t         *g_lock;
959         
960         /* Ensure the frags we've been given EXACTLY match the number of
961          * bytes we want to send.  Many TCP/IP stacks disregard any total
962          * size parameters passed to them and just look at the frags. 
963          *
964          * We always expect at least 1 mapped fragment containing the
965          * complete portals header. */
966         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
967                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
968         LASSERT (tx->tx_niov >= 1);
969         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
970
971         CDEBUG (D_NET, "packet %p type %d, nob %d niov %d nkiov %d\n",
972                 tx, ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, 
973                 tx->tx_nob, tx->tx_niov, tx->tx_nkiov);
974
975         tx->tx_conn = NULL;                     /* only set when assigned a conn */
976         tx->tx_resid = tx->tx_nob;
977         tx->tx_hdr = (ptl_hdr_t *)tx->tx_iov[0].iov_base;
978
979         g_lock = &ksocknal_data.ksnd_global_lock;
980 #if !SOCKNAL_ROUND_ROBIN
981         read_lock (g_lock);
982
983         peer = ksocknal_find_target_peer_locked (tx, nid);
984         if (peer == NULL) {
985                 read_unlock (g_lock);
986                 return (-EHOSTUNREACH);
987         }
988
989         if (ksocknal_find_connectable_route_locked(peer) == NULL) {
990                 conn = ksocknal_find_conn_locked (tx, peer);
991                 if (conn != NULL) {
992                         /* I've got no autoconnect routes that need to be
993                          * connecting and I do have an actual connection... */
994                         ksocknal_queue_tx_locked (tx, conn);
995                         read_unlock (g_lock);
996                         return (0);
997                 }
998         }
999  
1000         /* I'll need a write lock... */
1001         read_unlock (g_lock);
1002 #endif
1003         write_lock_irqsave(g_lock, flags);
1004
1005         peer = ksocknal_find_target_peer_locked (tx, nid);
1006         if (peer == NULL) {
1007                 write_unlock_irqrestore(g_lock, flags);
1008                 return (-EHOSTUNREACH);
1009         }
1010
1011         for (;;) {
1012                 /* launch any/all autoconnections that need it */
1013                 route = ksocknal_find_connectable_route_locked (peer);
1014                 if (route == NULL)
1015                         break;
1016
1017                 ksocknal_launch_autoconnect_locked (route);
1018         }
1019
1020         conn = ksocknal_find_conn_locked (tx, peer);
1021         if (conn != NULL) {
1022                 /* Connection exists; queue message on it */
1023                 ksocknal_queue_tx_locked (tx, conn);
1024                 write_unlock_irqrestore (g_lock, flags);
1025                 return (0);
1026         }
1027
1028         route = ksocknal_find_connecting_route_locked (peer);
1029         if (route != NULL) {
1030                 /* At least 1 connection is being established; queue the
1031                  * message... */
1032                 list_add_tail (&tx->tx_list, &peer->ksnp_tx_queue);
1033                 write_unlock_irqrestore (g_lock, flags);
1034                 return (0);
1035         }
1036         
1037         write_unlock_irqrestore (g_lock, flags);
1038         return (-EHOSTUNREACH);
1039 }
1040
1041 ptl_err_t
1042 ksocknal_sendmsg(lib_nal_t     *nal, 
1043                  void         *private, 
1044                  lib_msg_t    *cookie,
1045                  ptl_hdr_t    *hdr, 
1046                  int           type, 
1047                  ptl_nid_t     nid, 
1048                  ptl_pid_t     pid,
1049                  unsigned int  payload_niov, 
1050                  struct iovec *payload_iov, 
1051                  ptl_kiov_t   *payload_kiov,
1052                  size_t        payload_offset,
1053                  size_t        payload_nob)
1054 {
1055         ksock_ltx_t  *ltx;
1056         int           desc_size;
1057         int           rc;
1058
1059         /* NB 'private' is different depending on what we're sending.
1060          * Just ignore it... */
1061
1062         CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1063                " pid %d\n", payload_nob, payload_niov, nid , pid);
1064
1065         LASSERT (payload_nob == 0 || payload_niov > 0);
1066         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1067
1068         /* It must be OK to kmap() if required */
1069         LASSERT (payload_kiov == NULL || !in_interrupt ());
1070         /* payload is either all vaddrs or all pages */
1071         LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1072         
1073         if (payload_iov != NULL)
1074                 desc_size = offsetof(ksock_ltx_t, ltx_iov[1 + payload_niov]);
1075         else
1076                 desc_size = offsetof(ksock_ltx_t, ltx_kiov[payload_niov]);
1077         
1078         if (in_interrupt() ||
1079             type == PTL_MSG_ACK ||
1080             type == PTL_MSG_REPLY) {
1081                 /* Can't block if in interrupt or responding to an incoming
1082                  * message */
1083                 PORTAL_ALLOC_ATOMIC(ltx, desc_size);
1084         } else {
1085                 PORTAL_ALLOC(ltx, desc_size);
1086         }
1087         
1088         if (ltx == NULL) {
1089                 CERROR("Can't allocate tx desc type %d size %d %s\n",
1090                        type, desc_size, in_interrupt() ? "(intr)" : "");
1091                 return (PTL_NO_SPACE);
1092         }
1093
1094         atomic_inc(&ksocknal_data.ksnd_nactive_ltxs);
1095
1096         ltx->ltx_desc_size = desc_size;
1097         
1098         /* We always have 1 mapped frag for the header */
1099         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
1100         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
1101         ltx->ltx_iov[0].iov_len = sizeof(*hdr);
1102         ltx->ltx_hdr = *hdr;
1103         
1104         ltx->ltx_private = private;
1105         ltx->ltx_cookie = cookie;
1106         
1107         ltx->ltx_tx.tx_isfwd = 0;
1108         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_nob;
1109
1110         if (payload_iov != NULL) {
1111                 /* payload is all mapped */
1112                 ltx->ltx_tx.tx_kiov  = NULL;
1113                 ltx->ltx_tx.tx_nkiov = 0;
1114
1115                 ltx->ltx_tx.tx_niov = 
1116                         1 + lib_extract_iov(payload_niov, &ltx->ltx_iov[1],
1117                                             payload_niov, payload_iov,
1118                                             payload_offset, payload_nob);
1119         } else {
1120                 /* payload is all pages */
1121                 ltx->ltx_tx.tx_niov = 1;
1122
1123                 ltx->ltx_tx.tx_kiov = ltx->ltx_kiov;
1124                 ltx->ltx_tx.tx_nkiov =
1125                         lib_extract_kiov(payload_niov, ltx->ltx_kiov,
1126                                          payload_niov, payload_kiov,
1127                                          payload_offset, payload_nob);
1128         }
1129
1130         rc = ksocknal_launch_packet(&ltx->ltx_tx, nid);
1131         if (rc == 0)
1132                 return (PTL_OK);
1133         
1134         ksocknal_free_ltx(ltx);
1135         return (PTL_FAIL);
1136 }
1137
1138 ptl_err_t
1139 ksocknal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1140                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1141                unsigned int payload_niov, struct iovec *payload_iov,
1142                size_t payload_offset, size_t payload_len)
1143 {
1144         return (ksocknal_sendmsg(nal, private, cookie,
1145                                  hdr, type, nid, pid,
1146                                  payload_niov, payload_iov, NULL,
1147                                  payload_offset, payload_len));
1148 }
1149
1150 ptl_err_t
1151 ksocknal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie, 
1152                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1153                      unsigned int payload_niov, ptl_kiov_t *payload_kiov, 
1154                      size_t payload_offset, size_t payload_len)
1155 {
1156         return (ksocknal_sendmsg(nal, private, cookie,
1157                                  hdr, type, nid, pid,
1158                                  payload_niov, NULL, payload_kiov,
1159                                  payload_offset, payload_len));
1160 }
1161
1162 void
1163 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1164 {
1165         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
1166         ksock_ftx_t  *ftx = (ksock_ftx_t *)&fwd->kprfd_scratch;
1167         int           rc;
1168         
1169         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
1170                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
1171
1172         /* I'm the gateway; must be the last hop */
1173         if (nid == ksocknal_lib.libnal_ni.ni_pid.nid)
1174                 nid = fwd->kprfd_target_nid;
1175
1176         /* setup iov for hdr */
1177         ftx->ftx_iov.iov_base = fwd->kprfd_hdr;
1178         ftx->ftx_iov.iov_len = sizeof(ptl_hdr_t);
1179
1180         ftx->ftx_tx.tx_isfwd = 1;                  /* This is a forwarding packet */
1181         ftx->ftx_tx.tx_nob   = sizeof(ptl_hdr_t) + fwd->kprfd_nob;
1182         ftx->ftx_tx.tx_niov  = 1;
1183         ftx->ftx_tx.tx_iov   = &ftx->ftx_iov;
1184         ftx->ftx_tx.tx_nkiov = fwd->kprfd_niov;
1185         ftx->ftx_tx.tx_kiov  = fwd->kprfd_kiov;
1186
1187         rc = ksocknal_launch_packet (&ftx->ftx_tx, nid);
1188         if (rc != 0)
1189                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, rc);
1190 }
1191
1192 int
1193 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
1194 {
1195         long          pid = kernel_thread (fn, arg, 0);
1196         unsigned long flags;
1197
1198         if (pid < 0)
1199                 return ((int)pid);
1200
1201         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1202         ksocknal_data.ksnd_nthreads++;
1203         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1204         return (0);
1205 }
1206
1207 void
1208 ksocknal_thread_fini (void)
1209 {
1210         unsigned long flags;
1211
1212         write_lock_irqsave(&ksocknal_data.ksnd_global_lock, flags);
1213         ksocknal_data.ksnd_nthreads--;
1214         write_unlock_irqrestore(&ksocknal_data.ksnd_global_lock, flags);
1215 }
1216
1217 void
1218 ksocknal_fmb_callback (void *arg, int error)
1219 {
1220         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
1221         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
1222         ptl_hdr_t         *hdr = &fmb->fmb_hdr;
1223         ksock_conn_t      *conn = NULL;
1224         ksock_sched_t     *sched;
1225         unsigned long      flags;
1226         char               ipbuf[PTL_NALFMT_SIZE];
1227         char               ipbuf2[PTL_NALFMT_SIZE];
1228
1229         if (error != 0)
1230                 CERROR("Failed to route packet from "
1231                        LPX64" %s to "LPX64" %s: %d\n",
1232                        le64_to_cpu(hdr->src_nid),
1233                        portals_nid2str(SOCKNAL, le64_to_cpu(hdr->src_nid), ipbuf),
1234                        le64_to_cpu(hdr->dest_nid),
1235                        portals_nid2str(SOCKNAL, le64_to_cpu(hdr->dest_nid), ipbuf2),
1236                        error);
1237         else
1238                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
1239                         le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid));
1240
1241         /* drop peer ref taken on init */
1242         ksocknal_put_peer (fmb->fmb_peer);
1243
1244         spin_lock_irqsave (&fmp->fmp_lock, flags);
1245
1246         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
1247         fmp->fmp_nactive_fmbs--;
1248
1249         if (!list_empty (&fmp->fmp_blocked_conns)) {
1250                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
1251                                    ksock_conn_t, ksnc_rx_list);
1252                 list_del (&conn->ksnc_rx_list);
1253         }
1254
1255         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
1256
1257         if (conn == NULL)
1258                 return;
1259
1260         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
1261         LASSERT (conn->ksnc_rx_scheduled);
1262         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
1263
1264         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
1265
1266         sched = conn->ksnc_scheduler;
1267
1268         spin_lock_irqsave (&sched->kss_lock, flags);
1269
1270         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1271         wake_up (&sched->kss_waitq);
1272
1273         spin_unlock_irqrestore (&sched->kss_lock, flags);
1274 }
1275
1276 ksock_fmb_t *
1277 ksocknal_get_idle_fmb (ksock_conn_t *conn)
1278 {
1279         int               payload_nob = conn->ksnc_rx_nob_left;
1280         unsigned long     flags;
1281         ksock_fmb_pool_t *pool;
1282         ksock_fmb_t      *fmb;
1283
1284         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1285         LASSERT (kpr_routing(&ksocknal_data.ksnd_router));
1286
1287         if (payload_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
1288                 pool = &ksocknal_data.ksnd_small_fmp;
1289         else
1290                 pool = &ksocknal_data.ksnd_large_fmp;
1291
1292         spin_lock_irqsave (&pool->fmp_lock, flags);
1293
1294         if (!list_empty (&pool->fmp_idle_fmbs)) {
1295                 fmb = list_entry(pool->fmp_idle_fmbs.next,
1296                                  ksock_fmb_t, fmb_list);
1297                 list_del (&fmb->fmb_list);
1298                 pool->fmp_nactive_fmbs++;
1299                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
1300
1301                 return (fmb);
1302         }
1303
1304         /* deschedule until fmb free */
1305
1306         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
1307
1308         list_add_tail (&conn->ksnc_rx_list,
1309                        &pool->fmp_blocked_conns);
1310
1311         spin_unlock_irqrestore (&pool->fmp_lock, flags);
1312         return (NULL);
1313 }
1314
1315 int
1316 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
1317 {
1318         int       payload_nob = conn->ksnc_rx_nob_left;
1319         ptl_nid_t dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1320         int       niov = 0;
1321         int       nob = payload_nob;
1322
1323         LASSERT (conn->ksnc_rx_scheduled);
1324         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
1325         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
1326         LASSERT (payload_nob >= 0);
1327         LASSERT (payload_nob <= fmb->fmb_pool->fmp_buff_pages * PAGE_SIZE);
1328         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
1329         LASSERT (fmb->fmb_kiov[0].kiov_offset == 0);
1330
1331         /* Take a ref on the conn's peer to prevent module unload before
1332          * forwarding completes. */
1333         fmb->fmb_peer = conn->ksnc_peer;
1334         atomic_inc (&conn->ksnc_peer->ksnp_refcount);
1335
1336         /* Copy the header we just read into the forwarding buffer.  If
1337          * there's payload, start reading reading it into the buffer,
1338          * otherwise the forwarding buffer can be kicked off
1339          * immediately. */
1340         fmb->fmb_hdr = conn->ksnc_hdr;
1341
1342         while (nob > 0) {
1343                 LASSERT (niov < fmb->fmb_pool->fmp_buff_pages);
1344                 LASSERT (fmb->fmb_kiov[niov].kiov_offset == 0);
1345                 fmb->fmb_kiov[niov].kiov_len = MIN (PAGE_SIZE, nob);
1346                 nob -= PAGE_SIZE;
1347                 niov++;
1348         }
1349
1350         kpr_fwd_init(&fmb->fmb_fwd, dest_nid, &fmb->fmb_hdr,
1351                      payload_nob, niov, fmb->fmb_kiov,
1352                      ksocknal_fmb_callback, fmb);
1353
1354         if (payload_nob == 0) {         /* got complete packet already */
1355                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" fwd_start (immediate)\n",
1356                         conn, le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid);
1357
1358                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1359
1360                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1361                 return (1);
1362         }
1363
1364         conn->ksnc_cookie = fmb;                /* stash fmb for later */
1365         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1366         
1367         /* Set up conn->ksnc_rx_kiov to read the payload into fmb's kiov-ed
1368          * buffer */
1369         LASSERT (niov <= sizeof(conn->ksnc_rx_iov_space)/sizeof(ptl_kiov_t));
1370
1371         conn->ksnc_rx_niov = 0;
1372         conn->ksnc_rx_nkiov = niov;
1373         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1374         memcpy(conn->ksnc_rx_kiov, fmb->fmb_kiov, niov * sizeof(ptl_kiov_t));
1375         
1376         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1377                 le64_to_cpu(conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1378         return (0);
1379 }
1380
1381 void
1382 ksocknal_fwd_parse (ksock_conn_t *conn)
1383 {
1384         ksock_peer_t *peer;
1385         ptl_nid_t     dest_nid = le64_to_cpu(conn->ksnc_hdr.dest_nid);
1386         ptl_nid_t     src_nid = le64_to_cpu(conn->ksnc_hdr.src_nid);
1387         int           body_len = le32_to_cpu(conn->ksnc_hdr.payload_length);
1388         char str[PTL_NALFMT_SIZE];
1389         char str2[PTL_NALFMT_SIZE];
1390
1391         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1392                 src_nid, dest_nid, conn->ksnc_rx_nob_left);
1393
1394         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1395         LASSERT (conn->ksnc_rx_scheduled);
1396
1397         if (body_len < 0) {                 /* length corrupt (overflow) */
1398                 CERROR("dropping packet from "LPX64" (%s) for "LPX64" (%s): "
1399                        "packet size %d illegal\n",
1400                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1401                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1402                        body_len);
1403
1404                 ksocknal_new_packet (conn, 0);  /* on to new packet */
1405                 return;
1406         }
1407
1408         if (!kpr_routing(&ksocknal_data.ksnd_router)) {    /* not forwarding */
1409                 CERROR("dropping packet from "LPX64" (%s) for "LPX64
1410                        " (%s): not forwarding\n",
1411                        src_nid, portals_nid2str(TCPNAL, src_nid, str),
1412                        dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1413                 /* on to new packet (skip this one's body) */
1414                 ksocknal_new_packet (conn, body_len);
1415                 return;
1416         }
1417
1418         if (body_len > PTL_MTU) {      /* too big to forward */
1419                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1420                         "(%s): packet size %d too big\n",
1421                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1422                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2),
1423                         body_len);
1424                 /* on to new packet (skip this one's body) */
1425                 ksocknal_new_packet (conn, body_len);
1426                 return;
1427         }
1428
1429         /* should have gone direct */
1430         peer = ksocknal_get_peer (conn->ksnc_hdr.dest_nid);
1431         if (peer != NULL) {
1432                 CERROR ("dropping packet from "LPX64" (%s) for "LPX64
1433                         "(%s): target is a peer\n",
1434                         src_nid, portals_nid2str(TCPNAL, src_nid, str),
1435                         dest_nid, portals_nid2str(TCPNAL, dest_nid, str2));
1436                 ksocknal_put_peer (peer);  /* drop ref from get above */
1437
1438                 /* on to next packet (skip this one's body) */
1439                 ksocknal_new_packet (conn, body_len);
1440                 return;
1441         }
1442
1443         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1444         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1445         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1446 }
1447
1448 int
1449 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1450 {
1451         static char ksocknal_slop_buffer[4096];
1452
1453         int   nob;
1454         int   niov;
1455         int   skipped;
1456
1457         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1458                 conn->ksnc_rx_started = 0;
1459                 mb ();                          /* racing with timeout thread */
1460                 
1461                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1462                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1463                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1464
1465                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1466                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1467                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1468                 conn->ksnc_rx_niov = 1;
1469
1470                 conn->ksnc_rx_kiov = NULL;
1471                 conn->ksnc_rx_nkiov = 0;
1472                 return (1);
1473         }
1474
1475         /* Set up to skip as much a possible now.  If there's more left
1476          * (ran out of iov entries) we'll get called again */
1477
1478         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1479         conn->ksnc_rx_nob_left = nob_to_skip;
1480         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1481         skipped = 0;
1482         niov = 0;
1483
1484         do {
1485                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1486
1487                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1488                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1489                 niov++;
1490                 skipped += nob;
1491                 nob_to_skip -=nob;
1492
1493         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1494                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1495
1496         conn->ksnc_rx_niov = niov;
1497         conn->ksnc_rx_kiov = NULL;
1498         conn->ksnc_rx_nkiov = 0;
1499         conn->ksnc_rx_nob_wanted = skipped;
1500         return (0);
1501 }
1502
1503 int
1504 ksocknal_process_receive (ksock_conn_t *conn)
1505 {
1506         ksock_fmb_t  *fmb;
1507         int           rc;
1508         
1509         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1510
1511         /* doesn't need a forwarding buffer */
1512         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1513                 goto try_read;
1514
1515  get_fmb:
1516         fmb = ksocknal_get_idle_fmb (conn);
1517         if (fmb == NULL) {
1518                 /* conn descheduled waiting for idle fmb */
1519                 return (0);
1520         }
1521
1522         if (ksocknal_init_fmb (conn, fmb)) {
1523                 /* packet forwarded */
1524                 return (0);
1525         }
1526
1527  try_read:
1528         /* NB: sched lock NOT held */
1529         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1530                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1531                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1532                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1533
1534         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1535
1536         rc = ksocknal_receive(conn);
1537
1538         if (rc <= 0) {
1539                 LASSERT (rc != -EAGAIN);
1540
1541                 if (rc == 0)
1542                         CWARN ("[%p] EOF from "LPX64" ip %d.%d.%d.%d:%d\n",
1543                                conn, conn->ksnc_peer->ksnp_nid,
1544                                HIPQUAD(conn->ksnc_ipaddr),
1545                                conn->ksnc_port);
1546                 else if (!conn->ksnc_closing)
1547                         CERROR ("[%p] Error %d on read from "LPX64
1548                                 " ip %d.%d.%d.%d:%d\n",
1549                                 conn, rc, conn->ksnc_peer->ksnp_nid,
1550                                 HIPQUAD(conn->ksnc_ipaddr),
1551                                 conn->ksnc_port);
1552
1553                 ksocknal_close_conn_and_siblings (conn, rc);
1554                 return (rc == 0 ? -ESHUTDOWN : rc);
1555         }
1556
1557         if (conn->ksnc_rx_nob_wanted != 0) {
1558                 /* short read */
1559                 return (-EAGAIN);
1560         }
1561         
1562         switch (conn->ksnc_rx_state) {
1563         case SOCKNAL_RX_HEADER:
1564                 if (conn->ksnc_hdr.type != cpu_to_le32(PTL_MSG_HELLO) &&
1565                     le64_to_cpu(conn->ksnc_hdr.dest_nid) != 
1566                     ksocknal_lib.libnal_ni.ni_pid.nid) {
1567                         /* This packet isn't for me */
1568                         ksocknal_fwd_parse (conn);
1569                         switch (conn->ksnc_rx_state) {
1570                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1571                                 return (0);     /* => come back later */
1572                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1573                                 goto try_read;  /* => go read it */
1574                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1575                                 goto get_fmb;   /* => go get a fwd msg buffer */
1576                         default:
1577                                 LBUG ();
1578                         }
1579                         /* Not Reached */
1580                 }
1581
1582                 /* sets wanted_len, iovs etc */
1583                 rc = lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1584
1585                 if (rc != PTL_OK) {
1586                         /* I just received garbage: give up on this conn */
1587                         ksocknal_close_conn_and_siblings (conn, rc);
1588                         return (-EPROTO);
1589                 }
1590
1591                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1592                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1593                         goto try_read;          /* go read the payload */
1594                 }
1595                 /* Fall through (completed packet for me) */
1596
1597         case SOCKNAL_RX_BODY:
1598                 /* payload all received */
1599                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie, PTL_OK);
1600                 /* Fall through */
1601
1602         case SOCKNAL_RX_SLOP:
1603                 /* starting new packet? */
1604                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1605                         return (0);     /* come back later */
1606                 goto try_read;          /* try to finish reading slop now */
1607
1608         case SOCKNAL_RX_BODY_FWD:
1609                 /* payload all received */
1610                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1611                         conn, le64_to_cpu(conn->ksnc_hdr.src_nid),
1612                         le64_to_cpu(conn->ksnc_hdr.dest_nid),
1613                         conn->ksnc_rx_nob_left);
1614
1615                 /* forward the packet. NB ksocknal_init_fmb() put fmb into
1616                  * conn->ksnc_cookie */
1617                 fmb = (ksock_fmb_t *)conn->ksnc_cookie;
1618                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1619
1620                 /* no slop in forwarded packets */
1621                 LASSERT (conn->ksnc_rx_nob_left == 0);
1622
1623                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1624                 return (0);                     /* (later) */
1625
1626         default:
1627                 break;
1628         }
1629
1630         /* Not Reached */
1631         LBUG ();
1632         return (-EINVAL);                       /* keep gcc happy */
1633 }
1634
1635 ptl_err_t
1636 ksocknal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1637                unsigned int niov, struct iovec *iov, 
1638                size_t offset, size_t mlen, size_t rlen)
1639 {
1640         ksock_conn_t *conn = (ksock_conn_t *)private;
1641
1642         LASSERT (mlen <= rlen);
1643         LASSERT (niov <= PTL_MD_MAX_IOV);
1644         
1645         conn->ksnc_cookie = msg;
1646         conn->ksnc_rx_nob_wanted = mlen;
1647         conn->ksnc_rx_nob_left   = rlen;
1648
1649         conn->ksnc_rx_nkiov = 0;
1650         conn->ksnc_rx_kiov = NULL;
1651         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1652         conn->ksnc_rx_niov =
1653                 lib_extract_iov(PTL_MD_MAX_IOV, conn->ksnc_rx_iov,
1654                                 niov, iov, offset, mlen);
1655
1656         LASSERT (mlen == 
1657                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1658                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1659
1660         return (PTL_OK);
1661 }
1662
1663 ptl_err_t
1664 ksocknal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1665                      unsigned int niov, ptl_kiov_t *kiov, 
1666                      size_t offset, size_t mlen, size_t rlen)
1667 {
1668         ksock_conn_t *conn = (ksock_conn_t *)private;
1669
1670         LASSERT (mlen <= rlen);
1671         LASSERT (niov <= PTL_MD_MAX_IOV);
1672         
1673         conn->ksnc_cookie = msg;
1674         conn->ksnc_rx_nob_wanted = mlen;
1675         conn->ksnc_rx_nob_left   = rlen;
1676
1677         conn->ksnc_rx_niov = 0;
1678         conn->ksnc_rx_iov  = NULL;
1679         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1680         conn->ksnc_rx_nkiov = 
1681                 lib_extract_kiov(PTL_MD_MAX_IOV, conn->ksnc_rx_kiov,
1682                                  niov, kiov, offset, mlen);
1683
1684         LASSERT (mlen == 
1685                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1686                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1687
1688         return (PTL_OK);
1689 }
1690
1691 static inline int
1692 ksocknal_sched_cansleep(ksock_sched_t *sched)
1693 {
1694         unsigned long flags;
1695         int           rc;
1696
1697         spin_lock_irqsave(&sched->kss_lock, flags);
1698
1699         rc = (!ksocknal_data.ksnd_shuttingdown &&
1700 #if SOCKNAL_ZC
1701               list_empty(&sched->kss_zctxdone_list) &&
1702 #endif
1703               list_empty(&sched->kss_rx_conns) &&
1704               list_empty(&sched->kss_tx_conns));
1705         
1706         spin_unlock_irqrestore(&sched->kss_lock, flags);
1707         return (rc);
1708 }
1709
1710 int ksocknal_scheduler (void *arg)
1711 {
1712         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1713         ksock_conn_t      *conn;
1714         ksock_tx_t        *tx;
1715         unsigned long      flags;
1716         int                rc;
1717         int                nloops = 0;
1718         int                id = sched - ksocknal_data.ksnd_schedulers;
1719         char               name[16];
1720
1721         snprintf (name, sizeof (name),"ksocknald_%02d", id);
1722         kportal_daemonize (name);
1723         kportal_blockallsigs ();
1724
1725 #if (CONFIG_SMP && CPU_AFFINITY)
1726         id = ksocknal_sched2cpu(id);
1727         if (cpu_online(id)) {
1728                 cpumask_t m;
1729                 cpu_set(id, m);
1730                 set_cpus_allowed(current, m);
1731         } else {
1732                 CERROR ("Can't set CPU affinity for %s to %d\n", name, id);
1733         }
1734 #endif /* CONFIG_SMP && CPU_AFFINITY */
1735         
1736         spin_lock_irqsave (&sched->kss_lock, flags);
1737
1738         while (!ksocknal_data.ksnd_shuttingdown) {
1739                 int did_something = 0;
1740
1741                 /* Ensure I progress everything semi-fairly */
1742
1743                 if (!list_empty (&sched->kss_rx_conns)) {
1744                         conn = list_entry(sched->kss_rx_conns.next,
1745                                           ksock_conn_t, ksnc_rx_list);
1746                         list_del(&conn->ksnc_rx_list);
1747
1748                         LASSERT(conn->ksnc_rx_scheduled);
1749                         LASSERT(conn->ksnc_rx_ready);
1750
1751                         /* clear rx_ready in case receive isn't complete.
1752                          * Do it BEFORE we call process_recv, since
1753                          * data_ready can set it any time after we release
1754                          * kss_lock. */
1755                         conn->ksnc_rx_ready = 0;
1756                         spin_unlock_irqrestore(&sched->kss_lock, flags);
1757                         
1758                         rc = ksocknal_process_receive(conn);
1759                         
1760                         spin_lock_irqsave(&sched->kss_lock, flags);
1761
1762                         /* I'm the only one that can clear this flag */
1763                         LASSERT(conn->ksnc_rx_scheduled);
1764
1765                         /* Did process_receive get everything it wanted? */
1766                         if (rc == 0)
1767                                 conn->ksnc_rx_ready = 1;
1768                         
1769                         if (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP ||
1770                             conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB) {
1771                                 /* Conn blocked for a forwarding buffer.
1772                                  * It will get queued for my attention when
1773                                  * one becomes available (and it might just
1774                                  * already have been!).  Meanwhile my ref
1775                                  * on it stays put. */
1776                         } else if (conn->ksnc_rx_ready) {
1777                                 /* reschedule for rx */
1778                                 list_add_tail (&conn->ksnc_rx_list,
1779                                                &sched->kss_rx_conns);
1780                         } else {
1781                                 conn->ksnc_rx_scheduled = 0;
1782                                 /* drop my ref */
1783                                 ksocknal_put_conn(conn);
1784                         }
1785
1786                         did_something = 1;
1787                 }
1788
1789                 if (!list_empty (&sched->kss_tx_conns)) {
1790                         conn = list_entry(sched->kss_tx_conns.next,
1791                                           ksock_conn_t, ksnc_tx_list);
1792                         list_del (&conn->ksnc_tx_list);
1793                         
1794                         LASSERT(conn->ksnc_tx_scheduled);
1795                         LASSERT(conn->ksnc_tx_ready);
1796                         LASSERT(!list_empty(&conn->ksnc_tx_queue));
1797                         
1798                         tx = list_entry(conn->ksnc_tx_queue.next,
1799                                         ksock_tx_t, tx_list);
1800                         /* dequeue now so empty list => more to send */
1801                         list_del(&tx->tx_list);
1802                         
1803                         /* Clear tx_ready in case send isn't complete.  Do
1804                          * it BEFORE we call process_transmit, since
1805                          * write_space can set it any time after we release
1806                          * kss_lock. */
1807                         conn->ksnc_tx_ready = 0;
1808                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1809
1810                         rc = ksocknal_process_transmit(conn, tx);
1811
1812                         spin_lock_irqsave (&sched->kss_lock, flags);
1813
1814                         if (rc == -ENOMEM || rc == -EAGAIN) {
1815                                 /* Incomplete send: replace tx on HEAD of tx_queue */
1816                                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
1817                         } else {
1818                                 /* Complete send; assume space for more */
1819                                 conn->ksnc_tx_ready = 1;
1820                         }
1821
1822                         if (rc == -ENOMEM) {
1823                                 /* Do nothing; after a short timeout, this
1824                                  * conn will be reposted on kss_tx_conns. */
1825                         } else if (conn->ksnc_tx_ready &&
1826                                    !list_empty (&conn->ksnc_tx_queue)) {
1827                                 /* reschedule for tx */
1828                                 list_add_tail (&conn->ksnc_tx_list, 
1829                                                &sched->kss_tx_conns);
1830                         } else {
1831                                 conn->ksnc_tx_scheduled = 0;
1832                                 /* drop my ref */
1833                                 ksocknal_put_conn (conn);
1834                         }
1835                                 
1836                         did_something = 1;
1837                 }
1838 #if SOCKNAL_ZC
1839                 if (!list_empty (&sched->kss_zctxdone_list)) {
1840                         ksock_tx_t *tx =
1841                                 list_entry(sched->kss_zctxdone_list.next,
1842                                            ksock_tx_t, tx_list);
1843                         did_something = 1;
1844
1845                         list_del (&tx->tx_list);
1846                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1847
1848                         ksocknal_tx_done (tx, 1);
1849
1850                         spin_lock_irqsave (&sched->kss_lock, flags);
1851                 }
1852 #endif
1853                 if (!did_something ||           /* nothing to do */
1854                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1855                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1856
1857                         nloops = 0;
1858
1859                         if (!did_something) {   /* wait for something to do */
1860                                 rc = wait_event_interruptible (sched->kss_waitq,
1861                                                                !ksocknal_sched_cansleep(sched));
1862                                 LASSERT (rc == 0);
1863                         } else
1864                                our_cond_resched();
1865
1866                         spin_lock_irqsave (&sched->kss_lock, flags);
1867                 }
1868         }
1869
1870         spin_unlock_irqrestore (&sched->kss_lock, flags);
1871         ksocknal_thread_fini ();
1872         return (0);
1873 }
1874
1875 void
1876 ksocknal_data_ready (struct sock *sk, int n)
1877 {
1878         unsigned long  flags;
1879         ksock_conn_t  *conn;
1880         ksock_sched_t *sched;
1881         ENTRY;
1882
1883         /* interleave correctly with closing sockets... */
1884         read_lock (&ksocknal_data.ksnd_global_lock);
1885
1886         conn = sk->sk_user_data;
1887         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1888                 LASSERT (sk->sk_data_ready != &ksocknal_data_ready);
1889                 sk->sk_data_ready (sk, n);
1890         } else {
1891                 sched = conn->ksnc_scheduler;
1892
1893                 spin_lock_irqsave (&sched->kss_lock, flags);
1894
1895                 conn->ksnc_rx_ready = 1;
1896
1897                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1898                         list_add_tail(&conn->ksnc_rx_list,
1899                                       &sched->kss_rx_conns);
1900                         conn->ksnc_rx_scheduled = 1;
1901                         /* extra ref for scheduler */
1902                         atomic_inc (&conn->ksnc_refcount);
1903
1904                         wake_up (&sched->kss_waitq);
1905                 }
1906
1907                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1908         }
1909
1910         read_unlock (&ksocknal_data.ksnd_global_lock);
1911
1912         EXIT;
1913 }
1914
1915 void
1916 ksocknal_write_space (struct sock *sk)
1917 {
1918         unsigned long  flags;
1919         ksock_conn_t  *conn;
1920         ksock_sched_t *sched;
1921
1922         /* interleave correctly with closing sockets... */
1923         read_lock (&ksocknal_data.ksnd_global_lock);
1924
1925         conn = sk->sk_user_data;
1926
1927         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1928                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1929                (conn == NULL) ? "" : (conn->ksnc_tx_ready ?
1930                                       " ready" : " blocked"),
1931                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1932                                       " scheduled" : " idle"),
1933                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1934                                       " empty" : " queued"));
1935
1936         if (conn == NULL) {             /* raced with ksocknal_terminate_conn */
1937                 LASSERT (sk->sk_write_space != &ksocknal_write_space);
1938                 sk->sk_write_space (sk);
1939
1940                 read_unlock (&ksocknal_data.ksnd_global_lock);
1941                 return;
1942         }
1943
1944         if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1945                 sched = conn->ksnc_scheduler;
1946
1947                 spin_lock_irqsave (&sched->kss_lock, flags);
1948
1949                 clear_bit (SOCK_NOSPACE, &sk->sk_socket->flags);
1950                 conn->ksnc_tx_ready = 1;
1951
1952                 if (!conn->ksnc_tx_scheduled && // not being progressed
1953                     !list_empty(&conn->ksnc_tx_queue)){//packets to send
1954                         list_add_tail (&conn->ksnc_tx_list,
1955                                        &sched->kss_tx_conns);
1956                         conn->ksnc_tx_scheduled = 1;
1957                         /* extra ref for scheduler */
1958                         atomic_inc (&conn->ksnc_refcount);
1959
1960                         wake_up (&sched->kss_waitq);
1961                 }
1962
1963                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1964         }
1965
1966         read_unlock (&ksocknal_data.ksnd_global_lock);
1967 }
1968
1969 int
1970 ksocknal_sock_write (struct socket *sock, void *buffer, int nob)
1971 {
1972         int           rc;
1973         mm_segment_t  oldmm = get_fs();
1974
1975         while (nob > 0) {
1976                 struct iovec  iov = {
1977                         .iov_base = buffer,
1978                         .iov_len  = nob
1979                 };
1980                 struct msghdr msg = {
1981                         .msg_name       = NULL,
1982                         .msg_namelen    = 0,
1983                         .msg_iov        = &iov,
1984                         .msg_iovlen     = 1,
1985                         .msg_control    = NULL,
1986                         .msg_controllen = 0,
1987                         .msg_flags      = 0
1988                 };
1989
1990                 set_fs (KERNEL_DS);
1991                 rc = sock_sendmsg (sock, &msg, iov.iov_len);
1992                 set_fs (oldmm);
1993                 
1994                 if (rc < 0)
1995                         return (rc);
1996
1997                 if (rc == 0) {
1998                         CERROR ("Unexpected zero rc\n");
1999                         return (-ECONNABORTED);
2000                 }
2001
2002                 buffer = ((char *)buffer) + rc;
2003                 nob -= rc;
2004         }
2005         
2006         return (0);
2007 }
2008
2009 int
2010 ksocknal_sock_read (struct socket *sock, void *buffer, int nob)
2011 {
2012         int           rc;
2013         mm_segment_t  oldmm = get_fs();
2014         
2015         while (nob > 0) {
2016                 struct iovec  iov = {
2017                         .iov_base = buffer,
2018                         .iov_len  = nob
2019                 };
2020                 struct msghdr msg = {
2021                         .msg_name       = NULL,
2022                         .msg_namelen    = 0,
2023                         .msg_iov        = &iov,
2024                         .msg_iovlen     = 1,
2025                         .msg_control    = NULL,
2026                         .msg_controllen = 0,
2027                         .msg_flags      = 0
2028                 };
2029
2030                 set_fs (KERNEL_DS);
2031                 rc = sock_recvmsg (sock, &msg, iov.iov_len, 0);
2032                 set_fs (oldmm);
2033                 
2034                 if (rc < 0)
2035                         return (rc);
2036
2037                 if (rc == 0)
2038                         return (-ECONNABORTED);
2039
2040                 buffer = ((char *)buffer) + rc;
2041                 nob -= rc;
2042         }
2043         
2044         return (0);
2045 }
2046
2047 int
2048 ksocknal_send_hello (ksock_conn_t *conn, __u32 *ipaddrs, int nipaddrs)
2049 {
2050         /* CAVEAT EMPTOR: this byte flips 'ipaddrs' */
2051         struct socket      *sock = conn->ksnc_sock;
2052         ptl_hdr_t           hdr;
2053         ptl_magicversion_t *hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2054         int                 i;
2055         int                 rc;
2056
2057         LASSERT (conn->ksnc_type != SOCKNAL_CONN_NONE);
2058         LASSERT (nipaddrs <= SOCKNAL_MAX_INTERFACES);
2059
2060         /* No need for getconnsock/putconnsock */
2061         LASSERT (!conn->ksnc_closing);
2062
2063         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2064         hmv->magic         = cpu_to_le32 (PORTALS_PROTO_MAGIC);
2065         hmv->version_major = cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR);
2066         hmv->version_minor = cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR);
2067
2068         hdr.src_nid        = cpu_to_le64 (ksocknal_lib.libnal_ni.ni_pid.nid);
2069         hdr.type           = cpu_to_le32 (PTL_MSG_HELLO);
2070         hdr.payload_length = cpu_to_le32 (nipaddrs * sizeof(*ipaddrs));
2071
2072         hdr.msg.hello.type = cpu_to_le32 (conn->ksnc_type);
2073         hdr.msg.hello.incarnation =
2074                 cpu_to_le64 (ksocknal_data.ksnd_incarnation);
2075
2076         /* Receiver is eager */
2077         rc = ksocknal_sock_write (sock, &hdr, sizeof(hdr));
2078         if (rc != 0) {
2079                 CERROR ("Error %d sending HELLO hdr to %u.%u.%u.%u/%d\n",
2080                         rc, HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2081                 return (rc);
2082         }
2083         
2084         if (nipaddrs == 0)
2085                 return (0);
2086         
2087         for (i = 0; i < nipaddrs; i++) {
2088                 ipaddrs[i] = __cpu_to_le32 (ipaddrs[i]);
2089         }
2090
2091         rc = ksocknal_sock_write (sock, ipaddrs, nipaddrs * sizeof(*ipaddrs));
2092         if (rc != 0)
2093                 CERROR ("Error %d sending HELLO payload (%d)"
2094                         " to %u.%u.%u.%u/%d\n", rc, nipaddrs, 
2095                         HIPQUAD(conn->ksnc_ipaddr), conn->ksnc_port);
2096         return (rc);
2097 }
2098
2099 int
2100 ksocknal_invert_type(int type)
2101 {
2102         switch (type)
2103         {
2104         case SOCKNAL_CONN_ANY:
2105         case SOCKNAL_CONN_CONTROL:
2106                 return (type);
2107         case SOCKNAL_CONN_BULK_IN:
2108                 return SOCKNAL_CONN_BULK_OUT;
2109         case SOCKNAL_CONN_BULK_OUT:
2110                 return SOCKNAL_CONN_BULK_IN;
2111         default:
2112                 return (SOCKNAL_CONN_NONE);
2113         }
2114 }
2115
2116 int
2117 ksocknal_recv_hello (ksock_conn_t *conn, ptl_nid_t *nid,
2118                      __u64 *incarnation, __u32 *ipaddrs)
2119 {
2120         struct socket      *sock = conn->ksnc_sock;
2121         int                 rc;
2122         int                 nips;
2123         int                 i;
2124         int                 type;
2125         ptl_hdr_t           hdr;
2126         ptl_magicversion_t *hmv;
2127
2128         hmv = (ptl_magicversion_t *)&hdr.dest_nid;
2129         LASSERT (sizeof (*hmv) == sizeof (hdr.dest_nid));
2130
2131         rc = ksocknal_sock_read (sock, hmv, sizeof (*hmv));
2132         if (rc != 0) {
2133                 CERROR ("Error %d reading HELLO from %u.%u.%u.%u\n",
2134                         rc, HIPQUAD(conn->ksnc_ipaddr));
2135                 return (rc);
2136         }
2137
2138         if (hmv->magic != le32_to_cpu (PORTALS_PROTO_MAGIC)) {
2139                 CERROR ("Bad magic %#08x (%#08x expected) from %u.%u.%u.%u\n",
2140                         __cpu_to_le32 (hmv->magic), PORTALS_PROTO_MAGIC,
2141                         HIPQUAD(conn->ksnc_ipaddr));
2142                 return (-EPROTO);
2143         }
2144
2145         if (hmv->version_major != cpu_to_le16 (PORTALS_PROTO_VERSION_MAJOR) ||
2146             hmv->version_minor != cpu_to_le16 (PORTALS_PROTO_VERSION_MINOR)) {
2147                 CERROR ("Incompatible protocol version %d.%d (%d.%d expected)"
2148                         " from %u.%u.%u.%u\n",
2149                         le16_to_cpu (hmv->version_major),
2150                         le16_to_cpu (hmv->version_minor),
2151                         PORTALS_PROTO_VERSION_MAJOR,
2152                         PORTALS_PROTO_VERSION_MINOR,
2153                         HIPQUAD(conn->ksnc_ipaddr));
2154                 return (-EPROTO);
2155         }
2156
2157 #if (PORTALS_PROTO_VERSION_MAJOR != 1)
2158 # error "This code only understands protocol version 1.x"
2159 #endif
2160         /* version 1 sends magic/version as the dest_nid of a 'hello'
2161          * header, followed by payload full of interface IP addresses.
2162          * Read the rest of it in now... */
2163
2164         rc = ksocknal_sock_read (sock, hmv + 1, sizeof (hdr) - sizeof (*hmv));
2165         if (rc != 0) {
2166                 CERROR ("Error %d reading rest of HELLO hdr from %u.%u.%u.%u\n",
2167                         rc, HIPQUAD(conn->ksnc_ipaddr));
2168                 return (rc);
2169         }
2170
2171         /* ...and check we got what we expected */
2172         if (hdr.type != cpu_to_le32 (PTL_MSG_HELLO)) {
2173                 CERROR ("Expecting a HELLO hdr,"
2174                         " but got type %d from %u.%u.%u.%u\n",
2175                         le32_to_cpu (hdr.type),
2176                         HIPQUAD(conn->ksnc_ipaddr));
2177                 return (-EPROTO);
2178         }
2179
2180         if (le64_to_cpu(hdr.src_nid) == PTL_NID_ANY) {
2181                 CERROR("Expecting a HELLO hdr with a NID, but got PTL_NID_ANY"
2182                        "from %u.%u.%u.%u\n", HIPQUAD(conn->ksnc_ipaddr));
2183                 return (-EPROTO);
2184         }
2185
2186         if (*nid == PTL_NID_ANY) {              /* don't know peer's nid yet */
2187                 *nid = le64_to_cpu(hdr.src_nid);
2188         } else if (*nid != le64_to_cpu (hdr.src_nid)) {
2189                 CERROR ("Connected to nid "LPX64"@%u.%u.%u.%u "
2190                         "but expecting "LPX64"\n",
2191                         le64_to_cpu (hdr.src_nid),
2192                         HIPQUAD(conn->ksnc_ipaddr), *nid);
2193                 return (-EPROTO);
2194         }
2195
2196         type = __le32_to_cpu(hdr.msg.hello.type);
2197
2198         if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2199                 /* I've accepted this connection; peer determines type */
2200                 conn->ksnc_type = ksocknal_invert_type(type);
2201                 if (conn->ksnc_type == SOCKNAL_CONN_NONE) {
2202                         CERROR ("Unexpected type %d from "LPX64"@%u.%u.%u.%u\n",
2203                                 type, *nid, HIPQUAD(conn->ksnc_ipaddr));
2204                         return (-EPROTO);
2205                 }
2206         } else if (ksocknal_invert_type(type) != conn->ksnc_type) {
2207                 CERROR ("Mismatched types: me %d, "LPX64"@%u.%u.%u.%u %d\n",
2208                         conn->ksnc_type, *nid, HIPQUAD(conn->ksnc_ipaddr),
2209                         le32_to_cpu(hdr.msg.hello.type));
2210                 return (-EPROTO);
2211         }
2212
2213         *incarnation = le64_to_cpu(hdr.msg.hello.incarnation);
2214
2215         nips = __le32_to_cpu (hdr.payload_length) / sizeof (__u32);
2216
2217         if (nips > SOCKNAL_MAX_INTERFACES ||
2218             nips * sizeof(__u32) != __le32_to_cpu (hdr.payload_length)) {
2219                 CERROR("Bad payload length %d from "LPX64"@%u.%u.%u.%u\n",
2220                        __le32_to_cpu (hdr.payload_length),
2221                        *nid, HIPQUAD(conn->ksnc_ipaddr));
2222         }
2223
2224         if (nips == 0)
2225                 return (0);
2226         
2227         rc = ksocknal_sock_read (sock, ipaddrs, nips * sizeof(*ipaddrs));
2228         if (rc != 0) {
2229                 CERROR ("Error %d reading IPs from "LPX64"@%u.%u.%u.%u\n",
2230                         rc, *nid, HIPQUAD(conn->ksnc_ipaddr));
2231                 return (rc);
2232         }
2233
2234         for (i = 0; i < nips; i++) {
2235                 ipaddrs[i] = __le32_to_cpu(ipaddrs[i]);
2236                 
2237                 if (ipaddrs[i] == 0) {
2238                         CERROR("Zero IP[%d] from "LPX64"@%u.%u.%u.%u\n",
2239                                i, *nid, HIPQUAD(conn->ksnc_ipaddr));
2240                         return (-EPROTO);
2241                 }
2242         }
2243
2244         return (nips);
2245 }
2246
2247 int
2248 ksocknal_get_conn_tunables (ksock_conn_t *conn, int *txmem, int *rxmem, int *nagle)
2249 {
2250         mm_segment_t   oldmm = get_fs ();
2251         struct socket *sock = conn->ksnc_sock;
2252         int            len;
2253         int            rc;
2254
2255         rc = ksocknal_getconnsock (conn);
2256         if (rc != 0) {
2257                 LASSERT (conn->ksnc_closing);
2258                 *txmem = *rxmem = *nagle = 0;
2259                 return (-ESHUTDOWN);
2260         }
2261         
2262         set_fs (KERNEL_DS);
2263
2264         len = sizeof(*txmem);
2265         rc = sock_getsockopt(sock, SOL_SOCKET, SO_SNDBUF,
2266                              (char *)txmem, &len);
2267         if (rc == 0) {
2268                 len = sizeof(*rxmem);
2269                 rc = sock_getsockopt(sock, SOL_SOCKET, SO_RCVBUF,
2270                                      (char *)rxmem, &len);
2271         }
2272         if (rc == 0) {
2273                 len = sizeof(*nagle);
2274                 rc = sock->ops->getsockopt(sock, SOL_TCP, TCP_NODELAY,
2275                                            (char *)nagle, &len);
2276         }
2277
2278         set_fs (oldmm);
2279         ksocknal_putconnsock (conn);
2280
2281         if (rc == 0)
2282                 *nagle = !*nagle;
2283         else
2284                 *txmem = *rxmem = *nagle = 0;
2285                 
2286         return (rc);
2287 }
2288
2289 int
2290 ksocknal_setup_sock (struct socket *sock)
2291 {
2292         mm_segment_t    oldmm = get_fs ();
2293         int             rc;
2294         int             option;
2295         int             keep_idle;
2296         int             keep_intvl;
2297         int             keep_count;
2298         int             do_keepalive;
2299         struct linger   linger;
2300
2301         sock->sk->sk_allocation = GFP_NOFS;
2302
2303         /* Ensure this socket aborts active sends immediately when we close
2304          * it. */
2305
2306         linger.l_onoff = 0;
2307         linger.l_linger = 0;
2308
2309         set_fs (KERNEL_DS);
2310         rc = sock_setsockopt (sock, SOL_SOCKET, SO_LINGER,
2311                               (char *)&linger, sizeof (linger));
2312         set_fs (oldmm);
2313         if (rc != 0) {
2314                 CERROR ("Can't set SO_LINGER: %d\n", rc);
2315                 return (rc);
2316         }
2317
2318         option = -1;
2319         set_fs (KERNEL_DS);
2320         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_LINGER2,
2321                                     (char *)&option, sizeof (option));
2322         set_fs (oldmm);
2323         if (rc != 0) {
2324                 CERROR ("Can't set SO_LINGER2: %d\n", rc);
2325                 return (rc);
2326         }
2327
2328         if (!ksocknal_tunables.ksnd_nagle) {
2329                 option = 1;
2330                 
2331                 set_fs (KERNEL_DS);
2332                 rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_NODELAY,
2333                                             (char *)&option, sizeof (option));
2334                 set_fs (oldmm);
2335                 if (rc != 0) {
2336                         CERROR ("Can't disable nagle: %d\n", rc);
2337                         return (rc);
2338                 }
2339         }
2340         
2341         if (ksocknal_tunables.ksnd_buffer_size > 0) {
2342                 option = ksocknal_tunables.ksnd_buffer_size;
2343                 
2344                 set_fs (KERNEL_DS);
2345                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDBUF,
2346                                       (char *)&option, sizeof (option));
2347                 set_fs (oldmm);
2348                 if (rc != 0) {
2349                         CERROR ("Can't set send buffer %d: %d\n",
2350                                 option, rc);
2351                         return (rc);
2352                 }
2353
2354                 set_fs (KERNEL_DS);
2355                 rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVBUF,
2356                                       (char *)&option, sizeof (option));
2357                 set_fs (oldmm);
2358                 if (rc != 0) {
2359                         CERROR ("Can't set receive buffer %d: %d\n",
2360                                 option, rc);
2361                         return (rc);
2362                 }
2363         }
2364
2365         /* snapshot tunables */
2366         keep_idle  = ksocknal_tunables.ksnd_keepalive_idle;
2367         keep_count = ksocknal_tunables.ksnd_keepalive_count;
2368         keep_intvl = ksocknal_tunables.ksnd_keepalive_intvl;
2369         
2370         do_keepalive = (keep_idle > 0 && keep_count > 0 && keep_intvl > 0);
2371
2372         option = (do_keepalive ? 1 : 0);
2373         set_fs (KERNEL_DS);
2374         rc = sock_setsockopt (sock, SOL_SOCKET, SO_KEEPALIVE, 
2375                               (char *)&option, sizeof (option));
2376         set_fs (oldmm);
2377         if (rc != 0) {
2378                 CERROR ("Can't set SO_KEEPALIVE: %d\n", rc);
2379                 return (rc);
2380         }
2381
2382         if (!do_keepalive)
2383                 return (0);
2384
2385         set_fs (KERNEL_DS);
2386         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPIDLE,
2387                                     (char *)&keep_idle, sizeof (keep_idle));
2388         set_fs (oldmm);
2389         if (rc != 0) {
2390                 CERROR ("Can't set TCP_KEEPIDLE: %d\n", rc);
2391                 return (rc);
2392         }
2393
2394         set_fs (KERNEL_DS);
2395         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPINTVL,
2396                                     (char *)&keep_intvl, sizeof (keep_intvl));
2397         set_fs (oldmm);
2398         if (rc != 0) {
2399                 CERROR ("Can't set TCP_KEEPINTVL: %d\n", rc);
2400                 return (rc);
2401         }
2402
2403         set_fs (KERNEL_DS);
2404         rc = sock->ops->setsockopt (sock, SOL_TCP, TCP_KEEPCNT,
2405                                     (char *)&keep_count, sizeof (keep_count));
2406         set_fs (oldmm);
2407         if (rc != 0) {
2408                 CERROR ("Can't set TCP_KEEPCNT: %d\n", rc);
2409                 return (rc);
2410         }
2411
2412         return (0);
2413 }
2414
2415 static int
2416 ksocknal_connect_sock(struct socket **sockp, int *may_retry, 
2417                       ksock_route_t *route, int local_port)
2418 {
2419         struct sockaddr_in  locaddr;
2420         struct sockaddr_in  srvaddr;
2421         struct socket      *sock;
2422         int                 rc;
2423         int                 option;
2424         mm_segment_t        oldmm = get_fs();
2425         struct timeval      tv;
2426
2427         memset(&locaddr, 0, sizeof(locaddr)); 
2428         locaddr.sin_family = AF_INET; 
2429         locaddr.sin_port = htons(local_port);
2430         locaddr.sin_addr.s_addr = 
2431                 (route->ksnr_myipaddr != 0) ? htonl(route->ksnr_myipaddr) 
2432                                             : INADDR_ANY;
2433  
2434         memset (&srvaddr, 0, sizeof (srvaddr));
2435         srvaddr.sin_family = AF_INET;
2436         srvaddr.sin_port = htons (route->ksnr_port);
2437         srvaddr.sin_addr.s_addr = htonl (route->ksnr_ipaddr);
2438
2439         *may_retry = 0;
2440
2441         rc = sock_create (PF_INET, SOCK_STREAM, 0, &sock);
2442         *sockp = sock;
2443         if (rc != 0) {
2444                 CERROR ("Can't create autoconnect socket: %d\n", rc);
2445                 return (rc);
2446         }
2447
2448         /* Ugh; have to map_fd for compatibility with sockets passed in
2449          * from userspace.  And we actually need the sock->file refcounting
2450          * that this gives you :) */
2451
2452         rc = sock_map_fd (sock);
2453         if (rc < 0) {
2454                 sock_release (sock);
2455                 CERROR ("sock_map_fd error %d\n", rc);
2456                 return (rc);
2457         }
2458
2459         /* NB the file descriptor (rc) now owns the ref on sock->file */
2460         LASSERT (sock->file != NULL);
2461         LASSERT (file_count(sock->file) == 1);
2462
2463         get_file(sock->file);                /* extra ref makes sock->file */
2464         sys_close(rc);                       /* survive this close */
2465
2466         /* Still got a single ref on sock->file */
2467         LASSERT (file_count(sock->file) == 1);
2468
2469         /* Set the socket timeouts, so our connection attempt completes in
2470          * finite time */
2471         tv.tv_sec = ksocknal_tunables.ksnd_io_timeout;
2472         tv.tv_usec = 0;
2473
2474         set_fs (KERNEL_DS);
2475         rc = sock_setsockopt (sock, SOL_SOCKET, SO_SNDTIMEO,
2476                               (char *)&tv, sizeof (tv));
2477         set_fs (oldmm);
2478         if (rc != 0) {
2479                 CERROR ("Can't set send timeout %d: %d\n", 
2480                         ksocknal_tunables.ksnd_io_timeout, rc);
2481                 goto failed;
2482         }
2483         
2484         set_fs (KERNEL_DS);
2485         rc = sock_setsockopt (sock, SOL_SOCKET, SO_RCVTIMEO,
2486                               (char *)&tv, sizeof (tv));
2487         set_fs (oldmm);
2488         if (rc != 0) {
2489                 CERROR ("Can't set receive timeout %d: %d\n",
2490                         ksocknal_tunables.ksnd_io_timeout, rc);
2491                 goto failed;
2492         }
2493
2494         set_fs (KERNEL_DS);
2495         option = 1;
2496         rc = sock_setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, 
2497                              (char *)&option, sizeof (option)); 
2498         set_fs (oldmm);
2499         if (rc != 0) {
2500                 CERROR("Can't set SO_REUSEADDR for socket: %d\n", rc);
2501                 goto failed;
2502         }
2503
2504         rc = sock->ops->bind(sock, 
2505                              (struct sockaddr *)&locaddr, sizeof(locaddr));
2506         if (rc == -EADDRINUSE) {
2507                 CDEBUG(D_NET, "Port %d already in use\n", local_port);
2508                 *may_retry = 1;
2509                 goto failed;
2510         }
2511         if (rc != 0) {
2512                 CERROR("Error trying to bind to reserved port %d: %d\n",
2513                        local_port, rc);
2514                 goto failed;
2515         }
2516
2517         rc = sock->ops->connect(sock,
2518                                 (struct sockaddr *)&srvaddr, sizeof(srvaddr),
2519                                 sock->file->f_flags);
2520         if (rc == 0)
2521                 return 0;
2522
2523         /* EADDRNOTAVAIL probably means we're already connected to the same
2524          * peer/port on the same local port on a differently typed
2525          * connection.  Let our caller retry with a different local
2526          * port... */
2527         *may_retry = (rc == -EADDRNOTAVAIL);
2528
2529         CDEBUG(*may_retry ? D_NET : D_ERROR,
2530                "Error %d connecting %u.%u.%u.%u/%d -> %u.%u.%u.%u/%d\n", rc,
2531                HIPQUAD(route->ksnr_myipaddr), local_port,
2532                HIPQUAD(route->ksnr_ipaddr), route->ksnr_port);
2533
2534  failed:
2535         fput(sock->file);
2536         return rc;
2537 }
2538
2539 int
2540 ksocknal_connect_peer (ksock_route_t *route, int type)
2541 {
2542         struct socket      *sock;
2543         int                 rc;
2544         int                 port;
2545         int                 may_retry;
2546         
2547         /* Iterate through reserved ports.  When typed connections are
2548          * used, we will need to bind to multiple ports, but we only know
2549          * this at connect time.  But, by that time we've already called
2550          * bind() so we need a new socket. */
2551
2552         for (port = 1023; port > 512; --port) {
2553
2554                 rc = ksocknal_connect_sock(&sock, &may_retry, route, port);
2555
2556                 if (rc == 0) {
2557                         rc = ksocknal_create_conn(route, sock, type);
2558                         fput(sock->file);
2559                         return rc;
2560                 }
2561
2562                 if (!may_retry)
2563                         return rc;
2564         }
2565
2566         CERROR("Out of ports trying to bind to a reserved port\n");
2567         return (-EADDRINUSE);
2568 }
2569
2570 void
2571 ksocknal_autoconnect (ksock_route_t *route)
2572 {
2573         LIST_HEAD        (zombies);
2574         ksock_tx_t       *tx;
2575         ksock_peer_t     *peer;
2576         unsigned long     flags;
2577         int               rc;
2578         int               type;
2579         
2580         for (;;) {
2581                 for (type = 0; type < SOCKNAL_CONN_NTYPES; type++)
2582                         if ((route->ksnr_connecting & (1 << type)) != 0)
2583                                 break;
2584                 LASSERT (type < SOCKNAL_CONN_NTYPES);
2585
2586                 rc = ksocknal_connect_peer (route, type);
2587                 if (rc != 0)
2588                         break;
2589                 
2590                 /* successfully autoconnected: create_conn did the
2591                  * route/conn binding and scheduled any blocked packets */
2592
2593                 if (route->ksnr_connecting == 0) {
2594                         /* No more connections required */
2595                         return;
2596                 }
2597         }
2598
2599         /* Connection attempt failed */
2600
2601         write_lock_irqsave (&ksocknal_data.ksnd_global_lock, flags);
2602
2603         peer = route->ksnr_peer;
2604         route->ksnr_connecting = 0;
2605
2606         /* This is a retry rather than a new connection */
2607         LASSERT (route->ksnr_retry_interval != 0);
2608         route->ksnr_timeout = jiffies + route->ksnr_retry_interval;
2609         route->ksnr_retry_interval = MIN (route->ksnr_retry_interval * 2,
2610                                           SOCKNAL_MAX_RECONNECT_INTERVAL);
2611
2612         if (!list_empty (&peer->ksnp_tx_queue) &&
2613             ksocknal_find_connecting_route_locked (peer) == NULL) {
2614                 LASSERT (list_empty (&peer->ksnp_conns));
2615
2616                 /* None of the connections that the blocked packets are
2617                  * waiting for have been successful.  Complete them now... */
2618                 do {
2619                         tx = list_entry (peer->ksnp_tx_queue.next,
2620                                          ksock_tx_t, tx_list);
2621                         list_del (&tx->tx_list);
2622                         list_add_tail (&tx->tx_list, &zombies);
2623                 } while (!list_empty (&peer->ksnp_tx_queue));
2624         }
2625
2626 #if 0           /* irrelevent with only eager routes */
2627         if (!route->ksnr_deleted) {
2628                 /* make this route least-favourite for re-selection */
2629                 list_del(&route->ksnr_list);
2630                 list_add_tail(&route->ksnr_list, &peer->ksnp_routes);
2631         }
2632 #endif        
2633         write_unlock_irqrestore (&ksocknal_data.ksnd_global_lock, flags);
2634
2635         while (!list_empty (&zombies)) {
2636                 char ipbuf[PTL_NALFMT_SIZE];
2637                 char ipbuf2[PTL_NALFMT_SIZE];
2638                 tx = list_entry (zombies.next, ksock_tx_t, tx_list);
2639
2640                 CERROR ("Deleting packet type %d len %d ("LPX64" %s->"LPX64" %s)\n",
2641                         le32_to_cpu (tx->tx_hdr->type),
2642                         le32_to_cpu (tx->tx_hdr->payload_length),
2643                         le64_to_cpu (tx->tx_hdr->src_nid),
2644                         portals_nid2str(SOCKNAL,
2645                                         le64_to_cpu(tx->tx_hdr->src_nid),
2646                                         ipbuf),
2647                         le64_to_cpu (tx->tx_hdr->dest_nid),
2648                         portals_nid2str(SOCKNAL,
2649                                         le64_to_cpu(tx->tx_hdr->src_nid),
2650                                         ipbuf2));
2651
2652                 list_del (&tx->tx_list);
2653                 /* complete now */
2654                 ksocknal_tx_done (tx, 0);
2655         }
2656 }
2657
2658 int
2659 ksocknal_autoconnectd (void *arg)
2660 {
2661         long               id = (long)arg;
2662         char               name[16];
2663         unsigned long      flags;
2664         ksock_route_t     *route;
2665         int                rc;
2666
2667         snprintf (name, sizeof (name), "ksocknal_ad%02ld", id);
2668         kportal_daemonize (name);
2669         kportal_blockallsigs ();
2670
2671         spin_lock_irqsave (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2672
2673         while (!ksocknal_data.ksnd_shuttingdown) {
2674
2675                 if (!list_empty (&ksocknal_data.ksnd_autoconnectd_routes)) {
2676                         route = list_entry (ksocknal_data.ksnd_autoconnectd_routes.next,
2677                                             ksock_route_t, ksnr_connect_list);
2678
2679                         list_del (&route->ksnr_connect_list);
2680                         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2681
2682                         ksocknal_autoconnect (route);
2683                         ksocknal_put_route (route);
2684
2685                         spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock,
2686                                           flags);
2687                         continue;
2688                 }
2689
2690                 spin_unlock_irqrestore(&ksocknal_data.ksnd_autoconnectd_lock,
2691                                        flags);
2692
2693                 rc = wait_event_interruptible(ksocknal_data.ksnd_autoconnectd_waitq,
2694                                               ksocknal_data.ksnd_shuttingdown ||
2695                                               !list_empty(&ksocknal_data.ksnd_autoconnectd_routes));
2696
2697                 spin_lock_irqsave(&ksocknal_data.ksnd_autoconnectd_lock, flags);
2698         }
2699
2700         spin_unlock_irqrestore (&ksocknal_data.ksnd_autoconnectd_lock, flags);
2701
2702         ksocknal_thread_fini ();
2703         return (0);
2704 }
2705
2706 ksock_conn_t *
2707 ksocknal_find_timed_out_conn (ksock_peer_t *peer) 
2708 {
2709         /* We're called with a shared lock on ksnd_global_lock */
2710         ksock_conn_t      *conn;
2711         struct list_head  *ctmp;
2712
2713         list_for_each (ctmp, &peer->ksnp_conns) {
2714                 conn = list_entry (ctmp, ksock_conn_t, ksnc_list);
2715
2716                 /* Don't need the {get,put}connsock dance to deref ksnc_sock... */
2717                 LASSERT (!conn->ksnc_closing);
2718
2719                 if (conn->ksnc_sock->sk->sk_err != 0) {
2720                         /* Something (e.g. failed keepalive) set the socket error */
2721                         atomic_inc (&conn->ksnc_refcount);
2722                         CERROR ("Socket error %d: "LPX64" %p %d.%d.%d.%d\n",
2723                                 conn->ksnc_sock->sk->sk_err, peer->ksnp_nid,
2724                                 conn, HIPQUAD(conn->ksnc_ipaddr));
2725                         return (conn);
2726                 }
2727
2728                 if (conn->ksnc_rx_started &&
2729                     time_after_eq (jiffies, conn->ksnc_rx_deadline)) {
2730                         /* Timed out incomplete incoming message */
2731                         atomic_inc (&conn->ksnc_refcount);
2732                         CERROR ("Timed out RX from "LPX64" %p %d.%d.%d.%d\n",
2733                                 peer->ksnp_nid,conn,HIPQUAD(conn->ksnc_ipaddr));
2734                         return (conn);
2735                 }
2736
2737                 if ((!list_empty (&conn->ksnc_tx_queue) ||
2738                      conn->ksnc_sock->sk->sk_wmem_queued != 0) &&
2739                     time_after_eq (jiffies, conn->ksnc_tx_deadline)) {
2740                         /* Timed out messages queued for sending or
2741                          * buffered in the socket's send buffer */
2742                         atomic_inc (&conn->ksnc_refcount);
2743                         CERROR ("Timed out TX to "LPX64" %s%d %p %d.%d.%d.%d\n",
2744                                 peer->ksnp_nid,
2745                                 list_empty (&conn->ksnc_tx_queue) ? "" : "Q ",
2746                                 conn->ksnc_sock->sk->sk_wmem_queued, conn,
2747                                 HIPQUAD(conn->ksnc_ipaddr));
2748                         return (conn);
2749                 }
2750         }
2751
2752         return (NULL);
2753 }
2754
2755 void
2756 ksocknal_check_peer_timeouts (int idx)
2757 {
2758         struct list_head *peers = &ksocknal_data.ksnd_peers[idx];
2759         struct list_head *ptmp;
2760         ksock_peer_t     *peer;
2761         ksock_conn_t     *conn;
2762
2763  again:
2764         /* NB. We expect to have a look at all the peers and not find any
2765          * connections to time out, so we just use a shared lock while we
2766          * take a look... */
2767         read_lock (&ksocknal_data.ksnd_global_lock);
2768
2769         list_for_each (ptmp, peers) {
2770                 peer = list_entry (ptmp, ksock_peer_t, ksnp_list);
2771                 conn = ksocknal_find_timed_out_conn (peer);
2772                 
2773                 if (conn != NULL) {
2774                         read_unlock (&ksocknal_data.ksnd_global_lock);
2775
2776                         CERROR ("Timeout out conn->"LPX64" ip %d.%d.%d.%d:%d\n",
2777                                 peer->ksnp_nid,
2778                                 HIPQUAD(conn->ksnc_ipaddr),
2779                                 conn->ksnc_port);
2780                         ksocknal_close_conn_and_siblings (conn, -ETIMEDOUT);
2781                         
2782                         /* NB we won't find this one again, but we can't
2783                          * just proceed with the next peer, since we dropped
2784                          * ksnd_global_lock and it might be dead already! */
2785                         ksocknal_put_conn (conn);
2786                         goto again;
2787                 }
2788         }
2789
2790         read_unlock (&ksocknal_data.ksnd_global_lock);
2791 }
2792
2793 int
2794 ksocknal_reaper (void *arg)
2795 {
2796         wait_queue_t       wait;
2797         unsigned long      flags;
2798         ksock_conn_t      *conn;
2799         ksock_sched_t     *sched;
2800         struct list_head   enomem_conns;
2801         int                nenomem_conns;
2802         int                timeout;
2803         int                i;
2804         int                peer_index = 0;
2805         unsigned long      deadline = jiffies;
2806         
2807         kportal_daemonize ("ksocknal_reaper");
2808         kportal_blockallsigs ();
2809
2810         INIT_LIST_HEAD(&enomem_conns);
2811         init_waitqueue_entry (&wait, current);
2812
2813         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2814
2815         while (!ksocknal_data.ksnd_shuttingdown) {
2816
2817                 if (!list_empty (&ksocknal_data.ksnd_deathrow_conns)) {
2818                         conn = list_entry (ksocknal_data.ksnd_deathrow_conns.next,
2819                                            ksock_conn_t, ksnc_list);
2820                         list_del (&conn->ksnc_list);
2821                         
2822                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2823
2824                         ksocknal_terminate_conn (conn);
2825                         ksocknal_put_conn (conn);
2826
2827                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2828                         continue;
2829                 }
2830
2831                 if (!list_empty (&ksocknal_data.ksnd_zombie_conns)) {
2832                         conn = list_entry (ksocknal_data.ksnd_zombie_conns.next,
2833                                            ksock_conn_t, ksnc_list);
2834                         list_del (&conn->ksnc_list);
2835                         
2836                         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2837
2838                         ksocknal_destroy_conn (conn);
2839
2840                         spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2841                         continue;
2842                 }
2843
2844                 if (!list_empty (&ksocknal_data.ksnd_enomem_conns)) {
2845                         list_add(&enomem_conns, &ksocknal_data.ksnd_enomem_conns);
2846                         list_del_init(&ksocknal_data.ksnd_enomem_conns);
2847                 }
2848
2849                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2850
2851                 /* reschedule all the connections that stalled with ENOMEM... */
2852                 nenomem_conns = 0;
2853                 while (!list_empty (&enomem_conns)) {
2854                         conn = list_entry (enomem_conns.next,
2855                                            ksock_conn_t, ksnc_tx_list);
2856                         list_del (&conn->ksnc_tx_list);
2857
2858                         sched = conn->ksnc_scheduler;
2859
2860                         spin_lock_irqsave (&sched->kss_lock, flags);
2861
2862                         LASSERT (conn->ksnc_tx_scheduled);
2863                         conn->ksnc_tx_ready = 1;
2864                         list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
2865                         wake_up (&sched->kss_waitq);
2866
2867                         spin_unlock_irqrestore (&sched->kss_lock, flags);
2868                         nenomem_conns++;
2869                 }
2870                 
2871                 /* careful with the jiffy wrap... */
2872                 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2873                         const int n = 4;
2874                         const int p = 1;
2875                         int       chunk = ksocknal_data.ksnd_peer_hash_size;
2876                         
2877                         /* Time to check for timeouts on a few more peers: I do
2878                          * checks every 'p' seconds on a proportion of the peer
2879                          * table and I need to check every connection 'n' times
2880                          * within a timeout interval, to ensure I detect a
2881                          * timeout on any connection within (n+1)/n times the
2882                          * timeout interval. */
2883
2884                         if (ksocknal_tunables.ksnd_io_timeout > n * p)
2885                                 chunk = (chunk * n * p) / 
2886                                         ksocknal_tunables.ksnd_io_timeout;
2887                         if (chunk == 0)
2888                                 chunk = 1;
2889
2890                         for (i = 0; i < chunk; i++) {
2891                                 ksocknal_check_peer_timeouts (peer_index);
2892                                 peer_index = (peer_index + 1) % 
2893                                              ksocknal_data.ksnd_peer_hash_size;
2894                         }
2895
2896                         deadline += p * HZ;
2897                 }
2898
2899                 if (nenomem_conns != 0) {
2900                         /* Reduce my timeout if I rescheduled ENOMEM conns.
2901                          * This also prevents me getting woken immediately
2902                          * if any go back on my enomem list. */
2903                         timeout = SOCKNAL_ENOMEM_RETRY;
2904                 }
2905                 ksocknal_data.ksnd_reaper_waketime = jiffies + timeout;
2906
2907                 set_current_state (TASK_INTERRUPTIBLE);
2908                 add_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2909
2910                 if (!ksocknal_data.ksnd_shuttingdown &&
2911                     list_empty (&ksocknal_data.ksnd_deathrow_conns) &&
2912                     list_empty (&ksocknal_data.ksnd_zombie_conns))
2913                         schedule_timeout (timeout);
2914
2915                 set_current_state (TASK_RUNNING);
2916                 remove_wait_queue (&ksocknal_data.ksnd_reaper_waitq, &wait);
2917
2918                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
2919         }
2920
2921         spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
2922
2923         ksocknal_thread_fini ();
2924         return (0);
2925 }
2926
2927 lib_nal_t ksocknal_lib = {
2928         libnal_data:       &ksocknal_data,      /* NAL private data */
2929         libnal_send:        ksocknal_send,
2930         libnal_send_pages:  ksocknal_send_pages,
2931         libnal_recv:        ksocknal_recv,
2932         libnal_recv_pages:  ksocknal_recv_pages,
2933         libnal_dist:        ksocknal_dist
2934 };