1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 EP_STATUSBLK kqswnal_rpc_success;
30 EP_STATUSBLK kqswnal_rpc_failed;
33 * LIB functions follow
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41 nal->ni.nid, len, src_addr, dst_addr );
42 memcpy( dst_addr, src_addr, len );
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52 nal->ni.nid, len, src_addr, dst_addr );
53 memcpy( dst_addr, src_addr, len );
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
63 PORTAL_ALLOC(buf, len);
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
70 PORTAL_FREE(buf, len);
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
80 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
83 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
85 CDEBUG (D_NET, "%s", msg);
88 #if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
89 # error "Can't save/restore irq contexts in different procedures"
93 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
95 kqswnal_data_t *data= nal->nal_data;
97 spin_lock_irqsave(&data->kqn_statelock, *flags);
102 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
104 kqswnal_data_t *data= nal->nal_data;
106 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
111 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
113 if (nid == nal->ni.nid)
114 *dist = 0; /* it's me */
115 else if (kqswnal_nid2elanid (nid) >= 0)
116 *dist = 1; /* it's my peer */
118 *dist = 2; /* via router */
123 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
128 do_gettimeofday (&now);
129 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
131 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
135 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
141 if (ktx->ktx_nmappedpages == 0)
145 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
146 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
148 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
149 ep_dvma_unload(kqswnal_data.kqn_ep,
150 kqswnal_data.kqn_ep_tx_nmh,
153 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
154 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
156 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
157 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
158 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
160 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
161 kqswnal_data.kqn_eptxdmahandle,
162 ktx->ktx_basepage, ktx->ktx_nmappedpages);
164 ktx->ktx_nmappedpages = 0;
168 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
170 int nfrags = ktx->ktx_nfrag;
171 int nmapped = ktx->ktx_nmappedpages;
172 int maxmapped = ktx->ktx_npages;
173 uint32_t basepage = ktx->ktx_basepage + nmapped;
176 EP_RAILMASK railmask;
177 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
179 kqswnal_nid2elanid(ktx->ktx_nid));
182 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
185 railmask = 1 << rail;
187 LASSERT (nmapped <= maxmapped);
188 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
189 LASSERT (nfrags <= EP_MAXFRAG);
193 /* skip complete frags before 'offset' */
194 while (offset >= kiov->kiov_len) {
195 offset -= kiov->kiov_len;
202 int fraglen = kiov->kiov_len - offset;
204 /* nob exactly spans the iovs */
205 LASSERT (fraglen <= nob);
206 /* each frag fits in a page */
207 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
210 if (nmapped > maxmapped) {
211 CERROR("Can't map message in %d pages (max %d)\n",
216 if (nfrags == EP_MAXFRAG) {
217 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
222 /* XXX this is really crap, but we'll have to kmap until
223 * EKC has a page (rather than vaddr) mapping interface */
225 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
228 "%p[%d] loading %p for %d, page %d, %d total\n",
229 ktx, nfrags, ptr, fraglen, basepage, nmapped);
232 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
234 kqswnal_data.kqn_ep_tx_nmh, basepage,
235 &railmask, &ktx->ktx_frags[nfrags]);
237 if (nfrags == ktx->ktx_firsttmpfrag ||
238 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
239 &ktx->ktx_frags[nfrags - 1],
240 &ktx->ktx_frags[nfrags])) {
241 /* new frag if this is the first or can't merge */
245 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
246 kqswnal_data.kqn_eptxdmahandle,
248 basepage, &ktx->ktx_frags[nfrags].Base);
250 if (nfrags > 0 && /* previous frag mapped */
251 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
252 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
253 /* just extend previous */
254 ktx->ktx_frags[nfrags - 1].Len += fraglen;
256 ktx->ktx_frags[nfrags].Len = fraglen;
257 nfrags++; /* new frag */
261 kunmap (kiov->kiov_page);
263 /* keep in loop for failure case */
264 ktx->ktx_nmappedpages = nmapped;
272 /* iov must not run out before end of data */
273 LASSERT (nob == 0 || niov > 0);
277 ktx->ktx_nfrag = nfrags;
278 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
279 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
285 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
286 int niov, struct iovec *iov)
288 int nfrags = ktx->ktx_nfrag;
289 int nmapped = ktx->ktx_nmappedpages;
290 int maxmapped = ktx->ktx_npages;
291 uint32_t basepage = ktx->ktx_basepage + nmapped;
293 EP_RAILMASK railmask;
294 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
296 kqswnal_nid2elanid(ktx->ktx_nid));
299 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
302 railmask = 1 << rail;
304 LASSERT (nmapped <= maxmapped);
305 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
306 LASSERT (nfrags <= EP_MAXFRAG);
310 /* skip complete frags before offset */
311 while (offset >= iov->iov_len) {
312 offset -= iov->iov_len;
319 int fraglen = iov->iov_len - offset;
320 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
322 /* nob exactly spans the iovs */
323 LASSERT (fraglen <= nob);
326 if (nmapped > maxmapped) {
327 CERROR("Can't map message in %d pages (max %d)\n",
332 if (nfrags == EP_MAXFRAG) {
333 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
339 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
340 ktx, nfrags, iov->iov_base + offset, fraglen,
341 basepage, npages, nmapped);
344 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
345 iov->iov_base + offset, fraglen,
346 kqswnal_data.kqn_ep_tx_nmh, basepage,
347 &railmask, &ktx->ktx_frags[nfrags]);
349 if (nfrags == ktx->ktx_firsttmpfrag ||
350 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
351 &ktx->ktx_frags[nfrags - 1],
352 &ktx->ktx_frags[nfrags])) {
353 /* new frag if this is the first or can't merge */
357 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
358 kqswnal_data.kqn_eptxdmahandle,
359 iov->iov_base + offset, fraglen,
360 basepage, &ktx->ktx_frags[nfrags].Base);
362 if (nfrags > 0 && /* previous frag mapped */
363 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
364 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
365 /* just extend previous */
366 ktx->ktx_frags[nfrags - 1].Len += fraglen;
368 ktx->ktx_frags[nfrags].Len = fraglen;
369 nfrags++; /* new frag */
373 /* keep in loop for failure case */
374 ktx->ktx_nmappedpages = nmapped;
382 /* iov must not run out before end of data */
383 LASSERT (nob == 0 || niov > 0);
387 ktx->ktx_nfrag = nfrags;
388 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
389 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
396 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
398 kpr_fwd_desc_t *fwd = NULL;
401 kqswnal_unmap_tx (ktx); /* release temporary mappings */
402 ktx->ktx_state = KTX_IDLE;
404 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
406 list_del (&ktx->ktx_list); /* take off active list */
408 if (ktx->ktx_isnblk) {
409 /* reserved for non-blocking tx */
410 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
411 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
415 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
417 /* anything blocking for a tx descriptor? */
418 if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
420 CDEBUG(D_NET,"wakeup fwd\n");
422 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
423 kpr_fwd_desc_t, kprfd_list);
424 list_del (&fwd->kprfd_list);
427 wake_up (&kqswnal_data.kqn_idletxd_waitq);
429 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
434 /* schedule packet for forwarding again */
435 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
437 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
438 wake_up (&kqswnal_data.kqn_sched_waitq);
440 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
444 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
447 kqswnal_tx_t *ktx = NULL;
450 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
452 /* "normal" descriptor is free */
453 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
454 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
455 kqswnal_tx_t, ktx_list);
459 /* "normal" descriptor pool is empty */
461 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
462 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
463 list_add_tail (&fwd->kprfd_list,
464 &kqswnal_data.kqn_idletxd_fwdq);
468 /* doing a local transmit */
470 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
471 CERROR ("intr tx desc pool exhausted\n");
475 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
476 kqswnal_tx_t, ktx_list);
480 /* block for idle tx */
482 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
484 CDEBUG (D_NET, "blocking for tx desc\n");
485 wait_event (kqswnal_data.kqn_idletxd_waitq,
486 !list_empty (&kqswnal_data.kqn_idletxds));
490 list_del (&ktx->ktx_list);
491 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
492 ktx->ktx_launcher = current->pid;
495 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
497 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
498 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
504 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
507 lib_msg_t *repmsg = NULL;
509 switch (ktx->ktx_state) {
510 case KTX_FORWARDING: /* router asked me to forward this packet */
511 kpr_fwd_done (&kqswnal_data.kqn_router,
512 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
515 case KTX_SENDING: /* packet sourced locally */
516 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
517 (lib_msg_t *)ktx->ktx_args[1],
518 (error == 0) ? PTL_OK :
519 (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
522 case KTX_GETTING: /* Peer has DMA-ed direct? */
523 msg = (lib_msg_t *)ktx->ktx_args[1];
526 repmsg = lib_fake_reply_msg (&kqswnal_lib,
527 ktx->ktx_nid, msg->md);
533 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
535 lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
537 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
538 (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL);
546 kqswnal_put_idle_tx (ktx);
550 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
552 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
554 LASSERT (txd != NULL);
555 LASSERT (ktx != NULL);
557 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
559 if (status != EP_SUCCESS) {
561 CERROR ("Tx completion to "LPX64" failed: %d\n",
562 ktx->ktx_nid, status);
564 kqswnal_notify_peer_down(ktx);
567 } else if (ktx->ktx_state == KTX_GETTING) {
568 /* RPC completed OK; what did our peer put in the status
571 status = ep_txd_statusblk(txd)->Data[0];
573 status = ep_txd_statusblk(txd)->Status;
579 kqswnal_tx_done (ktx, status);
583 kqswnal_launch (kqswnal_tx_t *ktx)
585 /* Don't block for transmit descriptor if we're in interrupt context */
586 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
587 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
591 ktx->ktx_launchtime = jiffies;
593 LASSERT (dest >= 0); /* must be a peer */
594 if (ktx->ktx_state == KTX_GETTING) {
595 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
596 * other frags are the GET sink which we obviously don't
599 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
601 kqswnal_txhandler, ktx,
602 NULL, ktx->ktx_frags, 1);
604 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
605 ktx->ktx_port, attr, kqswnal_txhandler,
606 ktx, NULL, ktx->ktx_frags, 1);
610 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
612 kqswnal_txhandler, ktx,
613 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
615 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
617 kqswnal_txhandler, ktx,
618 ktx->ktx_frags, ktx->ktx_nfrag);
623 case EP_SUCCESS: /* success */
626 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
627 LASSERT (in_interrupt());
629 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
631 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
632 wake_up (&kqswnal_data.kqn_sched_waitq);
634 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
637 default: /* fatal error */
638 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
639 kqswnal_notify_peer_down(ktx);
640 return (-EHOSTUNREACH);
645 hdr_type_string (ptl_hdr_t *hdr)
657 return ("<UNKNOWN>");
662 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
664 char *type_str = hdr_type_string (hdr);
666 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
667 NTOH__u32(hdr->payload_length));
668 CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
669 NTOH__u32(hdr->src_pid));
670 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
671 NTOH__u32(hdr->dest_pid));
673 switch (NTOH__u32(hdr->type)) {
675 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
676 "match bits "LPX64"\n",
677 NTOH__u32 (hdr->msg.put.ptl_index),
678 hdr->msg.put.ack_wmd.wh_interface_cookie,
679 hdr->msg.put.ack_wmd.wh_object_cookie,
680 NTOH__u64 (hdr->msg.put.match_bits));
681 CERROR(" offset %d, hdr data "LPX64"\n",
682 NTOH__u32(hdr->msg.put.offset),
683 hdr->msg.put.hdr_data);
687 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
688 "match bits "LPX64"\n",
689 NTOH__u32 (hdr->msg.get.ptl_index),
690 hdr->msg.get.return_wmd.wh_interface_cookie,
691 hdr->msg.get.return_wmd.wh_object_cookie,
692 hdr->msg.get.match_bits);
693 CERROR(" Length %d, src offset %d\n",
694 NTOH__u32 (hdr->msg.get.sink_length),
695 NTOH__u32 (hdr->msg.get.src_offset));
699 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
700 hdr->msg.ack.dst_wmd.wh_interface_cookie,
701 hdr->msg.ack.dst_wmd.wh_object_cookie,
702 NTOH__u32 (hdr->msg.ack.mlength));
706 CERROR(" dst md "LPX64"."LPX64"\n",
707 hdr->msg.reply.dst_wmd.wh_interface_cookie,
708 hdr->msg.reply.dst_wmd.wh_object_cookie);
711 } /* end of print_hdr() */
715 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
719 CDEBUG (how, "%s: %d\n", str, n);
720 for (i = 0; i < n; i++) {
721 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
726 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
727 int nsrc, EP_IOVEC *src,
728 int ndst, EP_IOVEC *dst)
737 for (count = 0; count < ndv; count++, dv++) {
739 if (nsrc == 0 || ndst == 0) {
741 /* For now I'll barf on any left over entries */
742 CERROR ("mismatched src and dst iovs\n");
748 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
750 dv->Source = src->Base;
751 dv->Dest = dst->Base;
753 if (nob >= src->Len) {
761 if (nob >= dst->Len) {
770 CERROR ("DATAVEC too small\n");
776 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
777 struct iovec *iov, ptl_kiov_t *kiov,
780 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
781 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
782 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
787 EP_DATAVEC datav[EP_MAXFRAG];
790 LASSERT (krx->krx_rpc_reply_needed);
791 LASSERT ((iov == NULL) != (kiov == NULL));
793 /* see kqswnal_sendmsg comment regarding endian-ness */
794 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
795 /* msg too small to discover rmd size */
796 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
797 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
801 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
802 /* rmd doesn't fit in the incoming message */
803 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
804 krx->krx_nob, rmd->kqrmd_nfrag,
805 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
809 /* Map the source data... */
810 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
812 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
814 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
817 CERROR ("Can't map source data: %d\n", rc);
822 if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
823 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
824 ktx->ktx_nfrag, rmd->kqrmd_nfrag);
828 for (i = 0; i < rmd->kqrmd_nfrag; i++)
829 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
830 CERROR("Can't cope with unequal frags %d(%d):"
831 " %d local %d remote\n",
833 ktx->ktx_frags[i].nmd_len,
834 rmd->kqrmd_frag[i].nmd_len);
838 ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
839 ktx->ktx_nfrag, ktx->ktx_frags,
840 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
842 CERROR ("Can't create datavec: %d\n", ndatav);
847 /* Our caller will start to race with kqswnal_dma_reply_complete... */
848 LASSERT (atomic_read (&krx->krx_refcount) == 1);
849 atomic_set (&krx->krx_refcount, 2);
852 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
853 &kqswnal_rpc_success,
854 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
855 if (rc == EP_SUCCESS)
858 /* Well we tried... */
859 krx->krx_rpc_reply_needed = 0;
861 rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
862 &kqswnal_rpc_success, datav, ndatav);
863 if (rc == EP_SUCCESS)
866 /* "old" EKC destroys rxd on failed completion */
870 CERROR("can't complete RPC: %d\n", rc);
872 /* reset refcount back to 1: we're not going to be racing with
873 * kqswnal_dma_reply_complete. */
874 atomic_set (&krx->krx_refcount, 1);
876 return (-ECONNABORTED);
880 kqswnal_sendmsg (nal_cb_t *nal,
887 unsigned int payload_niov,
888 struct iovec *payload_iov,
889 ptl_kiov_t *payload_kiov,
890 size_t payload_offset,
903 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
904 " pid %u\n", payload_nob, payload_niov, nid, pid);
906 LASSERT (payload_nob == 0 || payload_niov > 0);
907 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
909 /* It must be OK to kmap() if required */
910 LASSERT (payload_kiov == NULL || !in_interrupt ());
911 /* payload is either all vaddrs or all pages */
912 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
914 if (payload_nob > KQSW_MAXPAYLOAD) {
915 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
916 payload_nob, KQSW_MAXPAYLOAD);
921 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
922 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
923 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
925 CERROR("Can't route to "LPX64": router error %d\n",
929 if (kqswnal_nid2elanid (targetnid) < 0) {
930 CERROR("Bad gateway "LPX64" for "LPX64"\n",
936 /* I may not block for a transmit descriptor if I might block the
937 * receiver, or an interrupt handler. */
938 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
939 type == PTL_MSG_REPLY ||
942 kqswnal_cerror_hdr (hdr);
943 return (PTL_NOSPACE);
946 ktx->ktx_nid = targetnid;
947 ktx->ktx_args[0] = private;
948 ktx->ktx_args[1] = libmsg;
950 if (type == PTL_MSG_REPLY &&
951 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
952 if (nid != targetnid ||
953 kqswnal_nid2elanid(nid) !=
954 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
955 CERROR("Optimized reply nid conflict: "
956 "nid "LPX64" via "LPX64" elanID %d\n",
958 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
962 /* peer expects RPC completion with GET data */
963 rc = kqswnal_dma_reply (ktx, payload_niov,
964 payload_iov, payload_kiov,
965 payload_offset, payload_nob);
969 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
970 kqswnal_put_idle_tx (ktx);
974 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
975 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
978 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
979 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
980 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
982 if (payload_kiov != NULL) {
983 ptl_kiov_t *kiov = &payload_kiov[i];
985 if (sumoff >= kiov->kiov_len) {
986 sumoff -= kiov->kiov_len;
988 char *addr = ((char *)kmap (kiov->kiov_page)) +
989 kiov->kiov_offset + sumoff;
990 int fragnob = kiov->kiov_len - sumoff;
992 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
995 kunmap(kiov->kiov_page);
998 struct iovec *iov = &payload_iov[i];
1000 if (sumoff > iov->iov_len) {
1001 sumoff -= iov->iov_len;
1003 char *addr = iov->iov_base + sumoff;
1004 int fragnob = iov->iov_len - sumoff;
1006 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1012 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1015 if (kqswnal_data.kqn_optimized_gets &&
1016 type == PTL_MSG_GET && /* doing a GET */
1017 nid == targetnid) { /* not forwarding */
1018 lib_md_t *md = libmsg->md;
1019 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1021 /* Optimised path: I send over the Elan vaddrs of the get
1022 * sink buffers, and my peer DMAs directly into them.
1024 * First I set up ktx as if it was going to send this
1025 * payload, (it needs to map it anyway). This fills
1026 * ktx_frags[1] and onward with the network addresses
1027 * of the GET sink frags. I copy these into ktx_buffer,
1028 * immediately after the header, and send that as my GET
1031 * Note that the addresses are sent in native endian-ness.
1032 * When EKC copes with different endian nodes, I'll fix
1033 * this (and eat my hat :) */
1035 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1036 ktx->ktx_state = KTX_GETTING;
1038 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1039 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1040 md->md_niov, md->md_iov.kiov);
1042 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1043 md->md_niov, md->md_iov.iov);
1046 kqswnal_put_idle_tx (ktx);
1050 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1052 payload_nob = offsetof(kqswnal_remotemd_t,
1053 kqrmd_frag[rmd->kqrmd_nfrag]);
1054 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1057 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1058 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1060 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1061 0, KQSW_HDR_SIZE + payload_nob);
1063 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1064 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1066 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1067 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1069 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1071 /* small message: single frag copied into the pre-mapped buffer */
1073 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1074 ktx->ktx_state = KTX_SENDING;
1076 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1077 0, KQSW_HDR_SIZE + payload_nob);
1079 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1080 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1082 if (payload_nob > 0) {
1083 if (payload_kiov != NULL)
1084 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1085 payload_niov, payload_kiov,
1086 payload_offset, payload_nob);
1088 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1089 payload_niov, payload_iov,
1090 payload_offset, payload_nob);
1094 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1096 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1097 ktx->ktx_state = KTX_SENDING;
1099 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1102 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1103 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1105 if (payload_kiov != NULL)
1106 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1107 payload_niov, payload_kiov);
1109 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1110 payload_niov, payload_iov);
1112 kqswnal_put_idle_tx (ktx);
1117 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1118 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1120 rc = kqswnal_launch (ktx);
1121 if (rc != 0) { /* failed? */
1122 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1123 kqswnal_put_idle_tx (ktx);
1127 CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
1128 payload_nob, nid, targetnid);
1133 kqswnal_send (nal_cb_t *nal,
1140 unsigned int payload_niov,
1141 struct iovec *payload_iov,
1142 size_t payload_offset,
1145 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1146 payload_niov, payload_iov, NULL,
1147 payload_offset, payload_nob));
1151 kqswnal_send_pages (nal_cb_t *nal,
1158 unsigned int payload_niov,
1159 ptl_kiov_t *payload_kiov,
1160 size_t payload_offset,
1163 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1164 payload_niov, NULL, payload_kiov,
1165 payload_offset, payload_nob));
1169 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1173 ptl_kiov_t *kiov = fwd->kprfd_kiov;
1174 int niov = fwd->kprfd_niov;
1175 int nob = fwd->kprfd_nob;
1176 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1179 CERROR ("checksums for forwarded packets not implemented\n");
1182 /* The router wants this NAL to forward a packet */
1183 CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1184 fwd, nid, niov, nob);
1186 ktx = kqswnal_get_idle_tx (fwd, 0);
1187 if (ktx == NULL) /* can't get txd right now */
1188 return; /* fwd will be scheduled when tx desc freed */
1190 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
1191 nid = fwd->kprfd_target_nid; /* target is final dest */
1193 if (kqswnal_nid2elanid (nid) < 0) {
1194 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1199 /* copy hdr into pre-mapped buffer */
1200 memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1201 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1203 ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
1204 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1206 ktx->ktx_state = KTX_FORWARDING;
1207 ktx->ktx_args[0] = fwd;
1208 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1210 if (nob <= KQSW_TX_MAXCONTIG)
1212 /* send payload from ktx's pre-mapped contiguous buffer */
1214 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1215 0, KQSW_HDR_SIZE + nob);
1217 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1218 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1221 lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1222 niov, kiov, 0, nob);
1226 /* zero copy payload */
1228 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1231 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1232 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1234 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1239 rc = kqswnal_launch (ktx);
1245 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1247 kqswnal_put_idle_tx (ktx);
1248 /* complete now (with failure) */
1249 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1253 kqswnal_fwd_callback (void *arg, int error)
1255 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1257 /* The router has finished forwarding this packet */
1261 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1263 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1264 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1267 kqswnal_requeue_rx (krx);
1271 kqswnal_dma_reply_complete (EP_RXD *rxd)
1273 int status = ep_rxd_status(rxd);
1274 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1275 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1276 lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
1278 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1279 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1281 LASSERT (krx->krx_rxd == rxd);
1282 LASSERT (krx->krx_rpc_reply_needed);
1284 krx->krx_rpc_reply_needed = 0;
1285 kqswnal_rx_done (krx);
1287 lib_finalize (&kqswnal_lib, NULL, msg,
1288 (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1289 kqswnal_put_idle_tx (ktx);
1293 kqswnal_rpc_complete (EP_RXD *rxd)
1295 int status = ep_rxd_status(rxd);
1296 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1298 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1299 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1301 LASSERT (krx->krx_rxd == rxd);
1302 LASSERT (krx->krx_rpc_reply_needed);
1304 krx->krx_rpc_reply_needed = 0;
1305 kqswnal_requeue_rx (krx);
1309 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1313 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1315 if (krx->krx_rpc_reply_needed) {
1317 /* We failed to complete the peer's optimized GET (e.g. we
1318 * couldn't map the source buffers). We complete the
1319 * peer's EKC rpc now with failure. */
1321 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1322 &kqswnal_rpc_failed, NULL, NULL, 0);
1323 if (rc == EP_SUCCESS)
1326 CERROR("can't complete RPC: %d\n", rc);
1328 if (krx->krx_rxd != NULL) {
1329 /* We didn't try (and fail) to complete earlier... */
1330 rc = ep_complete_rpc(krx->krx_rxd,
1331 kqswnal_rpc_complete, krx,
1332 &kqswnal_rpc_failed, NULL, 0);
1333 if (rc == EP_SUCCESS)
1336 CERROR("can't complete RPC: %d\n", rc);
1339 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1340 * have to requeue from scratch here, unless we're shutting
1342 if (kqswnal_data.kqn_shuttingdown)
1345 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1346 krx->krx_elanbuffer,
1347 krx->krx_npages * PAGE_SIZE, 0);
1348 LASSERT (rc == EP_SUCCESS);
1349 /* We don't handle failure here; it's incredibly rare
1350 * (never reported?) and only happens with "old" EKC */
1356 if (kqswnal_data.kqn_shuttingdown) {
1357 /* free EKC rxd on shutdown */
1358 ep_complete_receive(krx->krx_rxd);
1360 /* repost receive */
1361 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1362 &krx->krx_elanbuffer, 0);
1365 /* don't actually requeue on shutdown */
1366 if (!kqswnal_data.kqn_shuttingdown)
1367 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1368 krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1373 kqswnal_rx (kqswnal_rx_t *krx)
1375 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1376 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
1381 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1383 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1384 atomic_set(&krx->krx_refcount, 1);
1385 lib_parse (&kqswnal_lib, hdr, krx);
1386 kqswnal_rx_done(krx);
1391 CERROR ("checksums for forwarded packets not implemented\n");
1394 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1396 CERROR("dropping packet from "LPX64" for "LPX64
1397 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1399 kqswnal_requeue_rx (krx);
1403 nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1406 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1407 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1409 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1412 LASSERT (niov < krx->krx_npages);
1414 krx->krx_kiov[niov].kiov_offset = 0;
1415 krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1421 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1422 hdr, payload_nob, niov, krx->krx_kiov,
1423 kqswnal_fwd_callback, krx);
1425 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1428 /* Receive Interrupt Handler: posts to schedulers */
1430 kqswnal_rxhandler(EP_RXD *rxd)
1432 unsigned long flags;
1433 int nob = ep_rxd_len (rxd);
1434 int status = ep_rxd_status (rxd);
1435 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1437 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1438 rxd, krx, nob, status);
1440 LASSERT (krx != NULL);
1445 krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1447 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1450 /* must receive a whole header to be able to parse */
1451 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1453 /* receives complete with failure when receiver is removed */
1455 if (status == EP_SHUTDOWN)
1456 LASSERT (kqswnal_data.kqn_shuttingdown);
1458 CERROR("receive status failed with status %d nob %d\n",
1459 ep_rxd_status(rxd), nob);
1461 if (!kqswnal_data.kqn_shuttingdown)
1462 CERROR("receive status failed with status %d nob %d\n",
1463 ep_rxd_status(rxd), nob);
1465 kqswnal_requeue_rx (krx);
1469 if (!in_interrupt()) {
1474 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1476 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1477 wake_up (&kqswnal_data.kqn_sched_waitq);
1479 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1484 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1486 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1488 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1489 ", dpid %d, spid %d, type %d\n",
1490 ishdr ? "Header" : "Payload", krx,
1491 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1492 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1493 NTOH__u32(hdr->type));
1495 switch (NTOH__u32 (hdr->type))
1498 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1500 NTOH__u32(hdr->msg.ack.mlength),
1501 hdr->msg.ack.dst_wmd.handle_cookie,
1502 hdr->msg.ack.dst_wmd.handle_idx,
1503 NTOH__u64(hdr->msg.ack.match_bits),
1504 NTOH__u32(hdr->msg.ack.length));
1507 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1508 " len %u off %u data "LPX64"\n",
1509 NTOH__u32(hdr->msg.put.ptl_index),
1510 hdr->msg.put.ack_wmd.handle_cookie,
1511 hdr->msg.put.ack_wmd.handle_idx,
1512 NTOH__u64(hdr->msg.put.match_bits),
1513 NTOH__u32(hdr->msg.put.length),
1514 NTOH__u32(hdr->msg.put.offset),
1515 hdr->msg.put.hdr_data);
1518 CERROR ("GET: <>\n");
1521 CERROR ("REPLY: <>\n");
1524 CERROR ("TYPE?: <>\n");
1530 kqswnal_recvmsg (nal_cb_t *nal,
1540 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1541 char *buffer = page_address(krx->krx_kiov[0].kiov_page);
1549 kqsw_csum_t senders_csum;
1550 kqsw_csum_t payload_csum = 0;
1551 kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1552 size_t csum_len = mlen;
1555 static atomic_t csum_counter;
1556 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1558 atomic_inc (&csum_counter);
1560 memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1561 if (senders_csum != hdr_csum)
1562 kqswnal_csum_error (krx, 1);
1564 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1566 /* What was actually received must be >= payload. */
1567 LASSERT (mlen <= rlen);
1568 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1569 CERROR("Bad message size: have %d, need %d + %d\n",
1570 krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1574 /* It must be OK to kmap() if required */
1575 LASSERT (kiov == NULL || !in_interrupt ());
1576 /* Either all pages or all vaddrs */
1577 LASSERT (!(kiov != NULL && iov != NULL));
1581 page_ptr = buffer + KQSW_HDR_SIZE;
1582 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1587 /* skip complete frags */
1588 while (offset >= kiov->kiov_len) {
1589 offset -= kiov->kiov_len;
1594 iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1595 kiov->kiov_offset + offset;
1596 iov_nob = kiov->kiov_len - offset;
1598 /* skip complete frags */
1599 while (offset >= iov->iov_len) {
1600 offset -= iov->iov_len;
1605 iov_ptr = iov->iov_base + offset;
1606 iov_nob = iov->iov_len - offset;
1612 if (frag > page_nob)
1617 memcpy (iov_ptr, page_ptr, frag);
1619 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1633 LASSERT (page < krx->krx_npages);
1634 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1635 page_nob = PAGE_SIZE;
1641 else if (kiov != NULL) {
1642 kunmap (kiov->kiov_page);
1646 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1647 iov_nob = kiov->kiov_len;
1652 iov_ptr = iov->iov_base;
1653 iov_nob = iov->iov_len;
1658 kunmap (kiov->kiov_page);
1662 memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
1663 sizeof(kqsw_csum_t));
1665 if (csum_len != rlen)
1666 CERROR("Unable to checksum data in user's buffer\n");
1667 else if (senders_csum != payload_csum)
1668 kqswnal_csum_error (krx, 0);
1671 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1673 hdr_csum, payload_csum, csum_frags, csum_nob);
1675 lib_finalize(nal, private, libmsg, PTL_OK);
1681 kqswnal_recv(nal_cb_t *nal,
1690 return (kqswnal_recvmsg(nal, private, libmsg,
1692 offset, mlen, rlen));
1696 kqswnal_recv_pages (nal_cb_t *nal,
1705 return (kqswnal_recvmsg(nal, private, libmsg,
1707 offset, mlen, rlen));
1711 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1713 long pid = kernel_thread (fn, arg, 0);
1718 atomic_inc (&kqswnal_data.kqn_nthreads);
1719 atomic_inc (&kqswnal_data.kqn_nthreads_running);
1724 kqswnal_thread_fini (void)
1726 atomic_dec (&kqswnal_data.kqn_nthreads);
1730 kqswnal_scheduler (void *arg)
1734 kpr_fwd_desc_t *fwd;
1735 unsigned long flags;
1738 int shuttingdown = 0;
1741 kportal_daemonize ("kqswnal_sched");
1742 kportal_blockallsigs ();
1744 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1748 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1750 if (kqswnal_data.kqn_shuttingdown == 2)
1753 /* During stage 1 of shutdown we are still responsive
1756 atomic_dec (&kqswnal_data.kqn_nthreads_running);
1757 shuttingdown = kqswnal_data.kqn_shuttingdown;
1762 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1764 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1765 kqswnal_rx_t, krx_list);
1766 list_del (&krx->krx_list);
1767 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1773 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1776 if (!shuttingdown &&
1777 !list_empty (&kqswnal_data.kqn_delayedtxds))
1779 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1780 kqswnal_tx_t, ktx_list);
1781 list_del_init (&ktx->ktx_delayed_list);
1782 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1785 rc = kqswnal_launch (ktx);
1786 if (rc != 0) /* failed: ktx_nid down? */
1788 CERROR("Failed delayed transmit to "LPX64
1789 ": %d\n", ktx->ktx_nid, rc);
1790 kqswnal_tx_done (ktx, rc);
1794 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1798 !list_empty (&kqswnal_data.kqn_delayedfwds))
1800 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1801 list_del (&fwd->kprfd_list);
1802 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1804 kqswnal_fwd_packet (NULL, fwd);
1807 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1810 /* nothing to do or hogging CPU */
1811 if (!did_something || counter++ == KQSW_RESCHED) {
1812 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1817 if (!did_something) {
1818 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1819 kqswnal_data.kqn_shuttingdown != shuttingdown ||
1820 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1821 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1822 !list_empty(&kqswnal_data.kqn_delayedfwds));
1824 } else if (need_resched())
1827 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1831 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1833 kqswnal_thread_fini ();
1837 nal_cb_t kqswnal_lib =
1839 nal_data: &kqswnal_data, /* NAL private data */
1840 cb_send: kqswnal_send,
1841 cb_send_pages: kqswnal_send_pages,
1842 cb_recv: kqswnal_recv,
1843 cb_recv_pages: kqswnal_recv_pages,
1844 cb_read: kqswnal_read,
1845 cb_write: kqswnal_write,
1846 cb_malloc: kqswnal_malloc,
1847 cb_free: kqswnal_free,
1848 cb_printf: kqswnal_printf,
1849 cb_cli: kqswnal_cli,
1850 cb_sti: kqswnal_sti,
1851 cb_dist: kqswnal_dist