1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 atomic_t kqswnal_packets_launched;
30 atomic_t kqswnal_packets_transmitted;
31 atomic_t kqswnal_packets_received;
35 * LIB functions follow
39 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
42 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
43 nal->ni.nid, len, src_addr, dst_addr );
44 memcpy( dst_addr, src_addr, len );
50 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
53 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
54 nal->ni.nid, len, src_addr, dst_addr );
55 memcpy( dst_addr, src_addr, len );
61 kqswnal_malloc(nal_cb_t *nal, size_t len)
65 PORTAL_ALLOC(buf, len);
70 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
72 PORTAL_FREE(buf, len);
76 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
82 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
85 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
87 CDEBUG (D_NET, "%s", msg);
92 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
94 kqswnal_data_t *data= nal->nal_data;
96 spin_lock_irqsave(&data->kqn_statelock, *flags);
101 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
103 kqswnal_data_t *data= nal->nal_data;
105 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
110 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
112 if (nid == nal->ni.nid)
113 *dist = 0; /* it's me */
114 else if (kqswnal_nid2elanid (nid) >= 0)
115 *dist = 1; /* it's my peer */
117 *dist = 2; /* via router */
122 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
124 if (ktx->ktx_nmappedpages == 0)
127 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
128 ktx, ktx->ktx_niov, ktx->ktx_basepage, ktx->ktx_nmappedpages);
130 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
131 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
132 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
134 elan3_dvma_unload(kqswnal_data.kqn_epdev->DmaState,
135 kqswnal_data.kqn_eptxdmahandle,
136 ktx->ktx_basepage, ktx->ktx_nmappedpages);
137 ktx->ktx_nmappedpages = 0;
141 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
143 int nfrags = ktx->ktx_niov;
144 const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
145 int nmapped = ktx->ktx_nmappedpages;
146 int maxmapped = ktx->ktx_npages;
147 uint32_t basepage = ktx->ktx_basepage + nmapped;
150 LASSERT (nmapped <= maxmapped);
151 LASSERT (nfrags <= maxfrags);
156 int fraglen = kiov->kiov_len;
158 /* nob exactly spans the iovs */
159 LASSERT (fraglen <= nob);
160 /* each frag fits in a page */
161 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
164 if (nmapped > maxmapped) {
165 CERROR("Can't map message in %d pages (max %d)\n",
170 if (nfrags == maxfrags) {
171 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
176 /* XXX this is really crap, but we'll have to kmap until
177 * EKC has a page (rather than vaddr) mapping interface */
179 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
182 "%p[%d] loading %p for %d, page %d, %d total\n",
183 ktx, nfrags, ptr, fraglen, basepage, nmapped);
185 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
186 kqswnal_data.kqn_eptxdmahandle,
188 basepage, &ktx->ktx_iov[nfrags].Base);
190 kunmap (kiov->kiov_page);
192 /* keep in loop for failure case */
193 ktx->ktx_nmappedpages = nmapped;
195 if (nfrags > 0 && /* previous frag mapped */
196 ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
197 (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
198 /* just extend previous */
199 ktx->ktx_iov[nfrags - 1].Len += fraglen;
201 ktx->ktx_iov[nfrags].Len = fraglen;
202 nfrags++; /* new frag */
210 /* iov must not run out before end of data */
211 LASSERT (nob == 0 || niov > 0);
215 ktx->ktx_niov = nfrags;
216 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
217 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
223 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
225 int nfrags = ktx->ktx_niov;
226 const int maxfrags = sizeof (ktx->ktx_iov)/sizeof (ktx->ktx_iov[0]);
227 int nmapped = ktx->ktx_nmappedpages;
228 int maxmapped = ktx->ktx_npages;
229 uint32_t basepage = ktx->ktx_basepage + nmapped;
231 LASSERT (nmapped <= maxmapped);
232 LASSERT (nfrags <= maxfrags);
237 int fraglen = iov->iov_len;
238 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
240 /* nob exactly spans the iovs */
241 LASSERT (fraglen <= nob);
244 if (nmapped > maxmapped) {
245 CERROR("Can't map message in %d pages (max %d)\n",
250 if (nfrags == maxfrags) {
251 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
257 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
258 ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
261 elan3_dvma_kaddr_load (kqswnal_data.kqn_epdev->DmaState,
262 kqswnal_data.kqn_eptxdmahandle,
263 iov->iov_base, fraglen,
264 basepage, &ktx->ktx_iov[nfrags].Base);
265 /* keep in loop for failure case */
266 ktx->ktx_nmappedpages = nmapped;
268 if (nfrags > 0 && /* previous frag mapped */
269 ktx->ktx_iov[nfrags].Base == /* contiguous with this one */
270 (ktx->ktx_iov[nfrags-1].Base + ktx->ktx_iov[nfrags-1].Len))
271 /* just extend previous */
272 ktx->ktx_iov[nfrags - 1].Len += fraglen;
274 ktx->ktx_iov[nfrags].Len = fraglen;
275 nfrags++; /* new frag */
283 /* iov must not run out before end of data */
284 LASSERT (nob == 0 || niov > 0);
288 ktx->ktx_niov = nfrags;
289 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
290 ktx, ktx->ktx_niov, ktx->ktx_nmappedpages);
296 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
298 kpr_fwd_desc_t *fwd = NULL;
301 kqswnal_unmap_tx (ktx); /* release temporary mappings */
303 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
305 list_del (&ktx->ktx_list); /* take off active list */
307 if (ktx->ktx_isnblk) {
308 /* reserved for non-blocking tx */
309 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
310 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
314 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
316 /* anything blocking for a tx descriptor? */
317 if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
319 CDEBUG(D_NET,"wakeup fwd\n");
321 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
322 kpr_fwd_desc_t, kprfd_list);
323 list_del (&fwd->kprfd_list);
326 if (waitqueue_active (&kqswnal_data.kqn_idletxd_waitq)) /* process? */
328 /* local sender waiting for tx desc */
329 CDEBUG(D_NET,"wakeup process\n");
330 wake_up (&kqswnal_data.kqn_idletxd_waitq);
333 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
338 /* schedule packet for forwarding again */
339 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
341 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
342 if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
343 wake_up (&kqswnal_data.kqn_sched_waitq);
345 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
349 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
352 kqswnal_tx_t *ktx = NULL;
355 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
357 /* "normal" descriptor is free */
358 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
359 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
360 kqswnal_tx_t, ktx_list);
364 /* "normal" descriptor pool is empty */
366 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
367 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
368 list_add_tail (&fwd->kprfd_list,
369 &kqswnal_data.kqn_idletxd_fwdq);
373 /* doing a local transmit */
375 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
376 CERROR ("intr tx desc pool exhausted\n");
380 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
381 kqswnal_tx_t, ktx_list);
385 /* block for idle tx */
387 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
389 CDEBUG (D_NET, "blocking for tx desc\n");
390 wait_event (kqswnal_data.kqn_idletxd_waitq,
391 !list_empty (&kqswnal_data.kqn_idletxds));
395 list_del (&ktx->ktx_list);
396 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
397 ktx->ktx_launcher = current->pid;
400 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
402 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
403 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
408 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
410 if (ktx->ktx_forwarding) /* router asked me to forward this packet */
411 kpr_fwd_done (&kqswnal_data.kqn_router,
412 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
413 else /* packet sourced locally */
414 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
415 (lib_msg_t *)ktx->ktx_args[1]);
417 kqswnal_put_idle_tx (ktx);
421 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
423 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
427 LASSERT (txd != NULL);
428 LASSERT (ktx != NULL);
430 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
432 if (status == EP_SUCCESS)
433 atomic_inc (&kqswnal_packets_transmitted);
435 if (status != EP_SUCCESS)
437 CERROR ("Tx completion to "LPX64" failed: %d\n",
438 ktx->ktx_nid, status);
440 do_gettimeofday (&now);
441 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
443 kpr_notify (&kqswnal_data.kqn_router,
444 ktx->ktx_nid, 0, then);
449 kqswnal_tx_done (ktx, status);
453 kqswnal_launch (kqswnal_tx_t *ktx)
455 /* Don't block for transmit descriptor if we're in interrupt context */
456 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
457 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
461 ktx->ktx_launchtime = jiffies;
463 LASSERT (dest >= 0); /* must be a peer */
464 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
465 ktx->ktx_port, attr, kqswnal_txhandler,
466 ktx, ktx->ktx_iov, ktx->ktx_niov);
468 case 0: /* success */
469 atomic_inc (&kqswnal_packets_launched);
472 case ENOMEM: /* can't allocate ep txd => queue for later */
473 LASSERT (in_interrupt());
475 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
477 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
478 if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
479 wake_up (&kqswnal_data.kqn_sched_waitq);
481 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
484 default: /* fatal error */
485 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
487 /* Tell router I think a node is down */
488 kpr_notify (&kqswnal_data.kqn_router, ktx->ktx_nid,
489 0, ktx->ktx_launchtime);
495 hdr_type_string (ptl_hdr_t *hdr)
507 return ("<UNKNOWN>");
512 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
514 char *type_str = hdr_type_string (hdr);
516 CERROR("P3 Header at %p of type %s\n", hdr, type_str);
517 CERROR(" From nid/pid "LPU64"/%u", NTOH__u64(hdr->src_nid),
518 NTOH__u32(hdr->src_pid));
519 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
520 NTOH__u32(hdr->dest_pid));
522 switch (NTOH__u32(hdr->type)) {
524 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
525 "match bits "LPX64"\n",
526 NTOH__u32 (hdr->msg.put.ptl_index),
527 hdr->msg.put.ack_wmd.wh_interface_cookie,
528 hdr->msg.put.ack_wmd.wh_object_cookie,
529 NTOH__u64 (hdr->msg.put.match_bits));
530 CERROR(" Length %d, offset %d, hdr data "LPX64"\n",
531 NTOH__u32(PTL_HDR_LENGTH(hdr)),
532 NTOH__u32(hdr->msg.put.offset),
533 hdr->msg.put.hdr_data);
537 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
538 "match bits "LPX64"\n",
539 NTOH__u32 (hdr->msg.get.ptl_index),
540 hdr->msg.get.return_wmd.wh_interface_cookie,
541 hdr->msg.get.return_wmd.wh_object_cookie,
542 hdr->msg.get.match_bits);
543 CERROR(" Length %d, src offset %d\n",
544 NTOH__u32 (hdr->msg.get.sink_length),
545 NTOH__u32 (hdr->msg.get.src_offset));
549 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
550 hdr->msg.ack.dst_wmd.wh_interface_cookie,
551 hdr->msg.ack.dst_wmd.wh_object_cookie,
552 NTOH__u32 (hdr->msg.ack.mlength));
556 CERROR(" dst md "LPX64"."LPX64", length %d\n",
557 hdr->msg.reply.dst_wmd.wh_interface_cookie,
558 hdr->msg.reply.dst_wmd.wh_object_cookie,
559 NTOH__u32 (PTL_HDR_LENGTH(hdr)));
562 } /* end of print_hdr() */
565 kqswnal_sendmsg (nal_cb_t *nal,
572 unsigned int payload_niov,
573 struct iovec *payload_iov,
574 ptl_kiov_t *payload_kiov,
579 ptl_nid_t gatewaynid;
586 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
587 " pid %u\n", payload_nob, payload_niov, nid, pid);
589 LASSERT (payload_nob == 0 || payload_niov > 0);
590 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
592 /* It must be OK to kmap() if required */
593 LASSERT (payload_kiov == NULL || !in_interrupt ());
594 /* payload is either all vaddrs or all pages */
595 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
597 if (payload_nob > KQSW_MAXPAYLOAD) {
598 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
599 payload_nob, KQSW_MAXPAYLOAD);
603 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
604 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
605 sizeof (ptl_hdr_t) + payload_nob, &gatewaynid);
607 CERROR("Can't route to "LPX64": router error %d\n",
611 if (kqswnal_nid2elanid (gatewaynid) < 0) {
612 CERROR("Bad gateway "LPX64" for "LPX64"\n",
619 /* I may not block for a transmit descriptor if I might block the
620 * receiver, or an interrupt handler. */
621 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
622 type == PTL_MSG_REPLY ||
625 kqswnal_cerror_hdr (hdr);
626 return (PTL_NOSPACE);
629 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
630 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
633 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
634 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
635 for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
636 if (payload_kiov != NULL) {
637 ptl_kiov_t *kiov = &payload_kiov[i];
638 char *addr = ((char *)kmap (kiov->kiov_page)) +
641 csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
642 sumnob -= kiov->kiov_len;
644 struct iovec *iov = &payload_iov[i];
646 csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
647 sumnob -= iov->iov_len;
650 memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
653 /* Set up first frag from pre-mapped buffer (it's at least the
655 ktx->ktx_iov[0].Base = ktx->ktx_ebuffer;
656 ktx->ktx_iov[0].Len = KQSW_HDR_SIZE;
659 if (payload_nob > 0) { /* got some payload (something more to do) */
660 /* make a single contiguous message? */
661 if (payload_nob <= KQSW_TX_MAXCONTIG) {
662 /* copy payload to ktx_buffer, immediately after hdr */
663 if (payload_kiov != NULL)
664 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
665 payload_niov, payload_kiov, payload_nob);
667 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
668 payload_niov, payload_iov, payload_nob);
669 /* first frag includes payload */
670 ktx->ktx_iov[0].Len += payload_nob;
672 if (payload_kiov != NULL)
673 rc = kqswnal_map_tx_kiov (ktx, payload_nob,
674 payload_niov, payload_kiov);
676 rc = kqswnal_map_tx_iov (ktx, payload_nob,
677 payload_niov, payload_iov);
679 kqswnal_put_idle_tx (ktx);
685 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
686 EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
688 ktx->ktx_forwarding = 0; /* => lib_finalize() on completion */
689 ktx->ktx_args[0] = private;
690 ktx->ktx_args[1] = cookie;
692 rc = kqswnal_launch (ktx);
693 if (rc != 0) { /* failed? */
694 CERROR ("Failed to send packet to "LPX64": %d\n", nid, rc);
695 kqswnal_put_idle_tx (ktx);
699 CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64"\n", payload_nob, nid);
704 kqswnal_send (nal_cb_t *nal,
711 unsigned int payload_niov,
712 struct iovec *payload_iov,
715 return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
716 payload_niov, payload_iov, NULL, payload_nob));
720 kqswnal_send_pages (nal_cb_t *nal,
727 unsigned int payload_niov,
728 ptl_kiov_t *payload_kiov,
731 return (kqswnal_sendmsg (nal, private, cookie, hdr, type, nid, pid,
732 payload_niov, NULL, payload_kiov, payload_nob));
735 int kqswnal_fwd_copy_contig = 0;
738 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
742 struct iovec *iov = fwd->kprfd_iov;
743 int niov = fwd->kprfd_niov;
744 int nob = fwd->kprfd_nob;
745 ptl_nid_t nid = fwd->kprfd_gateway_nid;
748 CERROR ("checksums for forwarded packets not implemented\n");
751 /* The router wants this NAL to forward a packet */
752 CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
753 fwd, nid, niov, nob);
757 ktx = kqswnal_get_idle_tx (fwd, FALSE);
758 if (ktx == NULL) /* can't get txd right now */
759 return; /* fwd will be scheduled when tx desc freed */
761 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
762 nid = fwd->kprfd_target_nid; /* target is final dest */
764 if (kqswnal_nid2elanid (nid) < 0) {
765 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
770 if (nob > KQSW_NRXMSGBYTES_LARGE) {
771 CERROR ("Can't forward [%p] to "LPX64
772 ": size %d bigger than max packet size %ld\n",
773 fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
778 if ((kqswnal_fwd_copy_contig || niov > 1) &&
779 nob <= KQSW_TX_BUFFER_SIZE)
781 /* send from ktx's pre-allocated/mapped contiguous buffer? */
782 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
783 ktx->ktx_iov[0].Base = ktx->ktx_ebuffer; /* already mapped */
784 ktx->ktx_iov[0].Len = nob;
787 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
792 ktx->ktx_niov = 0; /* no frags mapped yet */
793 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
797 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
800 ktx->ktx_port = (nob <= (sizeof (ptl_hdr_t) + KQSW_SMALLPAYLOAD)) ?
801 EP_SVC_LARGE_PORTALS_SMALL : EP_SVC_LARGE_PORTALS_LARGE;
803 ktx->ktx_forwarding = 1;
804 ktx->ktx_args[0] = fwd;
806 rc = kqswnal_launch (ktx);
812 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
814 kqswnal_put_idle_tx (ktx);
815 /* complete now (with failure) */
816 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
820 kqswnal_fwd_callback (void *arg, int error)
822 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
824 /* The router has finished forwarding this packet */
828 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
830 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
831 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
834 kqswnal_requeue_rx (krx);
838 kqswnal_rx (kqswnal_rx_t *krx)
840 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
841 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
845 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
846 /* NB krx requeued when lib_parse() calls back kqswnal_recv */
847 lib_parse (&kqswnal_lib, hdr, krx);
852 CERROR ("checksums for forwarded packets not implemented\n");
855 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
857 CERROR("dropping packet from "LPX64" for "LPX64
858 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
859 kqswnal_requeue_rx (krx);
863 /* NB forwarding may destroy iov; rebuild every time */
864 for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
866 LASSERT (niov < krx->krx_npages);
867 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
868 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
871 kpr_fwd_init (&krx->krx_fwd, dest_nid,
872 krx->krx_nob, niov, krx->krx_iov,
873 kqswnal_fwd_callback, krx);
875 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
878 /* Receive Interrupt Handler: posts to schedulers */
880 kqswnal_rxhandler(EP_RXD *rxd)
883 int nob = ep_rxd_len (rxd);
884 int status = ep_rxd_status (rxd);
885 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
887 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
888 rxd, krx, nob, status);
890 LASSERT (krx != NULL);
895 /* must receive a whole header to be able to parse */
896 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
898 /* receives complete with failure when receiver is removed */
899 if (kqswnal_data.kqn_shuttingdown)
902 CERROR("receive status failed with status %d nob %d\n",
903 ep_rxd_status(rxd), nob);
904 kqswnal_requeue_rx (krx);
908 atomic_inc (&kqswnal_packets_received);
910 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
912 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
913 if (waitqueue_active (&kqswnal_data.kqn_sched_waitq))
914 wake_up (&kqswnal_data.kqn_sched_waitq);
916 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
921 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
923 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
925 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
926 ", dpid %d, spid %d, type %d\n",
927 ishdr ? "Header" : "Payload", krx,
928 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
929 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
930 NTOH__u32(hdr->type));
932 switch (NTOH__u32 (hdr->type))
935 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
937 NTOH__u32(hdr->msg.ack.mlength),
938 hdr->msg.ack.dst_wmd.handle_cookie,
939 hdr->msg.ack.dst_wmd.handle_idx,
940 NTOH__u64(hdr->msg.ack.match_bits),
941 NTOH__u32(hdr->msg.ack.length));
944 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
945 " len %u off %u data "LPX64"\n",
946 NTOH__u32(hdr->msg.put.ptl_index),
947 hdr->msg.put.ack_wmd.handle_cookie,
948 hdr->msg.put.ack_wmd.handle_idx,
949 NTOH__u64(hdr->msg.put.match_bits),
950 NTOH__u32(hdr->msg.put.length),
951 NTOH__u32(hdr->msg.put.offset),
952 hdr->msg.put.hdr_data);
955 CERROR ("GET: <>\n");
958 CERROR ("REPLY: <>\n");
961 CERROR ("TYPE?: <>\n");
967 kqswnal_recvmsg (nal_cb_t *nal,
976 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
984 kqsw_csum_t senders_csum;
985 kqsw_csum_t payload_csum = 0;
986 kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
988 size_t csum_len = mlen;
991 static atomic_t csum_counter;
992 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
994 atomic_inc (&csum_counter);
996 memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
997 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
998 if (senders_csum != hdr_csum)
999 kqswnal_csum_error (krx, 1);
1001 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1003 /* What was actually received must be >= payload.
1004 * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
1005 LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1006 LASSERT (mlen <= rlen);
1008 /* It must be OK to kmap() if required */
1009 LASSERT (kiov == NULL || !in_interrupt ());
1010 /* Either all pages or all vaddrs */
1011 LASSERT (!(kiov != NULL && iov != NULL));
1016 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1018 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1022 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1023 iov_nob = kiov->kiov_len;
1025 iov_ptr = iov->iov_base;
1026 iov_nob = iov->iov_len;
1031 /* We expect the iov to exactly match mlen */
1032 LASSERT (iov_nob <= mlen);
1034 frag = MIN (page_nob, iov_nob);
1035 memcpy (iov_ptr, page_ptr, frag);
1037 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1051 LASSERT (page < krx->krx_npages);
1052 page_ptr = page_address(krx->krx_pages[page]);
1053 page_nob = PAGE_SIZE;
1059 else if (kiov != NULL) {
1060 kunmap (kiov->kiov_page);
1064 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1065 iov_nob = kiov->kiov_len;
1070 iov_ptr = iov->iov_base;
1071 iov_nob = iov->iov_len;
1076 kunmap (kiov->kiov_page);
1080 memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1081 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1083 if (csum_len != rlen)
1084 CERROR("Unable to checksum data in user's buffer\n");
1085 else if (senders_csum != payload_csum)
1086 kqswnal_csum_error (krx, 0);
1089 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1091 hdr_csum, payload_csum, csum_frags, csum_nob);
1093 lib_finalize(nal, private, cookie);
1095 kqswnal_requeue_rx (krx);
1101 kqswnal_recv(nal_cb_t *nal,
1109 return (kqswnal_recvmsg (nal, private, cookie, niov, iov, NULL, mlen, rlen));
1113 kqswnal_recv_pages (nal_cb_t *nal,
1121 return (kqswnal_recvmsg (nal, private, cookie, niov, NULL, kiov, mlen, rlen));
1125 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1127 long pid = kernel_thread (fn, arg, 0);
1132 atomic_inc (&kqswnal_data.kqn_nthreads);
1137 kqswnal_thread_fini (void)
1139 atomic_dec (&kqswnal_data.kqn_nthreads);
1143 kqswnal_scheduler (void *arg)
1147 kpr_fwd_desc_t *fwd;
1153 kportal_daemonize ("kqswnal_sched");
1154 kportal_blockallsigs ();
1156 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1158 while (!kqswnal_data.kqn_shuttingdown)
1160 did_something = FALSE;
1162 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1164 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1165 kqswnal_rx_t, krx_list);
1166 list_del (&krx->krx_list);
1167 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1172 did_something = TRUE;
1173 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1176 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1178 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1179 kqswnal_tx_t, ktx_list);
1180 list_del_init (&ktx->ktx_delayed_list);
1181 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1184 rc = kqswnal_launch (ktx);
1185 if (rc != 0) /* failed: ktx_nid down? */
1187 CERROR("Failed delayed transmit to "LPX64
1188 ": %d\n", ktx->ktx_nid, rc);
1189 kqswnal_tx_done (ktx, rc);
1192 did_something = TRUE;
1193 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1196 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1198 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1199 list_del (&fwd->kprfd_list);
1200 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1202 kqswnal_fwd_packet (NULL, fwd);
1204 did_something = TRUE;
1205 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1208 /* nothing to do or hogging CPU */
1209 if (!did_something || counter++ == KQSW_RESCHED) {
1210 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1215 if (!did_something) {
1216 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1217 kqswnal_data.kqn_shuttingdown ||
1218 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1219 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1220 !list_empty(&kqswnal_data.kqn_delayedfwds));
1222 } else if (current->need_resched)
1225 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1229 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1231 kqswnal_thread_fini ();
1235 nal_cb_t kqswnal_lib =
1237 nal_data: &kqswnal_data, /* NAL private data */
1238 cb_send: kqswnal_send,
1239 cb_send_pages: kqswnal_send_pages,
1240 cb_recv: kqswnal_recv,
1241 cb_recv_pages: kqswnal_recv_pages,
1242 cb_read: kqswnal_read,
1243 cb_write: kqswnal_write,
1244 cb_malloc: kqswnal_malloc,
1245 cb_free: kqswnal_free,
1246 cb_printf: kqswnal_printf,
1247 cb_cli: kqswnal_cli,
1248 cb_sti: kqswnal_sti,
1249 cb_dist: kqswnal_dist