1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2001, 2002 Cluster File Systems, Inc.
5 * Author: Zach Brown <zab@zabbo.net>
6 * Author: Peter J. Braam <braam@clusterfs.com>
7 * Author: Phil Schwan <phil@clusterfs.com>
8 * Author: Eric Barton <eric@bartonsoftware.com>
9 * Author: Kedar Sovani <kedar@calsoftinc.com>
10 * Author: Amey Inamdar <amey@calsoftinc.com>
12 * This file is part of Portals, http://www.sf.net/projects/lustre/
14 * Portals is free software; you can redistribute it and/or
15 * modify it under the terms of version 2 of the GNU General Public
16 * License as published by the Free Software Foundation.
18 * Portals is distributed in the hope that it will be useful,
19 * but WITHOUT ANY WARRANTY; without even the implied warranty of
20 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
21 * GNU General Public License for more details.
23 * You should have received a copy of the GNU General Public License
24 * along with Portals; if not, write to the Free Software
25 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 #include <linux/poll.h>
32 atomic_t ktoenal_packets_received;
33 long ktoenal_packets_launched;
34 long ktoenal_packets_transmitted;
37 * LIB functions follow
41 ktoenal_read(nal_cb_t *nal, void *private, void *dst_addr,
42 user_ptr src_addr, size_t len)
44 CDEBUG(D_NET, LPX64": reading %ld bytes from %p -> %p\n",
45 nal->ni.nid, (long)len, src_addr, dst_addr);
47 memcpy( dst_addr, src_addr, len );
52 ktoenal_write(nal_cb_t *nal, void *private, user_ptr dst_addr,
53 void *src_addr, size_t len)
55 CDEBUG(D_NET, LPX64": writing %ld bytes from %p -> %p\n",
56 nal->ni.nid, (long)len, src_addr, dst_addr);
58 memcpy( dst_addr, src_addr, len );
63 ktoenal_callback (nal_cb_t * nal, void *private, lib_eq_t *eq,
66 CDEBUG(D_NET, LPX64": callback eq %p ev %p\n",
69 if (eq->event_callback != NULL)
70 eq->event_callback(ev);
76 ktoenal_malloc(nal_cb_t *nal, size_t len)
80 PORTAL_ALLOC(buf, len);
89 ktoenal_free(nal_cb_t *nal, void *buf, size_t len)
91 PORTAL_FREE(buf, len);
95 ktoenal_printf(nal_cb_t *nal, const char *fmt, ...)
101 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
104 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
106 CDEBUG (D_NET, "%s", msg);
110 ktoenal_cli(nal_cb_t *nal, unsigned long *flags)
112 ksock_nal_data_t *data = nal->nal_data;
114 spin_lock(&data->ksnd_nal_cb_lock);
118 ktoenal_sti(nal_cb_t *nal, unsigned long *flags)
120 ksock_nal_data_t *data;
121 data = nal->nal_data;
123 spin_unlock(&data->ksnd_nal_cb_lock);
127 ktoenal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
129 /* I would guess that if ktoenal_get_conn(nid) == NULL,
130 and we're not routing, then 'nid' is very distant :) */
131 if ( nal->ni.nid == nid ) {
141 ktoenal_get_ltx (int may_block)
144 ksock_ltx_t *ltx = NULL;
148 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
150 if (!list_empty (&ktoenal_data.ksnd_idle_ltx_list))
152 ltx = list_entry (ktoenal_data.ksnd_idle_ltx_list.next, ksock_ltx_t, ltx_tx.tx_list);
153 list_del (<x->ltx_tx.tx_list);
159 if (!list_empty (&ktoenal_data.ksnd_idle_nblk_ltx_list))
161 ltx = list_entry (ktoenal_data.ksnd_idle_nblk_ltx_list.next,
162 ksock_ltx_t, ltx_tx.tx_list);
163 list_del (<x->ltx_tx.tx_list);
168 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
170 wait_event (ktoenal_data.ksnd_idle_ltx_waitq,
171 !list_empty (&ktoenal_data.ksnd_idle_ltx_list));
174 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
180 ktoenal_sendmsg (struct file *sock, struct iovec *iov, int niov, int nob, int flags)
182 /* NB This procedure "consumes" iov (actually we do, tcp_sendmsg doesn't)
198 for (i = total_nob = 0; i < niov; i++)
199 total_nob += iov[i].iov_len;
201 LASSERT (nob == total_nob);
204 LASSERT (!in_interrupt());
206 rc = sock->f_op->writev(sock, iov, niov, NULL);
210 if (rc > 0) /* sent something? */
212 nob = rc; /* consume iov */
217 if (iov->iov_len >= nob)
220 iov->iov_base = (void *)(((unsigned long)iov->iov_base) + nob);
234 ktoenal_recvmsg(struct file *sock, struct iovec *iov, int niov, int toread)
236 /* NB This procedure "consumes" iov (actually tcp_recvmsg does)
239 int ret, i, len = 0, origlen = 0;
241 PROF_START(our_recvmsg);
242 for(i = 0; i < niov; i++) {
243 len += iov[i].iov_len;
249 origlen = iov[i].iov_len;
250 iov[i].iov_len -= (len - toread);
252 else { /* i == niov */
259 ret = sock->f_op->readv(sock, iov, i + 1, NULL);
264 iov[i].iov_len = origlen;
266 PROF_FINISH(our_recvmsg);
271 ktoenal_process_transmit (ksock_conn_t *conn, unsigned long *irq_flags)
273 ksock_tx_t *tx = list_entry (conn->ksnc_tx_queue.next, ksock_tx_t, tx_list);
276 LASSERT (conn->ksnc_tx_scheduled);
277 LASSERT (conn->ksnc_tx_ready);
278 LASSERT (!list_empty (&conn->ksnc_tx_queue));
280 /* assume transmit will complete now, so dequeue while I've got the lock */
281 list_del (&tx->tx_list);
283 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
285 LASSERT (tx->tx_nob > 0);
287 conn->ksnc_tx_ready = 0; /* write_space may race with me and set ready */
288 mb(); /* => clear BEFORE trying to write */
290 rc = ktoenal_sendmsg (conn->ksnc_file,
291 tx->tx_iov, tx->tx_niov, tx->tx_nob,
292 list_empty (&conn->ksnc_tx_queue) ?
293 MSG_DONTWAIT : (MSG_DONTWAIT | MSG_MORE));
295 CDEBUG (D_NET, "send(%d) %d\n", tx->tx_nob, rc);
297 if (rc < 0) /* error */
299 if (rc == -EAGAIN) /* socket full => */
300 rc = 0; /* nothing sent */
303 //warning FIXME: handle socket errors properly
304 CERROR ("Error socknal send(%d) %p: %d\n", tx->tx_nob, conn, rc);
305 rc = tx->tx_nob; /* kid on for now whole packet went */
309 if (rc == tx->tx_nob) /* everything went */
311 conn->ksnc_tx_ready = 1; /* assume more can go (ASAP) */
312 ktoenal_put_conn (conn); /* release packet's ref */
314 if (tx->tx_isfwd) /* was a forwarded packet? */
316 kpr_fwd_done (&ktoenal_data.ksnd_router,
317 KSOCK_TX_2_KPR_FWD_DESC (tx), 0);
319 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
321 else /* local send */
323 ksock_ltx_t *ltx = KSOCK_TX_2_KSOCK_LTX (tx);
325 lib_finalize (&ktoenal_lib, ltx->ltx_private, ltx->ltx_cookie);
327 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
329 list_add (<x->ltx_tx.tx_list, ltx->ltx_idle);
331 /* normal tx desc => wakeup anyone blocking for one */
332 if (ltx->ltx_idle == &ktoenal_data.ksnd_idle_ltx_list &&
333 waitqueue_active (&ktoenal_data.ksnd_idle_ltx_waitq))
334 wake_up (&ktoenal_data.ksnd_idle_ltx_waitq);
336 ktoenal_packets_transmitted++;
342 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
344 /* back onto HEAD of tx_queue */
345 list_add (&tx->tx_list, &conn->ksnc_tx_queue);
348 if (!conn->ksnc_tx_ready || /* no space to write now */
349 list_empty (&conn->ksnc_tx_queue)) /* nothing to write */
351 conn->ksnc_tx_scheduled = 0; /* not being scheduled */
352 ktoenal_put_conn (conn); /* release scheduler's ref */
354 else /* let scheduler call me again */
355 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
359 ktoenal_launch_packet (ksock_conn_t *conn, ksock_tx_t *tx)
362 int nob = tx->tx_nob;
363 struct iovec *iov = tx->tx_iov;
366 LASSERT (nob >= sizeof (ptl_hdr_t));
368 /* Truncate iov to exactly match total packet length
369 * since socket sendmsg pays no attention to requested length.
373 LASSERT (niov <= tx->tx_niov);
375 if (iov->iov_len >= nob)
386 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
387 list_add_tail (&tx->tx_list, &conn->ksnc_tx_queue);
389 if (conn->ksnc_tx_ready && /* able to send */
390 !conn->ksnc_tx_scheduled) /* not scheduled to send */
392 list_add_tail (&conn->ksnc_tx_list, &ktoenal_data.ksnd_tx_conns);
393 conn->ksnc_tx_scheduled = 1;
394 atomic_inc (&conn->ksnc_refcount); /* extra ref for scheduler */
395 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
396 wake_up (&ktoenal_data.ksnd_sched_waitq);
399 ktoenal_packets_launched++;
400 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
404 ktoenal_send(nal_cb_t *nal, void *private, lib_msg_t *cookie,
405 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
406 unsigned int payload_niov, struct iovec *payload_iov, size_t payload_len)
408 ptl_nid_t gatewaynid;
414 /* By this point, as it happens, we have absolutely no idea what
415 * 'private' is. It might be ksock_nal_data or it might be ksock_conn.
416 * Ha ha, isn't that a funny joke?
418 * FIXME: this is not the right way to fix this; the right way is to
419 * always pass in the same kind of structure. This is hard right now.
420 * To revisit this issue, set a breakpoint in here and watch for when
421 * it's called from lib_finalize. I think this occurs when we send a
422 * packet as a side-effect of another packet, such as when an ACK has
423 * been requested. -phil */
425 CDEBUG(D_NET, "sending %d bytes from [%d](%p,%d)... to nid: "
426 LPX64" pid %d\n", (int)payload_len, payload_niov,
427 payload_niov > 0 ? payload_iov[0].iov_base : NULL,
428 (int)(payload_niov > 0 ? payload_iov[0].iov_len : 0), nid, pid);
430 if ((conn = ktoenal_get_conn (nid)) == NULL)
432 /* It's not a peer; try to find a gateway */
433 rc = kpr_lookup (&ktoenal_data.ksnd_router, nid, payload_niov,
437 CERROR ("Can't route to "LPX64": router error %d\n", nid, rc);
441 if ((conn = ktoenal_get_conn (gatewaynid)) == NULL)
443 CERROR ("Can't route to "LPX64": gateway "LPX64" is not a peer\n",
449 /* This transmit has now got a ref on conn */
451 /* I may not block for a transmit descriptor if I might block the
452 * receiver, or an interrupt handler. */
453 ltx = ktoenal_get_ltx (!(type == PTL_MSG_ACK ||
454 type == PTL_MSG_REPLY ||
458 CERROR ("Can't allocate tx desc\n");
459 ktoenal_put_conn (conn);
463 /* Init common (to sends and forwards) packet part */
464 ltx->ltx_tx.tx_isfwd = 0;
465 ltx->ltx_tx.tx_nob = sizeof (*hdr) + payload_len;
466 ltx->ltx_tx.tx_niov = 1 + payload_niov;
467 ltx->ltx_tx.tx_iov = ltx->ltx_iov;
469 /* Init local send packet (storage for hdr, finalize() args, iov) */
471 ltx->ltx_private = private;
472 ltx->ltx_cookie = cookie;
474 ltx->ltx_iov[0].iov_base = <x->ltx_hdr;
475 ltx->ltx_iov[0].iov_len = sizeof (ltx->ltx_hdr);
477 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
479 for (i = 0; i < payload_niov; i++)
481 ltx->ltx_iov[1 + i].iov_base = payload_iov[i].iov_base;
482 ltx->ltx_iov[1 + i].iov_len = payload_iov[i].iov_len;
485 ktoenal_launch_packet (conn, <x->ltx_tx);
490 ktoenal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
493 ptl_nid_t nid = fwd->kprfd_gateway_nid;
494 ksock_tx_t *tx = (ksock_tx_t *)&fwd->kprfd_scratch;
496 CDEBUG (D_NET, "Forwarding [%p] -> "LPX64" ("LPX64"))\n", fwd,
497 fwd->kprfd_gateway_nid, fwd->kprfd_target_nid);
499 if (nid == ktoenal_lib.ni.nid) /* I'm the gateway; must be the last hop */
500 nid = fwd->kprfd_target_nid;
502 conn = ktoenal_get_conn (nid);
505 CERROR ("[%p] fwd to "LPX64" isn't a peer\n", fwd, nid);
506 kpr_fwd_done (&ktoenal_data.ksnd_router, fwd, -EHOSTUNREACH);
510 /* This forward has now got a ref on conn */
512 tx->tx_isfwd = 1; /* This is a forwarding packet */
513 tx->tx_nob = fwd->kprfd_nob;
514 tx->tx_niov = fwd->kprfd_niov;
515 tx->tx_iov = fwd->kprfd_iov;
517 ktoenal_launch_packet (conn, tx);
521 ktoenal_thread_start (int (*fn)(void *arg), void *arg)
523 long pid = kernel_thread (fn, arg, 0);
528 atomic_inc (&ktoenal_data.ksnd_nthreads);
533 ktoenal_thread_fini (void)
535 atomic_dec (&ktoenal_data.ksnd_nthreads);
539 ktoenal_fmb_callback (void *arg, int error)
541 ksock_fmb_t *fmb = (ksock_fmb_t *)arg;
542 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(fmb->fmb_pages[0]);
546 CDEBUG (D_NET, "routed packet from "LPX64" to "LPX64": %d\n",
547 hdr->src_nid, hdr->dest_nid, error);
550 CERROR ("Failed to route packet from "LPX64" to "LPX64": %d\n",
551 hdr->src_nid, hdr->dest_nid, error);
553 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
555 list_add (&fmb->fmb_list, &fmb->fmb_pool->fmp_idle_fmbs);
557 if (!list_empty (&fmb->fmb_pool->fmp_blocked_conns))
559 conn = list_entry (fmb->fmb_pool->fmp_blocked_conns.next, ksock_conn_t, ksnc_rx_list);
560 list_del (&conn->ksnc_rx_list);
562 CDEBUG (D_NET, "Scheduling conn %p\n", conn);
563 LASSERT (conn->ksnc_rx_scheduled);
564 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_FMB_SLEEP);
566 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB;
567 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
569 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
570 wake_up (&ktoenal_data.ksnd_sched_waitq);
573 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
577 ktoenal_get_idle_fmb (ksock_conn_t *conn)
579 /* NB called with sched lock held */
580 int payload_nob = conn->ksnc_rx_nob_left;
581 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
582 ksock_fmb_pool_t *pool;
585 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
587 if (packet_nob <= SOCKNAL_SMALL_FWD_PAGES * PAGE_SIZE)
588 pool = &ktoenal_data.ksnd_small_fmp;
590 pool = &ktoenal_data.ksnd_large_fmp;
592 if (!list_empty (&pool->fmp_idle_fmbs))
594 fmb = list_entry (pool->fmp_idle_fmbs.next, ksock_fmb_t, fmb_list);
595 list_del (&fmb->fmb_list);
599 /* deschedule until fmb free */
601 conn->ksnc_rx_state = SOCKNAL_RX_FMB_SLEEP;
603 list_add_tail (&conn->ksnc_rx_list,
604 &pool->fmp_blocked_conns);
610 ktoenal_init_fmb (ksock_conn_t *conn, ksock_fmb_t *fmb)
612 int payload_nob = conn->ksnc_rx_nob_left;
613 int packet_nob = sizeof (ptl_hdr_t) + payload_nob;
614 int niov; /* at least the header */
617 LASSERT (conn->ksnc_rx_scheduled);
618 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_GET_FMB);
619 LASSERT (conn->ksnc_rx_nob_wanted == conn->ksnc_rx_nob_left);
620 LASSERT (payload_nob >= 0);
621 LASSERT (packet_nob <= fmb->fmb_npages * PAGE_SIZE);
622 LASSERT (sizeof (ptl_hdr_t) < PAGE_SIZE);
624 /* Got a forwarding buffer; copy the header we just read into the
625 * forwarding buffer. If there's payload start reading reading it
626 * into the buffer, otherwise the forwarding buffer can be kicked
629 * NB fmb->fmb_iov spans the WHOLE packet.
630 * conn->ksnc_rx_iov spans just the payload.
633 fmb->fmb_iov[0].iov_base = page_address (fmb->fmb_pages[0]);
635 memcpy (fmb->fmb_iov[0].iov_base, &conn->ksnc_hdr, sizeof (ptl_hdr_t)); /* copy header */
637 if (payload_nob == 0) /* got complete packet already */
639 atomic_inc (&ktoenal_packets_received);
641 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (immediate)\n", conn,
642 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, packet_nob);
644 fmb->fmb_iov[0].iov_len = sizeof (ptl_hdr_t);
646 kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid,
647 packet_nob, 1, fmb->fmb_iov,
648 ktoenal_fmb_callback, fmb);
650 kpr_fwd_start (&ktoenal_data.ksnd_router, &fmb->fmb_fwd); /* forward it now */
652 ktoenal_new_packet (conn, 0); /* on to next packet */
657 if (packet_nob <= PAGE_SIZE) /* whole packet fits in first page */
658 fmb->fmb_iov[0].iov_len = packet_nob;
661 fmb->fmb_iov[0].iov_len = PAGE_SIZE;
662 nob = packet_nob - PAGE_SIZE;
666 LASSERT (niov < fmb->fmb_npages);
667 fmb->fmb_iov[niov].iov_base = page_address (fmb->fmb_pages[niov]);
668 fmb->fmb_iov[niov].iov_len = MIN (PAGE_SIZE, nob);
674 kpr_fwd_init (&fmb->fmb_fwd, conn->ksnc_hdr.dest_nid,
675 packet_nob, niov, fmb->fmb_iov,
676 ktoenal_fmb_callback, fmb);
678 /* stash router's descriptor ready for call to kpr_fwd_start */
679 conn->ksnc_cookie = &fmb->fmb_fwd;
681 conn->ksnc_rx_state = SOCKNAL_RX_BODY_FWD; /* read in the payload */
683 /* payload is desc's iov-ed buffer, but skipping the hdr */
684 LASSERT (niov <= sizeof (conn->ksnc_rx_iov) / sizeof (conn->ksnc_rx_iov[0]));
686 conn->ksnc_rx_iov[0].iov_base = (void *)(((unsigned long)fmb->fmb_iov[0].iov_base) + sizeof (ptl_hdr_t));
687 conn->ksnc_rx_iov[0].iov_len = fmb->fmb_iov[0].iov_len - sizeof (ptl_hdr_t);
690 memcpy (&conn->ksnc_rx_iov[1], &fmb->fmb_iov[1], (niov - 1) * sizeof (struct iovec));
692 conn->ksnc_rx_niov = niov;
694 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d reading body\n", conn,
695 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, payload_nob);
700 ktoenal_fwd_parse (ksock_conn_t *conn)
705 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d parsing header\n", conn,
706 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
708 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER);
709 LASSERT (conn->ksnc_rx_scheduled);
711 body_len = conn->ksnc_hdr.payload_length;
713 if (body_len < 0) /* length corrupt */
715 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d illegal\n",
716 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
717 ktoenal_new_packet (conn, 0); /* on to new packet */
721 if (body_len > PTL_MTU) /* too big to forward */
723 CERROR ("dropping packet from "LPX64" for "LPX64": packet size %d too big\n",
724 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, body_len);
725 ktoenal_new_packet (conn, body_len); /* on to new packet (skip this one's body) */
729 conn2 = ktoenal_get_conn (conn->ksnc_hdr.dest_nid); /* should have gone direct */
732 CERROR ("dropping packet from "LPX64" for "LPX64": target is a peer\n",
733 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid);
734 ktoenal_put_conn (conn2); /* drop ref from get above */
736 ktoenal_new_packet (conn, body_len); /* on to next packet (skip this one's body) */
740 conn->ksnc_rx_state = SOCKNAL_RX_GET_FMB; /* Getting FMB now */
741 conn->ksnc_rx_nob_left = body_len; /* stash packet size */
742 conn->ksnc_rx_nob_wanted = body_len; /* (no slop) */
746 ktoenal_new_packet (ksock_conn_t *conn, int nob_to_skip)
748 static char ktoenal_slop_buffer[4096];
754 if (nob_to_skip == 0) /* right at next packet boundary now */
756 conn->ksnc_rx_state = SOCKNAL_RX_HEADER;
757 conn->ksnc_rx_nob_wanted = sizeof (ptl_hdr_t);
758 conn->ksnc_rx_nob_left = sizeof (ptl_hdr_t);
760 conn->ksnc_rx_iov[0].iov_base = (char *)&conn->ksnc_hdr;
761 conn->ksnc_rx_iov[0].iov_len = sizeof (ptl_hdr_t);
762 conn->ksnc_rx_niov = 1;
766 /* set up to skip as much a possible now */
767 /* if there's more left (ran out of iov entries) we'll get called again */
769 conn->ksnc_rx_state = SOCKNAL_RX_SLOP;
770 conn->ksnc_rx_nob_left = nob_to_skip;
776 nob = MIN (nob_to_skip, sizeof (ktoenal_slop_buffer));
778 conn->ksnc_rx_iov[niov].iov_base = ktoenal_slop_buffer;
779 conn->ksnc_rx_iov[niov].iov_len = nob;
784 } while (nob_to_skip != 0 && /* mustn't overflow conn's rx iov */
785 niov < sizeof (conn->ksnc_rx_iov)/sizeof (conn->ksnc_rx_iov[0]));
787 conn->ksnc_rx_niov = niov;
788 conn->ksnc_rx_nob_wanted = skipped;
793 ktoenal_process_receive (ksock_conn_t *conn, unsigned long *irq_flags)
797 LASSERT (atomic_read (&conn->ksnc_refcount) > 0);
798 LASSERT (conn->ksnc_rx_scheduled);
799 LASSERT (conn->ksnc_rx_ready);
801 /* NB: sched lock held */
802 CDEBUG(D_NET, "conn %p\n", conn);
804 if (conn->ksnc_rx_state != SOCKNAL_RX_GET_FMB) /* doesn't need a forwarding buffer */
806 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
811 /* NB: sched lock held */
812 fmb = ktoenal_get_idle_fmb (conn);
813 if (fmb == NULL) /* conn descheduled waiting for idle fmb */
816 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, *irq_flags);
818 if (ktoenal_init_fmb (conn, fmb)) /* packet forwarded ? */
819 goto out; /* come back later for next packet */
822 /* NB: sched lock NOT held */
823 LASSERT (conn->ksnc_rx_state == SOCKNAL_RX_HEADER ||
824 conn->ksnc_rx_state == SOCKNAL_RX_BODY ||
825 conn->ksnc_rx_state == SOCKNAL_RX_BODY_FWD ||
826 conn->ksnc_rx_state == SOCKNAL_RX_SLOP);
828 LASSERT (conn->ksnc_rx_niov > 0);
829 LASSERT (conn->ksnc_rx_nob_wanted > 0);
831 conn->ksnc_rx_ready = 0; /* data ready may race with me and set ready */
832 mb(); /* => clear BEFORE trying to read */
834 /* NB ktoenal_recvmsg "consumes" the iov passed to it */
835 len = ktoenal_recvmsg(conn->ksnc_file,
836 conn->ksnc_rx_iov, conn->ksnc_rx_niov,
837 conn->ksnc_rx_nob_wanted);
838 CDEBUG (D_NET, "%p read(%d) %d\n", conn, conn->ksnc_rx_nob_wanted, len);
840 if (len <= 0) /* nothing ready (EAGAIN) or EOF or error */
842 if (len != -EAGAIN && /* ! nothing to read now */
843 len != 0) /* ! nothing to read ever */
845 // warning FIXME: handle socket errors properly
846 CERROR ("Error socknal read(%d) %p: %d\n",
847 conn->ksnc_rx_nob_wanted, conn, len);
849 goto out; /* come back when there's data ready */
852 LASSERT (len <= conn->ksnc_rx_nob_wanted);
853 conn->ksnc_rx_nob_wanted -= len;
854 conn->ksnc_rx_nob_left -= len;
856 if (conn->ksnc_rx_nob_wanted != 0) /* short read */
857 goto out; /* try again later */
859 conn->ksnc_rx_ready = 1; /* assume there's more to be had */
861 switch (conn->ksnc_rx_state)
863 case SOCKNAL_RX_HEADER:
864 if (conn->ksnc_hdr.dest_nid != ktoenal_lib.ni.nid) /* It's not for me */
866 ktoenal_fwd_parse (conn);
867 switch (conn->ksnc_rx_state)
869 case SOCKNAL_RX_HEADER: /* skipped this packet (zero payload) */
870 goto out; /* => come back later */
871 case SOCKNAL_RX_SLOP: /* skipping this packet's body */
872 goto try_read; /* => go read it */
873 case SOCKNAL_RX_GET_FMB: /* forwarding */
874 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
875 goto get_fmb; /* => go get a fwd msg buffer */
883 PROF_START(lib_parse);
884 lib_parse(&ktoenal_lib, &conn->ksnc_hdr, conn); /* sets wanted_len, iovs etc */
885 PROF_FINISH(lib_parse);
887 if (conn->ksnc_rx_nob_wanted != 0) /* need to get some payload? */
889 conn->ksnc_rx_state = SOCKNAL_RX_BODY;
890 goto try_read; /* go read the payload */
892 /* Fall through (completed packet for me) */
894 case SOCKNAL_RX_BODY:
895 atomic_inc (&ktoenal_packets_received);
896 lib_finalize(&ktoenal_lib, NULL, conn->ksnc_cookie); /* packet is done now */
899 case SOCKNAL_RX_SLOP:
900 if (ktoenal_new_packet (conn, conn->ksnc_rx_nob_left)) /* starting new packet? */
901 goto out; /* come back later */
902 goto try_read; /* try to finish reading slop now */
904 case SOCKNAL_RX_BODY_FWD:
905 CDEBUG (D_NET, "%p "LPX64"->"LPX64" %d fwd_start (got body)\n", conn,
906 conn->ksnc_hdr.src_nid, conn->ksnc_hdr.dest_nid, conn->ksnc_rx_nob_left);
908 atomic_inc (&ktoenal_packets_received);
910 /* ktoenal_init_fmb() stashed router descriptor in conn->ksnc_cookie */
911 kpr_fwd_start (&ktoenal_data.ksnd_router, (kpr_fwd_desc_t *)conn->ksnc_cookie);
913 LASSERT (conn->ksnc_rx_nob_left == 0); /* no slop in forwarded packets */
915 ktoenal_new_packet (conn, 0); /* on to next packet */
916 goto out; /* (later) */
926 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, *irq_flags);
928 if (!conn->ksnc_rx_ready) /* no data there to read? */
930 conn->ksnc_rx_scheduled = 0; /* let socket callback schedule again */
931 ktoenal_put_conn (conn); /* release scheduler's ref */
933 else /* let scheduler call me again */
934 list_add_tail (&conn->ksnc_rx_list, &ktoenal_data.ksnd_rx_conns);
938 ktoenal_recv(nal_cb_t *nal, void *private, lib_msg_t *msg,
939 unsigned int niov, struct iovec *iov, size_t mlen, size_t rlen)
941 ksock_conn_t *conn = (ksock_conn_t *)private;
944 conn->ksnc_cookie = msg;
946 LASSERT (niov <= PTL_MD_MAX_IOV);
947 for (i = 0; i < niov; i++)
949 conn->ksnc_rx_iov[i].iov_len = iov[i].iov_len;
950 conn->ksnc_rx_iov[i].iov_base = iov[i].iov_base;
953 conn->ksnc_rx_niov = niov;
954 conn->ksnc_rx_nob_wanted = mlen;
955 conn->ksnc_rx_nob_left = rlen;
961 ktoenal_scheduler (void *arg)
968 kportal_daemonize ("ktoenal_sched");
969 kportal_blockallsigs ();
971 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
973 while (!ktoenal_data.ksnd_shuttingdown)
975 int did_something = 0;
977 /* Ensure I progress everything semi-fairly */
979 if (!list_empty (&ktoenal_data.ksnd_rx_conns))
982 conn = list_entry (ktoenal_data.ksnd_rx_conns.next,
983 ksock_conn_t, ksnc_rx_list);
984 list_del (&conn->ksnc_rx_list);
986 ktoenal_process_receive (conn, &flags); /* drops & regains ksnd_sched_lock */
989 if (!list_empty (&ktoenal_data.ksnd_tx_conns))
992 conn = list_entry (ktoenal_data.ksnd_tx_conns.next,
993 ksock_conn_t, ksnc_tx_list);
995 list_del (&conn->ksnc_tx_list);
996 ktoenal_process_transmit (conn, &flags); /* drops and regains ksnd_sched_lock */
999 if (!did_something || /* nothing to do */
1000 ++nloops == SOCKNAL_RESCHED) /* hogging CPU? */
1002 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1006 if (!did_something) { /* wait for something to do */
1007 rc = wait_event_interruptible (ktoenal_data.ksnd_sched_waitq,
1008 ktoenal_data.ksnd_shuttingdown ||
1009 !list_empty (&ktoenal_data.ksnd_rx_conns) ||
1010 !list_empty (&ktoenal_data.ksnd_tx_conns));
1015 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1019 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1020 ktoenal_thread_fini ();
1026 ktoenal_reaper (void *arg)
1028 unsigned long flags;
1032 kportal_daemonize ("ktoenal_reaper");
1033 kportal_blockallsigs ();
1035 while (!ktoenal_data.ksnd_shuttingdown)
1037 spin_lock_irqsave (&ktoenal_data.ksnd_reaper_lock, flags);
1039 if (list_empty (&ktoenal_data.ksnd_reaper_list))
1043 conn = list_entry (ktoenal_data.ksnd_reaper_list.next,
1044 ksock_conn_t, ksnc_list);
1045 list_del (&conn->ksnc_list);
1048 spin_unlock_irqrestore (&ktoenal_data.ksnd_reaper_lock, flags);
1051 ktoenal_close_conn (conn);
1053 rc = wait_event_interruptible (ktoenal_data.ksnd_reaper_waitq,
1054 ktoenal_data.ksnd_shuttingdown ||
1055 !list_empty(&ktoenal_data.ksnd_reaper_list));
1060 ktoenal_thread_fini ();
1064 #define POLLREAD (POLLIN | POLLRDNORM | POLLRDBAND | POLLPRI)
1065 #define POLLWRITE (POLLOUT | POLLWRNORM | POLLWRBAND)
1068 ktoenal_pollthread(void *arg)
1071 struct list_head *tmp;
1074 /* Save the task struct for waking it up */
1075 ktoenal_data.ksnd_pollthread_tsk = current;
1077 kportal_daemonize ("ktoenal_pollthread");
1078 kportal_blockallsigs ();
1080 poll_initwait(&ktoenal_data.ksnd_pwait);
1082 while(!ktoenal_data.ksnd_shuttingdown) {
1084 set_current_state(TASK_INTERRUPTIBLE);
1086 read_lock (&ktoenal_data.ksnd_socklist_lock);
1087 list_for_each(tmp, &ktoenal_data.ksnd_socklist) {
1089 conn = list_entry(tmp, ksock_conn_t, ksnc_list);
1090 atomic_inc(&conn->ksnc_refcount);
1091 read_unlock (&ktoenal_data.ksnd_socklist_lock);
1093 mask = conn->ksnc_file->f_op->poll(conn->ksnc_file,
1094 ktoenal_data.ksnd_slistchange ?
1095 &ktoenal_data.ksnd_pwait : NULL);
1097 if(mask & POLLREAD) {
1098 ktoenal_data_ready(conn);
1101 if (mask & POLLWRITE) {
1102 ktoenal_write_space(conn);
1105 if (mask & (POLLERR | POLLHUP)) {
1106 /* Do error processing */
1109 read_lock (&ktoenal_data.ksnd_socklist_lock);
1110 if(atomic_dec_and_test(&conn->ksnc_refcount))
1111 _ktoenal_put_conn(conn);
1113 ktoenal_data.ksnd_slistchange = 0;
1114 read_unlock (&ktoenal_data.ksnd_socklist_lock);
1116 schedule_timeout(MAX_SCHEDULE_TIMEOUT);
1117 if(ktoenal_data.ksnd_slistchange) {
1118 poll_freewait(&ktoenal_data.ksnd_pwait);
1119 poll_initwait(&ktoenal_data.ksnd_pwait);
1122 poll_freewait(&ktoenal_data.ksnd_pwait);
1123 ktoenal_thread_fini();
1128 ktoenal_data_ready (ksock_conn_t *conn)
1130 unsigned long flags;
1133 if (!test_and_set_bit (0, &conn->ksnc_rx_ready)) {
1134 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1136 if (!conn->ksnc_rx_scheduled) { /* not being progressed */
1137 list_add_tail (&conn->ksnc_rx_list,
1138 &ktoenal_data.ksnd_rx_conns);
1139 conn->ksnc_rx_scheduled = 1;
1140 /* extra ref for scheduler */
1141 atomic_inc (&conn->ksnc_refcount);
1143 /* This is done to avoid the effects of a sequence
1144 * of events in which the rx_ready is lost
1146 conn->ksnc_rx_ready=1;
1148 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1149 wake_up (&ktoenal_data.ksnd_sched_waitq);
1152 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1159 ktoenal_write_space (ksock_conn_t *conn)
1161 unsigned long flags;
1163 CDEBUG (D_NET, "conn %p%s%s%s\n",
1165 (conn == NULL) ? "" : (test_bit (0, &conn->ksnc_tx_ready) ? " ready" : " blocked"),
1166 (conn == NULL) ? "" : (conn->ksnc_tx_scheduled ? " scheduled" : " idle"),
1167 (conn == NULL) ? "" : (list_empty (&conn->ksnc_tx_queue) ? " empty" : " queued"));
1170 if (!test_and_set_bit (0, &conn->ksnc_tx_ready)) {
1171 spin_lock_irqsave (&ktoenal_data.ksnd_sched_lock, flags);
1173 if (!list_empty (&conn->ksnc_tx_queue) && /* packets to send */
1174 !conn->ksnc_tx_scheduled) { /* not being progressed */
1176 list_add_tail (&conn->ksnc_tx_list,
1177 &ktoenal_data.ksnd_tx_conns);
1178 conn->ksnc_tx_scheduled = 1;
1179 /* extra ref for scheduler */
1180 atomic_inc (&conn->ksnc_refcount);
1182 if (waitqueue_active (&ktoenal_data.ksnd_sched_waitq))
1183 wake_up (&ktoenal_data.ksnd_sched_waitq);
1185 spin_unlock_irqrestore (&ktoenal_data.ksnd_sched_lock, flags);
1189 nal_cb_t ktoenal_lib = {
1190 nal_data: &ktoenal_data, /* NAL private data */
1191 cb_send: ktoenal_send,
1192 cb_recv: ktoenal_recv,
1193 cb_read: ktoenal_read,
1194 cb_write: ktoenal_write,
1195 cb_callback: ktoenal_callback,
1196 cb_malloc: ktoenal_malloc,
1197 cb_free: ktoenal_free,
1198 cb_printf: ktoenal_printf,
1199 cb_cli: ktoenal_cli,
1200 cb_sti: ktoenal_sti,
1201 cb_dist: ktoenal_dist