Whamcloud - gitweb
land v0.9.1 on HEAD, in preparation for a 1.0.x branch
[fs/lustre-release.git] / lustre / portals / knals / toenal / toenal_cb.c
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  *
4  * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5  *   Author: Zach Brown <zab@zabbo.net>
6  *   Author: Peter J. Braam <braam@clusterfs.com>
7  *   Author: Phil Schwan <phil@clusterfs.com>
8  *   Author: Eric Barton <eric@bartonsoftware.com>
9  *   Author: Kedar Sovani <kedar@calsoftinc.com>
10  *   Author: Amey Inamdar <amey@calsoftinc.com>
11  *   
12  *   This file is part of Portals, http://www.sf.net/projects/lustre/
13  *
14  *   Portals is free software; you can redistribute it and/or
15  *   modify it under the terms of version 2 of the GNU General Public
16  *   License as published by the Free Software Foundation.
17  *
18  *   Portals is distributed in the hope that it will be useful,
19  *   but WITHOUT ANY WARRANTY; without even the implied warranty of
20  *   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
21  *   GNU General Public License for more details.
22  *
23  *   You should have received a copy of the GNU General Public License
24  *   along with Portals; if not, write to the Free Software
25  *   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26  *
27  */
28
29 #include <linux/poll.h>
30 #include "toenal.h"
31
32 atomic_t   ktoenal_packets_received;
33 long       ktoenal_packets_launched;
34 long       ktoenal_packets_transmitted;
35
36 /*
37  *  LIB functions follow
38  *
39  */
40 int
41 ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr,
42               user_ptr src_addr, size_t len)
43 {
44         CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
45                nal->ni.nid, (long)len, src_addr, dst_addr);
46
47         memcpy( dst_addr, src_addr, len );
48         return 0;
49 }
50
51 int
52 ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
53                void *src_addr, size_t len)
54 {
55         CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
56                nal->ni.nid, (long)len, src_addr, dst_addr);
57
58         memcpy( dst_addr, src_addr, len );
59         return 0;
60 }
61
62 int 
63 ktoenal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
64                          ptl_event_t *ev)
65 {
66         CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
67                nal->ni.nid, eq, ev);
68
69         if (eq->event_callback != NULL) 
70                 eq->event_callback(ev);
71
72         return 0;
73 }
74
75 void *
76 ktoenal_malloc(nal_cb_t *nal, size_t len)
77 {
78         void *buf;
79
80         PORTAL_ALLOC(buf, len);
81
82         if (buf != NULL)
83                 memset(buf, 0, len);
84
85         return (buf);
86 }
87
88 void
89 ktoenal_free(nal_cb_t *nal, void *buf, size_t len)
90 {
91         PORTAL_FREE(buf, len);
92 }
93
94 void
95 ktoenal_printf(nal_cb_t *nal, const char *fmt, ...)
96 {
97         va_list ap;
98         char msg[256];
99
100         va_start (ap, fmt);
101         vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
102         va_end (ap);
103
104         msg[sizeof (msg) - 1] = 0;              /* ensure terminated */
105
106         CDEBUG (D_NET, "%s", msg);
107 }
108
109 void
110 ktoenal_cli(nal_cb_t *nal, unsigned long *flags)
111 {
112         ksock_nal_data_t *data = nal->nal_data;
113
114         spin_lock(&data->ksnd_nal_cb_lock);
115 }
116
117 void
118 ktoenal_sti(nal_cb_t *nal, unsigned long *flags)
119 {
120         ksock_nal_data_t *data;
121         data = nal->nal_data;
122
123         spin_unlock(&data->ksnd_nal_cb_lock);
124 }
125
126 int
127 ktoenal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
128 {
129         /* I would guess that if ktoenal_get_conn(nid) == NULL,
130            and we're not routing, then 'nid' is very distant :) */
131         if ( nal->ni.nid == nid ) {
132                 *dist = 0;
133         } else {
134                 *dist = 1;
135         }
136
137         return 0;
138 }
139
140 ksock_ltx_t *
141 ktoenal_get_ltx (int may_block)
142 {
143         unsigned long   flags;
144         ksock_ltx_t *ltx = NULL;
145         
146         for (;;)
147         {
148                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
149         
150                 if (!list_empty (&ktoenal_data.ksnd_idle_ltx_list))
151                 {
152                         ltx = list_entry (ktoenal_data.ksnd_idle_ltx_list.next, ksock_ltx_t, ltx_tx.tx_list);
153                         list_del (&ltx->ltx_tx.tx_list);
154                         break;
155                 }
156
157                 if (!may_block)
158                 {
159                         if (!list_empty (&ktoenal_data.ksnd_idle_nblk_ltx_list))
160                         {
161                                 ltx = list_entry (ktoenal_data.ksnd_idle_nblk_ltx_list.next, 
162                                                   ksock_ltx_t, ltx_tx.tx_list);
163                                 list_del (&ltx->ltx_tx.tx_list);
164                         }
165                         break;
166                 }
167                 
168                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
169                 
170                 wait_event (ktoenal_data.ksnd_idle_ltx_waitq,
171                             !list_empty (&ktoenal_data.ksnd_idle_ltx_list));
172         }
173
174         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
175
176         return (ltx);
177 }
178
179 int
180 ktoenal_sendmsg (struct file *sock, struct iovec *iov, int niov, int nob, int flags)
181 {
182         /* NB This procedure "consumes" iov (actually we do, tcp_sendmsg doesn't)
183          */
184         mm_segment_t oldmm;
185         int           rc;
186
187         LASSERT (niov > 0);
188         LASSERT (nob > 0);
189         
190         oldmm = get_fs();
191         set_fs (KERNEL_DS);
192
193 #ifdef PORTAL_DEBUG
194         {
195                 int total_nob;
196                 int i;
197                 
198                 for (i = total_nob = 0; i < niov; i++)
199                         total_nob += iov[i].iov_len;
200                 
201                 LASSERT (nob == total_nob);
202         }
203 #endif        
204         LASSERT (!in_interrupt());
205        
206         rc = sock->f_op->writev(sock, iov, niov, NULL);
207
208         set_fs (oldmm);
209
210         if (rc > 0)                             /* sent something? */
211         {
212                 nob = rc;                       /* consume iov */
213                 for (;;)
214                 {
215                         LASSERT (niov > 0);
216                         
217                         if (iov->iov_len >= nob)
218                         {
219                                 iov->iov_len -= nob;
220                                 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
221                                 break;
222                         }
223                         nob -= iov->iov_len;
224                         iov->iov_len = 0;
225                         iov++;
226                         niov--;
227                 }
228         }
229
230         return (rc);
231 }
232
233 int
234 ktoenal_recvmsg(struct file *sock, struct iovec *iov, int niov, int toread)
235 {
236         /* NB This procedure "consumes" iov (actually tcp_recvmsg does)
237          */
238         mm_segment_t oldmm;
239         int ret, i, len = 0, origlen = 0;
240         
241         PROF_START(our_recvmsg);
242         for(i = 0; i < niov; i++) {
243                 len += iov[i].iov_len;
244                 if(len >= toread)
245                         break;
246         }
247
248         if(len >= toread) {
249                 origlen = iov[i].iov_len;
250                 iov[i].iov_len -= (len - toread);
251         }
252         else {  /* i == niov */
253                 i = niov - 1;
254         }
255
256         oldmm = get_fs();
257         set_fs(KERNEL_DS);
258
259         ret = sock->f_op->readv(sock, iov, i + 1, NULL);
260         
261         set_fs(oldmm);
262
263         if(origlen)
264                 iov[i].iov_len = origlen;
265
266         PROF_FINISH(our_recvmsg);
267         return ret;
268 }
269
270 void
271 ktoenal_process_transmit (ksock_conn_t *conn, unsigned long *irq_flags)
272 {
273         ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
274         int         rc;
275         
276         LASSERT (conn->ksnc_tx_scheduled);
277         LASSERT (conn->ksnc_tx_ready);
278         LASSERT (!list_empty (&conn->ksnc_tx_queue));
279
280         /* assume transmit will complete now, so dequeue while I've got the lock */
281         list_del (&tx->tx_list);
282
283         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
284
285         LASSERT (tx->tx_nob > 0);
286
287         conn->ksnc_tx_ready = 0;                /* write_space may race with me and set ready */
288         mb();                                   /* => clear BEFORE trying to write */
289
290         rc = ktoenal_sendmsg (conn->ksnc_file,
291                                tx->tx_iov, tx->tx_niov, tx->tx_nob,
292                                list_empty (&conn->ksnc_tx_queue) ? 
293                                MSG_DONTWAIT : (MSG_DONTWAIT | MSG_MORE));
294
295         CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
296
297         if (rc < 0)                             /* error */
298         {
299                 if (rc == -EAGAIN)              /* socket full => */
300                         rc = 0;                 /* nothing sent */
301                 else
302                 {
303                         //warning FIXME: handle socket errors properly
304                         CERROR ("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
305                         rc = tx->tx_nob;        /* kid on for now whole packet went */
306                 }
307         }
308
309         if (rc == tx->tx_nob)                   /* everything went */
310         {
311                 conn->ksnc_tx_ready = 1;        /* assume more can go (ASAP) */
312                 ktoenal_put_conn (conn);       /* release packet's ref */
313
314                 if (tx->tx_isfwd)               /* was a forwarded packet? */
315                 {
316                         kpr_fwd_done (&ktoenal_data.ksnd_router,
317                                       KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
318
319                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
320                 }
321                 else                            /* local send */
322                 {
323                         ksock_ltx_t *ltx = KSOCK_TX_2_KSOCK_LTX (tx);
324
325                         lib_finalize (&ktoenal_lib, ltx->ltx_private, ltx->ltx_cookie);
326
327                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
328                         
329                         list_add (&ltx->ltx_tx.tx_list, ltx->ltx_idle);
330
331                         /* normal tx desc => wakeup anyone blocking for one */
332                         if (ltx->ltx_idle == &ktoenal_data.ksnd_idle_ltx_list &&
333                             waitqueue_active (&ktoenal_data.ksnd_idle_ltx_waitq))
334                                 wake_up (&ktoenal_data.ksnd_idle_ltx_waitq);
335                 }
336                 ktoenal_packets_transmitted++;
337         }
338         else
339         {
340                 tx->tx_nob -= rc;
341
342                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
343
344                 /* back onto HEAD of tx_queue */
345                 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
346         }
347
348         if (!conn->ksnc_tx_ready ||             /* no space to write now */
349             list_empty (&conn->ksnc_tx_queue))  /* nothing to write */
350         {
351                 conn->ksnc_tx_scheduled = 0;    /* not being scheduled */
352                 ktoenal_put_conn (conn);       /* release scheduler's ref */
353         }
354         else                                    /* let scheduler call me again */
355                 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
356 }
357
358 void
359 ktoenal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
360 {
361         unsigned long flags;
362         int           nob = tx->tx_nob;
363         struct iovec *iov = tx->tx_iov;
364         int           niov = 1;
365         
366         LASSERT (nob >= sizeof (ptl_hdr_t));
367
368         /* Truncate iov to exactly match total packet length
369          * since socket sendmsg pays no attention to requested length.
370          */
371         for (;;)
372         {
373                 LASSERT (niov <= tx->tx_niov);
374                 
375                 if (iov->iov_len >= nob)
376                 {
377                         iov->iov_len = nob;
378                         break;
379                 }
380                 nob -= iov->iov_len;
381                 iov++;
382                 niov++;
383         }
384         tx->tx_niov = niov;
385         
386         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
387         list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
388
389         if (conn->ksnc_tx_ready &&              /* able to send */
390             !conn->ksnc_tx_scheduled)           /* not scheduled to send */
391         {
392                 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
393                 conn->ksnc_tx_scheduled = 1;
394                 atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
395                 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
396                         wake_up (&ktoenal_data.ksnd_sched_waitq);
397         }
398
399         ktoenal_packets_launched++;
400         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
401 }
402
403 int
404 ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie,
405               ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
406               unsigned int payload_niov, struct iovec *payload_iov, size_t payload_len)
407 {
408         ptl_nid_t     gatewaynid;
409         ksock_conn_t *conn;
410         ksock_ltx_t  *ltx;
411         int           rc;
412         int           i;
413
414         /* By this point, as it happens, we have absolutely no idea what
415          * 'private' is.  It might be ksock_nal_data or it might be ksock_conn.
416          * Ha ha, isn't that a funny joke?
417          *
418          * FIXME: this is not the right way to fix this; the right way is to
419          * always pass in the same kind of structure.  This is hard right now.
420          * To revisit this issue, set a breakpoint in here and watch for when
421          * it's called from lib_finalize.  I think this occurs when we send a
422          * packet as a side-effect of another packet, such as when an ACK has
423          * been requested. -phil */
424
425         CDEBUG(D_NET, "sending %d bytes from [%d](%p,%d)... to nid: "
426                LPX64" pid %d\n", (int)payload_len, payload_niov,
427                payload_niov > 0 ? payload_iov[0].iov_base : NULL,
428                (int)(payload_niov > 0 ? payload_iov[0].iov_len : 0), nid, pid);
429
430         if ((conn = ktoenal_get_conn (nid)) == NULL)
431         {
432                 /* It's not a peer; try to find a gateway */
433                 rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, payload_niov,
434                                  &gatewaynid);
435                 if (rc != 0)
436                 {
437                         CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
438                         return (-1);
439                 }
440
441                 if ((conn = ktoenal_get_conn (gatewaynid)) == NULL)
442                 {
443                         CERROR ("Can't route to "LPX64": gateway "LPX64" is not a peer\n", 
444                                 nid, gatewaynid);
445                         return (-1);
446                 }
447         }
448
449         /* This transmit has now got a ref on conn */
450
451         /* I may not block for a transmit descriptor if I might block the
452          * receiver, or an interrupt handler. */
453         ltx = ktoenal_get_ltx (!(type == PTL_MSG_ACK ||
454                                  type == PTL_MSG_REPLY ||
455                                  in_interrupt ()));
456         if (ltx == NULL)
457         {
458                 CERROR ("Can't allocate tx desc\n");
459                 ktoenal_put_conn (conn);
460                 return (-1);
461         }
462         
463         /* Init common (to sends and forwards) packet part */
464         ltx->ltx_tx.tx_isfwd = 0;
465         ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
466         ltx->ltx_tx.tx_niov = 1 + payload_niov;
467         ltx->ltx_tx.tx_iov = ltx->ltx_iov;
468
469         /* Init local send packet (storage for hdr, finalize() args, iov) */
470         ltx->ltx_hdr = *hdr;
471         ltx->ltx_private = private;
472         ltx->ltx_cookie = cookie;
473
474         ltx->ltx_iov[0].iov_base = &ltx->ltx_hdr;
475         ltx->ltx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
476
477         LASSERT (payload_niov <= PTL_MD_MAX_IOV);
478
479         for (i = 0; i < payload_niov; i++)
480         {
481                 ltx->ltx_iov[1 + i].iov_base = payload_iov[i].iov_base;
482                 ltx->ltx_iov[1 + i].iov_len  = payload_iov[i].iov_len;
483         }
484
485         ktoenal_launch_packet (conn, &ltx->ltx_tx);
486         return (0);
487 }
488
489 void
490 ktoenal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
491 {
492         ksock_conn_t *conn;
493         ptl_nid_t     nid = fwd->kprfd_gateway_nid;
494         ksock_tx_t   *tx  = (ksock_tx_t *)&fwd->kprfd_scratch;
495
496         CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd, 
497                 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
498
499         if (nid == ktoenal_lib.ni.nid)         /* I'm the gateway; must be the last hop */
500                 nid = fwd->kprfd_target_nid;
501         
502         conn = ktoenal_get_conn (nid);
503         if (conn == NULL)
504         {
505                 CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
506                 kpr_fwd_done (&ktoenal_data.ksnd_router, fwd, -EHOSTUNREACH);
507                 return;
508         }
509
510         /* This forward has now got a ref on conn */
511
512         tx->tx_isfwd = 1;                       /* This is a forwarding packet */
513         tx->tx_nob   = fwd->kprfd_nob;
514         tx->tx_niov  = fwd->kprfd_niov;
515         tx->tx_iov   = fwd->kprfd_iov;
516
517         ktoenal_launch_packet (conn, tx);
518 }
519
520 int
521 ktoenal_thread_start (int (*fn)(void *arg), void *arg)
522 {
523         long    pid = kernel_thread (fn, arg, 0);
524
525         if (pid < 0)
526                 return ((int)pid);
527
528         atomic_inc (&ktoenal_data.ksnd_nthreads);
529         return (0);
530 }
531
532 void
533 ktoenal_thread_fini (void)
534 {
535         atomic_dec (&ktoenal_data.ksnd_nthreads);
536 }
537
538 void
539 ktoenal_fmb_callback (void *arg, int error)
540 {
541         ksock_fmb_t       *fmb = (ksock_fmb_t *)arg;
542         ptl_hdr_t         *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
543         ksock_conn_t      *conn;
544         unsigned long     flags;
545
546         CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": %d\n", 
547                 hdr->src_nid, hdr->dest_nid, error);
548
549         if (error != 0)
550                 CERROR ("Failed to route packet from "LPX64" to "LPX64": %d\n", 
551                         hdr->src_nid, hdr->dest_nid, error);
552
553         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
554         
555         list_add (&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
556
557         if (!list_empty (&fmb->fmb_pool->fmp_blocked_conns))
558         {
559                 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next, ksock_conn_t, ksnc_rx_list);
560                 list_del (&conn->ksnc_rx_list);
561
562                 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
563                 LASSERT (conn->ksnc_rx_scheduled);
564                 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
565
566                 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
567                 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
568
569                 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
570                         wake_up (&ktoenal_data.ksnd_sched_waitq);
571         }
572
573         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
574 }
575
576 ksock_fmb_t *
577 ktoenal_get_idle_fmb (ksock_conn_t *conn)
578 {
579         /* NB called with sched lock held */
580         int               payload_nob = conn->ksnc_rx_nob_left;
581         int               packet_nob = sizeof (ptl_hdr_t) + payload_nob;
582         ksock_fmb_pool_t *pool;
583         ksock_fmb_t      *fmb;
584         
585         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
586
587         if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
588                 pool = &ktoenal_data.ksnd_small_fmp;
589         else
590                 pool = &ktoenal_data.ksnd_large_fmp;
591         
592         if (!list_empty (&pool->fmp_idle_fmbs))
593         {
594                 fmb = list_entry (pool->fmp_idle_fmbs.next, ksock_fmb_t, fmb_list);
595                 list_del (&fmb->fmb_list);
596                 return (fmb);
597         }
598
599         /* deschedule until fmb free */
600
601         conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
602
603         list_add_tail (&conn->ksnc_rx_list,
604                        &pool->fmp_blocked_conns);
605         return (NULL);
606 }
607
608
609 int
610 ktoenal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
611 {
612         int payload_nob = conn->ksnc_rx_nob_left;
613         int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
614         int niov;                               /* at least the header */
615         int nob;
616         
617         LASSERT (conn->ksnc_rx_scheduled);
618         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
619         LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
620         LASSERT (payload_nob >= 0);
621         LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
622         LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
623         
624         /* Got a forwarding buffer; copy the header we just read into the
625          * forwarding buffer.  If there's payload start reading reading it
626          * into the buffer, otherwise the forwarding buffer can be kicked
627          * off immediately.
628          *
629          * NB fmb->fmb_iov spans the WHOLE packet.
630          *    conn->ksnc_rx_iov spans just the payload.
631          */
632
633         fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
634                 
635         memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t)); /* copy header */
636
637         if (payload_nob == 0)                   /* got complete packet already */
638         {
639                 atomic_inc (&ktoenal_packets_received);
640
641                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n", conn,
642                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, packet_nob);
643
644                 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
645
646                 kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid, 
647                               packet_nob, 1, fmb->fmb_iov, 
648                               ktoenal_fmb_callback, fmb);
649
650                 kpr_fwd_start (&ktoenal_data.ksnd_router, &fmb->fmb_fwd); /* forward it now */
651
652                 ktoenal_new_packet (conn, 0);  /* on to next packet */
653                 return (1);
654         }
655
656         niov = 1;
657         if (packet_nob <= PAGE_SIZE)            /* whole packet fits in first page */
658                 fmb->fmb_iov[0].iov_len = packet_nob;
659         else
660         {
661                 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
662                 nob = packet_nob - PAGE_SIZE;
663                 
664                 do
665                 {
666                         LASSERT (niov < fmb->fmb_npages);
667                         fmb->fmb_iov[niov].iov_base = page_address (fmb->fmb_pages[niov]);
668                         fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
669                         nob -= PAGE_SIZE;
670                         niov++;
671                 } while (nob > 0);
672         }
673
674         kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid, 
675                       packet_nob, niov, fmb->fmb_iov, 
676                       ktoenal_fmb_callback, fmb);
677
678         /* stash router's descriptor ready for call to kpr_fwd_start */        
679         conn->ksnc_cookie = &fmb->fmb_fwd;
680
681         conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
682
683         /* payload is desc's iov-ed buffer, but skipping the hdr */
684         LASSERT (niov <= sizeof (conn->ksnc_rx_iov) / sizeof (conn->ksnc_rx_iov[0]));
685
686         conn->ksnc_rx_iov[0].iov_base = (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) + sizeof (ptl_hdr_t));
687         conn->ksnc_rx_iov[0].iov_len = fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
688
689         if (niov > 1)
690                 memcpy (&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1], (niov - 1) * sizeof (struct iovec));
691
692         conn->ksnc_rx_niov = niov;
693
694         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
695                 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, payload_nob);
696         return (0);
697 }
698
699 void
700 ktoenal_fwd_parse (ksock_conn_t *conn)
701 {
702         ksock_conn_t *conn2;
703         int           body_len;
704
705         CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
706                 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
707
708         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
709         LASSERT (conn->ksnc_rx_scheduled);
710
711         body_len = conn->ksnc_hdr.payload_length;
712
713         if (body_len < 0)                               /* length corrupt */
714         {
715                 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d illegal\n",
716                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
717                 ktoenal_new_packet (conn, 0);          /* on to new packet */
718                 return;
719         }
720
721         if (body_len > PTL_MTU)         /* too big to forward */
722         {
723                 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d too big\n",
724                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
725                 ktoenal_new_packet (conn, body_len);    /* on to new packet (skip this one's body) */
726                 return;
727         }
728
729         conn2 = ktoenal_get_conn (conn->ksnc_hdr.dest_nid); /* should have gone direct */
730         if (conn2 != NULL)
731         {
732                 CERROR ("dropping packet from "LPX64" for "LPX64": target is a peer\n",
733                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid);
734                 ktoenal_put_conn (conn2);          /* drop ref from get above */
735
736                 ktoenal_new_packet (conn, body_len);  /* on to next packet (skip this one's body) */
737                 return;
738         }
739
740         conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;       /* Getting FMB now */
741         conn->ksnc_rx_nob_left = body_len;              /* stash packet size */
742         conn->ksnc_rx_nob_wanted = body_len;            /* (no slop) */
743 }
744
745 int
746 ktoenal_new_packet (ksock_conn_t *conn, int nob_to_skip)
747 {
748         static char ktoenal_slop_buffer[4096];
749
750         int   nob;
751         int   niov;
752         int   skipped;
753
754         if (nob_to_skip == 0)                   /* right at next packet boundary now */
755         {
756                 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
757                 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
758                 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
759
760                 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
761                 conn->ksnc_rx_iov[0].iov_len  = sizeof (ptl_hdr_t);
762                 conn->ksnc_rx_niov = 1;
763                 return (1);
764         }
765
766         /* set up to skip as much a possible now */
767         /* if there's more left (ran out of iov entries) we'll get called again */
768
769         conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
770         conn->ksnc_rx_nob_left = nob_to_skip;
771         skipped = 0;
772         niov = 0;
773
774         do
775         {
776                 nob = MIN (nob_to_skip, sizeof (ktoenal_slop_buffer));
777
778                 conn->ksnc_rx_iov[niov].iov_base = ktoenal_slop_buffer;
779                 conn->ksnc_rx_iov[niov].iov_len  = nob;
780                 niov++;
781                 skipped += nob;
782                 nob_to_skip -=nob;
783
784         } while (nob_to_skip != 0 &&            /* mustn't overflow conn's rx iov */
785                  niov < sizeof (conn->ksnc_rx_iov)/sizeof (conn->ksnc_rx_iov[0]));
786
787         conn->ksnc_rx_niov = niov;
788         conn->ksnc_rx_nob_wanted = skipped;
789         return (0);
790 }
791
792 void
793 ktoenal_process_receive (ksock_conn_t *conn, unsigned long *irq_flags)
794 {
795         ksock_fmb_t *fmb;
796         int          len;
797         LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
798         LASSERT (conn->ksnc_rx_scheduled);
799         LASSERT (conn->ksnc_rx_ready);
800
801         /* NB: sched lock held */
802         CDEBUG(D_NET, "conn %p\n", conn);
803
804         if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB)     /* doesn't need a forwarding buffer */
805         {
806                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
807                 goto try_read;
808         }
809
810  get_fmb:
811         /* NB: sched lock held */
812         fmb = ktoenal_get_idle_fmb (conn);
813         if (fmb == NULL)                        /* conn descheduled waiting for idle fmb */
814                 return;
815
816         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
817         
818         if (ktoenal_init_fmb (conn, fmb)) /* packet forwarded ? */
819                 goto out;               /* come back later for next packet */
820
821  try_read:
822         /* NB: sched lock NOT held */
823         LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
824                  conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
825                  conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
826                  conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
827
828         LASSERT (conn->ksnc_rx_niov > 0);
829         LASSERT (conn->ksnc_rx_nob_wanted > 0);
830
831         conn->ksnc_rx_ready = 0;                /* data ready may race with me and set ready */
832         mb();                                   /* => clear BEFORE trying to read */
833
834         /* NB ktoenal_recvmsg "consumes" the iov passed to it */
835         len = ktoenal_recvmsg(conn->ksnc_file,
836                                conn->ksnc_rx_iov, conn->ksnc_rx_niov,
837                                conn->ksnc_rx_nob_wanted);
838         CDEBUG (D_NET, "%p read(%d) %d\n", conn, conn->ksnc_rx_nob_wanted, len);
839
840         if (len <= 0)                           /* nothing ready (EAGAIN) or EOF or error */
841         {
842                 if (len != -EAGAIN &&           /* ! nothing to read now */
843                     len != 0)                   /* ! nothing to read ever */
844                 {
845                         // warning FIXME: handle socket errors properly
846                         CERROR ("Error socknal read(%d) %p: %d\n",
847                                 conn->ksnc_rx_nob_wanted, conn, len);
848                 }
849                 goto out;                       /* come back when there's data ready */
850         }
851
852         LASSERT (len <= conn->ksnc_rx_nob_wanted);
853         conn->ksnc_rx_nob_wanted -= len;
854         conn->ksnc_rx_nob_left -= len;
855
856         if (conn->ksnc_rx_nob_wanted != 0)      /* short read */
857                 goto out;                       /* try again later */
858
859         conn->ksnc_rx_ready = 1;                /* assume there's more to be had */
860
861         switch (conn->ksnc_rx_state)
862         {
863         case SOCKNAL_RX_HEADER:
864                 if (conn->ksnc_hdr.dest_nid != ktoenal_lib.ni.nid) /* It's not for me */
865                 {
866                         ktoenal_fwd_parse (conn);
867                         switch (conn->ksnc_rx_state)
868                         {
869                         case SOCKNAL_RX_HEADER: /* skipped this packet (zero payload) */
870                                 goto out;       /* => come back later */
871                         case SOCKNAL_RX_SLOP:   /* skipping this packet's body */
872                                 goto try_read;  /* => go read it */
873                         case SOCKNAL_RX_GET_FMB: /* forwarding */
874                                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
875                                 goto get_fmb;   /* => go get a fwd msg buffer */
876                         default:
877                                 break;
878                         }
879                         /* Not Reached */
880                         LBUG ();
881                 }
882
883                 PROF_START(lib_parse);
884                 lib_parse(&ktoenal_lib, &conn->ksnc_hdr, conn); /* sets wanted_len, iovs etc */
885                 PROF_FINISH(lib_parse);
886
887                 if (conn->ksnc_rx_nob_wanted != 0) /* need to get some payload? */
888                 {
889                         conn->ksnc_rx_state = SOCKNAL_RX_BODY;
890                         goto try_read;          /* go read the payload */
891                 }
892                 /* Fall through (completed packet for me) */
893
894         case SOCKNAL_RX_BODY:
895                 atomic_inc (&ktoenal_packets_received);
896                 lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie); /* packet is done now */
897                 /* Fall through */
898
899         case SOCKNAL_RX_SLOP:
900                 if (ktoenal_new_packet (conn, conn->ksnc_rx_nob_left)) /* starting new packet? */
901                         goto out;               /* come back later */
902                 goto try_read;                  /* try to finish reading slop now */
903
904         case SOCKNAL_RX_BODY_FWD:
905                 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n", conn,
906                         conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
907
908                 atomic_inc (&ktoenal_packets_received);
909
910                 /* ktoenal_init_fmb() stashed router descriptor in conn->ksnc_cookie */
911                 kpr_fwd_start (&ktoenal_data.ksnd_router, (kpr_fwd_desc_t *)conn->ksnc_cookie);
912
913                 LASSERT (conn->ksnc_rx_nob_left == 0); /* no slop in forwarded packets */
914
915                 ktoenal_new_packet (conn, 0);  /* on to next packet */
916                 goto out;                       /* (later) */
917
918         default:
919                 break;
920         }
921
922         /* Not Reached */
923         LBUG ();
924
925  out:
926         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
927
928         if (!conn->ksnc_rx_ready)               /* no data there to read? */
929         {
930                 conn->ksnc_rx_scheduled = 0;    /* let socket callback schedule again */
931                 ktoenal_put_conn (conn);       /* release scheduler's ref */
932         }
933         else                                    /* let scheduler call me again */
934                 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
935 }
936
937 int
938 ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg,
939              unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
940 {
941         ksock_conn_t *conn = (ksock_conn_t *)private;
942         int           i;
943
944         conn->ksnc_cookie = msg;
945
946         LASSERT (niov <= PTL_MD_MAX_IOV);
947         for (i = 0; i < niov; i++)
948         {
949                 conn->ksnc_rx_iov[i].iov_len = iov[i].iov_len;
950                 conn->ksnc_rx_iov[i].iov_base = iov[i].iov_base;
951         }
952
953         conn->ksnc_rx_niov       = niov;
954         conn->ksnc_rx_nob_wanted = mlen;
955         conn->ksnc_rx_nob_left   = rlen;
956
957         return (rlen);
958 }
959
960 int
961 ktoenal_scheduler (void *arg)
962 {
963         unsigned long      flags;
964         ksock_conn_t      *conn;
965         int                rc;
966         int                nloops = 0;
967
968         kportal_daemonize ("ktoenal_sched");
969         kportal_blockallsigs ();
970         
971         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
972
973         while (!ktoenal_data.ksnd_shuttingdown)
974         {
975                 int did_something = 0;
976
977                 /* Ensure I progress everything semi-fairly */
978
979                 if (!list_empty (&ktoenal_data.ksnd_rx_conns))
980                 {
981                         did_something = 1;
982                         conn = list_entry (ktoenal_data.ksnd_rx_conns.next,
983                                            ksock_conn_t, ksnc_rx_list);
984                         list_del (&conn->ksnc_rx_list);
985
986                         ktoenal_process_receive (conn, &flags); /* drops & regains ksnd_sched_lock */
987                 }
988
989                 if (!list_empty (&ktoenal_data.ksnd_tx_conns))
990                 {
991                         did_something = 1;
992                         conn = list_entry (ktoenal_data.ksnd_tx_conns.next,
993                                            ksock_conn_t, ksnc_tx_list);
994
995                         list_del (&conn->ksnc_tx_list);
996                         ktoenal_process_transmit (conn, &flags); /* drops and regains ksnd_sched_lock */
997                 }
998
999                 if (!did_something ||           /* nothing to do */
1000                     ++nloops == SOCKNAL_RESCHED) /* hogging CPU? */
1001                 {
1002                         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1003
1004                         nloops = 0;
1005
1006                         if (!did_something) {   /* wait for something to do */
1007                                 rc = wait_event_interruptible (ktoenal_data.ksnd_sched_waitq,
1008                                                                ktoenal_data.ksnd_shuttingdown ||
1009                                                                !list_empty (&ktoenal_data.ksnd_rx_conns) ||
1010                                                                !list_empty (&ktoenal_data.ksnd_tx_conns));
1011                                 LASSERT (rc == 0);
1012                         } else 
1013                                 our_cond_resched();
1014
1015                         spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1016                 }
1017         }
1018
1019         spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1020         ktoenal_thread_fini ();
1021         return (0);
1022 }
1023
1024
1025 int
1026 ktoenal_reaper (void *arg)
1027 {
1028         unsigned long      flags;
1029         ksock_conn_t      *conn;
1030         int                rc;
1031         
1032         kportal_daemonize ("ktoenal_reaper");
1033         kportal_blockallsigs ();
1034
1035         while (!ktoenal_data.ksnd_shuttingdown)
1036         {
1037                 spin_lock_irqsave (&ktoenal_data.ksnd_reaper_lock, flags);
1038
1039                 if (list_empty (&ktoenal_data.ksnd_reaper_list))
1040                         conn = NULL;
1041                 else
1042                 {
1043                         conn = list_entry (ktoenal_data.ksnd_reaper_list.next,
1044                                            ksock_conn_t, ksnc_list);
1045                         list_del (&conn->ksnc_list);
1046                 }
1047
1048                 spin_unlock_irqrestore (&ktoenal_data.ksnd_reaper_lock, flags);
1049
1050                 if (conn != NULL)
1051                         ktoenal_close_conn (conn);
1052                 else {
1053                         rc = wait_event_interruptible (ktoenal_data.ksnd_reaper_waitq,
1054                                                        ktoenal_data.ksnd_shuttingdown ||
1055                                                        !list_empty(&ktoenal_data.ksnd_reaper_list));
1056                         LASSERT (rc == 0);
1057                 }
1058         }
1059
1060         ktoenal_thread_fini ();
1061         return (0);
1062 }
1063
1064 #define POLLREAD        (POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI)
1065 #define POLLWRITE       (POLLOUT | POLLWRNORM | POLLWRBAND)
1066
1067 int
1068 ktoenal_pollthread(void *arg)
1069 {
1070         unsigned int mask;
1071         struct list_head *tmp;
1072         ksock_conn_t *conn;
1073         
1074         /* Save the task struct for waking it up */
1075         ktoenal_data.ksnd_pollthread_tsk = current; 
1076         
1077         kportal_daemonize ("ktoenal_pollthread");
1078         kportal_blockallsigs ();
1079         
1080         poll_initwait(&ktoenal_data.ksnd_pwait);
1081         
1082         while(!ktoenal_data.ksnd_shuttingdown) {
1083                 
1084                 set_current_state(TASK_INTERRUPTIBLE);
1085                 
1086                 read_lock (&ktoenal_data.ksnd_socklist_lock);
1087                 list_for_each(tmp, &ktoenal_data.ksnd_socklist) {
1088                         
1089                         conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1090                         atomic_inc(&conn->ksnc_refcount);
1091                         read_unlock (&ktoenal_data.ksnd_socklist_lock);
1092                         
1093                         mask = conn->ksnc_file->f_op->poll(conn->ksnc_file,
1094                                   ktoenal_data.ksnd_slistchange ? 
1095                                   &ktoenal_data.ksnd_pwait : NULL);
1096                          
1097                         if(mask & POLLREAD) {
1098                                 ktoenal_data_ready(conn);
1099                                                         
1100                         } 
1101                         if (mask & POLLWRITE) {
1102                                 ktoenal_write_space(conn);  
1103                               
1104                         }
1105                         if (mask & (POLLERR | POLLHUP)) {
1106                                          /* Do error processing */          
1107                         }      
1108                         
1109                         read_lock (&ktoenal_data.ksnd_socklist_lock);
1110                         if(atomic_dec_and_test(&conn->ksnc_refcount))
1111                                 _ktoenal_put_conn(conn);
1112                 }
1113                 ktoenal_data.ksnd_slistchange = 0;
1114                 read_unlock (&ktoenal_data.ksnd_socklist_lock);
1115                 
1116                 schedule_timeout(MAX_SCHEDULE_TIMEOUT);
1117                 if(ktoenal_data.ksnd_slistchange) {
1118                         poll_freewait(&ktoenal_data.ksnd_pwait); 
1119                         poll_initwait(&ktoenal_data.ksnd_pwait);
1120                 }
1121          }
1122         poll_freewait(&ktoenal_data.ksnd_pwait);
1123         ktoenal_thread_fini();
1124         return (0);
1125 }
1126
1127 void
1128 ktoenal_data_ready (ksock_conn_t *conn)
1129 {
1130         unsigned long  flags;
1131         ENTRY;
1132
1133         if (!test_and_set_bit (0, &conn->ksnc_rx_ready)) { 
1134                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1135
1136                 if (!conn->ksnc_rx_scheduled) {  /* not being progressed */
1137                         list_add_tail (&conn->ksnc_rx_list, 
1138                                         &ktoenal_data.ksnd_rx_conns);
1139                         conn->ksnc_rx_scheduled = 1;
1140                         /* extra ref for scheduler */
1141                         atomic_inc (&conn->ksnc_refcount);
1142
1143                         /* This is done to avoid the effects of a sequence
1144                          * of events in which the rx_ready is lost
1145                          */
1146                         conn->ksnc_rx_ready=1;
1147                           
1148                         if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1149                                 wake_up (&ktoenal_data.ksnd_sched_waitq);
1150                 }
1151
1152                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1153         }
1154
1155         EXIT;
1156 }
1157
1158 void
1159 ktoenal_write_space (ksock_conn_t *conn)
1160 {
1161         unsigned long  flags;
1162
1163         CDEBUG (D_NET, "conn %p%s%s%s\n",
1164                          conn,
1165                         (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ? " ready" : " blocked"),
1166                         (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ? " scheduled" : " idle"),
1167                         (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ? " empty" : " queued"));
1168
1169
1170         if (!test_and_set_bit (0, &conn->ksnc_tx_ready)) {
1171                 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1172
1173                 if (!list_empty (&conn->ksnc_tx_queue) && /* packets to send */
1174                                 !conn->ksnc_tx_scheduled) { /* not being progressed */
1175
1176                         list_add_tail (&conn->ksnc_tx_list, 
1177                                         &ktoenal_data.ksnd_tx_conns);
1178                         conn->ksnc_tx_scheduled = 1;
1179                         /* extra ref for scheduler */
1180                         atomic_inc (&conn->ksnc_refcount);
1181
1182                         if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1183                                 wake_up (&ktoenal_data.ksnd_sched_waitq);
1184                 }
1185                 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1186         }
1187 }
1188
1189 nal_cb_t ktoenal_lib = {
1190         nal_data:       &ktoenal_data,                /* NAL private data */
1191         cb_send:         ktoenal_send,
1192         cb_recv:         ktoenal_recv,
1193         cb_read:         ktoenal_read,
1194         cb_write:        ktoenal_write,
1195         cb_callback:     ktoenal_callback,
1196         cb_malloc:       ktoenal_malloc,
1197         cb_free:         ktoenal_free,
1198         cb_printf:       ktoenal_printf,
1199         cb_cli:          ktoenal_cli,
1200         cb_sti:          ktoenal_sti,
1201         cb_dist:         ktoenal_dist
1202 };