Whamcloud - gitweb
- merge 0.7rc1 from b_devel to HEAD (20030612 merge point)
[fs/lustre-release.git] / lnet / klnds / socklnd / socklnd_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *
10  *   This file is part of Portals, http://www.sf.net/projects/sandiaportals/
11  *
12  *   Portals is free software; you can redistribute it and/or
13  *   modify it under the terms of version 2 of the GNU General Public
14  *   License as published by the Free Software Foundation.
15  *
16  *   Portals is distributed in the hope that it will be useful,
17  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
18  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
19  *   GNU General Public License for more details.
20  *
21  *   You should have received a copy of the GNU General Public License
22  *   along with Portals; if not, write to the Free Software
23  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
24  */
25
26 #include "socknal.h"
27
28 atomic_t   ksocknal_packets_received;
29 atomic_t   ksocknal_packets_launched;
30 atomic_t   ksocknal_packets_being_sent;
31
32 #if SOCKNAL_ZC
33 int        ksocknal_do_zc = 1;
34 int        ksocknal_zc_min_frag = 2048;
35 #endif
36
37 /*
38  *  LIB functions follow
39  *
40  */
41 int
42 ksocknal_read(nal_cb_t *nal, void *private, void *dst_addr,
43               user_ptr src_addr, size_t len)
44 {
45         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
46                nal->ni.nid, (long)len, src_addr, dst_addr);
47
48         memcpy( dst_addr, src_addr, len );
49         return 0;
50 }
51
52 int
53 ksocknal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
54                void *src_addr, size_t len)
55 {
56         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
57                nal->ni.nid, (long)len, src_addr, dst_addr);
58
59         memcpy( dst_addr, src_addr, len );
60         return 0;
61 }
62
63 int
64 ksocknal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
65                          ptl_event_t *ev)
66 {
67         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
68                nal->ni.nid, eq, ev);
69
70         if (eq->event_callback != NULL)
71                 eq->event_callback(ev);
72
73         return 0;
74 }
75
76 void *
77 ksocknal_malloc(nal_cb_t *nal, size_t len)
78 {
79         void *buf;
80
81         PORTAL_ALLOC(buf, len);
82
83         if (buf != NULL)
84                 memset(buf, 0, len);
85
86         return (buf);
87 }
88
89 void
90 ksocknal_free(nal_cb_t *nal, void *buf, size_t len)
91 {
92         PORTAL_FREE(buf, len);
93 }
94
95 void
96 ksocknal_printf(nal_cb_t *nal, const char *fmt, ...)
97 {
98         va_list ap;
99         char msg[256];
100
101         va_start (ap, fmt);
102         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
103         va_end (ap);
104
105         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
106
107         CDEBUG (D_NET, "%s", msg);
108 }
109
110 void
111 ksocknal_cli(nal_cb_t *nal, unsigned long *flags)
112 {
113         ksock_nal_data_t *data = nal->nal_data;
114
115         spin_lock(&data->ksnd_nal_cb_lock);
116 }
117
118 void
119 ksocknal_sti(nal_cb_t *nal, unsigned long *flags)
120 {
121         ksock_nal_data_t *data;
122         data = nal->nal_data;
123
124         spin_unlock(&data->ksnd_nal_cb_lock);
125 }
126
127 int
128 ksocknal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
129 {
130         /* I would guess that if ksocknal_get_conn(nid) == NULL,
131            and we're not routing, then 'nid' is very distant :) */
132         if ( nal->ni.nid == nid ) {
133                 *dist = 0;
134         } else {
135                 *dist = 1;
136         }
137
138         return 0;
139 }
140
141 ksock_ltx_t *
142 ksocknal_get_ltx (int may_block)
143 {
144         long             flags;
145         ksock_ltx_t *ltx = NULL;
146
147         for (;;) {
148                 spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
149
150                 if (!list_empty (&ksocknal_data.ksnd_idle_ltx_list)) {
151                         ltx = list_entry(ksocknal_data.ksnd_idle_ltx_list.next,
152                                          ksock_ltx_t, ltx_tx.tx_list);
153                         list_del (&ltx->ltx_tx.tx_list);
154                         break;
155                 }
156
157                 if (!may_block) {
158                         if (!list_empty(&ksocknal_data.ksnd_idle_nblk_ltx_list)) {
159                                 ltx = list_entry(ksocknal_data.ksnd_idle_nblk_ltx_list.next,
160                                                  ksock_ltx_t, ltx_tx.tx_list);
161                                 list_del (&ltx->ltx_tx.tx_list);
162                         }
163                         break;
164                 }
165
166                 spin_unlock_irqrestore(&ksocknal_data.ksnd_idle_ltx_lock,
167                                        flags);
168
169                 wait_event (ksocknal_data.ksnd_idle_ltx_waitq,
170                             !list_empty (&ksocknal_data.ksnd_idle_ltx_list));
171         }
172
173         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
174
175         return (ltx);
176 }
177
178 #if SOCKNAL_ZC
179 struct page *
180 ksocknal_kvaddr_to_page (unsigned long vaddr)
181 {
182         struct page *page;
183
184         if (vaddr >= VMALLOC_START &&
185             vaddr < VMALLOC_END)
186                 page = vmalloc_to_page ((void *)vaddr);
187 #if CONFIG_HIGHMEM
188         else if (vaddr >= PKMAP_BASE &&
189                  vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
190                 page = vmalloc_to_page ((void *)vaddr);
191                 /* in 2.4 ^ just walks the page tables */
192 #endif
193         else
194                 page = virt_to_page (vaddr);
195
196         if (page == NULL ||
197             !VALID_PAGE (page))
198                 return (NULL);
199
200         return (page);
201 }
202 #endif
203
204 int
205 ksocknal_send_iov (struct socket *sock, ksock_tx_t *tx, int more)
206 {
207         struct iovec  *iov = tx->tx_iov;
208         int            fragsize = iov->iov_len;
209         unsigned long  vaddr = (unsigned long)iov->iov_base;
210 #if SOCKNAL_ZC
211         int            offset = vaddr & (PAGE_SIZE - 1);
212         int            zcsize = MIN (fragsize, PAGE_SIZE - offset);
213         struct page   *page;
214 #endif
215         int            rc;
216
217         /* NB we can't trust socket ops to either consume our iovs
218          * or leave them alone, so we only send 1 frag at a time. */
219         LASSERT (fragsize <= tx->tx_nob);
220         LASSERT (tx->tx_niov > 0);
221         more |= (tx->tx_niov > 1);
222         
223 #if SOCKNAL_ZC
224         if (ksocknal_do_zc &&
225             (sock->sk->route_caps & NETIF_F_SG) &&
226             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
227             zcsize >= ksocknal_zc_min_frag &&
228             (page = ksocknal_kvaddr_to_page (vaddr)) != NULL) {
229                 
230                 CDEBUG(D_NET, "vaddr %p, page %p->%p + offset %x for %d\n",
231                        (void *)vaddr, page, page_address(page), offset, zcsize);
232
233                 more |= (zcsize < fragsize);
234
235                 rc = tcp_sendpage_zccd(sock, page, offset, zcsize, 
236                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
237                                        &tx->tx_zccd);
238         } else
239 #endif
240         {
241                 /* NB don't pass tx's iov; sendmsg may or may not update it */
242                 struct iovec fragiov = { .iov_base = (void *)vaddr,
243                                          .iov_len  = fragsize};
244                 struct msghdr msg = {
245                         .msg_name       = NULL,
246                         .msg_namelen    = 0,
247                         .msg_iov        = &fragiov,
248                         .msg_iovlen     = 1,
249                         .msg_control    = NULL,
250                         .msg_controllen = 0,
251                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
252                 };
253                 mm_segment_t oldmm = get_fs();
254                 
255                 set_fs (KERNEL_DS);
256                 rc = sock->sk->prot->sendmsg(sock->sk, &msg, fragsize);
257                 set_fs (oldmm);
258         } 
259
260         if (rc <= 0)
261                 return (rc);
262
263         tx->tx_nob -= rc;
264
265         if (rc < fragsize) {
266                 /* didn't send whole frag */
267                 iov->iov_base = (void *)(vaddr + rc);
268                 iov->iov_len  = fragsize - rc;
269                 return (-EAGAIN);
270         }
271
272         /* everything went */
273         LASSERT (rc == fragsize);
274         tx->tx_iov++;
275         tx->tx_niov--;
276         return (1);
277 }
278
279 int
280 ksocknal_send_kiov (struct socket *sock, ksock_tx_t *tx, int more)
281 {
282         ptl_kiov_t    *kiov = tx->tx_kiov;
283         int            fragsize = kiov->kiov_len;
284         struct page   *page = kiov->kiov_page;
285         int            offset = kiov->kiov_offset;
286         int            rc;
287
288         /* NB we can't trust socket ops to either consume our iovs
289          * or leave them alone, so we only send 1 frag at a time. */
290         LASSERT (fragsize <= tx->tx_nob);
291         LASSERT (offset + fragsize <= PAGE_SIZE);
292         LASSERT (tx->tx_nkiov > 0);
293         more |= (tx->tx_nkiov > 1);
294
295 #if SOCKNAL_ZC
296         if (ksocknal_do_zc &&
297             (sock->sk->route_caps & NETIF_F_SG) &&
298             (sock->sk->route_caps & (NETIF_F_IP_CSUM | NETIF_F_NO_CSUM | NETIF_F_HW_CSUM)) &&
299             fragsize >= ksocknal_zc_min_frag) {
300
301                 CDEBUG(D_NET, "page %p + offset %x for %d\n",
302                                page, offset, fragsize);
303
304                 rc = tcp_sendpage_zccd(sock, page, offset, fragsize,
305                                        more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT,
306                                        &tx->tx_zccd);
307         } else
308 #endif
309         {
310                 char *addr = ((char *)kmap (page)) + offset;
311                 struct iovec fragiov = {.iov_base = addr,
312                                         .iov_len  = fragsize};
313                 struct msghdr msg = {
314                         .msg_name       = NULL,
315                         .msg_namelen    = 0,
316                         .msg_iov        = &fragiov,
317                         .msg_iovlen     = 1,
318                         .msg_control    = NULL,
319                         .msg_controllen = 0,
320                         .msg_flags      = more ? (MSG_DONTWAIT | MSG_MORE) : MSG_DONTWAIT
321                 };
322                 mm_segment_t  oldmm = get_fs();
323                 
324                 set_fs (KERNEL_DS);
325                 rc = sock->sk->prot->sendmsg(sock->sk, &msg, fragsize);
326                 set_fs (oldmm);
327                 kunmap (page);
328         }
329
330         if (rc <= 0)
331                 return (rc);
332
333         tx->tx_nob -= rc;
334
335         if (rc < fragsize) {
336                 /* didn't send whole frag */
337                 kiov->kiov_offset = offset + rc;
338                 kiov->kiov_len    = fragsize - rc;
339                 return (-EAGAIN);
340         }
341
342         /* everything went */
343         LASSERT (rc == fragsize);
344         tx->tx_kiov++;
345         tx->tx_nkiov--;
346         return (1);
347 }
348
349 int
350 ksocknal_sendmsg (struct socket *sock, ksock_tx_t *tx, int more)
351 {
352         int    rc;
353         int    sent_some = 0;
354         ENTRY;
355         
356         LASSERT (!in_interrupt());
357
358         for (;;) {
359                 if (tx->tx_niov != 0)
360                         rc = ksocknal_send_iov (sock, tx, more || tx->tx_nkiov != 0);
361                 else
362                         rc = ksocknal_send_kiov (sock, tx, more);
363
364                 /* Interpret a zero rc the same as -EAGAIN (Adaptech TOE) */
365                 if (rc <= 0)                    /* error or partial send */
366                         RETURN ((sent_some || rc == -EAGAIN) ? 0 : rc);
367                 
368                 if (tx->tx_nob == 0)            /* sent everything */
369                         RETURN (0);
370
371                 sent_some = 1;
372         }
373 }
374
375 int
376 ksocknal_recv_iov (ksock_conn_t *conn)
377 {
378         struct iovec *iov = conn->ksnc_rx_iov;
379         int           fragsize  = iov->iov_len;
380         unsigned long vaddr = (unsigned long)iov->iov_base;
381         struct iovec  fragiov = { .iov_base = (void *)vaddr,
382                                   .iov_len  = fragsize};
383         struct msghdr msg = {
384                 .msg_name       = NULL,
385                 .msg_namelen    = 0,
386                 .msg_iov        = &fragiov,
387                 .msg_iovlen     = 1,
388                 .msg_control    = NULL,
389                 .msg_controllen = 0,
390                 .msg_flags      = 0
391         };
392         mm_segment_t oldmm = get_fs();
393         int          rc;
394
395         /* NB we can't trust socket ops to either consume our iovs
396          * or leave them alone, so we only receive 1 frag at a time. */
397         LASSERT (conn->ksnc_rx_niov > 0);
398         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
399         
400         set_fs (KERNEL_DS);
401         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
402         /* NB this is just a boolean............................^ */
403         set_fs (oldmm);
404
405         if (rc <= 0)
406                 return (rc);
407
408         conn->ksnc_rx_nob_wanted -= rc;
409         conn->ksnc_rx_nob_left -= rc;
410                 
411         if (rc < fragsize) {
412                 iov->iov_base = (void *)(vaddr + rc);
413                 iov->iov_len = fragsize - rc;
414                 return (-EAGAIN);
415         }
416
417         LASSERT (rc == fragsize);
418         conn->ksnc_rx_iov++;
419         conn->ksnc_rx_niov--;
420         return (1);
421 }
422
423 int
424 ksocknal_recv_kiov (ksock_conn_t *conn)
425 {
426         ptl_kiov_t   *kiov = conn->ksnc_rx_kiov;
427         struct page  *page = kiov->kiov_page;
428         int           offset = kiov->kiov_offset;
429         int           fragsize = kiov->kiov_len;
430         unsigned long vaddr = ((unsigned long)kmap (page)) + offset;
431         struct iovec  fragiov = { .iov_base = (void *)vaddr,
432                                   .iov_len  = fragsize};
433         struct msghdr msg = {
434                 .msg_name       = NULL,
435                 .msg_namelen    = 0,
436                 .msg_iov        = &fragiov,
437                 .msg_iovlen     = 1,
438                 .msg_control    = NULL,
439                 .msg_controllen = 0,
440                 .msg_flags      = 0
441         };
442         mm_segment_t oldmm = get_fs();
443         int          rc;
444
445         /* NB we can't trust socket ops to either consume our iovs
446          * or leave them alone, so we only receive 1 frag at a time. */
447         LASSERT (fragsize <= conn->ksnc_rx_nob_wanted);
448         LASSERT (conn->ksnc_rx_nkiov > 0);
449         LASSERT (offset + fragsize <= PAGE_SIZE);
450         
451         set_fs (KERNEL_DS);
452         rc = sock_recvmsg (conn->ksnc_sock, &msg, fragsize, MSG_DONTWAIT);
453         /* NB this is just a boolean............................^ */
454         set_fs (oldmm);
455         kunmap (page);
456         
457         if (rc <= 0)
458                 return (rc);
459         
460         conn->ksnc_rx_nob_wanted -= rc;
461         conn->ksnc_rx_nob_left -= rc;
462                 
463         if (rc < fragsize) {
464                 kiov->kiov_offset = offset + rc;
465                 kiov->kiov_len = fragsize - rc;
466                 return (-EAGAIN);
467         }
468
469         LASSERT (rc == fragsize);
470         conn->ksnc_rx_kiov++;
471         conn->ksnc_rx_nkiov--;
472         return (1);
473 }
474
475 int
476 ksocknal_recvmsg (ksock_conn_t *conn) 
477 {
478         int    rc;
479         int    got_some = 0;
480         ENTRY;
481         
482         LASSERT (!in_interrupt ());
483
484         for (;;) {
485                 LASSERT (conn->ksnc_rx_nob_wanted > 0);
486                 
487                 if (conn->ksnc_rx_niov != 0)
488                         rc = ksocknal_recv_iov (conn);
489                 else
490                         rc = ksocknal_recv_kiov (conn);
491
492                 /* CAVEAT EMPTOR: we return...
493                  * <= 0 for error (0 == EOF) and > 0 for success (unlike sendmsg()) */
494
495                 if (rc <= 0)                    /* error/EOF or partial receive */
496                         RETURN ((got_some || rc == -EAGAIN) ? 1 : rc);
497                 
498                 if (conn->ksnc_rx_nob_wanted == 0)
499                         RETURN (1);
500
501                 got_some = 0;
502         }
503 }
504
505 #if SOCKNAL_ZC
506 void
507 ksocknal_zc_callback (zccd_t *zcd)
508 {
509         ksock_tx_t    *tx = KSOCK_ZCCD_2_TX(zcd);
510         ksock_sched_t *sched = tx->tx_sched;
511         unsigned long  flags;
512         ENTRY;
513
514         /* Schedule tx for cleanup (can't do it now due to lock conflicts) */
515
516         spin_lock_irqsave (&sched->kss_lock, flags);
517
518         list_add_tail (&tx->tx_list, &sched->kss_zctxdone_list);
519         if (waitqueue_active (&sched->kss_waitq))
520                 wake_up (&sched->kss_waitq);
521
522         spin_unlock_irqrestore (&sched->kss_lock, flags);
523         EXIT;
524 }
525 #endif
526
527 void
528 ksocknal_tx_done (ksock_tx_t *tx)
529 {
530         long           flags;
531         ksock_ltx_t   *ltx;
532         ENTRY;
533
534         atomic_dec (&ksocknal_packets_being_sent);
535
536         if (tx->tx_isfwd) {             /* was a forwarded packet? */
537                 kpr_fwd_done (&ksocknal_data.ksnd_router,
538                               KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
539                 EXIT;
540                 return;
541         }
542
543         /* local send */
544         ltx = KSOCK_TX_2_KSOCK_LTX (tx);
545
546         lib_finalize (&ksocknal_lib, ltx->ltx_private, ltx->ltx_cookie);
547
548         spin_lock_irqsave (&ksocknal_data.ksnd_idle_ltx_lock, flags);
549
550         list_add_tail (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
551
552         /* normal tx desc => wakeup anyone blocking for one */
553         if (ltx->ltx_idle == &ksocknal_data.ksnd_idle_ltx_list &&
554             waitqueue_active (&ksocknal_data.ksnd_idle_ltx_waitq))
555                 wake_up (&ksocknal_data.ksnd_idle_ltx_waitq);
556
557         spin_unlock_irqrestore (&ksocknal_data.ksnd_idle_ltx_lock, flags);
558         EXIT;
559 }
560
561 void
562 ksocknal_process_transmit (ksock_sched_t *sched, long *irq_flags)
563 {
564         ksock_conn_t *conn;
565         ksock_tx_t *tx;
566         int         rc;
567
568         LASSERT (!list_empty (&sched->kss_tx_conns));
569         conn = list_entry(sched->kss_tx_conns.next, ksock_conn_t, ksnc_tx_list);
570         list_del (&conn->ksnc_tx_list);
571
572         LASSERT (conn->ksnc_tx_scheduled);
573         LASSERT (conn->ksnc_tx_ready);
574         LASSERT (!list_empty (&conn->ksnc_tx_queue));
575         tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
576         /* assume transmit will complete now, so dequeue while I've got lock */
577         list_del (&tx->tx_list);
578
579         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
580
581         LASSERT (tx->tx_nob > 0);
582
583         conn->ksnc_tx_ready = 0;/* write_space may race with me and set ready */
584         mb();                   /* => clear BEFORE trying to write */
585
586         rc = ksocknal_sendmsg (conn->ksnc_sock, tx, 
587                                !list_empty (&conn->ksnc_tx_queue)); /* more to come? */
588
589         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
590
591         if (rc != 0) {
592 #warning FIXME: handle socket errors properly
593                 CERROR("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
594                 /* kid on for now the whole packet went.
595                  * NB when we handle the error better, we'll still need to
596                  * block for zccd completion.
597                  */
598                 tx->tx_nob = 0;
599         }
600
601         if (tx->tx_nob == 0)                    /* nothing left to send */
602         {
603                 /* everything went; assume more can go, so prevent write_space locking */
604                 conn->ksnc_tx_ready = 1;
605
606                 ksocknal_put_conn (conn);       /* release packet's ref */
607                 atomic_inc (&ksocknal_packets_being_sent);
608 #if SOCKNAL_ZC
609                 if (atomic_read (&tx->tx_zccd.zccd_count) != 1) {
610                         /* zccd skbufs are still in-flight.  Release my
611                          * initial ref on zccd, so callback can occur */
612                         zccd_put (&tx->tx_zccd);
613                 } else
614 #endif
615                         ksocknal_tx_done (tx);
616
617                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
618         } else {
619                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
620
621                 /* back onto HEAD of tx_queue */
622                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
623         }
624
625         if (!conn->ksnc_tx_ready ||             /* no space to write now */
626             list_empty (&conn->ksnc_tx_queue)) {/* nothing to write */
627                 conn->ksnc_tx_scheduled = 0;    /* not being scheduled */
628                 ksocknal_put_conn (conn);       /* release scheduler's ref */
629         } else                                 /* let scheduler call me again */
630                 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
631 }
632
633 void
634 ksocknal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
635 {
636         unsigned long  flags;
637         ksock_sched_t *sched = conn->ksnc_scheduler;
638
639         /* Ensure the frags we've been given EXACTLY match the number of
640          * bytes we want to send.  Many TCP/IP stacks disregard any total
641          * size parameters passed to them and just look at the frags. 
642          *
643          * We always expect at least 1 mapped fragment containing the
644          * complete portals header.
645          */
646         LASSERT (lib_iov_nob (tx->tx_niov, tx->tx_iov) +
647                  lib_kiov_nob (tx->tx_nkiov, tx->tx_kiov) == tx->tx_nob);
648         LASSERT (tx->tx_niov >= 1);
649         LASSERT (tx->tx_iov[0].iov_len >= sizeof (ptl_hdr_t));
650         
651         CDEBUG (D_NET, "type %d, nob %d niov %d nkiov %d\n",
652                 ((ptl_hdr_t *)tx->tx_iov[0].iov_base)->type, tx->tx_nob, 
653                 tx->tx_niov, tx->tx_nkiov);
654
655 #if SOCKNAL_ZC
656         zccd_init (&tx->tx_zccd, ksocknal_zc_callback);
657         /* NB this sets 1 ref on zccd, so the callback can only occur
658          * after I've released this ref */
659         tx->tx_sched = sched;
660 #endif
661         spin_lock_irqsave (&sched->kss_lock, flags);
662
663         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
664
665         if (conn->ksnc_tx_ready &&              /* able to send */
666             !conn->ksnc_tx_scheduled) {          /* not scheduled to send */
667                 list_add_tail (&conn->ksnc_tx_list, &sched->kss_tx_conns);
668                 conn->ksnc_tx_scheduled = 1;
669                 atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
670                 if (waitqueue_active (&sched->kss_waitq))
671                         wake_up (&sched->kss_waitq);
672         }
673
674         spin_unlock_irqrestore (&sched->kss_lock, flags);
675
676         atomic_inc (&ksocknal_packets_launched);
677 }
678
679 ksock_conn_t *
680 ksocknal_send_target (ptl_nid_t nid) 
681 {
682         ptl_nid_t     gatewaynid;
683         ksock_conn_t *conn;
684         int           rc;
685
686         if ((conn = ksocknal_get_conn (nid)) == NULL) {
687                 /* It's not a peer; try to find a gateway */
688                 rc = kpr_lookup (&ksocknal_data.ksnd_router, nid, &gatewaynid);
689                 if (rc != 0) {
690                         CERROR("Can't route to "LPX64": router error %d\n",
691                                nid, rc);
692                         return (NULL);
693                 }
694
695                 if ((conn = ksocknal_get_conn (gatewaynid)) == NULL) {
696                         CERROR ("Can't route to "LPX64": gateway "LPX64
697                                 " is not a peer\n", nid, gatewaynid);
698                         return (NULL);
699                 }
700         }
701
702         return (conn);
703 }
704
705 ksock_ltx_t *
706 ksocknal_setup_hdr (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
707                     ptl_hdr_t *hdr, int type)
708 {
709         ksock_ltx_t  *ltx;
710
711         /* I may not block for a transmit descriptor if I might block the
712          * receiver, or an interrupt handler. */
713         ltx = ksocknal_get_ltx (!(type == PTL_MSG_ACK ||
714                                   type == PTL_MSG_REPLY ||
715                                   in_interrupt ()));
716         if (ltx == NULL) {
717                 CERROR ("Can't allocate tx desc\n");
718                 return (NULL);
719         }
720
721         /* Init local send packet (storage for hdr, finalize() args) */
722         ltx->ltx_hdr = *hdr;
723         ltx->ltx_private = private;
724         ltx->ltx_cookie = cookie;
725         
726         /* Init common ltx_tx */
727         ltx->ltx_tx.tx_isfwd = 0;
728         ltx->ltx_tx.tx_nob = sizeof (*hdr);
729
730         /* We always have 1 mapped frag for the header */
731         ltx->ltx_tx.tx_niov = 1;
732         ltx->ltx_tx.tx_iov = &ltx->ltx_iov_space.hdr;
733         ltx->ltx_tx.tx_iov[0].iov_base = &ltx->ltx_hdr;
734         ltx->ltx_tx.tx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
735
736         ltx->ltx_tx.tx_kiov  = NULL;
737         ltx->ltx_tx.tx_nkiov = 0;
738
739         return (ltx);
740 }
741
742 int
743 ksocknal_send (nal_cb_t *nal, void *private, lib_msg_t *cookie,
744                ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
745                unsigned int payload_niov, struct iovec *payload_iov,
746                size_t payload_len)
747 {
748         ksock_ltx_t  *ltx;
749         ksock_conn_t *conn;
750
751         /* NB 'private' is different depending on what we're sending.
752          * Just ignore it until we can rely on it
753          *
754          * Also, the return code from this procedure is ignored.
755          * If we can't send, we must still complete with lib_finalize().
756          * We'll have to wait for 3.2 to return an error event.
757          */
758
759         CDEBUG(D_NET,
760                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64
761                " pid %d\n", payload_len, payload_niov, nid, pid);
762
763         conn = ksocknal_send_target (nid);
764         if (conn == NULL) {
765                 lib_finalize (&ksocknal_lib, private, cookie);
766                 return (-1);
767         }
768
769         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
770         if (ltx == NULL) {
771                 ksocknal_put_conn (conn);
772                 lib_finalize (&ksocknal_lib, private, cookie);
773                 return (-1);
774         }
775
776         /* append the payload_iovs to the one pointing at the header */
777         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
778         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
779
780         memcpy (ltx->ltx_tx.tx_iov + 1, payload_iov,
781                 payload_niov * sizeof (*payload_iov));
782         ltx->ltx_tx.tx_niov = 1 + payload_niov;
783         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
784
785         ksocknal_launch_packet (conn, &ltx->ltx_tx);
786         return (0);
787 }
788
789 int
790 ksocknal_send_pages (nal_cb_t *nal, void *private, lib_msg_t *cookie, 
791                      ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
792                      unsigned int payload_niov, ptl_kiov_t *payload_iov, size_t payload_len)
793 {
794         ksock_ltx_t *ltx;
795         ksock_conn_t *conn;
796         
797         /* NB 'private' is different depending on what we're sending.
798          * Just ignore it until we can rely on it */
799
800         CDEBUG(D_NET,
801                "sending "LPSZ" bytes in %d mapped frags to nid: "LPX64" pid %d\n",
802                payload_len, payload_niov, nid, pid);
803
804         conn = ksocknal_send_target (nid);
805         if (conn == NULL)
806                 return (-1);
807
808         ltx = ksocknal_setup_hdr (nal, private, cookie, hdr, type);
809         if (ltx == NULL) {
810                 ksocknal_put_conn (conn);
811                 return (-1);
812         }
813
814         LASSERT (ltx->ltx_tx.tx_niov == 1 && ltx->ltx_tx.tx_nkiov == 0);
815         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
816         
817         ltx->ltx_tx.tx_kiov = ltx->ltx_iov_space.payload.kiov;
818         memcpy (ltx->ltx_tx.tx_kiov, payload_iov, 
819                 payload_niov * sizeof (*payload_iov));
820         ltx->ltx_tx.tx_nkiov = payload_niov;
821         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
822
823         ksocknal_launch_packet (conn, &ltx->ltx_tx);
824         return (0);
825 }
826
827 void
828 ksocknal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
829 {
830         ksock_conn_t *conn;
831         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
832         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
833
834         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
835                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
836
837         /* I'm the gateway; must be the last hop */
838         if (nid == ksocknal_lib.ni.nid)
839                 nid = fwd->kprfd_target_nid;
840
841         conn = ksocknal_get_conn (nid);
842         if (conn == NULL) {
843                 CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
844                 kpr_fwd_done (&ksocknal_data.ksnd_router, fwd, -EHOSTUNREACH);
845                 return;
846         }
847
848         /* This forward has now got a ref on conn */
849
850         tx->tx_isfwd = 1;                   /* This is a forwarding packet */
851         tx->tx_nob   = fwd->kprfd_nob;
852         tx->tx_niov  = fwd->kprfd_niov;
853         tx->tx_iov   = fwd->kprfd_iov;
854         tx->tx_nkiov = 0;
855         tx->tx_kiov  = NULL;
856         
857         ksocknal_launch_packet (conn, tx);
858 }
859
860 int
861 ksocknal_thread_start (int (*fn)(void *arg), void *arg)
862 {
863         long    pid = kernel_thread (fn, arg, 0);
864
865         if (pid < 0)
866                 return ((int)pid);
867
868         atomic_inc (&ksocknal_data.ksnd_nthreads);
869         return (0);
870 }
871
872 void
873 ksocknal_thread_fini (void)
874 {
875         atomic_dec (&ksocknal_data.ksnd_nthreads);
876 }
877
878 void
879 ksocknal_fmb_callback (void *arg, int error)
880 {
881         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
882         ksock_fmb_pool_t  *fmp = fmb->fmb_pool;
883         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
884         ksock_conn_t      *conn = NULL;
885         ksock_sched_t     *sched;
886         long               flags;
887
888         if (error != 0)
889                 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
890                        NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),
891                        error);
892         else
893                 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": OK\n",
894                         NTOH__u64 (hdr->src_nid), NTOH__u64 (hdr->dest_nid));
895
896         spin_lock_irqsave (&fmp->fmp_lock, flags);
897
898         list_add (&fmb->fmb_list, &fmp->fmp_idle_fmbs);
899
900         if (!list_empty (&fmp->fmp_blocked_conns)) {
901                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next,
902                                    ksock_conn_t, ksnc_rx_list);
903                 list_del (&conn->ksnc_rx_list);
904         }
905
906         spin_unlock_irqrestore (&fmp->fmp_lock, flags);
907
908         if (conn == NULL)
909                 return;
910
911         CDEBUG (D_NET, "Scheduling conn %p\n", conn);
912         LASSERT (conn->ksnc_rx_scheduled);
913         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
914
915         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
916
917         sched = conn->ksnc_scheduler;
918
919         spin_lock_irqsave (&sched->kss_lock, flags);
920
921         list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
922
923         if (waitqueue_active (&sched->kss_waitq))
924                 wake_up (&sched->kss_waitq);
925
926         spin_unlock_irqrestore (&sched->kss_lock, flags);
927 }
928
929 ksock_fmb_t *
930 ksocknal_get_idle_fmb (ksock_conn_t *conn)
931 {
932         int               payload_nob = conn->ksnc_rx_nob_left;
933         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
934         long              flags;
935         ksock_fmb_pool_t *pool;
936         ksock_fmb_t      *fmb;
937
938         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
939         LASSERT (ksocknal_data.ksnd_fmbs != NULL);
940
941         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
942                 pool = &ksocknal_data.ksnd_small_fmp;
943         else
944                 pool = &ksocknal_data.ksnd_large_fmp;
945
946         spin_lock_irqsave (&pool->fmp_lock, flags);
947
948         if (!list_empty (&pool->fmp_idle_fmbs)) {
949                 fmb = list_entry(pool->fmp_idle_fmbs.next,
950                                  ksock_fmb_t, fmb_list);
951                 list_del (&fmb->fmb_list);
952                 spin_unlock_irqrestore (&pool->fmp_lock, flags);
953
954                 return (fmb);
955         }
956
957         /* deschedule until fmb free */
958
959         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
960
961         list_add_tail (&conn->ksnc_rx_list,
962                        &pool->fmp_blocked_conns);
963
964         spin_unlock_irqrestore (&pool->fmp_lock, flags);
965         return (NULL);
966 }
967
968
969 int
970 ksocknal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
971 {
972         int payload_nob = conn->ksnc_rx_nob_left;
973         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
974         ptl_nid_t dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
975         int niov;                               /* at least the header */
976         int nob;
977
978         LASSERT (conn->ksnc_rx_scheduled);
979         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
980         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
981         LASSERT (payload_nob >= 0);
982         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
983         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
984
985         /* Got a forwarding buffer; copy the header we just read into the
986          * forwarding buffer.  If there's payload start reading reading it
987          * into the buffer, otherwise the forwarding buffer can be kicked
988          * off immediately.
989          *
990          * NB fmb->fmb_iov spans the WHOLE packet.
991          *    conn->ksnc_rx_iov spans just the payload.
992          */
993
994         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
995
996         /* copy header */
997         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t));
998
999         if (payload_nob == 0) {         /* got complete packet already */
1000                 atomic_inc (&ksocknal_packets_received);
1001
1002                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n",
1003                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1004                         dest_nid, packet_nob);
1005
1006                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
1007
1008                 kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1009                               packet_nob, 1, fmb->fmb_iov,
1010                               ksocknal_fmb_callback, fmb);
1011
1012                 /* forward it now */
1013                 kpr_fwd_start (&ksocknal_data.ksnd_router, &fmb->fmb_fwd);
1014
1015                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1016                 return (1);
1017         }
1018
1019         niov = 1;
1020         if (packet_nob <= PAGE_SIZE) {  /* whole packet fits in first page */
1021                 fmb->fmb_iov[0].iov_len = packet_nob;
1022         } else {
1023                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
1024                 nob = packet_nob - PAGE_SIZE;
1025
1026                 do {
1027                         LASSERT (niov < fmb->fmb_npages);
1028                         fmb->fmb_iov[niov].iov_base =
1029                                 page_address (fmb->fmb_pages[niov]);
1030                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
1031                         nob -= PAGE_SIZE;
1032                         niov++;
1033                 } while (nob > 0);
1034         }
1035
1036         kpr_fwd_init (&fmb->fmb_fwd, dest_nid,
1037                       packet_nob, niov, fmb->fmb_iov,
1038                       ksocknal_fmb_callback, fmb);
1039
1040         /* stash router's descriptor ready for call to kpr_fwd_start */
1041         conn->ksnc_cookie = &fmb->fmb_fwd;
1042
1043         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
1044
1045         /* payload is desc's iov-ed buffer, but skipping the hdr */
1046         LASSERT (niov <= sizeof (conn->ksnc_rx_iov_space) /
1047                  sizeof (struct iovec));
1048
1049         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1050         conn->ksnc_rx_iov[0].iov_base =
1051                 (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) +
1052                          sizeof (ptl_hdr_t));
1053         conn->ksnc_rx_iov[0].iov_len =
1054                 fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
1055
1056         if (niov > 1)
1057                 memcpy(&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1],
1058                        (niov - 1) * sizeof (struct iovec));
1059
1060         conn->ksnc_rx_niov = niov;
1061
1062         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
1063                 NTOH__u64 (conn->ksnc_hdr.src_nid), dest_nid, payload_nob);
1064         return (0);
1065 }
1066
1067 void
1068 ksocknal_fwd_parse (ksock_conn_t *conn)
1069 {
1070         ksock_conn_t *conn2;
1071         ptl_nid_t     dest_nid = NTOH__u64 (conn->ksnc_hdr.dest_nid);
1072         int           body_len = NTOH__u32 (PTL_HDR_LENGTH(&conn->ksnc_hdr));
1073
1074         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
1075                 NTOH__u64 (conn->ksnc_hdr.src_nid),
1076                 dest_nid, conn->ksnc_rx_nob_left);
1077
1078         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
1079         LASSERT (conn->ksnc_rx_scheduled);
1080
1081         if (body_len < 0) {                 /* length corrupt (overflow) */
1082                 CERROR("dropping packet from "LPX64" for "LPX64": packet "
1083                        "size %d illegal\n", NTOH__u64 (conn->ksnc_hdr.src_nid),
1084                        dest_nid, body_len);
1085                 ksocknal_new_packet (conn, 0);          /* on to new packet */
1086                 return;
1087         }
1088
1089         if (ksocknal_data.ksnd_fmbs == NULL) {        /* not forwarding */
1090                 CERROR("dropping packet from "LPX64" for "LPX64": not "
1091                        "forwarding\n", conn->ksnc_hdr.src_nid,
1092                        conn->ksnc_hdr.dest_nid);
1093                 /* on to new packet (skip this one's body) */
1094                 ksocknal_new_packet (conn, body_len);
1095                 return;
1096         }
1097
1098         if (body_len > SOCKNAL_MAX_FWD_PAYLOAD) {      /* too big to forward */
1099                 CERROR ("dropping packet from "LPX64" for "LPX64
1100                         ": packet size %d too big\n", conn->ksnc_hdr.src_nid,
1101                         conn->ksnc_hdr.dest_nid, body_len);
1102                 /* on to new packet (skip this one's body) */
1103                 ksocknal_new_packet (conn, body_len);
1104                 return;
1105         }
1106
1107         /* should have gone direct */
1108         conn2 = ksocknal_get_conn (conn->ksnc_hdr.dest_nid);
1109         if (conn2 != NULL) {
1110                 CERROR ("dropping packet from "LPX64" for "LPX64
1111                         ": target is a peer\n", conn->ksnc_hdr.src_nid,
1112                         conn->ksnc_hdr.dest_nid);
1113                 ksocknal_put_conn (conn2);  /* drop ref from get above */
1114
1115                 /* on to next packet (skip this one's body) */
1116                 ksocknal_new_packet (conn, body_len);
1117                 return;
1118         }
1119
1120         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
1121         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
1122         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
1123 }
1124
1125 int
1126 ksocknal_new_packet (ksock_conn_t *conn, int nob_to_skip)
1127 {
1128         static char ksocknal_slop_buffer[4096];
1129
1130         int   nob;
1131         int   niov;
1132         int   skipped;
1133
1134         if (nob_to_skip == 0) {         /* right at next packet boundary now */
1135                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
1136                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
1137                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
1138
1139                 conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1140                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
1141                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
1142                 conn->ksnc_rx_niov = 1;
1143
1144                 conn->ksnc_rx_kiov = NULL;
1145                 conn->ksnc_rx_nkiov = 0;
1146                 return (1);
1147         }
1148
1149         /* Set up to skip as much a possible now.  If there's more left
1150          * (ran out of iov entries) we'll get called again */
1151
1152         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
1153         conn->ksnc_rx_nob_left = nob_to_skip;
1154         conn->ksnc_rx_iov = (struct iovec *)&conn->ksnc_rx_iov_space;
1155         skipped = 0;
1156         niov = 0;
1157
1158         do {
1159                 nob = MIN (nob_to_skip, sizeof (ksocknal_slop_buffer));
1160
1161                 conn->ksnc_rx_iov[niov].iov_base = ksocknal_slop_buffer;
1162                 conn->ksnc_rx_iov[niov].iov_len  = nob;
1163                 niov++;
1164                 skipped += nob;
1165                 nob_to_skip -=nob;
1166
1167         } while (nob_to_skip != 0 &&    /* mustn't overflow conn's rx iov */
1168                  niov < sizeof(conn->ksnc_rx_iov_space) / sizeof (struct iovec));
1169
1170         conn->ksnc_rx_niov = niov;
1171         conn->ksnc_rx_kiov = NULL;
1172         conn->ksnc_rx_nkiov = 0;
1173         conn->ksnc_rx_nob_wanted = skipped;
1174         return (0);
1175 }
1176
1177 void
1178 ksocknal_process_receive (ksock_sched_t *sched, long *irq_flags)
1179 {
1180         ksock_conn_t *conn;
1181         ksock_fmb_t  *fmb;
1182         int           rc;
1183
1184         /* NB: sched->ksnc_lock lock held */
1185
1186         LASSERT (!list_empty (&sched->kss_rx_conns));
1187         conn = list_entry(sched->kss_rx_conns.next, ksock_conn_t, ksnc_rx_list);
1188         list_del (&conn->ksnc_rx_list);
1189
1190         spin_unlock_irqrestore (&sched->kss_lock, *irq_flags);
1191
1192         CDEBUG(D_NET, "sched %p conn %p\n", sched, conn);
1193         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
1194         LASSERT (conn->ksnc_rx_scheduled);
1195         LASSERT (conn->ksnc_rx_ready);
1196
1197         /* doesn't need a forwarding buffer */
1198         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)
1199                 goto try_read;
1200
1201  get_fmb:
1202         fmb = ksocknal_get_idle_fmb (conn);
1203         if (fmb == NULL) {      /* conn descheduled waiting for idle fmb */
1204                 spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1205                 return;
1206         }
1207
1208         if (ksocknal_init_fmb (conn, fmb)) /* packet forwarded ? */
1209                 goto out;               /* come back later for next packet */
1210
1211  try_read:
1212         /* NB: sched lock NOT held */
1213         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
1214                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
1215                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
1216                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
1217
1218         LASSERT (conn->ksnc_rx_nob_wanted > 0);
1219
1220         conn->ksnc_rx_ready = 0;/* data ready may race with me and set ready */
1221         mb();                   /* => clear BEFORE trying to read */
1222
1223         rc = ksocknal_recvmsg(conn);
1224
1225         if (rc == 0)
1226                 goto out;
1227         if (rc < 0) {
1228 #warning FIXME: handle socket errors properly
1229                 CERROR ("Error socknal read %p: %d\n", conn, rc);
1230                 goto out;
1231         }
1232
1233         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
1234                 goto out;                       /* try again later */
1235
1236         /* got all I wanted, assume there's more - prevent data_ready locking */
1237         conn->ksnc_rx_ready = 1;
1238
1239         switch (conn->ksnc_rx_state) {
1240         case SOCKNAL_RX_HEADER:
1241                 /* It's not for me */
1242                 if (conn->ksnc_hdr.type != PTL_MSG_HELLO &&
1243                     NTOH__u64(conn->ksnc_hdr.dest_nid) != ksocknal_lib.ni.nid) {
1244                         ksocknal_fwd_parse (conn);
1245                         switch (conn->ksnc_rx_state) {
1246                         case SOCKNAL_RX_HEADER: /* skipped (zero payload) */
1247                                 goto out;       /* => come back later */
1248                         case SOCKNAL_RX_SLOP:   /* skipping packet's body */
1249                                 goto try_read;  /* => go read it */
1250                         case SOCKNAL_RX_GET_FMB: /* forwarding */
1251                                 goto get_fmb;   /* => go get a fwd msg buffer */
1252                         default:
1253                                 LBUG ();
1254                         }
1255                         /* Not Reached */
1256                 }
1257
1258                 PROF_START(lib_parse);
1259                 /* sets wanted_len, iovs etc */
1260                 lib_parse(&ksocknal_lib, &conn->ksnc_hdr, conn);
1261                 PROF_FINISH(lib_parse);
1262
1263                 if (conn->ksnc_rx_nob_wanted != 0) { /* need to get payload? */
1264                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
1265                         goto try_read;          /* go read the payload */
1266                 }
1267                 /* Fall through (completed packet for me) */
1268
1269         case SOCKNAL_RX_BODY:
1270                 atomic_inc (&ksocknal_packets_received);
1271                 /* packet is done now */
1272                 lib_finalize(&ksocknal_lib, NULL, conn->ksnc_cookie);
1273                 /* Fall through */
1274
1275         case SOCKNAL_RX_SLOP:
1276                 /* starting new packet? */
1277                 if (ksocknal_new_packet (conn, conn->ksnc_rx_nob_left))
1278                         goto out;       /* come back later */
1279                 goto try_read;          /* try to finish reading slop now */
1280
1281         case SOCKNAL_RX_BODY_FWD:
1282                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n",
1283                         conn, NTOH__u64 (conn->ksnc_hdr.src_nid),
1284                         NTOH__u64 (conn->ksnc_hdr.dest_nid),
1285                         conn->ksnc_rx_nob_left);
1286
1287                 atomic_inc (&ksocknal_packets_received);
1288
1289                 /* ksocknal_init_fmb() put router desc. in conn->ksnc_cookie */
1290                 kpr_fwd_start (&ksocknal_data.ksnd_router,
1291                                (kpr_fwd_desc_t *)conn->ksnc_cookie);
1292
1293                 /* no slop in forwarded packets */
1294                 LASSERT (conn->ksnc_rx_nob_left == 0);
1295
1296                 ksocknal_new_packet (conn, 0);  /* on to next packet */
1297                 goto out;                       /* (later) */
1298
1299         default:
1300         }
1301
1302         /* Not Reached */
1303         LBUG ();
1304
1305  out:
1306         spin_lock_irqsave (&sched->kss_lock, *irq_flags);
1307
1308         /* no data there to read? */
1309         if (!conn->ksnc_rx_ready) {
1310                 /* let socket callback schedule again */
1311                 conn->ksnc_rx_scheduled = 0;
1312                 ksocknal_put_conn (conn);       /* release scheduler's ref */
1313         } else                              /* let scheduler call me again */
1314                 list_add_tail (&conn->ksnc_rx_list, &sched->kss_rx_conns);
1315 }
1316
1317 int
1318 ksocknal_recv (nal_cb_t *nal, void *private, lib_msg_t *msg,
1319                unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
1320 {
1321         ksock_conn_t *conn = (ksock_conn_t *)private;
1322
1323         LASSERT (mlen <= rlen);
1324         LASSERT (niov <= PTL_MD_MAX_IOV);
1325         
1326         conn->ksnc_cookie = msg;
1327         conn->ksnc_rx_nob_wanted = mlen;
1328         conn->ksnc_rx_nob_left   = rlen;
1329
1330         conn->ksnc_rx_nkiov = 0;
1331         conn->ksnc_rx_kiov = NULL;
1332         conn->ksnc_rx_niov = niov;
1333         conn->ksnc_rx_iov = conn->ksnc_rx_iov_space.iov;
1334         memcpy (conn->ksnc_rx_iov, iov, niov * sizeof (*iov));
1335
1336         LASSERT (mlen == 
1337                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1338                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1339
1340         return (rlen);
1341 }
1342
1343 int
1344 ksocknal_recv_pages (nal_cb_t *nal, void *private, lib_msg_t *msg,
1345                      unsigned int niov, ptl_kiov_t *kiov, size_t mlen, size_t rlen)
1346 {
1347         ksock_conn_t *conn = (ksock_conn_t *)private;
1348
1349         LASSERT (mlen <= rlen);
1350         LASSERT (niov <= PTL_MD_MAX_IOV);
1351         
1352         conn->ksnc_cookie = msg;
1353         conn->ksnc_rx_nob_wanted = mlen;
1354         conn->ksnc_rx_nob_left   = rlen;
1355
1356         conn->ksnc_rx_niov = 0;
1357         conn->ksnc_rx_iov  = NULL;
1358         conn->ksnc_rx_nkiov = niov;
1359         conn->ksnc_rx_kiov = conn->ksnc_rx_iov_space.kiov;
1360         memcpy (conn->ksnc_rx_kiov, kiov, niov * sizeof (*kiov));
1361
1362         LASSERT (mlen == 
1363                  lib_iov_nob (conn->ksnc_rx_niov, conn->ksnc_rx_iov) +
1364                  lib_kiov_nob (conn->ksnc_rx_nkiov, conn->ksnc_rx_kiov));
1365
1366         return (rlen);
1367 }
1368
1369 int ksocknal_scheduler (void *arg)
1370 {
1371         ksock_sched_t     *sched = (ksock_sched_t *)arg;
1372         unsigned long      flags;
1373         int                rc;
1374         int                nloops = 0;
1375         int                id = sched - ksocknal_data.ksnd_schedulers;
1376         char               name[16];
1377 #if (CONFIG_SMP && CPU_AFFINITY)
1378 #if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0))
1379         int                cpu = cpu_logical_map(id % num_online_cpus());
1380 #else
1381 #warning "Take care of architecure specific logical APIC map"
1382         int cpu = 1;    /* Have to change later. */
1383 #endif /* LINUX_VERSION_CODE */
1384         
1385         set_cpus_allowed (current, 1 << cpu);
1386         id = cpu;
1387 #endif /* CONFIG_SMP && CPU_AFFINITY */
1388
1389         snprintf (name, sizeof (name),"ksocknald[%d]", id);
1390         kportal_daemonize (name);
1391         kportal_blockallsigs ();
1392         
1393         spin_lock_irqsave (&sched->kss_lock, flags);
1394
1395         while (!ksocknal_data.ksnd_shuttingdown) {
1396                 int did_something = 0;
1397
1398                 /* Ensure I progress everything semi-fairly */
1399
1400                 if (!list_empty (&sched->kss_rx_conns)) {
1401                         did_something = 1;
1402                         /* drops & regains kss_lock */
1403                         ksocknal_process_receive (sched, &flags);
1404                 }
1405
1406                 if (!list_empty (&sched->kss_tx_conns)) {
1407                         did_something = 1;
1408                         /* drops and regains kss_lock */
1409                         ksocknal_process_transmit (sched, &flags);
1410                 }
1411 #if SOCKNAL_ZC
1412                 if (!list_empty (&sched->kss_zctxdone_list)) {
1413                         ksock_tx_t *tx =
1414                                 list_entry(sched->kss_zctxdone_list.next,
1415                                            ksock_tx_t, tx_list);
1416                         did_something = 1;
1417
1418                         list_del (&tx->tx_list);
1419                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1420
1421                         ksocknal_tx_done (tx);
1422
1423                         spin_lock_irqsave (&sched->kss_lock, flags);
1424                 }
1425 #endif
1426                 if (!did_something ||           /* nothing to do */
1427                     ++nloops == SOCKNAL_RESCHED) { /* hogging CPU? */
1428                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1429
1430                         nloops = 0;
1431
1432                         if (!did_something) {   /* wait for something to do */
1433 #if SOCKNAL_ZC
1434                                 rc = wait_event_interruptible (sched->kss_waitq,
1435                                                                ksocknal_data.ksnd_shuttingdown ||
1436                                                                !list_empty(&sched->kss_rx_conns) ||
1437                                                                !list_empty(&sched->kss_tx_conns) ||
1438                                                                !list_empty(&sched->kss_zctxdone_list));
1439 #else
1440                                 rc = wait_event_interruptible (sched->kss_waitq,
1441                                                                ksocknal_data.ksnd_shuttingdown ||
1442                                                                !list_empty(&sched->kss_rx_conns) ||
1443                                                                !list_empty(&sched->kss_tx_conns));
1444 #endif
1445                                 LASSERT (rc == 0);
1446                         } else
1447                                our_cond_resched();
1448
1449                         spin_lock_irqsave (&sched->kss_lock, flags);
1450                 }
1451         }
1452
1453         spin_unlock_irqrestore (&sched->kss_lock, flags);
1454         ksocknal_thread_fini ();
1455         return (0);
1456 }
1457
1458 void
1459 ksocknal_data_ready (struct sock *sk, int n)
1460 {
1461         unsigned long  flags;
1462         ksock_conn_t  *conn;
1463         ksock_sched_t *sched;
1464         ENTRY;
1465
1466         /* interleave correctly with closing sockets... */
1467         read_lock (&ksocknal_data.ksnd_socklist_lock);
1468
1469         conn = sk->user_data;
1470         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1471                 LASSERT (sk->data_ready != &ksocknal_data_ready);
1472                 sk->data_ready (sk, n);
1473         } else if (!conn->ksnc_rx_ready) {        /* new news */
1474                 /* Set ASAP in case of concurrent calls to me */
1475                 conn->ksnc_rx_ready = 1;
1476
1477                 sched = conn->ksnc_scheduler;
1478
1479                 spin_lock_irqsave (&sched->kss_lock, flags);
1480
1481                 /* Set again (process_receive may have cleared while I blocked for the lock) */
1482                 conn->ksnc_rx_ready = 1;
1483
1484                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1485                         list_add_tail(&conn->ksnc_rx_list,
1486                                       &sched->kss_rx_conns);
1487                         conn->ksnc_rx_scheduled = 1;
1488                         /* extra ref for scheduler */
1489                         atomic_inc (&conn->ksnc_refcount);
1490
1491                         if (waitqueue_active (&sched->kss_waitq))
1492                                 wake_up (&sched->kss_waitq);
1493                 }
1494
1495                 spin_unlock_irqrestore (&sched->kss_lock, flags);
1496         }
1497
1498         read_unlock (&ksocknal_data.ksnd_socklist_lock);
1499
1500         EXIT;
1501 }
1502
1503 void
1504 ksocknal_write_space (struct sock *sk)
1505 {
1506         unsigned long  flags;
1507         ksock_conn_t  *conn;
1508         ksock_sched_t *sched;
1509
1510         /* interleave correctly with closing sockets... */
1511         read_lock (&ksocknal_data.ksnd_socklist_lock);
1512
1513         conn = sk->user_data;
1514
1515         CDEBUG(D_NET, "sk %p wspace %d low water %d conn %p%s%s%s\n",
1516                sk, tcp_wspace(sk), SOCKNAL_TX_LOW_WATER(sk), conn,
1517                (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ?
1518                                       " ready" : " blocked"),
1519                (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ?
1520                                       " scheduled" : " idle"),
1521                (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ?
1522                                       " empty" : " queued"));
1523
1524         if (conn == NULL) {             /* raced with ksocknal_close_sock */
1525                 LASSERT (sk->write_space != &ksocknal_write_space);
1526                 sk->write_space (sk);
1527         } else if (tcp_wspace(sk) >= SOCKNAL_TX_LOW_WATER(sk)) { /* got enough space */
1528                 clear_bit (SOCK_NOSPACE, &sk->socket->flags);
1529
1530                 if (!conn->ksnc_tx_ready) {      /* new news */
1531                         /* Set ASAP in case of concurrent calls to me */
1532                         conn->ksnc_tx_ready = 1;
1533
1534                         sched = conn->ksnc_scheduler;
1535
1536                         spin_lock_irqsave (&sched->kss_lock, flags);
1537
1538                         /* Set again (process_transmit may have
1539                            cleared while I blocked for the lock) */
1540                         conn->ksnc_tx_ready = 1;
1541
1542                         if (!conn->ksnc_tx_scheduled && // not being progressed
1543                             !list_empty(&conn->ksnc_tx_queue)){//packets to send
1544                                 list_add_tail (&conn->ksnc_tx_list,
1545                                                &sched->kss_tx_conns);
1546                                 conn->ksnc_tx_scheduled = 1;
1547                                 /* extra ref for scheduler */
1548                                 atomic_inc (&conn->ksnc_refcount);
1549
1550                                 if (waitqueue_active (&sched->kss_waitq))
1551                                         wake_up (&sched->kss_waitq);
1552                         }
1553
1554                         spin_unlock_irqrestore (&sched->kss_lock, flags);
1555                 }
1556         }
1557
1558         read_unlock (&ksocknal_data.ksnd_socklist_lock);
1559 }
1560
1561 int
1562 ksocknal_reaper (void *arg)
1563 {
1564         unsigned long      flags;
1565         ksock_conn_t      *conn;
1566         int                rc;
1567         
1568         kportal_daemonize ("ksocknal_reaper");
1569         kportal_blockallsigs ();
1570
1571         while (!ksocknal_data.ksnd_shuttingdown) {
1572                 spin_lock_irqsave (&ksocknal_data.ksnd_reaper_lock, flags);
1573
1574                 if (list_empty (&ksocknal_data.ksnd_reaper_list)) {
1575                         conn = NULL;
1576                 } else {
1577                         conn = list_entry (ksocknal_data.ksnd_reaper_list.next,
1578                                            ksock_conn_t, ksnc_list);
1579                         list_del (&conn->ksnc_list);
1580                 }
1581
1582                 spin_unlock_irqrestore (&ksocknal_data.ksnd_reaper_lock, flags);
1583
1584                 if (conn != NULL)
1585                         ksocknal_close_conn (conn);
1586                 else {
1587                         rc = wait_event_interruptible (ksocknal_data.ksnd_reaper_waitq,
1588                                                        ksocknal_data.ksnd_shuttingdown ||
1589                                                        !list_empty(&ksocknal_data.ksnd_reaper_list));
1590                         LASSERT (rc == 0);
1591                 }
1592         }
1593
1594         ksocknal_thread_fini ();
1595         return (0);
1596 }
1597
1598 nal_cb_t ksocknal_lib = {
1599         nal_data:       &ksocknal_data,                /* NAL private data */
1600         cb_send:         ksocknal_send,
1601         cb_send_pages:   ksocknal_send_pages,
1602         cb_recv:         ksocknal_recv,
1603         cb_recv_pages:   ksocknal_recv_pages,
1604         cb_read:         ksocknal_read,
1605         cb_write:        ksocknal_write,
1606         cb_callback:     ksocknal_callback,
1607         cb_malloc:       ksocknal_malloc,
1608         cb_free:         ksocknal_free,
1609         cb_printf:       ksocknal_printf,
1610         cb_cli:          ksocknal_cli,
1611         cb_sti:          ksocknal_sti,
1612         cb_dist:         ksocknal_dist
1613 };