1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 EP_STATUSBLK kqswnal_rpc_success;
30 EP_STATUSBLK kqswnal_rpc_failed;
33 * LIB functions follow
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41 nal->ni.nid, len, src_addr, dst_addr );
42 memcpy( dst_addr, src_addr, len );
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52 nal->ni.nid, len, src_addr, dst_addr );
53 memcpy( dst_addr, src_addr, len );
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
63 PORTAL_ALLOC(buf, len);
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
70 PORTAL_FREE(buf, len);
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
80 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
83 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
85 CDEBUG (D_NET, "%s", msg);
90 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
92 kqswnal_data_t *data= nal->nal_data;
94 spin_lock_irqsave(&data->kqn_statelock, *flags);
99 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
101 kqswnal_data_t *data= nal->nal_data;
103 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
108 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
110 if (nid == nal->ni.nid)
111 *dist = 0; /* it's me */
112 else if (kqswnal_nid2elanid (nid) >= 0)
113 *dist = 1; /* it's my peer */
115 *dist = 2; /* via router */
120 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
125 do_gettimeofday (&now);
126 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
128 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
132 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
138 if (ktx->ktx_nmappedpages == 0)
142 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
143 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
145 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
146 ep_dvma_unload(kqswnal_data.kqn_ep,
147 kqswnal_data.kqn_ep_tx_nmh,
150 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
151 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
153 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
154 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
155 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
157 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
158 kqswnal_data.kqn_eptxdmahandle,
159 ktx->ktx_basepage, ktx->ktx_nmappedpages);
161 ktx->ktx_nmappedpages = 0;
165 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
167 int nfrags = ktx->ktx_nfrag;
168 int nmapped = ktx->ktx_nmappedpages;
169 int maxmapped = ktx->ktx_npages;
170 uint32_t basepage = ktx->ktx_basepage + nmapped;
173 EP_RAILMASK railmask;
174 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
176 kqswnal_nid2elanid(ktx->ktx_nid));
179 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
182 railmask = 1 << rail;
184 LASSERT (nmapped <= maxmapped);
185 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
186 LASSERT (nfrags <= EP_MAXFRAG);
190 /* skip complete frags before 'offset' */
191 while (offset >= kiov->kiov_len) {
192 offset -= kiov->kiov_len;
199 int fraglen = kiov->kiov_len - offset;
201 /* nob exactly spans the iovs */
202 LASSERT (fraglen <= nob);
203 /* each frag fits in a page */
204 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
207 if (nmapped > maxmapped) {
208 CERROR("Can't map message in %d pages (max %d)\n",
213 if (nfrags == EP_MAXFRAG) {
214 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
219 /* XXX this is really crap, but we'll have to kmap until
220 * EKC has a page (rather than vaddr) mapping interface */
222 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
225 "%p[%d] loading %p for %d, page %d, %d total\n",
226 ktx, nfrags, ptr, fraglen, basepage, nmapped);
229 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
231 kqswnal_data.kqn_ep_tx_nmh, basepage,
232 &railmask, &ktx->ktx_frags[nfrags]);
234 if (nfrags == ktx->ktx_firsttmpfrag ||
235 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
236 &ktx->ktx_frags[nfrags - 1],
237 &ktx->ktx_frags[nfrags])) {
238 /* new frag if this is the first or can't merge */
242 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
243 kqswnal_data.kqn_eptxdmahandle,
245 basepage, &ktx->ktx_frags[nfrags].Base);
247 if (nfrags > 0 && /* previous frag mapped */
248 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
249 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
250 /* just extend previous */
251 ktx->ktx_frags[nfrags - 1].Len += fraglen;
253 ktx->ktx_frags[nfrags].Len = fraglen;
254 nfrags++; /* new frag */
258 kunmap (kiov->kiov_page);
260 /* keep in loop for failure case */
261 ktx->ktx_nmappedpages = nmapped;
269 /* iov must not run out before end of data */
270 LASSERT (nob == 0 || niov > 0);
274 ktx->ktx_nfrag = nfrags;
275 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
276 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
282 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
283 int niov, struct iovec *iov)
285 int nfrags = ktx->ktx_nfrag;
286 int nmapped = ktx->ktx_nmappedpages;
287 int maxmapped = ktx->ktx_npages;
288 uint32_t basepage = ktx->ktx_basepage + nmapped;
290 EP_RAILMASK railmask;
291 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
293 kqswnal_nid2elanid(ktx->ktx_nid));
296 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
299 railmask = 1 << rail;
301 LASSERT (nmapped <= maxmapped);
302 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
303 LASSERT (nfrags <= EP_MAXFRAG);
307 /* skip complete frags before offset */
308 while (offset >= iov->iov_len) {
309 offset -= iov->iov_len;
316 int fraglen = iov->iov_len - offset;
317 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
319 /* nob exactly spans the iovs */
320 LASSERT (fraglen <= nob);
323 if (nmapped > maxmapped) {
324 CERROR("Can't map message in %d pages (max %d)\n",
329 if (nfrags == EP_MAXFRAG) {
330 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
336 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
337 ktx, nfrags, iov->iov_base + offset, fraglen,
338 basepage, npages, nmapped);
341 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
342 iov->iov_base + offset, fraglen,
343 kqswnal_data.kqn_ep_tx_nmh, basepage,
344 &railmask, &ktx->ktx_frags[nfrags]);
346 if (nfrags == ktx->ktx_firsttmpfrag ||
347 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
348 &ktx->ktx_frags[nfrags - 1],
349 &ktx->ktx_frags[nfrags])) {
350 /* new frag if this is the first or can't merge */
354 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
355 kqswnal_data.kqn_eptxdmahandle,
356 iov->iov_base + offset, fraglen,
357 basepage, &ktx->ktx_frags[nfrags].Base);
359 if (nfrags > 0 && /* previous frag mapped */
360 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
361 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
362 /* just extend previous */
363 ktx->ktx_frags[nfrags - 1].Len += fraglen;
365 ktx->ktx_frags[nfrags].Len = fraglen;
366 nfrags++; /* new frag */
370 /* keep in loop for failure case */
371 ktx->ktx_nmappedpages = nmapped;
379 /* iov must not run out before end of data */
380 LASSERT (nob == 0 || niov > 0);
384 ktx->ktx_nfrag = nfrags;
385 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
386 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
393 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
395 kpr_fwd_desc_t *fwd = NULL;
398 kqswnal_unmap_tx (ktx); /* release temporary mappings */
399 ktx->ktx_state = KTX_IDLE;
401 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
403 list_del (&ktx->ktx_list); /* take off active list */
405 if (ktx->ktx_isnblk) {
406 /* reserved for non-blocking tx */
407 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
408 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
412 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
414 /* anything blocking for a tx descriptor? */
415 if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
417 CDEBUG(D_NET,"wakeup fwd\n");
419 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
420 kpr_fwd_desc_t, kprfd_list);
421 list_del (&fwd->kprfd_list);
424 wake_up (&kqswnal_data.kqn_idletxd_waitq);
426 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
431 /* schedule packet for forwarding again */
432 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
434 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
435 wake_up (&kqswnal_data.kqn_sched_waitq);
437 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
441 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
444 kqswnal_tx_t *ktx = NULL;
447 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
449 /* "normal" descriptor is free */
450 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
451 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
452 kqswnal_tx_t, ktx_list);
456 /* "normal" descriptor pool is empty */
458 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
459 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
460 list_add_tail (&fwd->kprfd_list,
461 &kqswnal_data.kqn_idletxd_fwdq);
465 /* doing a local transmit */
467 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
468 CERROR ("intr tx desc pool exhausted\n");
472 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
473 kqswnal_tx_t, ktx_list);
477 /* block for idle tx */
479 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
481 CDEBUG (D_NET, "blocking for tx desc\n");
482 wait_event (kqswnal_data.kqn_idletxd_waitq,
483 !list_empty (&kqswnal_data.kqn_idletxds));
487 list_del (&ktx->ktx_list);
488 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
489 ktx->ktx_launcher = current->pid;
492 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
494 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
495 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
501 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
504 lib_msg_t *repmsg = NULL;
506 switch (ktx->ktx_state) {
507 case KTX_FORWARDING: /* router asked me to forward this packet */
508 kpr_fwd_done (&kqswnal_data.kqn_router,
509 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
512 case KTX_SENDING: /* packet sourced locally */
513 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
514 (lib_msg_t *)ktx->ktx_args[1],
515 (error == 0) ? PTL_OK :
516 (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
519 case KTX_GETTING: /* Peer has DMA-ed direct? */
520 msg = (lib_msg_t *)ktx->ktx_args[1];
523 repmsg = lib_fake_reply_msg (&kqswnal_lib,
524 ktx->ktx_nid, msg->md);
530 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
532 lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
534 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
535 (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
543 kqswnal_put_idle_tx (ktx);
547 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
549 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
551 LASSERT (txd != NULL);
552 LASSERT (ktx != NULL);
554 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
556 if (status != EP_SUCCESS) {
558 CERROR ("Tx completion to "LPX64" failed: %d\n",
559 ktx->ktx_nid, status);
561 kqswnal_notify_peer_down(ktx);
564 } else if (ktx->ktx_state == KTX_GETTING) {
565 /* RPC completed OK; what did our peer put in the status
568 status = ep_txd_statusblk(txd)->Data[0];
570 status = ep_txd_statusblk(txd)->Status;
576 kqswnal_tx_done (ktx, status);
580 kqswnal_launch (kqswnal_tx_t *ktx)
582 /* Don't block for transmit descriptor if we're in interrupt context */
583 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
584 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
588 ktx->ktx_launchtime = jiffies;
590 LASSERT (dest >= 0); /* must be a peer */
591 if (ktx->ktx_state == KTX_GETTING) {
592 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
593 * other frags are the GET sink which we obviously don't
596 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
598 kqswnal_txhandler, ktx,
599 NULL, ktx->ktx_frags, 1);
601 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
602 ktx->ktx_port, attr, kqswnal_txhandler,
603 ktx, NULL, ktx->ktx_frags, 1);
607 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
609 kqswnal_txhandler, ktx,
610 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
612 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
614 kqswnal_txhandler, ktx,
615 ktx->ktx_frags, ktx->ktx_nfrag);
620 case EP_SUCCESS: /* success */
623 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
624 LASSERT (in_interrupt());
626 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
628 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
629 wake_up (&kqswnal_data.kqn_sched_waitq);
631 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
634 default: /* fatal error */
635 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
636 kqswnal_notify_peer_down(ktx);
637 return (-EHOSTUNREACH);
642 hdr_type_string (ptl_hdr_t *hdr)
654 return ("<UNKNOWN>");
659 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
661 char *type_str = hdr_type_string (hdr);
663 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
664 NTOH__u32(hdr->payload_length));
665 CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
666 NTOH__u32(hdr->src_pid));
667 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
668 NTOH__u32(hdr->dest_pid));
670 switch (NTOH__u32(hdr->type)) {
672 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
673 "match bits "LPX64"\n",
674 NTOH__u32 (hdr->msg.put.ptl_index),
675 hdr->msg.put.ack_wmd.wh_interface_cookie,
676 hdr->msg.put.ack_wmd.wh_object_cookie,
677 NTOH__u64 (hdr->msg.put.match_bits));
678 CERROR(" offset %d, hdr data "LPX64"\n",
679 NTOH__u32(hdr->msg.put.offset),
680 hdr->msg.put.hdr_data);
684 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
685 "match bits "LPX64"\n",
686 NTOH__u32 (hdr->msg.get.ptl_index),
687 hdr->msg.get.return_wmd.wh_interface_cookie,
688 hdr->msg.get.return_wmd.wh_object_cookie,
689 hdr->msg.get.match_bits);
690 CERROR(" Length %d, src offset %d\n",
691 NTOH__u32 (hdr->msg.get.sink_length),
692 NTOH__u32 (hdr->msg.get.src_offset));
696 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
697 hdr->msg.ack.dst_wmd.wh_interface_cookie,
698 hdr->msg.ack.dst_wmd.wh_object_cookie,
699 NTOH__u32 (hdr->msg.ack.mlength));
703 CERROR(" dst md "LPX64"."LPX64"\n",
704 hdr->msg.reply.dst_wmd.wh_interface_cookie,
705 hdr->msg.reply.dst_wmd.wh_object_cookie);
708 } /* end of print_hdr() */
712 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
716 CDEBUG (how, "%s: %d\n", str, n);
717 for (i = 0; i < n; i++) {
718 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
723 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
724 int nsrc, EP_IOVEC *src,
725 int ndst, EP_IOVEC *dst)
734 for (count = 0; count < ndv; count++, dv++) {
736 if (nsrc == 0 || ndst == 0) {
738 /* For now I'll barf on any left over entries */
739 CERROR ("mismatched src and dst iovs\n");
745 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
747 dv->Source = src->Base;
748 dv->Dest = dst->Base;
750 if (nob >= src->Len) {
758 if (nob >= dst->Len) {
767 CERROR ("DATAVEC too small\n");
773 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
774 struct iovec *iov, ptl_kiov_t *kiov,
777 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
778 char *buffer = (char *)page_address(krx->krx_pages[0]);
779 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
784 EP_DATAVEC datav[EP_MAXFRAG];
787 LASSERT (krx->krx_rpc_reply_needed);
788 LASSERT ((iov == NULL) != (kiov == NULL));
790 /* see kqswnal_sendmsg comment regarding endian-ness */
791 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
792 /* msg too small to discover rmd size */
793 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
794 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
798 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
799 /* rmd doesn't fit in the incoming message */
800 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
801 krx->krx_nob, rmd->kqrmd_nfrag,
802 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
806 /* Map the source data... */
807 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
809 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
811 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
814 CERROR ("Can't map source data: %d\n", rc);
819 if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
820 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
821 ktx->ktx_nfrag, rmd->kqrmd_nfrag);
825 for (i = 0; i < rmd->kqrmd_nfrag; i++)
826 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
827 CERROR("Can't cope with unequal frags %d(%d):"
828 " %d local %d remote\n",
830 ktx->ktx_frags[i].nmd_len,
831 rmd->kqrmd_frag[i].nmd_len);
835 ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
836 ktx->ktx_nfrag, ktx->ktx_frags,
837 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
839 CERROR ("Can't create datavec: %d\n", ndatav);
844 /* Our caller will start to race with kqswnal_dma_reply_complete... */
845 LASSERT (atomic_read (&krx->krx_refcount) == 1);
846 atomic_set (&krx->krx_refcount, 2);
849 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
850 &kqswnal_rpc_success,
851 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
852 if (rc == EP_SUCCESS)
855 /* Well we tried... */
856 krx->krx_rpc_reply_needed = 0;
858 rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
859 &kqswnal_rpc_success, datav, ndatav);
860 if (rc == EP_SUCCESS)
863 /* "old" EKC destroys rxd on failed completion */
867 CERROR("can't complete RPC: %d\n", rc);
869 /* reset refcount back to 1: we're not going to be racing with
870 * kqswnal_dma_reply_complete. */
871 atomic_set (&krx->krx_refcount, 1);
873 return (-ECONNABORTED);
877 kqswnal_sendmsg (nal_cb_t *nal,
884 unsigned int payload_niov,
885 struct iovec *payload_iov,
886 ptl_kiov_t *payload_kiov,
887 size_t payload_offset,
900 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
901 " pid %u\n", payload_nob, payload_niov, nid, pid);
903 LASSERT (payload_nob == 0 || payload_niov > 0);
904 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
906 /* It must be OK to kmap() if required */
907 LASSERT (payload_kiov == NULL || !in_interrupt ());
908 /* payload is either all vaddrs or all pages */
909 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
911 if (payload_nob > KQSW_MAXPAYLOAD) {
912 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
913 payload_nob, KQSW_MAXPAYLOAD);
918 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
919 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
920 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
922 CERROR("Can't route to "LPX64": router error %d\n",
926 if (kqswnal_nid2elanid (targetnid) < 0) {
927 CERROR("Bad gateway "LPX64" for "LPX64"\n",
933 /* I may not block for a transmit descriptor if I might block the
934 * receiver, or an interrupt handler. */
935 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
936 type == PTL_MSG_REPLY ||
939 kqswnal_cerror_hdr (hdr);
940 return (PTL_NOSPACE);
943 ktx->ktx_nid = targetnid;
944 ktx->ktx_args[0] = private;
945 ktx->ktx_args[1] = libmsg;
947 if (type == PTL_MSG_REPLY &&
948 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
949 if (nid != targetnid ||
950 kqswnal_nid2elanid(nid) !=
951 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
952 CERROR("Optimized reply nid conflict: "
953 "nid "LPX64" via "LPX64" elanID %d\n",
955 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
959 /* peer expects RPC completion with GET data */
960 rc = kqswnal_dma_reply (ktx, payload_niov,
961 payload_iov, payload_kiov,
962 payload_offset, payload_nob);
966 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
967 kqswnal_put_idle_tx (ktx);
971 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
972 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
975 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
976 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
977 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
979 if (payload_kiov != NULL) {
980 ptl_kiov_t *kiov = &payload_kiov[i];
982 if (sumoff >= kiov->kiov_len) {
983 sumoff -= kiov->kiov_len;
985 char *addr = ((char *)kmap (kiov->kiov_page)) +
986 kiov->kiov_offset + sumoff;
987 int fragnob = kiov->kiov_len - sumoff;
989 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
992 kunmap(kiov->kiov_page);
995 struct iovec *iov = &payload_iov[i];
997 if (sumoff > iov->iov_len) {
998 sumoff -= iov->iov_len;
1000 char *addr = iov->iov_base + sumoff;
1001 int fragnob = iov->iov_len - sumoff;
1003 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1009 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1012 if (kqswnal_data.kqn_optimized_gets &&
1013 type == PTL_MSG_GET && /* doing a GET */
1014 nid == targetnid) { /* not forwarding */
1015 lib_md_t *md = libmsg->md;
1016 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1018 /* Optimised path: I send over the Elan vaddrs of the get
1019 * sink buffers, and my peer DMAs directly into them.
1021 * First I set up ktx as if it was going to send this
1022 * payload, (it needs to map it anyway). This fills
1023 * ktx_frags[1] and onward with the network addresses
1024 * of the GET sink frags. I copy these into ktx_buffer,
1025 * immediately after the header, and send that as my GET
1028 * Note that the addresses are sent in native endian-ness.
1029 * When EKC copes with different endian nodes, I'll fix
1030 * this (and eat my hat :) */
1032 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1033 ktx->ktx_state = KTX_GETTING;
1035 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1036 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1037 md->md_niov, md->md_iov.kiov);
1039 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1040 md->md_niov, md->md_iov.iov);
1043 kqswnal_put_idle_tx (ktx);
1047 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1049 payload_nob = offsetof(kqswnal_remotemd_t,
1050 kqrmd_frag[rmd->kqrmd_nfrag]);
1051 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1054 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1055 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1057 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1058 0, KQSW_HDR_SIZE + payload_nob);
1060 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1061 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1063 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1064 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1066 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1068 /* small message: single frag copied into the pre-mapped buffer */
1070 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1071 ktx->ktx_state = KTX_SENDING;
1073 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1074 0, KQSW_HDR_SIZE + payload_nob);
1076 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1077 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1079 if (payload_nob > 0) {
1080 if (payload_kiov != NULL)
1081 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1082 payload_niov, payload_kiov,
1083 payload_offset, payload_nob);
1085 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1086 payload_niov, payload_iov,
1087 payload_offset, payload_nob);
1091 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1093 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1094 ktx->ktx_state = KTX_SENDING;
1096 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1099 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1100 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1102 if (payload_kiov != NULL)
1103 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1104 payload_niov, payload_kiov);
1106 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1107 payload_niov, payload_iov);
1109 kqswnal_put_idle_tx (ktx);
1114 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1115 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1117 rc = kqswnal_launch (ktx);
1118 if (rc != 0) { /* failed? */
1119 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1120 kqswnal_put_idle_tx (ktx);
1124 CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
1125 payload_nob, nid, targetnid);
1130 kqswnal_send (nal_cb_t *nal,
1137 unsigned int payload_niov,
1138 struct iovec *payload_iov,
1139 size_t payload_offset,
1142 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1143 payload_niov, payload_iov, NULL,
1144 payload_offset, payload_nob));
1148 kqswnal_send_pages (nal_cb_t *nal,
1155 unsigned int payload_niov,
1156 ptl_kiov_t *payload_kiov,
1157 size_t payload_offset,
1160 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1161 payload_niov, NULL, payload_kiov,
1162 payload_offset, payload_nob));
1166 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1170 struct iovec *iov = fwd->kprfd_iov;
1171 int niov = fwd->kprfd_niov;
1172 int nob = fwd->kprfd_nob;
1173 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1176 CERROR ("checksums for forwarded packets not implemented\n");
1179 /* The router wants this NAL to forward a packet */
1180 CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
1181 fwd, nid, niov, nob);
1185 ktx = kqswnal_get_idle_tx (fwd, 0);
1186 if (ktx == NULL) /* can't get txd right now */
1187 return; /* fwd will be scheduled when tx desc freed */
1189 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
1190 nid = fwd->kprfd_target_nid; /* target is final dest */
1192 if (kqswnal_nid2elanid (nid) < 0) {
1193 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1198 if (nob > KQSW_NRXMSGBYTES_LARGE) {
1199 CERROR ("Can't forward [%p] to "LPX64
1200 ": size %d bigger than max packet size %ld\n",
1201 fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
1206 ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
1207 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1209 ktx->ktx_state = KTX_FORWARDING;
1210 ktx->ktx_args[0] = fwd;
1212 if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
1213 nob <= KQSW_TX_BUFFER_SIZE)
1215 /* send from ktx's pre-mapped contiguous buffer? */
1216 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob);
1218 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1221 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1222 ktx->ktx_frags[0].Len = nob;
1224 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1225 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1230 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
1231 rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov);
1235 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
1238 rc = kqswnal_launch (ktx);
1244 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1246 kqswnal_put_idle_tx (ktx);
1247 /* complete now (with failure) */
1248 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1252 kqswnal_fwd_callback (void *arg, int error)
1254 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1256 /* The router has finished forwarding this packet */
1260 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1262 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1263 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1266 kqswnal_requeue_rx (krx);
1270 kqswnal_dma_reply_complete (EP_RXD *rxd)
1272 int status = ep_rxd_status(rxd);
1273 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1274 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1275 lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
1277 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1278 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1280 LASSERT (krx->krx_rxd == rxd);
1281 LASSERT (krx->krx_rpc_reply_needed);
1283 krx->krx_rpc_reply_needed = 0;
1284 kqswnal_rx_done (krx);
1286 lib_finalize (&kqswnal_lib, NULL, msg,
1287 (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1288 kqswnal_put_idle_tx (ktx);
1292 kqswnal_rpc_complete (EP_RXD *rxd)
1294 int status = ep_rxd_status(rxd);
1295 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1297 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1298 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1300 LASSERT (krx->krx_rxd == rxd);
1301 LASSERT (krx->krx_rpc_reply_needed);
1303 krx->krx_rpc_reply_needed = 0;
1304 kqswnal_requeue_rx (krx);
1308 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1312 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1314 if (krx->krx_rpc_reply_needed) {
1316 /* We failed to complete the peer's optimized GET (e.g. we
1317 * couldn't map the source buffers). We complete the
1318 * peer's EKC rpc now with failure. */
1320 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1321 &kqswnal_rpc_failed, NULL, NULL, 0);
1322 if (rc == EP_SUCCESS)
1325 CERROR("can't complete RPC: %d\n", rc);
1327 if (krx->krx_rxd != NULL) {
1328 /* We didn't try (and fail) to complete earlier... */
1329 rc = ep_complete_rpc(krx->krx_rxd,
1330 kqswnal_rpc_complete, krx,
1331 &kqswnal_rpc_failed, NULL, 0);
1332 if (rc == EP_SUCCESS)
1335 CERROR("can't complete RPC: %d\n", rc);
1338 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1339 * have to requeue from scratch here, unless we're shutting
1341 if (kqswnal_data.kqn_shuttingdown)
1344 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1345 krx->krx_elanbuffer,
1346 krx->krx_npages * PAGE_SIZE, 0);
1347 LASSERT (rc == EP_SUCCESS);
1348 /* We don't handle failure here; it's incredibly rare
1349 * (never reported?) and only happens with "old" EKC */
1355 if (kqswnal_data.kqn_shuttingdown) {
1356 /* free EKC rxd on shutdown */
1357 ep_complete_receive(krx->krx_rxd);
1359 /* repost receive */
1360 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1361 &krx->krx_elanbuffer, 0);
1364 /* don't actually requeue on shutdown */
1365 if (!kqswnal_data.kqn_shuttingdown)
1366 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1367 krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1372 kqswnal_rx (kqswnal_rx_t *krx)
1374 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
1375 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
1379 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1381 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1382 atomic_set(&krx->krx_refcount, 1);
1383 lib_parse (&kqswnal_lib, hdr, krx);
1384 kqswnal_rx_done(krx);
1389 CERROR ("checksums for forwarded packets not implemented\n");
1392 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1394 CERROR("dropping packet from "LPX64" for "LPX64
1395 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1397 kqswnal_requeue_rx (krx);
1401 /* NB forwarding may destroy iov; rebuild every time */
1402 for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
1404 LASSERT (niov < krx->krx_npages);
1405 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
1406 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
1409 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1410 krx->krx_nob, niov, krx->krx_iov,
1411 kqswnal_fwd_callback, krx);
1413 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1416 /* Receive Interrupt Handler: posts to schedulers */
1418 kqswnal_rxhandler(EP_RXD *rxd)
1421 int nob = ep_rxd_len (rxd);
1422 int status = ep_rxd_status (rxd);
1423 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1425 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1426 rxd, krx, nob, status);
1428 LASSERT (krx != NULL);
1433 krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1435 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1438 /* must receive a whole header to be able to parse */
1439 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1441 /* receives complete with failure when receiver is removed */
1443 if (status == EP_SHUTDOWN)
1444 LASSERT (kqswnal_data.kqn_shuttingdown);
1446 CERROR("receive status failed with status %d nob %d\n",
1447 ep_rxd_status(rxd), nob);
1449 if (!kqswnal_data.kqn_shuttingdown)
1450 CERROR("receive status failed with status %d nob %d\n",
1451 ep_rxd_status(rxd), nob);
1453 kqswnal_requeue_rx (krx);
1457 if (!in_interrupt()) {
1462 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1464 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1465 wake_up (&kqswnal_data.kqn_sched_waitq);
1467 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1472 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1474 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1476 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1477 ", dpid %d, spid %d, type %d\n",
1478 ishdr ? "Header" : "Payload", krx,
1479 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1480 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1481 NTOH__u32(hdr->type));
1483 switch (NTOH__u32 (hdr->type))
1486 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1488 NTOH__u32(hdr->msg.ack.mlength),
1489 hdr->msg.ack.dst_wmd.handle_cookie,
1490 hdr->msg.ack.dst_wmd.handle_idx,
1491 NTOH__u64(hdr->msg.ack.match_bits),
1492 NTOH__u32(hdr->msg.ack.length));
1495 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1496 " len %u off %u data "LPX64"\n",
1497 NTOH__u32(hdr->msg.put.ptl_index),
1498 hdr->msg.put.ack_wmd.handle_cookie,
1499 hdr->msg.put.ack_wmd.handle_idx,
1500 NTOH__u64(hdr->msg.put.match_bits),
1501 NTOH__u32(hdr->msg.put.length),
1502 NTOH__u32(hdr->msg.put.offset),
1503 hdr->msg.put.hdr_data);
1506 CERROR ("GET: <>\n");
1509 CERROR ("REPLY: <>\n");
1512 CERROR ("TYPE?: <>\n");
1518 kqswnal_recvmsg (nal_cb_t *nal,
1528 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1536 kqsw_csum_t senders_csum;
1537 kqsw_csum_t payload_csum = 0;
1538 kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
1540 size_t csum_len = mlen;
1543 static atomic_t csum_counter;
1544 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1546 atomic_inc (&csum_counter);
1548 memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1549 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1550 if (senders_csum != hdr_csum)
1551 kqswnal_csum_error (krx, 1);
1553 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1555 /* What was actually received must be >= payload. */
1556 LASSERT (mlen <= rlen);
1557 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1558 CERROR("Bad message size: have %d, need %d + %d\n",
1559 krx->krx_nob, KQSW_HDR_SIZE, mlen);
1563 /* It must be OK to kmap() if required */
1564 LASSERT (kiov == NULL || !in_interrupt ());
1565 /* Either all pages or all vaddrs */
1566 LASSERT (!(kiov != NULL && iov != NULL));
1571 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1573 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1578 /* skip complete frags */
1579 while (offset >= kiov->kiov_len) {
1580 offset -= kiov->kiov_len;
1585 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
1586 iov_nob = kiov->kiov_len - offset;
1588 /* skip complete frags */
1589 while (offset >= iov->iov_len) {
1590 offset -= iov->iov_len;
1595 iov_ptr = iov->iov_base + offset;
1596 iov_nob = iov->iov_len - offset;
1602 if (frag > page_nob)
1607 memcpy (iov_ptr, page_ptr, frag);
1609 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1623 LASSERT (page < krx->krx_npages);
1624 page_ptr = page_address(krx->krx_pages[page]);
1625 page_nob = PAGE_SIZE;
1631 else if (kiov != NULL) {
1632 kunmap (kiov->kiov_page);
1636 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1637 iov_nob = kiov->kiov_len;
1642 iov_ptr = iov->iov_base;
1643 iov_nob = iov->iov_len;
1648 kunmap (kiov->kiov_page);
1652 memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1653 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1655 if (csum_len != rlen)
1656 CERROR("Unable to checksum data in user's buffer\n");
1657 else if (senders_csum != payload_csum)
1658 kqswnal_csum_error (krx, 0);
1661 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1663 hdr_csum, payload_csum, csum_frags, csum_nob);
1665 lib_finalize(nal, private, libmsg, PTL_OK);
1671 kqswnal_recv(nal_cb_t *nal,
1680 return (kqswnal_recvmsg(nal, private, libmsg,
1682 offset, mlen, rlen));
1686 kqswnal_recv_pages (nal_cb_t *nal,
1695 return (kqswnal_recvmsg(nal, private, libmsg,
1697 offset, mlen, rlen));
1701 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1703 long pid = kernel_thread (fn, arg, 0);
1708 atomic_inc (&kqswnal_data.kqn_nthreads);
1709 atomic_inc (&kqswnal_data.kqn_nthreads_running);
1714 kqswnal_thread_fini (void)
1716 atomic_dec (&kqswnal_data.kqn_nthreads);
1720 kqswnal_scheduler (void *arg)
1724 kpr_fwd_desc_t *fwd;
1728 int shuttingdown = 0;
1731 kportal_daemonize ("kqswnal_sched");
1732 kportal_blockallsigs ();
1734 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1738 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1740 if (kqswnal_data.kqn_shuttingdown == 2)
1743 /* During stage 1 of shutdown we are still responsive
1746 atomic_dec (&kqswnal_data.kqn_nthreads_running);
1747 shuttingdown = kqswnal_data.kqn_shuttingdown;
1752 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1754 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1755 kqswnal_rx_t, krx_list);
1756 list_del (&krx->krx_list);
1757 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1763 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1766 if (!shuttingdown &&
1767 !list_empty (&kqswnal_data.kqn_delayedtxds))
1769 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1770 kqswnal_tx_t, ktx_list);
1771 list_del_init (&ktx->ktx_delayed_list);
1772 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1775 rc = kqswnal_launch (ktx);
1776 if (rc != 0) /* failed: ktx_nid down? */
1778 CERROR("Failed delayed transmit to "LPX64
1779 ": %d\n", ktx->ktx_nid, rc);
1780 kqswnal_tx_done (ktx, rc);
1784 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1788 !list_empty (&kqswnal_data.kqn_delayedfwds))
1790 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1791 list_del (&fwd->kprfd_list);
1792 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1794 kqswnal_fwd_packet (NULL, fwd);
1797 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1800 /* nothing to do or hogging CPU */
1801 if (!did_something || counter++ == KQSW_RESCHED) {
1802 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1807 if (!did_something) {
1808 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1809 kqswnal_data.kqn_shuttingdown != shuttingdown ||
1810 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1811 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1812 !list_empty(&kqswnal_data.kqn_delayedfwds));
1814 } else if (current->need_resched)
1817 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1821 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1823 kqswnal_thread_fini ();
1827 nal_cb_t kqswnal_lib =
1829 nal_data: &kqswnal_data, /* NAL private data */
1830 cb_send: kqswnal_send,
1831 cb_send_pages: kqswnal_send_pages,
1832 cb_recv: kqswnal_recv,
1833 cb_recv_pages: kqswnal_recv_pages,
1834 cb_read: kqswnal_read,
1835 cb_write: kqswnal_write,
1836 cb_malloc: kqswnal_malloc,
1837 cb_free: kqswnal_free,
1838 cb_printf: kqswnal_printf,
1839 cb_cli: kqswnal_cli,
1840 cb_sti: kqswnal_sti,
1841 cb_dist: kqswnal_dist