1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 EP_STATUSBLK kqswnal_rpc_success;
30 EP_STATUSBLK kqswnal_rpc_failed;
33 * LIB functions follow
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41 nal->ni.nid, len, src_addr, dst_addr );
42 memcpy( dst_addr, src_addr, len );
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52 nal->ni.nid, len, src_addr, dst_addr );
53 memcpy( dst_addr, src_addr, len );
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
63 PORTAL_ALLOC(buf, len);
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
70 PORTAL_FREE(buf, len);
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
80 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
83 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
85 CDEBUG (D_NET, "%s", msg);
90 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
92 kqswnal_data_t *data= nal->nal_data;
94 spin_lock_irqsave(&data->kqn_statelock, *flags);
99 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
101 kqswnal_data_t *data= nal->nal_data;
103 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
108 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
110 if (nid == nal->ni.nid)
111 *dist = 0; /* it's me */
112 else if (kqswnal_nid2elanid (nid) >= 0)
113 *dist = 1; /* it's my peer */
115 *dist = 2; /* via router */
120 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
125 do_gettimeofday (&now);
126 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
128 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
132 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
138 if (ktx->ktx_nmappedpages == 0)
142 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
143 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
145 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
146 ep_dvma_unload(kqswnal_data.kqn_ep,
147 kqswnal_data.kqn_ep_tx_nmh,
150 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
151 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
153 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
154 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
155 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
157 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
158 kqswnal_data.kqn_eptxdmahandle,
159 ktx->ktx_basepage, ktx->ktx_nmappedpages);
162 ktx->ktx_nmappedpages = 0;
166 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int nob, int niov, ptl_kiov_t *kiov)
168 int nfrags = ktx->ktx_nfrag;
169 int nmapped = ktx->ktx_nmappedpages;
170 int maxmapped = ktx->ktx_npages;
171 uint32_t basepage = ktx->ktx_basepage + nmapped;
174 EP_RAILMASK railmask;
175 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
177 kqswnal_nid2elanid(ktx->ktx_nid));
180 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
183 railmask = 1 << rail;
185 LASSERT (nmapped <= maxmapped);
186 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
187 LASSERT (nfrags <= EP_MAXFRAG);
192 int fraglen = kiov->kiov_len;
194 /* nob exactly spans the iovs */
195 LASSERT (fraglen <= nob);
196 /* each frag fits in a page */
197 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
200 if (nmapped > maxmapped) {
201 CERROR("Can't map message in %d pages (max %d)\n",
206 if (nfrags == EP_MAXFRAG) {
207 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
212 /* XXX this is really crap, but we'll have to kmap until
213 * EKC has a page (rather than vaddr) mapping interface */
215 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
218 "%p[%d] loading %p for %d, page %d, %d total\n",
219 ktx, nfrags, ptr, fraglen, basepage, nmapped);
222 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
224 kqswnal_data.kqn_ep_tx_nmh, basepage,
225 &railmask, &ktx->ktx_frags[nfrags]);
227 if (nfrags == ktx->ktx_firsttmpfrag ||
228 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
229 &ktx->ktx_frags[nfrags - 1],
230 &ktx->ktx_frags[nfrags])) {
231 /* new frag if this is the first or can't merge */
235 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
236 kqswnal_data.kqn_eptxdmahandle,
238 basepage, &ktx->ktx_frags[nfrags].Base);
240 if (nfrags > 0 && /* previous frag mapped */
241 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
242 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
243 /* just extend previous */
244 ktx->ktx_frags[nfrags - 1].Len += fraglen;
246 ktx->ktx_frags[nfrags].Len = fraglen;
247 nfrags++; /* new frag */
251 kunmap (kiov->kiov_page);
253 /* keep in loop for failure case */
254 ktx->ktx_nmappedpages = nmapped;
261 /* iov must not run out before end of data */
262 LASSERT (nob == 0 || niov > 0);
266 ktx->ktx_nfrag = nfrags;
267 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
268 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
274 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int nob, int niov, struct iovec *iov)
276 int nfrags = ktx->ktx_nfrag;
277 int nmapped = ktx->ktx_nmappedpages;
278 int maxmapped = ktx->ktx_npages;
279 uint32_t basepage = ktx->ktx_basepage + nmapped;
281 EP_RAILMASK railmask;
282 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
284 kqswnal_nid2elanid(ktx->ktx_nid));
287 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
290 railmask = 1 << rail;
292 LASSERT (nmapped <= maxmapped);
293 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
294 LASSERT (nfrags <= EP_MAXFRAG);
299 int fraglen = iov->iov_len;
300 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
302 /* nob exactly spans the iovs */
303 LASSERT (fraglen <= nob);
306 if (nmapped > maxmapped) {
307 CERROR("Can't map message in %d pages (max %d)\n",
312 if (nfrags == EP_MAXFRAG) {
313 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
319 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
320 ktx, nfrags, iov->iov_base, fraglen, basepage, npages,
324 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
325 iov->iov_base, fraglen,
326 kqswnal_data.kqn_ep_tx_nmh, basepage,
327 &railmask, &ktx->ktx_frags[nfrags]);
329 if (nfrags == ktx->ktx_firsttmpfrag ||
330 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
331 &ktx->ktx_frags[nfrags - 1],
332 &ktx->ktx_frags[nfrags])) {
333 /* new frag if this is the first or can't merge */
337 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
338 kqswnal_data.kqn_eptxdmahandle,
339 iov->iov_base, fraglen,
340 basepage, &ktx->ktx_frags[nfrags].Base);
342 if (nfrags > 0 && /* previous frag mapped */
343 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
344 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
345 /* just extend previous */
346 ktx->ktx_frags[nfrags - 1].Len += fraglen;
348 ktx->ktx_frags[nfrags].Len = fraglen;
349 nfrags++; /* new frag */
353 /* keep in loop for failure case */
354 ktx->ktx_nmappedpages = nmapped;
361 /* iov must not run out before end of data */
362 LASSERT (nob == 0 || niov > 0);
366 ktx->ktx_nfrag = nfrags;
367 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
368 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
375 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
377 kpr_fwd_desc_t *fwd = NULL;
380 kqswnal_unmap_tx (ktx); /* release temporary mappings */
381 ktx->ktx_state = KTX_IDLE;
383 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
385 list_del (&ktx->ktx_list); /* take off active list */
387 if (ktx->ktx_isnblk) {
388 /* reserved for non-blocking tx */
389 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
390 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
394 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
396 /* anything blocking for a tx descriptor? */
397 if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
399 CDEBUG(D_NET,"wakeup fwd\n");
401 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
402 kpr_fwd_desc_t, kprfd_list);
403 list_del (&fwd->kprfd_list);
406 wake_up (&kqswnal_data.kqn_idletxd_waitq);
408 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
413 /* schedule packet for forwarding again */
414 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
416 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
417 wake_up (&kqswnal_data.kqn_sched_waitq);
419 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
423 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
426 kqswnal_tx_t *ktx = NULL;
429 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
431 /* "normal" descriptor is free */
432 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
433 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
434 kqswnal_tx_t, ktx_list);
438 /* "normal" descriptor pool is empty */
440 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
441 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
442 list_add_tail (&fwd->kprfd_list,
443 &kqswnal_data.kqn_idletxd_fwdq);
447 /* doing a local transmit */
449 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
450 CERROR ("intr tx desc pool exhausted\n");
454 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
455 kqswnal_tx_t, ktx_list);
459 /* block for idle tx */
461 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
463 CDEBUG (D_NET, "blocking for tx desc\n");
464 wait_event (kqswnal_data.kqn_idletxd_waitq,
465 !list_empty (&kqswnal_data.kqn_idletxds));
469 list_del (&ktx->ktx_list);
470 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
471 ktx->ktx_launcher = current->pid;
474 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
476 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
477 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
483 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
488 switch (ktx->ktx_state) {
489 case KTX_FORWARDING: /* router asked me to forward this packet */
490 kpr_fwd_done (&kqswnal_data.kqn_router,
491 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
494 case KTX_SENDING: /* packet sourced locally */
495 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
496 (lib_msg_t *)ktx->ktx_args[1]);
499 case KTX_GETTING: /* Peer has DMA-ed direct? */
500 msg = (lib_msg_t *)ktx->ktx_args[1];
504 repmsg = lib_fake_reply_msg (&kqswnal_lib,
505 ktx->ktx_nid, msg->md);
507 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg);
510 lib_finalize (&kqswnal_lib, NULL, repmsg);
517 kqswnal_put_idle_tx (ktx);
521 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
523 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
525 LASSERT (txd != NULL);
526 LASSERT (ktx != NULL);
528 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
530 if (status != EP_SUCCESS) {
532 CERROR ("Tx completion to "LPX64" failed: %d\n",
533 ktx->ktx_nid, status);
535 kqswnal_notify_peer_down(ktx);
538 } else if (ktx->ktx_state == KTX_GETTING) {
539 /* RPC completed OK; what did our peer put in the status
542 status = ep_txd_statusblk(txd)->Data[0];
544 status = ep_txd_statusblk(txd)->Status;
550 kqswnal_tx_done (ktx, status);
554 kqswnal_launch (kqswnal_tx_t *ktx)
556 /* Don't block for transmit descriptor if we're in interrupt context */
557 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
558 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
562 ktx->ktx_launchtime = jiffies;
564 LASSERT (dest >= 0); /* must be a peer */
565 if (ktx->ktx_state == KTX_GETTING) {
566 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
567 * other frags are the GET sink which we obviously don't
570 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
572 kqswnal_txhandler, ktx,
573 NULL, ktx->ktx_frags, 1);
575 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
576 ktx->ktx_port, attr, kqswnal_txhandler,
577 ktx, NULL, ktx->ktx_frags, 1);
581 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
583 kqswnal_txhandler, ktx,
584 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
586 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
588 kqswnal_txhandler, ktx,
589 ktx->ktx_frags, ktx->ktx_nfrag);
594 case EP_SUCCESS: /* success */
597 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
598 LASSERT (in_interrupt());
600 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
602 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
603 wake_up (&kqswnal_data.kqn_sched_waitq);
605 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
608 default: /* fatal error */
609 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
610 kqswnal_notify_peer_down(ktx);
611 return (-EHOSTUNREACH);
616 hdr_type_string (ptl_hdr_t *hdr)
628 return ("<UNKNOWN>");
633 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
635 char *type_str = hdr_type_string (hdr);
637 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
638 NTOH__u32(hdr->payload_length));
639 CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
640 NTOH__u32(hdr->src_pid));
641 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
642 NTOH__u32(hdr->dest_pid));
644 switch (NTOH__u32(hdr->type)) {
646 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
647 "match bits "LPX64"\n",
648 NTOH__u32 (hdr->msg.put.ptl_index),
649 hdr->msg.put.ack_wmd.wh_interface_cookie,
650 hdr->msg.put.ack_wmd.wh_object_cookie,
651 NTOH__u64 (hdr->msg.put.match_bits));
652 CERROR(" offset %d, hdr data "LPX64"\n",
653 NTOH__u32(hdr->msg.put.offset),
654 hdr->msg.put.hdr_data);
658 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
659 "match bits "LPX64"\n",
660 NTOH__u32 (hdr->msg.get.ptl_index),
661 hdr->msg.get.return_wmd.wh_interface_cookie,
662 hdr->msg.get.return_wmd.wh_object_cookie,
663 hdr->msg.get.match_bits);
664 CERROR(" Length %d, src offset %d\n",
665 NTOH__u32 (hdr->msg.get.sink_length),
666 NTOH__u32 (hdr->msg.get.src_offset));
670 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
671 hdr->msg.ack.dst_wmd.wh_interface_cookie,
672 hdr->msg.ack.dst_wmd.wh_object_cookie,
673 NTOH__u32 (hdr->msg.ack.mlength));
677 CERROR(" dst md "LPX64"."LPX64"\n",
678 hdr->msg.reply.dst_wmd.wh_interface_cookie,
679 hdr->msg.reply.dst_wmd.wh_object_cookie);
682 } /* end of print_hdr() */
686 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
690 CDEBUG (how, "%s: %d\n", str, n);
691 for (i = 0; i < n; i++) {
692 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
697 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
698 int nsrc, EP_IOVEC *src,
699 int ndst, EP_IOVEC *dst)
708 for (count = 0; count < ndv; count++, dv++) {
710 if (nsrc == 0 || ndst == 0) {
712 /* For now I'll barf on any left over entries */
713 CERROR ("mismatched src and dst iovs\n");
719 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
721 dv->Source = src->Base;
722 dv->Dest = dst->Base;
724 if (nob >= src->Len) {
732 if (nob >= dst->Len) {
741 CERROR ("DATAVEC too small\n");
747 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
748 struct iovec *iov, ptl_kiov_t *kiov, int nob)
750 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
751 char *buffer = (char *)page_address(krx->krx_pages[0]);
752 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
757 EP_DATAVEC datav[EP_MAXFRAG];
760 LASSERT (krx->krx_rpc_reply_needed);
761 LASSERT ((iov == NULL) != (kiov == NULL));
763 /* see kqswnal_sendmsg comment regarding endian-ness */
764 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
765 /* msg too small to discover rmd size */
766 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
767 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
771 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
772 /* rmd doesn't fit in the incoming message */
773 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
774 krx->krx_nob, rmd->kqrmd_nfrag,
775 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
779 /* Map the source data... */
780 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
782 rc = kqswnal_map_tx_kiov (ktx, nob, nfrag, kiov);
784 rc = kqswnal_map_tx_iov (ktx, nob, nfrag, iov);
787 CERROR ("Can't map source data: %d\n", rc);
792 if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
793 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
794 ktx->ktx_nfrag, rmd->kqrmd_nfrag);
798 for (i = 0; i < rmd->kqrmd_nfrag; i++)
799 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
800 CERROR("Can't cope with unequal frags %d(%d):"
801 " %d local %d remote\n",
803 ktx->ktx_frags[i].nmd_len,
804 rmd->kqrmd_frag[i].nmd_len);
808 ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
809 ktx->ktx_nfrag, ktx->ktx_frags,
810 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
812 CERROR ("Can't create datavec: %d\n", ndatav);
817 /* Our caller will start to race with kqswnal_dma_reply_complete... */
818 LASSERT (atomic_read (&krx->krx_refcount) == 1);
819 atomic_set (&krx->krx_refcount, 2);
822 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
823 &kqswnal_rpc_success,
824 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
825 if (rc == EP_SUCCESS)
828 /* Well we tried... */
829 krx->krx_rpc_reply_needed = 0;
831 rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
832 &kqswnal_rpc_success, datav, ndatav);
833 if (rc == EP_SUCCESS)
836 /* "old" EKC destroys rxd on failed completion */
840 CERROR("can't complete RPC: %d\n", rc);
842 /* reset refcount back to 1: we're not going to be racing with
843 * kqswnal_dma_reply_complete. */
844 atomic_set (&krx->krx_refcount, 1);
846 return (-ECONNABORTED);
850 kqswnal_sendmsg (nal_cb_t *nal,
857 unsigned int payload_niov,
858 struct iovec *payload_iov,
859 ptl_kiov_t *payload_kiov,
871 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
872 " pid %u\n", payload_nob, payload_niov, nid, pid);
874 LASSERT (payload_nob == 0 || payload_niov > 0);
875 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
877 /* It must be OK to kmap() if required */
878 LASSERT (payload_kiov == NULL || !in_interrupt ());
879 /* payload is either all vaddrs or all pages */
880 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
882 if (payload_nob > KQSW_MAXPAYLOAD) {
883 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
884 payload_nob, KQSW_MAXPAYLOAD);
889 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
890 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
891 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
893 CERROR("Can't route to "LPX64": router error %d\n",
897 if (kqswnal_nid2elanid (targetnid) < 0) {
898 CERROR("Bad gateway "LPX64" for "LPX64"\n",
904 /* I may not block for a transmit descriptor if I might block the
905 * receiver, or an interrupt handler. */
906 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
907 type == PTL_MSG_REPLY ||
910 kqswnal_cerror_hdr (hdr);
911 return (PTL_NOSPACE);
914 ktx->ktx_nid = targetnid;
915 ktx->ktx_args[0] = private;
916 ktx->ktx_args[1] = libmsg;
918 if (type == PTL_MSG_REPLY &&
919 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
920 if (nid != targetnid ||
921 kqswnal_nid2elanid(nid) !=
922 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
923 CERROR("Optimized reply nid conflict: "
924 "nid "LPX64" via "LPX64" elanID %d\n",
926 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
930 /* peer expects RPC completion with GET data */
931 rc = kqswnal_dma_reply (ktx,
932 payload_niov, payload_iov,
933 payload_kiov, payload_nob);
937 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
938 kqswnal_put_idle_tx (ktx);
942 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
943 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
946 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
947 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
948 for (csum = 0, i = 0, sumnob = payload_nob; sumnob > 0; i++) {
949 if (payload_kiov != NULL) {
950 ptl_kiov_t *kiov = &payload_kiov[i];
951 char *addr = ((char *)kmap (kiov->kiov_page)) +
954 csum = kqsw_csum (csum, addr, MIN (sumnob, kiov->kiov_len));
955 sumnob -= kiov->kiov_len;
957 struct iovec *iov = &payload_iov[i];
959 csum = kqsw_csum (csum, iov->iov_base, MIN (sumnob, kiov->iov_len));
960 sumnob -= iov->iov_len;
963 memcpy(ktx->ktx_buffer +sizeof(*hdr) +sizeof(csum), &csum,sizeof(csum));
966 if (kqswnal_data.kqn_optimized_gets &&
967 type == PTL_MSG_GET && /* doing a GET */
968 nid == targetnid) { /* not forwarding */
969 lib_md_t *md = libmsg->md;
970 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
972 /* Optimised path: I send over the Elan vaddrs of the get
973 * sink buffers, and my peer DMAs directly into them.
975 * First I set up ktx as if it was going to send this
976 * payload, (it needs to map it anyway). This fills
977 * ktx_frags[1] and onward with the network addresses
978 * of the GET sink frags. I copy these into ktx_buffer,
979 * immediately after the header, and send that as my GET
982 * Note that the addresses are sent in native endian-ness.
983 * When EKC copes with different endian nodes, I'll fix
984 * this (and eat my hat :) */
986 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
987 ktx->ktx_state = KTX_GETTING;
989 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
990 rc = kqswnal_map_tx_kiov (ktx, md->length,
991 md->md_niov, md->md_iov.kiov);
993 rc = kqswnal_map_tx_iov (ktx, md->length,
994 md->md_niov, md->md_iov.iov);
997 kqswnal_put_idle_tx (ktx);
1001 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1003 payload_nob = offsetof(kqswnal_remotemd_t,
1004 kqrmd_frag[rmd->kqrmd_nfrag]);
1005 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1008 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1009 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1011 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1012 0, KQSW_HDR_SIZE + payload_nob);
1014 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1015 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1017 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1018 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1020 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1022 /* small message: single frag copied into the pre-mapped buffer */
1024 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1025 ktx->ktx_state = KTX_SENDING;
1027 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1028 0, KQSW_HDR_SIZE + payload_nob);
1030 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1031 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1033 if (payload_nob > 0) {
1034 if (payload_kiov != NULL)
1035 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1036 payload_niov, payload_kiov, payload_nob);
1038 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1039 payload_niov, payload_iov, payload_nob);
1043 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1045 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1046 ktx->ktx_state = KTX_SENDING;
1048 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1051 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1052 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1054 if (payload_kiov != NULL)
1055 rc = kqswnal_map_tx_kiov (ktx, payload_nob,
1056 payload_niov, payload_kiov);
1058 rc = kqswnal_map_tx_iov (ktx, payload_nob,
1059 payload_niov, payload_iov);
1061 kqswnal_put_idle_tx (ktx);
1066 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1067 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1069 rc = kqswnal_launch (ktx);
1070 if (rc != 0) { /* failed? */
1071 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1072 kqswnal_put_idle_tx (ktx);
1076 CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
1077 payload_nob, nid, targetnid);
1082 kqswnal_send (nal_cb_t *nal,
1089 unsigned int payload_niov,
1090 struct iovec *payload_iov,
1093 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1094 payload_niov, payload_iov, NULL, payload_nob));
1098 kqswnal_send_pages (nal_cb_t *nal,
1105 unsigned int payload_niov,
1106 ptl_kiov_t *payload_kiov,
1109 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1110 payload_niov, NULL, payload_kiov, payload_nob));
1114 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1118 struct iovec *iov = fwd->kprfd_iov;
1119 int niov = fwd->kprfd_niov;
1120 int nob = fwd->kprfd_nob;
1121 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1124 CERROR ("checksums for forwarded packets not implemented\n");
1127 /* The router wants this NAL to forward a packet */
1128 CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n",
1129 fwd, nid, niov, nob);
1133 ktx = kqswnal_get_idle_tx (fwd, 0);
1134 if (ktx == NULL) /* can't get txd right now */
1135 return; /* fwd will be scheduled when tx desc freed */
1137 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
1138 nid = fwd->kprfd_target_nid; /* target is final dest */
1140 if (kqswnal_nid2elanid (nid) < 0) {
1141 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1146 if (nob > KQSW_NRXMSGBYTES_LARGE) {
1147 CERROR ("Can't forward [%p] to "LPX64
1148 ": size %d bigger than max packet size %ld\n",
1149 fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE);
1154 ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ?
1155 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1157 ktx->ktx_state = KTX_FORWARDING;
1158 ktx->ktx_args[0] = fwd;
1160 if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) &&
1161 nob <= KQSW_TX_BUFFER_SIZE)
1163 /* send from ktx's pre-mapped contiguous buffer? */
1164 lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, nob);
1166 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1169 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1170 ktx->ktx_frags[0].Len = nob;
1172 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1173 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1178 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
1179 rc = kqswnal_map_tx_iov (ktx, nob, niov, iov);
1183 ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base;
1186 rc = kqswnal_launch (ktx);
1192 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1194 kqswnal_put_idle_tx (ktx);
1195 /* complete now (with failure) */
1196 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1200 kqswnal_fwd_callback (void *arg, int error)
1202 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1204 /* The router has finished forwarding this packet */
1208 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1210 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1211 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1214 kqswnal_requeue_rx (krx);
1218 kqswnal_dma_reply_complete (EP_RXD *rxd)
1220 int status = ep_rxd_status(rxd);
1221 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1222 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1223 lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
1225 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1226 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1228 LASSERT (krx->krx_rxd == rxd);
1229 LASSERT (krx->krx_rpc_reply_needed);
1231 krx->krx_rpc_reply_needed = 0;
1232 kqswnal_rx_done (krx);
1234 lib_finalize (&kqswnal_lib, NULL, msg);
1235 kqswnal_put_idle_tx (ktx);
1239 kqswnal_rpc_complete (EP_RXD *rxd)
1241 int status = ep_rxd_status(rxd);
1242 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1244 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1245 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1247 LASSERT (krx->krx_rxd == rxd);
1248 LASSERT (krx->krx_rpc_reply_needed);
1250 krx->krx_rpc_reply_needed = 0;
1251 kqswnal_requeue_rx (krx);
1255 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1259 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1261 if (krx->krx_rpc_reply_needed) {
1263 /* We failed to complete the peer's optimized GET (e.g. we
1264 * couldn't map the source buffers). We complete the
1265 * peer's EKC rpc now with failure. */
1267 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1268 &kqswnal_rpc_failed, NULL, NULL, 0);
1269 if (rc == EP_SUCCESS)
1272 CERROR("can't complete RPC: %d\n", rc);
1274 if (krx->krx_rxd != NULL) {
1275 /* We didn't try (and fail) to complete earlier... */
1276 rc = ep_complete_rpc(krx->krx_rxd,
1277 kqswnal_rpc_complete, krx,
1278 &kqswnal_rpc_failed, NULL, 0);
1279 if (rc == EP_SUCCESS)
1282 CERROR("can't complete RPC: %d\n", rc);
1285 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1286 * have to requeue from scratch here, unless we're shutting
1288 if (kqswnal_data.kqn_shuttingdown)
1291 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1292 krx->krx_elanbuffer,
1293 krx->krx_npages * PAGE_SIZE, 0);
1294 LASSERT (rc == EP_SUCCESS);
1295 /* We don't handle failure here; it's incredibly rare
1296 * (never reported?) and only happens with "old" EKC */
1302 if (kqswnal_data.kqn_shuttingdown) {
1303 /* free EKC rxd on shutdown */
1304 ep_complete_receive(krx->krx_rxd);
1306 /* repost receive */
1307 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1308 &krx->krx_elanbuffer, 0);
1311 /* don't actually requeue on shutdown */
1312 if (!kqswnal_data.kqn_shuttingdown)
1313 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1314 krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1319 kqswnal_rx (kqswnal_rx_t *krx)
1321 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]);
1322 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
1326 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1328 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1329 atomic_set(&krx->krx_refcount, 1);
1330 lib_parse (&kqswnal_lib, hdr, krx);
1331 kqswnal_rx_done(krx);
1336 CERROR ("checksums for forwarded packets not implemented\n");
1339 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1341 CERROR("dropping packet from "LPX64" for "LPX64
1342 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1344 kqswnal_requeue_rx (krx);
1348 /* NB forwarding may destroy iov; rebuild every time */
1349 for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++)
1351 LASSERT (niov < krx->krx_npages);
1352 krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]);
1353 krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob);
1356 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1357 krx->krx_nob, niov, krx->krx_iov,
1358 kqswnal_fwd_callback, krx);
1360 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1363 /* Receive Interrupt Handler: posts to schedulers */
1365 kqswnal_rxhandler(EP_RXD *rxd)
1368 int nob = ep_rxd_len (rxd);
1369 int status = ep_rxd_status (rxd);
1370 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1372 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1373 rxd, krx, nob, status);
1375 LASSERT (krx != NULL);
1380 krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1382 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1385 /* must receive a whole header to be able to parse */
1386 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1388 /* receives complete with failure when receiver is removed */
1390 if (status == EP_SHUTDOWN)
1391 LASSERT (kqswnal_data.kqn_shuttingdown);
1393 CERROR("receive status failed with status %d nob %d\n",
1394 ep_rxd_status(rxd), nob);
1396 if (!kqswnal_data.kqn_shuttingdown)
1397 CERROR("receive status failed with status %d nob %d\n",
1398 ep_rxd_status(rxd), nob);
1400 kqswnal_requeue_rx (krx);
1404 if (!in_interrupt()) {
1409 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1411 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1412 wake_up (&kqswnal_data.kqn_sched_waitq);
1414 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1419 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1421 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]);
1423 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1424 ", dpid %d, spid %d, type %d\n",
1425 ishdr ? "Header" : "Payload", krx,
1426 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1427 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1428 NTOH__u32(hdr->type));
1430 switch (NTOH__u32 (hdr->type))
1433 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1435 NTOH__u32(hdr->msg.ack.mlength),
1436 hdr->msg.ack.dst_wmd.handle_cookie,
1437 hdr->msg.ack.dst_wmd.handle_idx,
1438 NTOH__u64(hdr->msg.ack.match_bits),
1439 NTOH__u32(hdr->msg.ack.length));
1442 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1443 " len %u off %u data "LPX64"\n",
1444 NTOH__u32(hdr->msg.put.ptl_index),
1445 hdr->msg.put.ack_wmd.handle_cookie,
1446 hdr->msg.put.ack_wmd.handle_idx,
1447 NTOH__u64(hdr->msg.put.match_bits),
1448 NTOH__u32(hdr->msg.put.length),
1449 NTOH__u32(hdr->msg.put.offset),
1450 hdr->msg.put.hdr_data);
1453 CERROR ("GET: <>\n");
1456 CERROR ("REPLY: <>\n");
1459 CERROR ("TYPE?: <>\n");
1465 kqswnal_recvmsg (nal_cb_t *nal,
1474 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1482 kqsw_csum_t senders_csum;
1483 kqsw_csum_t payload_csum = 0;
1484 kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]),
1486 size_t csum_len = mlen;
1489 static atomic_t csum_counter;
1490 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1492 atomic_inc (&csum_counter);
1494 memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1495 sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1496 if (senders_csum != hdr_csum)
1497 kqswnal_csum_error (krx, 1);
1499 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1501 /* What was actually received must be >= payload.
1502 * This is an LASSERT, as lib_finalize() doesn't have a completion status. */
1503 LASSERT (krx->krx_nob >= KQSW_HDR_SIZE + mlen);
1504 LASSERT (mlen <= rlen);
1506 /* It must be OK to kmap() if required */
1507 LASSERT (kiov == NULL || !in_interrupt ());
1508 /* Either all pages or all vaddrs */
1509 LASSERT (!(kiov != NULL && iov != NULL));
1514 page_ptr = ((char *) page_address(krx->krx_pages[0])) +
1516 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1520 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1521 iov_nob = kiov->kiov_len;
1523 iov_ptr = iov->iov_base;
1524 iov_nob = iov->iov_len;
1529 /* We expect the iov to exactly match mlen */
1530 LASSERT (iov_nob <= mlen);
1532 frag = MIN (page_nob, iov_nob);
1533 memcpy (iov_ptr, page_ptr, frag);
1535 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1549 LASSERT (page < krx->krx_npages);
1550 page_ptr = page_address(krx->krx_pages[page]);
1551 page_nob = PAGE_SIZE;
1557 else if (kiov != NULL) {
1558 kunmap (kiov->kiov_page);
1562 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1563 iov_nob = kiov->kiov_len;
1568 iov_ptr = iov->iov_base;
1569 iov_nob = iov->iov_len;
1574 kunmap (kiov->kiov_page);
1578 memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) +
1579 sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t));
1581 if (csum_len != rlen)
1582 CERROR("Unable to checksum data in user's buffer\n");
1583 else if (senders_csum != payload_csum)
1584 kqswnal_csum_error (krx, 0);
1587 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1589 hdr_csum, payload_csum, csum_frags, csum_nob);
1591 lib_finalize(nal, private, libmsg);
1597 kqswnal_recv(nal_cb_t *nal,
1605 return (kqswnal_recvmsg (nal, private, libmsg, niov, iov, NULL, mlen, rlen));
1609 kqswnal_recv_pages (nal_cb_t *nal,
1617 return (kqswnal_recvmsg (nal, private, libmsg, niov, NULL, kiov, mlen, rlen));
1621 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1623 long pid = kernel_thread (fn, arg, 0);
1628 atomic_inc (&kqswnal_data.kqn_nthreads);
1629 atomic_inc (&kqswnal_data.kqn_nthreads_running);
1634 kqswnal_thread_fini (void)
1636 atomic_dec (&kqswnal_data.kqn_nthreads);
1640 kqswnal_scheduler (void *arg)
1644 kpr_fwd_desc_t *fwd;
1648 int shuttingdown = 0;
1651 kportal_daemonize ("kqswnal_sched");
1652 kportal_blockallsigs ();
1654 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1658 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1660 if (kqswnal_data.kqn_shuttingdown == 2)
1663 /* During stage 1 of shutdown we are still responsive
1666 atomic_dec (&kqswnal_data.kqn_nthreads_running);
1667 shuttingdown = kqswnal_data.kqn_shuttingdown;
1672 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1674 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1675 kqswnal_rx_t, krx_list);
1676 list_del (&krx->krx_list);
1677 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1683 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1686 if (!shuttingdown &&
1687 !list_empty (&kqswnal_data.kqn_delayedtxds))
1689 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1690 kqswnal_tx_t, ktx_list);
1691 list_del_init (&ktx->ktx_delayed_list);
1692 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1695 rc = kqswnal_launch (ktx);
1696 if (rc != 0) /* failed: ktx_nid down? */
1698 CERROR("Failed delayed transmit to "LPX64
1699 ": %d\n", ktx->ktx_nid, rc);
1700 kqswnal_tx_done (ktx, rc);
1704 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1708 !list_empty (&kqswnal_data.kqn_delayedfwds))
1710 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1711 list_del (&fwd->kprfd_list);
1712 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1714 kqswnal_fwd_packet (NULL, fwd);
1717 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1720 /* nothing to do or hogging CPU */
1721 if (!did_something || counter++ == KQSW_RESCHED) {
1722 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1727 if (!did_something) {
1728 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1729 kqswnal_data.kqn_shuttingdown != shuttingdown ||
1730 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1731 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1732 !list_empty(&kqswnal_data.kqn_delayedfwds));
1734 } else if (current->need_resched)
1737 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1741 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1743 kqswnal_thread_fini ();
1747 nal_cb_t kqswnal_lib =
1749 nal_data: &kqswnal_data, /* NAL private data */
1750 cb_send: kqswnal_send,
1751 cb_send_pages: kqswnal_send_pages,
1752 cb_recv: kqswnal_recv,
1753 cb_recv_pages: kqswnal_recv_pages,
1754 cb_read: kqswnal_read,
1755 cb_write: kqswnal_write,
1756 cb_malloc: kqswnal_malloc,
1757 cb_free: kqswnal_free,
1758 cb_printf: kqswnal_printf,
1759 cb_cli: kqswnal_cli,
1760 cb_sti: kqswnal_sti,
1761 cb_dist: kqswnal_dist