1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 EP_STATUSBLK kqswnal_rpc_success;
30 EP_STATUSBLK kqswnal_rpc_failed;
33 * LIB functions follow
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41 nal->ni.nid, len, src_addr, dst_addr );
42 memcpy( dst_addr, src_addr, len );
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52 nal->ni.nid, len, src_addr, dst_addr );
53 memcpy( dst_addr, src_addr, len );
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
63 PORTAL_ALLOC(buf, len);
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
70 PORTAL_FREE(buf, len);
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
80 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
83 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
85 CDEBUG (D_NET, "%s", msg);
90 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
92 kqswnal_data_t *data= nal->nal_data;
94 spin_lock_irqsave(&data->kqn_statelock, *flags);
99 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
101 kqswnal_data_t *data= nal->nal_data;
103 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
108 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
110 if (nid == nal->ni.nid)
111 *dist = 0; /* it's me */
112 else if (kqswnal_nid2elanid (nid) >= 0)
113 *dist = 1; /* it's my peer */
115 *dist = 2; /* via router */
120 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
125 do_gettimeofday (&now);
126 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
128 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
132 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
138 if (ktx->ktx_nmappedpages == 0)
142 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
143 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
145 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
146 ep_dvma_unload(kqswnal_data.kqn_ep,
147 kqswnal_data.kqn_ep_tx_nmh,
150 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
151 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
153 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
154 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
155 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
157 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
158 kqswnal_data.kqn_eptxdmahandle,
159 ktx->ktx_basepage, ktx->ktx_nmappedpages);
161 ktx->ktx_nmappedpages = 0;
165 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
167 int nfrags = ktx->ktx_nfrag;
168 int nmapped = ktx->ktx_nmappedpages;
169 int maxmapped = ktx->ktx_npages;
170 uint32_t basepage = ktx->ktx_basepage + nmapped;
173 EP_RAILMASK railmask;
174 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
176 kqswnal_nid2elanid(ktx->ktx_nid));
179 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
182 railmask = 1 << rail;
184 LASSERT (nmapped <= maxmapped);
185 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
186 LASSERT (nfrags <= EP_MAXFRAG);
190 /* skip complete frags before 'offset' */
191 while (offset >= kiov->kiov_len) {
192 offset -= kiov->kiov_len;
199 int fraglen = kiov->kiov_len - offset;
201 /* nob exactly spans the iovs */
202 LASSERT (fraglen <= nob);
203 /* each frag fits in a page */
204 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
207 if (nmapped > maxmapped) {
208 CERROR("Can't map message in %d pages (max %d)\n",
213 if (nfrags == EP_MAXFRAG) {
214 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
219 /* XXX this is really crap, but we'll have to kmap until
220 * EKC has a page (rather than vaddr) mapping interface */
222 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
225 "%p[%d] loading %p for %d, page %d, %d total\n",
226 ktx, nfrags, ptr, fraglen, basepage, nmapped);
229 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
231 kqswnal_data.kqn_ep_tx_nmh, basepage,
232 &railmask, &ktx->ktx_frags[nfrags]);
234 if (nfrags == ktx->ktx_firsttmpfrag ||
235 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
236 &ktx->ktx_frags[nfrags - 1],
237 &ktx->ktx_frags[nfrags])) {
238 /* new frag if this is the first or can't merge */
242 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
243 kqswnal_data.kqn_eptxdmahandle,
245 basepage, &ktx->ktx_frags[nfrags].Base);
247 if (nfrags > 0 && /* previous frag mapped */
248 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
249 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
250 /* just extend previous */
251 ktx->ktx_frags[nfrags - 1].Len += fraglen;
253 ktx->ktx_frags[nfrags].Len = fraglen;
254 nfrags++; /* new frag */
258 kunmap (kiov->kiov_page);
260 /* keep in loop for failure case */
261 ktx->ktx_nmappedpages = nmapped;
269 /* iov must not run out before end of data */
270 LASSERT (nob == 0 || niov > 0);
274 ktx->ktx_nfrag = nfrags;
275 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
276 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
282 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
283 int niov, struct iovec *iov)
285 int nfrags = ktx->ktx_nfrag;
286 int nmapped = ktx->ktx_nmappedpages;
287 int maxmapped = ktx->ktx_npages;
288 uint32_t basepage = ktx->ktx_basepage + nmapped;
290 EP_RAILMASK railmask;
291 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
293 kqswnal_nid2elanid(ktx->ktx_nid));
296 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
299 railmask = 1 << rail;
301 LASSERT (nmapped <= maxmapped);
302 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
303 LASSERT (nfrags <= EP_MAXFRAG);
307 /* skip complete frags before offset */
308 while (offset >= iov->iov_len) {
309 offset -= iov->iov_len;
316 int fraglen = iov->iov_len - offset;
317 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
319 /* nob exactly spans the iovs */
320 LASSERT (fraglen <= nob);
323 if (nmapped > maxmapped) {
324 CERROR("Can't map message in %d pages (max %d)\n",
329 if (nfrags == EP_MAXFRAG) {
330 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
336 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
337 ktx, nfrags, iov->iov_base + offset, fraglen,
338 basepage, npages, nmapped);
341 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
342 iov->iov_base + offset, fraglen,
343 kqswnal_data.kqn_ep_tx_nmh, basepage,
344 &railmask, &ktx->ktx_frags[nfrags]);
346 if (nfrags == ktx->ktx_firsttmpfrag ||
347 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
348 &ktx->ktx_frags[nfrags - 1],
349 &ktx->ktx_frags[nfrags])) {
350 /* new frag if this is the first or can't merge */
354 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
355 kqswnal_data.kqn_eptxdmahandle,
356 iov->iov_base + offset, fraglen,
357 basepage, &ktx->ktx_frags[nfrags].Base);
359 if (nfrags > 0 && /* previous frag mapped */
360 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
361 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
362 /* just extend previous */
363 ktx->ktx_frags[nfrags - 1].Len += fraglen;
365 ktx->ktx_frags[nfrags].Len = fraglen;
366 nfrags++; /* new frag */
370 /* keep in loop for failure case */
371 ktx->ktx_nmappedpages = nmapped;
379 /* iov must not run out before end of data */
380 LASSERT (nob == 0 || niov > 0);
384 ktx->ktx_nfrag = nfrags;
385 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
386 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
393 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
395 kpr_fwd_desc_t *fwd = NULL;
398 kqswnal_unmap_tx (ktx); /* release temporary mappings */
399 ktx->ktx_state = KTX_IDLE;
401 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
403 list_del (&ktx->ktx_list); /* take off active list */
405 if (ktx->ktx_isnblk) {
406 /* reserved for non-blocking tx */
407 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
408 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
412 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
414 /* anything blocking for a tx descriptor? */
415 if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
417 CDEBUG(D_NET,"wakeup fwd\n");
419 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
420 kpr_fwd_desc_t, kprfd_list);
421 list_del (&fwd->kprfd_list);
424 wake_up (&kqswnal_data.kqn_idletxd_waitq);
426 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
431 /* schedule packet for forwarding again */
432 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
434 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
435 wake_up (&kqswnal_data.kqn_sched_waitq);
437 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
441 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
444 kqswnal_tx_t *ktx = NULL;
447 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
449 /* "normal" descriptor is free */
450 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
451 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
452 kqswnal_tx_t, ktx_list);
456 /* "normal" descriptor pool is empty */
458 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
459 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
460 list_add_tail (&fwd->kprfd_list,
461 &kqswnal_data.kqn_idletxd_fwdq);
465 /* doing a local transmit */
467 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
468 CERROR ("intr tx desc pool exhausted\n");
472 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
473 kqswnal_tx_t, ktx_list);
477 /* block for idle tx */
479 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
481 CDEBUG (D_NET, "blocking for tx desc\n");
482 wait_event (kqswnal_data.kqn_idletxd_waitq,
483 !list_empty (&kqswnal_data.kqn_idletxds));
487 list_del (&ktx->ktx_list);
488 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
489 ktx->ktx_launcher = current->pid;
492 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
494 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
495 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
501 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
504 lib_msg_t *repmsg = NULL;
506 switch (ktx->ktx_state) {
507 case KTX_FORWARDING: /* router asked me to forward this packet */
508 kpr_fwd_done (&kqswnal_data.kqn_router,
509 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
512 case KTX_SENDING: /* packet sourced locally */
513 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
514 (lib_msg_t *)ktx->ktx_args[1],
515 (error == 0) ? PTL_OK :
516 (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
519 case KTX_GETTING: /* Peer has DMA-ed direct? */
520 msg = (lib_msg_t *)ktx->ktx_args[1];
523 repmsg = lib_fake_reply_msg (&kqswnal_lib,
524 ktx->ktx_nid, msg->md);
530 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
532 lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
534 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
535 (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
543 kqswnal_put_idle_tx (ktx);
547 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
549 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
551 LASSERT (txd != NULL);
552 LASSERT (ktx != NULL);
554 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
556 if (status != EP_SUCCESS) {
558 CERROR ("Tx completion to "LPX64" failed: %d\n",
559 ktx->ktx_nid, status);
561 kqswnal_notify_peer_down(ktx);
564 } else if (ktx->ktx_state == KTX_GETTING) {
565 /* RPC completed OK; what did our peer put in the status
568 status = ep_txd_statusblk(txd)->Data[0];
570 status = ep_txd_statusblk(txd)->Status;
576 kqswnal_tx_done (ktx, status);
580 kqswnal_launch (kqswnal_tx_t *ktx)
582 /* Don't block for transmit descriptor if we're in interrupt context */
583 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
584 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
588 ktx->ktx_launchtime = jiffies;
590 LASSERT (dest >= 0); /* must be a peer */
591 if (ktx->ktx_state == KTX_GETTING) {
592 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
593 * other frags are the GET sink which we obviously don't
596 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
598 kqswnal_txhandler, ktx,
599 NULL, ktx->ktx_frags, 1);
601 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
602 ktx->ktx_port, attr, kqswnal_txhandler,
603 ktx, NULL, ktx->ktx_frags, 1);
607 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
609 kqswnal_txhandler, ktx,
610 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
612 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
614 kqswnal_txhandler, ktx,
615 ktx->ktx_frags, ktx->ktx_nfrag);
620 case EP_SUCCESS: /* success */
623 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
624 LASSERT (in_interrupt());
626 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
628 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
629 wake_up (&kqswnal_data.kqn_sched_waitq);
631 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
634 default: /* fatal error */
635 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
636 kqswnal_notify_peer_down(ktx);
637 return (-EHOSTUNREACH);
642 hdr_type_string (ptl_hdr_t *hdr)
654 return ("<UNKNOWN>");
659 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
661 char *type_str = hdr_type_string (hdr);
663 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
664 NTOH__u32(hdr->payload_length));
665 CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
666 NTOH__u32(hdr->src_pid));
667 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
668 NTOH__u32(hdr->dest_pid));
670 switch (NTOH__u32(hdr->type)) {
672 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
673 "match bits "LPX64"\n",
674 NTOH__u32 (hdr->msg.put.ptl_index),
675 hdr->msg.put.ack_wmd.wh_interface_cookie,
676 hdr->msg.put.ack_wmd.wh_object_cookie,
677 NTOH__u64 (hdr->msg.put.match_bits));
678 CERROR(" offset %d, hdr data "LPX64"\n",
679 NTOH__u32(hdr->msg.put.offset),
680 hdr->msg.put.hdr_data);
684 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
685 "match bits "LPX64"\n",
686 NTOH__u32 (hdr->msg.get.ptl_index),
687 hdr->msg.get.return_wmd.wh_interface_cookie,
688 hdr->msg.get.return_wmd.wh_object_cookie,
689 hdr->msg.get.match_bits);
690 CERROR(" Length %d, src offset %d\n",
691 NTOH__u32 (hdr->msg.get.sink_length),
692 NTOH__u32 (hdr->msg.get.src_offset));
696 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
697 hdr->msg.ack.dst_wmd.wh_interface_cookie,
698 hdr->msg.ack.dst_wmd.wh_object_cookie,
699 NTOH__u32 (hdr->msg.ack.mlength));
703 CERROR(" dst md "LPX64"."LPX64"\n",
704 hdr->msg.reply.dst_wmd.wh_interface_cookie,
705 hdr->msg.reply.dst_wmd.wh_object_cookie);
708 } /* end of print_hdr() */
712 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
716 CDEBUG (how, "%s: %d\n", str, n);
717 for (i = 0; i < n; i++) {
718 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
723 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
724 int nsrc, EP_IOVEC *src,
725 int ndst, EP_IOVEC *dst)
734 for (count = 0; count < ndv; count++, dv++) {
736 if (nsrc == 0 || ndst == 0) {
738 /* For now I'll barf on any left over entries */
739 CERROR ("mismatched src and dst iovs\n");
745 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
747 dv->Source = src->Base;
748 dv->Dest = dst->Base;
750 if (nob >= src->Len) {
758 if (nob >= dst->Len) {
767 CERROR ("DATAVEC too small\n");
773 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
774 struct iovec *iov, ptl_kiov_t *kiov,
777 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
778 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
779 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
784 EP_DATAVEC datav[EP_MAXFRAG];
787 LASSERT (krx->krx_rpc_reply_needed);
788 LASSERT ((iov == NULL) != (kiov == NULL));
790 /* see kqswnal_sendmsg comment regarding endian-ness */
791 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
792 /* msg too small to discover rmd size */
793 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
794 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
798 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
799 /* rmd doesn't fit in the incoming message */
800 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
801 krx->krx_nob, rmd->kqrmd_nfrag,
802 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
806 /* Map the source data... */
807 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
809 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
811 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
814 CERROR ("Can't map source data: %d\n", rc);
819 if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
820 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
821 ktx->ktx_nfrag, rmd->kqrmd_nfrag);
825 for (i = 0; i < rmd->kqrmd_nfrag; i++)
826 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
827 CERROR("Can't cope with unequal frags %d(%d):"
828 " %d local %d remote\n",
830 ktx->ktx_frags[i].nmd_len,
831 rmd->kqrmd_frag[i].nmd_len);
835 ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
836 ktx->ktx_nfrag, ktx->ktx_frags,
837 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
839 CERROR ("Can't create datavec: %d\n", ndatav);
844 /* Our caller will start to race with kqswnal_dma_reply_complete... */
845 LASSERT (atomic_read (&krx->krx_refcount) == 1);
846 atomic_set (&krx->krx_refcount, 2);
849 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
850 &kqswnal_rpc_success,
851 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
852 if (rc == EP_SUCCESS)
855 /* Well we tried... */
856 krx->krx_rpc_reply_needed = 0;
858 rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
859 &kqswnal_rpc_success, datav, ndatav);
860 if (rc == EP_SUCCESS)
863 /* "old" EKC destroys rxd on failed completion */
867 CERROR("can't complete RPC: %d\n", rc);
869 /* reset refcount back to 1: we're not going to be racing with
870 * kqswnal_dma_reply_complete. */
871 atomic_set (&krx->krx_refcount, 1);
873 return (-ECONNABORTED);
877 kqswnal_sendmsg (nal_cb_t *nal,
884 unsigned int payload_niov,
885 struct iovec *payload_iov,
886 ptl_kiov_t *payload_kiov,
887 size_t payload_offset,
900 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
901 " pid %u\n", payload_nob, payload_niov, nid, pid);
903 LASSERT (payload_nob == 0 || payload_niov > 0);
904 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
906 /* It must be OK to kmap() if required */
907 LASSERT (payload_kiov == NULL || !in_interrupt ());
908 /* payload is either all vaddrs or all pages */
909 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
911 if (payload_nob > KQSW_MAXPAYLOAD) {
912 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
913 payload_nob, KQSW_MAXPAYLOAD);
918 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
919 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
920 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
922 CERROR("Can't route to "LPX64": router error %d\n",
926 if (kqswnal_nid2elanid (targetnid) < 0) {
927 CERROR("Bad gateway "LPX64" for "LPX64"\n",
933 /* I may not block for a transmit descriptor if I might block the
934 * receiver, or an interrupt handler. */
935 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
936 type == PTL_MSG_REPLY ||
939 kqswnal_cerror_hdr (hdr);
940 return (PTL_NOSPACE);
943 ktx->ktx_nid = targetnid;
944 ktx->ktx_args[0] = private;
945 ktx->ktx_args[1] = libmsg;
947 if (type == PTL_MSG_REPLY &&
948 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
949 if (nid != targetnid ||
950 kqswnal_nid2elanid(nid) !=
951 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
952 CERROR("Optimized reply nid conflict: "
953 "nid "LPX64" via "LPX64" elanID %d\n",
955 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
959 /* peer expects RPC completion with GET data */
960 rc = kqswnal_dma_reply (ktx, payload_niov,
961 payload_iov, payload_kiov,
962 payload_offset, payload_nob);
966 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
967 kqswnal_put_idle_tx (ktx);
971 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
972 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
975 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
976 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
977 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
979 if (payload_kiov != NULL) {
980 ptl_kiov_t *kiov = &payload_kiov[i];
982 if (sumoff >= kiov->kiov_len) {
983 sumoff -= kiov->kiov_len;
985 char *addr = ((char *)kmap (kiov->kiov_page)) +
986 kiov->kiov_offset + sumoff;
987 int fragnob = kiov->kiov_len - sumoff;
989 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
992 kunmap(kiov->kiov_page);
995 struct iovec *iov = &payload_iov[i];
997 if (sumoff > iov->iov_len) {
998 sumoff -= iov->iov_len;
1000 char *addr = iov->iov_base + sumoff;
1001 int fragnob = iov->iov_len - sumoff;
1003 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1009 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1012 if (kqswnal_data.kqn_optimized_gets &&
1013 type == PTL_MSG_GET && /* doing a GET */
1014 nid == targetnid) { /* not forwarding */
1015 lib_md_t *md = libmsg->md;
1016 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1018 /* Optimised path: I send over the Elan vaddrs of the get
1019 * sink buffers, and my peer DMAs directly into them.
1021 * First I set up ktx as if it was going to send this
1022 * payload, (it needs to map it anyway). This fills
1023 * ktx_frags[1] and onward with the network addresses
1024 * of the GET sink frags. I copy these into ktx_buffer,
1025 * immediately after the header, and send that as my GET
1028 * Note that the addresses are sent in native endian-ness.
1029 * When EKC copes with different endian nodes, I'll fix
1030 * this (and eat my hat :) */
1032 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1033 ktx->ktx_state = KTX_GETTING;
1035 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1036 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1037 md->md_niov, md->md_iov.kiov);
1039 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1040 md->md_niov, md->md_iov.iov);
1043 kqswnal_put_idle_tx (ktx);
1047 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1049 payload_nob = offsetof(kqswnal_remotemd_t,
1050 kqrmd_frag[rmd->kqrmd_nfrag]);
1051 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1054 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1055 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1057 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1058 0, KQSW_HDR_SIZE + payload_nob);
1060 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1061 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1063 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1064 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1066 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1068 /* small message: single frag copied into the pre-mapped buffer */
1070 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1071 ktx->ktx_state = KTX_SENDING;
1073 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1074 0, KQSW_HDR_SIZE + payload_nob);
1076 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1077 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1079 if (payload_nob > 0) {
1080 if (payload_kiov != NULL)
1081 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1082 payload_niov, payload_kiov,
1083 payload_offset, payload_nob);
1085 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1086 payload_niov, payload_iov,
1087 payload_offset, payload_nob);
1091 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1093 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1094 ktx->ktx_state = KTX_SENDING;
1096 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1099 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1100 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1102 if (payload_kiov != NULL)
1103 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1104 payload_niov, payload_kiov);
1106 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1107 payload_niov, payload_iov);
1109 kqswnal_put_idle_tx (ktx);
1114 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1115 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1117 rc = kqswnal_launch (ktx);
1118 if (rc != 0) { /* failed? */
1119 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1120 kqswnal_put_idle_tx (ktx);
1124 CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
1125 payload_nob, nid, targetnid);
1130 kqswnal_send (nal_cb_t *nal,
1137 unsigned int payload_niov,
1138 struct iovec *payload_iov,
1139 size_t payload_offset,
1142 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1143 payload_niov, payload_iov, NULL,
1144 payload_offset, payload_nob));
1148 kqswnal_send_pages (nal_cb_t *nal,
1155 unsigned int payload_niov,
1156 ptl_kiov_t *payload_kiov,
1157 size_t payload_offset,
1160 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1161 payload_niov, NULL, payload_kiov,
1162 payload_offset, payload_nob));
1166 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1170 ptl_kiov_t *kiov = fwd->kprfd_kiov;
1171 int niov = fwd->kprfd_niov;
1172 int nob = fwd->kprfd_nob;
1173 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1176 CERROR ("checksums for forwarded packets not implemented\n");
1179 /* The router wants this NAL to forward a packet */
1180 CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1181 fwd, nid, niov, nob);
1183 ktx = kqswnal_get_idle_tx (fwd, 0);
1184 if (ktx == NULL) /* can't get txd right now */
1185 return; /* fwd will be scheduled when tx desc freed */
1187 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
1188 nid = fwd->kprfd_target_nid; /* target is final dest */
1190 if (kqswnal_nid2elanid (nid) < 0) {
1191 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1196 /* copy hdr into pre-mapped buffer */
1197 memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1198 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1200 ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
1201 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1203 ktx->ktx_state = KTX_FORWARDING;
1204 ktx->ktx_args[0] = fwd;
1205 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1207 if (nob <= KQSW_TX_MAXCONTIG)
1209 /* send payload from ktx's pre-mapped contiguous buffer */
1211 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1212 0, KQSW_HDR_SIZE + nob);
1214 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1215 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1218 lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1219 niov, kiov, 0, nob);
1223 /* zero copy payload */
1225 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1228 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1229 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1231 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1236 rc = kqswnal_launch (ktx);
1242 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1244 kqswnal_put_idle_tx (ktx);
1245 /* complete now (with failure) */
1246 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1250 kqswnal_fwd_callback (void *arg, int error)
1252 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1254 /* The router has finished forwarding this packet */
1258 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1260 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1261 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1264 kqswnal_requeue_rx (krx);
1268 kqswnal_dma_reply_complete (EP_RXD *rxd)
1270 int status = ep_rxd_status(rxd);
1271 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1272 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1273 lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
1275 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1276 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1278 LASSERT (krx->krx_rxd == rxd);
1279 LASSERT (krx->krx_rpc_reply_needed);
1281 krx->krx_rpc_reply_needed = 0;
1282 kqswnal_rx_done (krx);
1284 lib_finalize (&kqswnal_lib, NULL, msg,
1285 (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1286 kqswnal_put_idle_tx (ktx);
1290 kqswnal_rpc_complete (EP_RXD *rxd)
1292 int status = ep_rxd_status(rxd);
1293 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1295 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1296 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1298 LASSERT (krx->krx_rxd == rxd);
1299 LASSERT (krx->krx_rpc_reply_needed);
1301 krx->krx_rpc_reply_needed = 0;
1302 kqswnal_requeue_rx (krx);
1306 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1310 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1312 if (krx->krx_rpc_reply_needed) {
1314 /* We failed to complete the peer's optimized GET (e.g. we
1315 * couldn't map the source buffers). We complete the
1316 * peer's EKC rpc now with failure. */
1318 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1319 &kqswnal_rpc_failed, NULL, NULL, 0);
1320 if (rc == EP_SUCCESS)
1323 CERROR("can't complete RPC: %d\n", rc);
1325 if (krx->krx_rxd != NULL) {
1326 /* We didn't try (and fail) to complete earlier... */
1327 rc = ep_complete_rpc(krx->krx_rxd,
1328 kqswnal_rpc_complete, krx,
1329 &kqswnal_rpc_failed, NULL, 0);
1330 if (rc == EP_SUCCESS)
1333 CERROR("can't complete RPC: %d\n", rc);
1336 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1337 * have to requeue from scratch here, unless we're shutting
1339 if (kqswnal_data.kqn_shuttingdown)
1342 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1343 krx->krx_elanbuffer,
1344 krx->krx_npages * PAGE_SIZE, 0);
1345 LASSERT (rc == EP_SUCCESS);
1346 /* We don't handle failure here; it's incredibly rare
1347 * (never reported?) and only happens with "old" EKC */
1353 if (kqswnal_data.kqn_shuttingdown) {
1354 /* free EKC rxd on shutdown */
1355 ep_complete_receive(krx->krx_rxd);
1357 /* repost receive */
1358 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1359 &krx->krx_elanbuffer, 0);
1362 /* don't actually requeue on shutdown */
1363 if (!kqswnal_data.kqn_shuttingdown)
1364 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1365 krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1370 kqswnal_rx (kqswnal_rx_t *krx)
1372 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1373 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
1378 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1380 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1381 atomic_set(&krx->krx_refcount, 1);
1382 lib_parse (&kqswnal_lib, hdr, krx);
1383 kqswnal_rx_done(krx);
1388 CERROR ("checksums for forwarded packets not implemented\n");
1391 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1393 CERROR("dropping packet from "LPX64" for "LPX64
1394 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1396 kqswnal_requeue_rx (krx);
1400 nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1403 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1404 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1406 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1409 LASSERT (niov < krx->krx_npages);
1411 krx->krx_kiov[niov].kiov_offset = 0;
1412 krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1418 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1419 hdr, payload_nob, niov, krx->krx_kiov,
1420 kqswnal_fwd_callback, krx);
1422 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1425 /* Receive Interrupt Handler: posts to schedulers */
1427 kqswnal_rxhandler(EP_RXD *rxd)
1430 int nob = ep_rxd_len (rxd);
1431 int status = ep_rxd_status (rxd);
1432 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1434 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1435 rxd, krx, nob, status);
1437 LASSERT (krx != NULL);
1442 krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1444 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1447 /* must receive a whole header to be able to parse */
1448 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1450 /* receives complete with failure when receiver is removed */
1452 if (status == EP_SHUTDOWN)
1453 LASSERT (kqswnal_data.kqn_shuttingdown);
1455 CERROR("receive status failed with status %d nob %d\n",
1456 ep_rxd_status(rxd), nob);
1458 if (!kqswnal_data.kqn_shuttingdown)
1459 CERROR("receive status failed with status %d nob %d\n",
1460 ep_rxd_status(rxd), nob);
1462 kqswnal_requeue_rx (krx);
1466 if (!in_interrupt()) {
1471 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1473 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1474 wake_up (&kqswnal_data.kqn_sched_waitq);
1476 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1481 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1483 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1485 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1486 ", dpid %d, spid %d, type %d\n",
1487 ishdr ? "Header" : "Payload", krx,
1488 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1489 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1490 NTOH__u32(hdr->type));
1492 switch (NTOH__u32 (hdr->type))
1495 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1497 NTOH__u32(hdr->msg.ack.mlength),
1498 hdr->msg.ack.dst_wmd.handle_cookie,
1499 hdr->msg.ack.dst_wmd.handle_idx,
1500 NTOH__u64(hdr->msg.ack.match_bits),
1501 NTOH__u32(hdr->msg.ack.length));
1504 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1505 " len %u off %u data "LPX64"\n",
1506 NTOH__u32(hdr->msg.put.ptl_index),
1507 hdr->msg.put.ack_wmd.handle_cookie,
1508 hdr->msg.put.ack_wmd.handle_idx,
1509 NTOH__u64(hdr->msg.put.match_bits),
1510 NTOH__u32(hdr->msg.put.length),
1511 NTOH__u32(hdr->msg.put.offset),
1512 hdr->msg.put.hdr_data);
1515 CERROR ("GET: <>\n");
1518 CERROR ("REPLY: <>\n");
1521 CERROR ("TYPE?: <>\n");
1527 kqswnal_recvmsg (nal_cb_t *nal,
1537 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1538 char *buffer = page_address(krx->krx_kiov[0].kiov_page);
1546 kqsw_csum_t senders_csum;
1547 kqsw_csum_t payload_csum = 0;
1548 kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1549 size_t csum_len = mlen;
1552 static atomic_t csum_counter;
1553 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1555 atomic_inc (&csum_counter);
1557 memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1558 if (senders_csum != hdr_csum)
1559 kqswnal_csum_error (krx, 1);
1561 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1563 /* What was actually received must be >= payload. */
1564 LASSERT (mlen <= rlen);
1565 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1566 CERROR("Bad message size: have %d, need %d + %d\n",
1567 krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1571 /* It must be OK to kmap() if required */
1572 LASSERT (kiov == NULL || !in_interrupt ());
1573 /* Either all pages or all vaddrs */
1574 LASSERT (!(kiov != NULL && iov != NULL));
1578 page_ptr = buffer + KQSW_HDR_SIZE;
1579 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1584 /* skip complete frags */
1585 while (offset >= kiov->kiov_len) {
1586 offset -= kiov->kiov_len;
1591 iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1592 kiov->kiov_offset + offset;
1593 iov_nob = kiov->kiov_len - offset;
1595 /* skip complete frags */
1596 while (offset >= iov->iov_len) {
1597 offset -= iov->iov_len;
1602 iov_ptr = iov->iov_base + offset;
1603 iov_nob = iov->iov_len - offset;
1609 if (frag > page_nob)
1614 memcpy (iov_ptr, page_ptr, frag);
1616 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1630 LASSERT (page < krx->krx_npages);
1631 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1632 page_nob = PAGE_SIZE;
1638 else if (kiov != NULL) {
1639 kunmap (kiov->kiov_page);
1643 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1644 iov_nob = kiov->kiov_len;
1649 iov_ptr = iov->iov_base;
1650 iov_nob = iov->iov_len;
1655 kunmap (kiov->kiov_page);
1659 memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
1660 sizeof(kqsw_csum_t));
1662 if (csum_len != rlen)
1663 CERROR("Unable to checksum data in user's buffer\n");
1664 else if (senders_csum != payload_csum)
1665 kqswnal_csum_error (krx, 0);
1668 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1670 hdr_csum, payload_csum, csum_frags, csum_nob);
1672 lib_finalize(nal, private, libmsg, PTL_OK);
1678 kqswnal_recv(nal_cb_t *nal,
1687 return (kqswnal_recvmsg(nal, private, libmsg,
1689 offset, mlen, rlen));
1693 kqswnal_recv_pages (nal_cb_t *nal,
1702 return (kqswnal_recvmsg(nal, private, libmsg,
1704 offset, mlen, rlen));
1708 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1710 long pid = kernel_thread (fn, arg, 0);
1715 atomic_inc (&kqswnal_data.kqn_nthreads);
1716 atomic_inc (&kqswnal_data.kqn_nthreads_running);
1721 kqswnal_thread_fini (void)
1723 atomic_dec (&kqswnal_data.kqn_nthreads);
1727 kqswnal_scheduler (void *arg)
1731 kpr_fwd_desc_t *fwd;
1735 int shuttingdown = 0;
1738 kportal_daemonize ("kqswnal_sched");
1739 kportal_blockallsigs ();
1741 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1745 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1747 if (kqswnal_data.kqn_shuttingdown == 2)
1750 /* During stage 1 of shutdown we are still responsive
1753 atomic_dec (&kqswnal_data.kqn_nthreads_running);
1754 shuttingdown = kqswnal_data.kqn_shuttingdown;
1759 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1761 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1762 kqswnal_rx_t, krx_list);
1763 list_del (&krx->krx_list);
1764 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1770 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1773 if (!shuttingdown &&
1774 !list_empty (&kqswnal_data.kqn_delayedtxds))
1776 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1777 kqswnal_tx_t, ktx_list);
1778 list_del_init (&ktx->ktx_delayed_list);
1779 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1782 rc = kqswnal_launch (ktx);
1783 if (rc != 0) /* failed: ktx_nid down? */
1785 CERROR("Failed delayed transmit to "LPX64
1786 ": %d\n", ktx->ktx_nid, rc);
1787 kqswnal_tx_done (ktx, rc);
1791 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1795 !list_empty (&kqswnal_data.kqn_delayedfwds))
1797 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1798 list_del (&fwd->kprfd_list);
1799 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1801 kqswnal_fwd_packet (NULL, fwd);
1804 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1807 /* nothing to do or hogging CPU */
1808 if (!did_something || counter++ == KQSW_RESCHED) {
1809 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1814 if (!did_something) {
1815 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1816 kqswnal_data.kqn_shuttingdown != shuttingdown ||
1817 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1818 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1819 !list_empty(&kqswnal_data.kqn_delayedfwds));
1821 } else if (current->need_resched)
1824 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1828 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1830 kqswnal_thread_fini ();
1834 nal_cb_t kqswnal_lib =
1836 nal_data: &kqswnal_data, /* NAL private data */
1837 cb_send: kqswnal_send,
1838 cb_send_pages: kqswnal_send_pages,
1839 cb_recv: kqswnal_recv,
1840 cb_recv_pages: kqswnal_recv_pages,
1841 cb_read: kqswnal_read,
1842 cb_write: kqswnal_write,
1843 cb_malloc: kqswnal_malloc,
1844 cb_free: kqswnal_free,
1845 cb_printf: kqswnal_printf,
1846 cb_cli: kqswnal_cli,
1847 cb_sti: kqswnal_sti,
1848 cb_dist: kqswnal_dist