1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
30 * LIB functions follow
34 kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
36 if (nid == nal->libnal_ni.ni_pid.nid)
37 *dist = 0; /* it's me */
38 else if (kqswnal_nid2elanid (nid) >= 0)
39 *dist = 1; /* it's my peer */
41 *dist = 2; /* via router */
46 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
51 do_gettimeofday (&now);
52 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
54 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
58 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
63 ktx->ktx_rail = -1; /* unset rail */
66 if (ktx->ktx_nmappedpages == 0)
70 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
71 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
73 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
74 ep_dvma_unload(kqswnal_data.kqn_ep,
75 kqswnal_data.kqn_ep_tx_nmh,
78 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
79 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
81 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
82 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
83 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
85 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
86 kqswnal_data.kqn_eptxdmahandle,
87 ktx->ktx_basepage, ktx->ktx_nmappedpages);
89 ktx->ktx_nmappedpages = 0;
93 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
95 int nfrags = ktx->ktx_nfrag;
96 int nmapped = ktx->ktx_nmappedpages;
97 int maxmapped = ktx->ktx_npages;
98 uint32_t basepage = ktx->ktx_basepage + nmapped;
101 EP_RAILMASK railmask;
104 if (ktx->ktx_rail < 0)
105 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
107 kqswnal_nid2elanid(ktx->ktx_nid));
108 rail = ktx->ktx_rail;
110 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
113 railmask = 1 << rail;
115 LASSERT (nmapped <= maxmapped);
116 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
117 LASSERT (nfrags <= EP_MAXFRAG);
121 /* skip complete frags before 'offset' */
122 while (offset >= kiov->kiov_len) {
123 offset -= kiov->kiov_len;
130 int fraglen = kiov->kiov_len - offset;
132 /* each page frag is contained in one page */
133 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
139 if (nmapped > maxmapped) {
140 CERROR("Can't map message in %d pages (max %d)\n",
145 if (nfrags == EP_MAXFRAG) {
146 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
151 /* XXX this is really crap, but we'll have to kmap until
152 * EKC has a page (rather than vaddr) mapping interface */
154 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
157 "%p[%d] loading %p for %d, page %d, %d total\n",
158 ktx, nfrags, ptr, fraglen, basepage, nmapped);
161 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
163 kqswnal_data.kqn_ep_tx_nmh, basepage,
164 &railmask, &ktx->ktx_frags[nfrags]);
166 if (nfrags == ktx->ktx_firsttmpfrag ||
167 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
168 &ktx->ktx_frags[nfrags - 1],
169 &ktx->ktx_frags[nfrags])) {
170 /* new frag if this is the first or can't merge */
174 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
175 kqswnal_data.kqn_eptxdmahandle,
177 basepage, &ktx->ktx_frags[nfrags].Base);
179 if (nfrags > 0 && /* previous frag mapped */
180 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
181 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
182 /* just extend previous */
183 ktx->ktx_frags[nfrags - 1].Len += fraglen;
185 ktx->ktx_frags[nfrags].Len = fraglen;
186 nfrags++; /* new frag */
190 kunmap (kiov->kiov_page);
192 /* keep in loop for failure case */
193 ktx->ktx_nmappedpages = nmapped;
201 /* iov must not run out before end of data */
202 LASSERT (nob == 0 || niov > 0);
206 ktx->ktx_nfrag = nfrags;
207 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
208 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
214 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
215 int niov, struct iovec *iov)
217 int nfrags = ktx->ktx_nfrag;
218 int nmapped = ktx->ktx_nmappedpages;
219 int maxmapped = ktx->ktx_npages;
220 uint32_t basepage = ktx->ktx_basepage + nmapped;
222 EP_RAILMASK railmask;
225 if (ktx->ktx_rail < 0)
226 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
228 kqswnal_nid2elanid(ktx->ktx_nid));
229 rail = ktx->ktx_rail;
231 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
234 railmask = 1 << rail;
236 LASSERT (nmapped <= maxmapped);
237 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
238 LASSERT (nfrags <= EP_MAXFRAG);
242 /* skip complete frags before offset */
243 while (offset >= iov->iov_len) {
244 offset -= iov->iov_len;
251 int fraglen = iov->iov_len - offset;
256 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
259 if (nmapped > maxmapped) {
260 CERROR("Can't map message in %d pages (max %d)\n",
265 if (nfrags == EP_MAXFRAG) {
266 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
272 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
273 ktx, nfrags, iov->iov_base + offset, fraglen,
274 basepage, npages, nmapped);
277 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278 iov->iov_base + offset, fraglen,
279 kqswnal_data.kqn_ep_tx_nmh, basepage,
280 &railmask, &ktx->ktx_frags[nfrags]);
282 if (nfrags == ktx->ktx_firsttmpfrag ||
283 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284 &ktx->ktx_frags[nfrags - 1],
285 &ktx->ktx_frags[nfrags])) {
286 /* new frag if this is the first or can't merge */
290 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
291 kqswnal_data.kqn_eptxdmahandle,
292 iov->iov_base + offset, fraglen,
293 basepage, &ktx->ktx_frags[nfrags].Base);
295 if (nfrags > 0 && /* previous frag mapped */
296 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
297 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
298 /* just extend previous */
299 ktx->ktx_frags[nfrags - 1].Len += fraglen;
301 ktx->ktx_frags[nfrags].Len = fraglen;
302 nfrags++; /* new frag */
306 /* keep in loop for failure case */
307 ktx->ktx_nmappedpages = nmapped;
315 /* iov must not run out before end of data */
316 LASSERT (nob == 0 || niov > 0);
320 ktx->ktx_nfrag = nfrags;
321 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
322 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
329 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
331 kpr_fwd_desc_t *fwd = NULL;
334 kqswnal_unmap_tx (ktx); /* release temporary mappings */
335 ktx->ktx_state = KTX_IDLE;
337 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
339 list_del (&ktx->ktx_list); /* take off active list */
341 if (ktx->ktx_isnblk) {
342 /* reserved for non-blocking tx */
343 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
344 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
348 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
350 /* anything blocking for a tx descriptor? */
351 if (!kqswnal_data.kqn_shuttingdown &&
352 !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
354 CDEBUG(D_NET,"wakeup fwd\n");
356 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
357 kpr_fwd_desc_t, kprfd_list);
358 list_del (&fwd->kprfd_list);
361 wake_up (&kqswnal_data.kqn_idletxd_waitq);
363 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
368 /* schedule packet for forwarding again */
369 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
371 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
372 wake_up (&kqswnal_data.kqn_sched_waitq);
374 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
378 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
381 kqswnal_tx_t *ktx = NULL;
384 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
386 if (kqswnal_data.kqn_shuttingdown)
389 /* "normal" descriptor is free */
390 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
391 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
392 kqswnal_tx_t, ktx_list);
396 if (fwd != NULL) /* forwarded packet? */
399 /* doing a local transmit */
401 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
402 CERROR ("intr tx desc pool exhausted\n");
406 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
407 kqswnal_tx_t, ktx_list);
411 /* block for idle tx */
413 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
415 CDEBUG (D_NET, "blocking for tx desc\n");
416 wait_event (kqswnal_data.kqn_idletxd_waitq,
417 !list_empty (&kqswnal_data.kqn_idletxds) ||
418 kqswnal_data.kqn_shuttingdown);
422 list_del (&ktx->ktx_list);
423 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
424 ktx->ktx_launcher = current->pid;
425 atomic_inc(&kqswnal_data.kqn_pending_txs);
426 } else if (fwd != NULL) {
427 /* queue forwarded packet until idle txd available */
428 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
429 list_add_tail (&fwd->kprfd_list,
430 &kqswnal_data.kqn_idletxd_fwdq);
433 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
435 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
436 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
442 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
444 switch (ktx->ktx_state) {
445 case KTX_FORWARDING: /* router asked me to forward this packet */
446 kpr_fwd_done (&kqswnal_data.kqn_router,
447 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
450 case KTX_RDMAING: /* optimized GET/PUT handled */
451 case KTX_PUTTING: /* optimized PUT sent */
452 case KTX_SENDING: /* normal send */
453 lib_finalize (&kqswnal_lib, NULL,
454 (lib_msg_t *)ktx->ktx_args[1],
455 (error == 0) ? PTL_OK : PTL_FAIL);
458 case KTX_GETTING: /* optimized GET sent & REPLY received */
459 /* Complete the GET with success since we can't avoid
460 * delivering a REPLY event; we committed to it when we
461 * launched the GET */
462 lib_finalize (&kqswnal_lib, NULL,
463 (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
464 lib_finalize (&kqswnal_lib, NULL,
465 (lib_msg_t *)ktx->ktx_args[2],
466 (error == 0) ? PTL_OK : PTL_FAIL);
473 kqswnal_put_idle_tx (ktx);
477 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
479 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
481 LASSERT (txd != NULL);
482 LASSERT (ktx != NULL);
484 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
486 if (status != EP_SUCCESS) {
488 CERROR ("Tx completion to "LPX64" failed: %d\n",
489 ktx->ktx_nid, status);
491 kqswnal_notify_peer_down(ktx);
494 } else switch (ktx->ktx_state) {
498 /* RPC completed OK; but what did our peer put in the status
501 status = ep_txd_statusblk(txd)->Data[0];
503 status = ep_txd_statusblk(txd)->Status;
517 kqswnal_tx_done (ktx, status);
521 kqswnal_launch (kqswnal_tx_t *ktx)
523 /* Don't block for transmit descriptor if we're in interrupt context */
524 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
525 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
529 ktx->ktx_launchtime = jiffies;
531 if (kqswnal_data.kqn_shuttingdown)
534 LASSERT (dest >= 0); /* must be a peer */
537 if (ktx->ktx_nmappedpages != 0)
538 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
541 switch (ktx->ktx_state) {
544 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
545 * The other frags are the payload, awaiting RDMA */
546 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
548 kqswnal_txhandler, ktx,
549 NULL, ktx->ktx_frags, 1);
555 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
557 kqswnal_txhandler, ktx,
558 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
560 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
562 kqswnal_txhandler, ktx,
563 ktx->ktx_frags, ktx->ktx_nfrag);
569 rc = -EINVAL; /* no compiler warning please */
574 case EP_SUCCESS: /* success */
577 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
578 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
580 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
581 wake_up (&kqswnal_data.kqn_sched_waitq);
583 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
586 default: /* fatal error */
587 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
588 kqswnal_notify_peer_down(ktx);
589 return (-EHOSTUNREACH);
595 hdr_type_string (ptl_hdr_t *hdr)
607 return ("<UNKNOWN>");
612 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
614 char *type_str = hdr_type_string (hdr);
616 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
617 le32_to_cpu(hdr->payload_length));
618 CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
619 le32_to_cpu(hdr->src_pid));
620 CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
621 le32_to_cpu(hdr->dest_pid));
623 switch (le32_to_cpu(hdr->type)) {
625 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
626 "match bits "LPX64"\n",
627 le32_to_cpu(hdr->msg.put.ptl_index),
628 hdr->msg.put.ack_wmd.wh_interface_cookie,
629 hdr->msg.put.ack_wmd.wh_object_cookie,
630 le64_to_cpu(hdr->msg.put.match_bits));
631 CERROR(" offset %d, hdr data "LPX64"\n",
632 le32_to_cpu(hdr->msg.put.offset),
633 hdr->msg.put.hdr_data);
637 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
638 "match bits "LPX64"\n",
639 le32_to_cpu(hdr->msg.get.ptl_index),
640 hdr->msg.get.return_wmd.wh_interface_cookie,
641 hdr->msg.get.return_wmd.wh_object_cookie,
642 hdr->msg.get.match_bits);
643 CERROR(" Length %d, src offset %d\n",
644 le32_to_cpu(hdr->msg.get.sink_length),
645 le32_to_cpu(hdr->msg.get.src_offset));
649 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
650 hdr->msg.ack.dst_wmd.wh_interface_cookie,
651 hdr->msg.ack.dst_wmd.wh_object_cookie,
652 le32_to_cpu(hdr->msg.ack.mlength));
656 CERROR(" dst md "LPX64"."LPX64"\n",
657 hdr->msg.reply.dst_wmd.wh_interface_cookie,
658 hdr->msg.reply.dst_wmd.wh_object_cookie);
661 } /* end of print_hdr() */
666 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
670 CDEBUG (how, "%s: %d\n", str, n);
671 for (i = 0; i < n; i++) {
672 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
677 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
678 int nsrc, EP_IOVEC *src,
679 int ndst, EP_IOVEC *dst)
688 for (count = 0; count < ndv; count++, dv++) {
690 if (nsrc == 0 || ndst == 0) {
692 /* For now I'll barf on any left over entries */
693 CERROR ("mismatched src and dst iovs\n");
699 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
701 dv->Source = src->Base;
702 dv->Dest = dst->Base;
704 if (nob >= src->Len) {
712 if (nob >= dst->Len) {
721 CERROR ("DATAVEC too small\n");
726 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
727 int nrfrag, EP_NMD *rfrag)
731 if (nlfrag != nrfrag) {
732 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
737 for (i = 0; i < nlfrag; i++)
738 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
739 CERROR("Can't cope with unequal frags %d(%d):"
740 " %d local %d remote\n",
741 i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
750 kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
752 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
753 ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
754 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
755 ptl_nid_t nid = kqswnal_rx_nid(krx);
757 /* Note (1) lib_parse has already flipped hdr.
758 * (2) RDMA addresses are sent in native endian-ness. When
759 * EKC copes with different endian nodes, I'll fix this (and
762 LASSERT (krx->krx_nob >= sizeof(*hdr));
764 if (hdr->type != type) {
765 CERROR ("Unexpected optimized get/put type %d (%d expected)"
766 "from "LPX64"\n", hdr->type, type, nid);
770 if (hdr->src_nid != nid) {
771 CERROR ("Unexpected optimized get/put source NID "
772 LPX64" from "LPX64"\n", hdr->src_nid, nid);
776 LASSERT (nid == expected_nid);
778 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
779 /* msg too small to discover rmd size */
780 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
781 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
785 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
786 /* rmd doesn't fit in the incoming message */
787 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
788 krx->krx_nob, rmd->kqrmd_nfrag,
789 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
797 kqswnal_rdma_store_complete (EP_RXD *rxd)
799 int status = ep_rxd_status(rxd);
800 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
801 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
803 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
804 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
806 LASSERT (ktx->ktx_state == KTX_RDMAING);
807 LASSERT (krx->krx_rxd == rxd);
808 LASSERT (krx->krx_rpc_reply_needed);
810 krx->krx_rpc_reply_needed = 0;
811 kqswnal_rx_decref (krx);
813 /* free ktx & finalize() its lib_msg_t */
814 kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
818 kqswnal_rdma_fetch_complete (EP_RXD *rxd)
820 /* Completed fetching the PUT data */
821 int status = ep_rxd_status(rxd);
822 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
823 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
826 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
827 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
829 LASSERT (ktx->ktx_state == KTX_RDMAING);
830 LASSERT (krx->krx_rxd == rxd);
831 /* RPC completes with failure by default */
832 LASSERT (krx->krx_rpc_reply_needed);
833 LASSERT (krx->krx_rpc_reply_status != 0);
835 if (status == EP_SUCCESS) {
836 status = krx->krx_rpc_reply_status = 0;
838 /* Abandon RPC since get failed */
839 krx->krx_rpc_reply_needed = 0;
840 status = -ECONNABORTED;
843 /* free ktx & finalize() its lib_msg_t */
844 kqswnal_tx_done(ktx, status);
846 if (!in_interrupt()) {
847 /* OK to complete the RPC now (iff I had the last ref) */
848 kqswnal_rx_decref (krx);
852 LASSERT (krx->krx_state == KRX_PARSE);
853 krx->krx_state = KRX_COMPLETING;
855 /* Complete the RPC in thread context */
856 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
858 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
859 wake_up (&kqswnal_data.kqn_sched_waitq);
861 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
865 kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
866 int niov, struct iovec *iov, ptl_kiov_t *kiov,
867 size_t offset, size_t len)
869 kqswnal_remotemd_t *rmd;
874 EP_DATAVEC datav[EP_MAXFRAG];
878 LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
879 /* Not both mapped and paged payload */
880 LASSERT (iov == NULL || kiov == NULL);
881 /* RPC completes with failure by default */
882 LASSERT (krx->krx_rpc_reply_needed);
883 LASSERT (krx->krx_rpc_reply_status != 0);
885 rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
890 /* data got truncated to nothing. */
891 lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
892 /* Let kqswnal_rx_done() complete the RPC with success */
893 krx->krx_rpc_reply_status = 0;
897 /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
898 actually sending a portals message with it */
899 ktx = kqswnal_get_idle_tx(NULL, 0);
901 CERROR ("Can't get txd for RDMA with "LPX64"\n",
902 libmsg->ev.initiator.nid);
906 ktx->ktx_state = KTX_RDMAING;
907 ktx->ktx_nid = libmsg->ev.initiator.nid;
908 ktx->ktx_args[0] = krx;
909 ktx->ktx_args[1] = libmsg;
912 /* Map on the rail the RPC prefers */
913 ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
914 ep_rxd_railmask(krx->krx_rxd));
917 /* Start mapping at offset 0 (we're not mapping any headers) */
918 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
921 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
923 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
926 CERROR ("Can't map local RDMA data: %d\n", rc);
931 rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
932 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
934 CERROR ("Incompatible RDMA descriptors\n");
943 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
944 ktx->ktx_nfrag, ktx->ktx_frags,
945 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
949 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
950 rmd->kqrmd_nfrag, rmd->kqrmd_frag,
951 ktx->ktx_nfrag, ktx->ktx_frags);
956 CERROR ("Can't create datavec: %d\n", ndatav);
962 LASSERT (atomic_read(&krx->krx_refcount) > 0);
963 /* Take an extra ref for the completion callback */
964 atomic_inc(&krx->krx_refcount);
972 eprc = ep_complete_rpc(krx->krx_rxd,
973 kqswnal_rdma_store_complete, ktx,
974 &kqswnal_data.kqn_rpc_success,
975 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
977 eprc = ep_complete_rpc (krx->krx_rxd,
978 kqswnal_rdma_store_complete, ktx,
979 &kqswnal_data.kqn_rpc_success,
981 if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
984 if (eprc != EP_SUCCESS) {
985 CERROR("can't complete RPC: %d\n", eprc);
986 /* don't re-attempt RPC completion */
987 krx->krx_rpc_reply_needed = 0;
994 eprc = ep_rpc_get (krx->krx_rxd,
995 kqswnal_rdma_fetch_complete, ktx,
996 rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
998 eprc = ep_rpc_get (krx->krx_rxd,
999 kqswnal_rdma_fetch_complete, ktx,
1002 if (eprc != EP_SUCCESS) {
1003 CERROR("ep_rpc_get failed: %d\n", eprc);
1004 /* Don't attempt RPC completion:
1005 * EKC nuked it when the get failed */
1006 krx->krx_rpc_reply_needed = 0;
1014 kqswnal_rx_decref(krx); /* drop callback's ref */
1015 kqswnal_put_idle_tx (ktx);
1018 atomic_dec(&kqswnal_data.kqn_pending_txs);
1023 kqswnal_sendmsg (lib_nal_t *nal,
1030 unsigned int payload_niov,
1031 struct iovec *payload_iov,
1032 ptl_kiov_t *payload_kiov,
1033 size_t payload_offset,
1038 ptl_nid_t targetnid;
1045 /* NB 1. hdr is in network byte order */
1046 /* 2. 'private' depends on the message type */
1048 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
1049 " pid %u\n", payload_nob, payload_niov, nid, pid);
1051 LASSERT (payload_nob == 0 || payload_niov > 0);
1052 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1054 /* It must be OK to kmap() if required */
1055 LASSERT (payload_kiov == NULL || !in_interrupt ());
1056 /* payload is either all vaddrs or all pages */
1057 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1059 if (payload_nob > KQSW_MAXPAYLOAD) {
1060 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
1061 payload_nob, KQSW_MAXPAYLOAD);
1065 if (type == PTL_MSG_REPLY && /* can I look in 'private' */
1066 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
1067 /* Must be a REPLY for an optimized GET */
1068 rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
1069 payload_niov, payload_iov, payload_kiov,
1070 payload_offset, payload_nob);
1071 return ((rc == 0) ? PTL_OK : PTL_FAIL);
1075 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
1076 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
1077 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
1079 CERROR("Can't route to "LPX64": router error %d\n",
1083 if (kqswnal_nid2elanid (targetnid) < 0) {
1084 CERROR("Bad gateway "LPX64" for "LPX64"\n",
1090 /* I may not block for a transmit descriptor if I might block the
1091 * receiver, or an interrupt handler. */
1092 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
1093 type == PTL_MSG_REPLY ||
1096 CERROR ("Can't get txd for msg type %d for "LPX64"\n",
1097 type, libmsg->ev.initiator.nid);
1098 return (PTL_NO_SPACE);
1101 ktx->ktx_state = KTX_SENDING;
1102 ktx->ktx_nid = targetnid;
1103 ktx->ktx_args[0] = private;
1104 ktx->ktx_args[1] = libmsg;
1105 ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
1107 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
1110 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
1111 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
1112 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
1114 if (payload_kiov != NULL) {
1115 ptl_kiov_t *kiov = &payload_kiov[i];
1117 if (sumoff >= kiov->kiov_len) {
1118 sumoff -= kiov->kiov_len;
1120 char *addr = ((char *)kmap (kiov->kiov_page)) +
1121 kiov->kiov_offset + sumoff;
1122 int fragnob = kiov->kiov_len - sumoff;
1124 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1127 kunmap(kiov->kiov_page);
1130 struct iovec *iov = &payload_iov[i];
1132 if (sumoff > iov->iov_len) {
1133 sumoff -= iov->iov_len;
1135 char *addr = iov->iov_base + sumoff;
1136 int fragnob = iov->iov_len - sumoff;
1138 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1144 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1147 /* The first frag will be the pre-mapped buffer for (at least) the
1148 * portals header. */
1149 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1151 if (nid == targetnid && /* not forwarding */
1152 ((type == PTL_MSG_GET && /* optimize GET? */
1153 kqswnal_tunables.kqn_optimized_gets != 0 &&
1154 le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
1155 (type == PTL_MSG_PUT && /* optimize PUT? */
1156 kqswnal_tunables.kqn_optimized_puts != 0 &&
1157 payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
1158 lib_md_t *md = libmsg->md;
1159 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1161 /* Optimised path: I send over the Elan vaddrs of the local
1162 * buffers, and my peer DMAs directly to/from them.
1164 * First I set up ktx as if it was going to send this
1165 * payload, (it needs to map it anyway). This fills
1166 * ktx_frags[1] and onward with the network addresses
1167 * of the GET sink frags. I copy these into ktx_buffer,
1168 * immediately after the header, and send that as my
1171 ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
1173 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1174 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1175 md->md_niov, md->md_iov.kiov);
1177 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1178 md->md_niov, md->md_iov.iov);
1182 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1184 payload_nob = offsetof(kqswnal_remotemd_t,
1185 kqrmd_frag[rmd->kqrmd_nfrag]);
1186 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1189 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1190 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1192 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1193 0, KQSW_HDR_SIZE + payload_nob);
1195 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1196 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1198 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1199 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1201 if (type == PTL_MSG_GET) {
1202 /* Allocate reply message now while I'm in thread context */
1203 ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
1205 if (ktx->ktx_args[2] == NULL)
1208 /* NB finalizing the REPLY message is my
1209 * responsibility now, whatever happens. */
1212 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1214 /* small message: single frag copied into the pre-mapped buffer */
1217 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1218 0, KQSW_HDR_SIZE + payload_nob);
1220 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1221 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1223 if (payload_nob > 0) {
1224 if (payload_kiov != NULL)
1225 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1226 payload_niov, payload_kiov,
1227 payload_offset, payload_nob);
1229 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1230 payload_niov, payload_iov,
1231 payload_offset, payload_nob);
1235 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1238 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1241 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1242 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1244 if (payload_kiov != NULL)
1245 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1246 payload_niov, payload_kiov);
1248 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1249 payload_niov, payload_iov);
1254 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1255 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1257 rc = kqswnal_launch (ktx);
1260 CDEBUG(rc == 0 ? D_NET : D_ERROR,
1261 "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
1262 rc == 0 ? "Sent" : "Failed to send",
1263 payload_nob, nid, targetnid, rc);
1266 if (ktx->ktx_state == KTX_GETTING &&
1267 ktx->ktx_args[2] != NULL) {
1268 /* We committed to reply, but there was a problem
1269 * launching the GET. We can't avoid delivering a
1270 * REPLY event since we committed above, so we
1271 * pretend the GET succeeded but the REPLY
1274 lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
1275 lib_finalize (&kqswnal_lib, private,
1276 (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
1279 kqswnal_put_idle_tx (ktx);
1282 atomic_dec(&kqswnal_data.kqn_pending_txs);
1283 return (rc == 0 ? PTL_OK : PTL_FAIL);
1287 kqswnal_send (lib_nal_t *nal,
1294 unsigned int payload_niov,
1295 struct iovec *payload_iov,
1296 size_t payload_offset,
1299 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1300 payload_niov, payload_iov, NULL,
1301 payload_offset, payload_nob));
1305 kqswnal_send_pages (lib_nal_t *nal,
1312 unsigned int payload_niov,
1313 ptl_kiov_t *payload_kiov,
1314 size_t payload_offset,
1317 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1318 payload_niov, NULL, payload_kiov,
1319 payload_offset, payload_nob));
1323 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1327 ptl_kiov_t *kiov = fwd->kprfd_kiov;
1328 int niov = fwd->kprfd_niov;
1329 int nob = fwd->kprfd_nob;
1330 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1333 CERROR ("checksums for forwarded packets not implemented\n");
1336 /* The router wants this NAL to forward a packet */
1337 CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1338 fwd, nid, niov, nob);
1340 ktx = kqswnal_get_idle_tx (fwd, 0);
1341 if (ktx == NULL) /* can't get txd right now */
1342 return; /* fwd will be scheduled when tx desc freed */
1344 if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
1345 nid = fwd->kprfd_target_nid; /* target is final dest */
1347 if (kqswnal_nid2elanid (nid) < 0) {
1348 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1353 /* copy hdr into pre-mapped buffer */
1354 memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1356 ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
1357 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1359 ktx->ktx_state = KTX_FORWARDING;
1360 ktx->ktx_args[0] = fwd;
1361 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1363 if (nob <= KQSW_TX_MAXCONTIG)
1365 /* send payload from ktx's pre-mapped contiguous buffer */
1367 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1368 0, KQSW_HDR_SIZE + nob);
1370 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1371 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1374 lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1375 niov, kiov, 0, nob);
1379 /* zero copy payload */
1381 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1384 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1385 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1387 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1392 rc = kqswnal_launch (ktx);
1395 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1397 /* complete now (with failure) */
1398 kqswnal_tx_done (ktx, rc);
1401 atomic_dec(&kqswnal_data.kqn_pending_txs);
1405 kqswnal_fwd_callback (void *arg, int error)
1407 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1409 /* The router has finished forwarding this packet */
1413 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1415 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1416 le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
1419 LASSERT (atomic_read(&krx->krx_refcount) == 1);
1420 kqswnal_rx_decref (krx);
1424 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1426 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1427 LASSERT (!krx->krx_rpc_reply_needed);
1429 krx->krx_state = KRX_POSTED;
1432 if (kqswnal_data.kqn_shuttingdown) {
1433 /* free EKC rxd on shutdown */
1434 ep_complete_receive(krx->krx_rxd);
1436 /* repost receive */
1437 ep_requeue_receive(krx->krx_rxd,
1438 kqswnal_rxhandler, krx,
1439 &krx->krx_elanbuffer, 0);
1442 if (kqswnal_data.kqn_shuttingdown)
1445 if (krx->krx_rxd == NULL) {
1446 /* We had a failed ep_complete_rpc() which nukes the
1447 * descriptor in "old" EKC */
1448 int eprc = ep_queue_receive(krx->krx_eprx,
1449 kqswnal_rxhandler, krx,
1450 krx->krx_elanbuffer,
1451 krx->krx_npages * PAGE_SIZE, 0);
1452 LASSERT (eprc == EP_SUCCESS);
1453 /* We don't handle failure here; it's incredibly rare
1454 * (never reported?) and only happens with "old" EKC */
1456 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1457 krx->krx_elanbuffer,
1458 krx->krx_npages * PAGE_SIZE);
1464 kqswnal_rpc_complete (EP_RXD *rxd)
1466 int status = ep_rxd_status(rxd);
1467 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1469 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1470 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1472 LASSERT (krx->krx_rxd == rxd);
1473 LASSERT (krx->krx_rpc_reply_needed);
1475 krx->krx_rpc_reply_needed = 0;
1476 kqswnal_requeue_rx (krx);
1480 kqswnal_rx_done (kqswnal_rx_t *krx)
1485 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1487 if (krx->krx_rpc_reply_needed) {
1488 /* We've not completed the peer's RPC yet... */
1489 sblk = (krx->krx_rpc_reply_status == 0) ?
1490 &kqswnal_data.kqn_rpc_success :
1491 &kqswnal_data.kqn_rpc_failed;
1493 LASSERT (!in_interrupt());
1495 rc = ep_complete_rpc(krx->krx_rxd,
1496 kqswnal_rpc_complete, krx,
1497 sblk, NULL, NULL, 0);
1498 if (rc == EP_SUCCESS)
1501 rc = ep_complete_rpc(krx->krx_rxd,
1502 kqswnal_rpc_complete, krx,
1504 if (rc == EP_SUCCESS)
1507 /* "old" EKC destroys rxd on failed completion */
1508 krx->krx_rxd = NULL;
1510 CERROR("can't complete RPC: %d\n", rc);
1511 krx->krx_rpc_reply_needed = 0;
1514 kqswnal_requeue_rx(krx);
1518 kqswnal_parse (kqswnal_rx_t *krx)
1520 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1521 ptl_nid_t dest_nid = le64_to_cpu(hdr->dest_nid);
1526 LASSERT (atomic_read(&krx->krx_refcount) == 1);
1528 if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
1529 /* I ignore parse errors since I'm not consuming a byte
1531 (void)lib_parse (&kqswnal_lib, hdr, krx);
1533 /* Drop my ref; any RDMA activity takes an additional ref */
1534 kqswnal_rx_decref(krx);
1539 LASSERTF (0, "checksums for forwarded packets not implemented\n");
1542 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1544 CERROR("dropping packet from "LPX64" for "LPX64
1545 ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
1547 kqswnal_rx_decref (krx);
1551 nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1554 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1555 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1557 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1560 LASSERT (niov < krx->krx_npages);
1562 krx->krx_kiov[niov].kiov_offset = 0;
1563 krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1569 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1570 hdr, payload_nob, niov, krx->krx_kiov,
1571 kqswnal_fwd_callback, krx);
1573 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1576 /* Receive Interrupt Handler: posts to schedulers */
1578 kqswnal_rxhandler(EP_RXD *rxd)
1580 unsigned long flags;
1581 int nob = ep_rxd_len (rxd);
1582 int status = ep_rxd_status (rxd);
1583 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1585 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1586 rxd, krx, nob, status);
1588 LASSERT (krx != NULL);
1589 LASSERT (krx->krx_state = KRX_POSTED);
1591 krx->krx_state = KRX_PARSE;
1595 /* RPC reply iff rpc request received without error */
1596 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1597 (status == EP_SUCCESS ||
1598 status == EP_MSG_TOO_BIG);
1600 /* Default to failure if an RPC reply is requested but not handled */
1601 krx->krx_rpc_reply_status = -EPROTO;
1602 atomic_set (&krx->krx_refcount, 1);
1604 /* must receive a whole header to be able to parse */
1605 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1607 /* receives complete with failure when receiver is removed */
1609 if (status == EP_SHUTDOWN)
1610 LASSERT (kqswnal_data.kqn_shuttingdown);
1612 CERROR("receive status failed with status %d nob %d\n",
1613 ep_rxd_status(rxd), nob);
1615 if (!kqswnal_data.kqn_shuttingdown)
1616 CERROR("receive status failed with status %d nob %d\n",
1617 ep_rxd_status(rxd), nob);
1619 kqswnal_rx_decref(krx);
1623 if (!in_interrupt()) {
1628 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1630 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1631 wake_up (&kqswnal_data.kqn_sched_waitq);
1633 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1638 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1640 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1642 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1643 ", dpid %d, spid %d, type %d\n",
1644 ishdr ? "Header" : "Payload", krx,
1645 le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
1646 le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
1647 le32_to_cpu(hdr->type));
1649 switch (le32_to_cpu(hdr->type))
1652 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1654 le32_to_cpu(hdr->msg.ack.mlength),
1655 hdr->msg.ack.dst_wmd.handle_cookie,
1656 hdr->msg.ack.dst_wmd.handle_idx,
1657 le64_to_cpu(hdr->msg.ack.match_bits),
1658 le32_to_cpu(hdr->msg.ack.length));
1661 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1662 " len %u off %u data "LPX64"\n",
1663 le32_to_cpu(hdr->msg.put.ptl_index),
1664 hdr->msg.put.ack_wmd.handle_cookie,
1665 hdr->msg.put.ack_wmd.handle_idx,
1666 le64_to_cpu(hdr->msg.put.match_bits),
1667 le32_to_cpu(hdr->msg.put.length),
1668 le32_to_cpu(hdr->msg.put.offset),
1669 hdr->msg.put.hdr_data);
1672 CERROR ("GET: <>\n");
1675 CERROR ("REPLY: <>\n");
1678 CERROR ("TYPE?: <>\n");
1684 kqswnal_recvmsg (lib_nal_t *nal,
1694 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1695 char *buffer = page_address(krx->krx_kiov[0].kiov_page);
1696 ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
1705 kqsw_csum_t senders_csum;
1706 kqsw_csum_t payload_csum = 0;
1707 kqsw_csum_t hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
1708 size_t csum_len = mlen;
1711 static atomic_t csum_counter;
1712 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1714 atomic_inc (&csum_counter);
1716 memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1717 if (senders_csum != hdr_csum)
1718 kqswnal_csum_error (krx, 1);
1720 /* NB lib_parse() has already flipped *hdr */
1722 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1724 if (krx->krx_rpc_reply_needed &&
1725 hdr->type == PTL_MSG_PUT) {
1726 /* This must be an optimized PUT */
1727 rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
1728 niov, iov, kiov, offset, mlen);
1729 return (rc == 0 ? PTL_OK : PTL_FAIL);
1732 /* What was actually received must be >= payload. */
1733 LASSERT (mlen <= rlen);
1734 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1735 CERROR("Bad message size: have %d, need %d + %d\n",
1736 krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1740 /* It must be OK to kmap() if required */
1741 LASSERT (kiov == NULL || !in_interrupt ());
1742 /* Either all pages or all vaddrs */
1743 LASSERT (!(kiov != NULL && iov != NULL));
1747 page_ptr = buffer + KQSW_HDR_SIZE;
1748 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1753 /* skip complete frags */
1754 while (offset >= kiov->kiov_len) {
1755 offset -= kiov->kiov_len;
1760 iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1761 kiov->kiov_offset + offset;
1762 iov_nob = kiov->kiov_len - offset;
1764 /* skip complete frags */
1765 while (offset >= iov->iov_len) {
1766 offset -= iov->iov_len;
1771 iov_ptr = iov->iov_base + offset;
1772 iov_nob = iov->iov_len - offset;
1778 if (frag > page_nob)
1783 memcpy (iov_ptr, page_ptr, frag);
1785 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1799 LASSERT (page < krx->krx_npages);
1800 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1801 page_nob = PAGE_SIZE;
1807 else if (kiov != NULL) {
1808 kunmap (kiov->kiov_page);
1812 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1813 iov_nob = kiov->kiov_len;
1818 iov_ptr = iov->iov_base;
1819 iov_nob = iov->iov_len;
1824 kunmap (kiov->kiov_page);
1828 memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
1829 sizeof(kqsw_csum_t));
1831 if (csum_len != rlen)
1832 CERROR("Unable to checksum data in user's buffer\n");
1833 else if (senders_csum != payload_csum)
1834 kqswnal_csum_error (krx, 0);
1837 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1839 hdr_csum, payload_csum, csum_frags, csum_nob);
1841 lib_finalize(nal, private, libmsg, PTL_OK);
1847 kqswnal_recv(lib_nal_t *nal,
1856 return (kqswnal_recvmsg(nal, private, libmsg,
1858 offset, mlen, rlen));
1862 kqswnal_recv_pages (lib_nal_t *nal,
1871 return (kqswnal_recvmsg(nal, private, libmsg,
1873 offset, mlen, rlen));
1877 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1879 long pid = kernel_thread (fn, arg, 0);
1884 atomic_inc (&kqswnal_data.kqn_nthreads);
1889 kqswnal_thread_fini (void)
1891 atomic_dec (&kqswnal_data.kqn_nthreads);
1895 kqswnal_scheduler (void *arg)
1899 kpr_fwd_desc_t *fwd;
1900 unsigned long flags;
1905 kportal_daemonize ("kqswnal_sched");
1906 kportal_blockallsigs ();
1908 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1914 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1916 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1917 kqswnal_rx_t, krx_list);
1918 list_del (&krx->krx_list);
1919 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1922 switch (krx->krx_state) {
1924 kqswnal_parse (krx);
1926 case KRX_COMPLETING:
1927 kqswnal_rx_decref (krx);
1934 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1937 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1939 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1940 kqswnal_tx_t, ktx_list);
1941 list_del_init (&ktx->ktx_delayed_list);
1942 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1945 rc = kqswnal_launch (ktx);
1947 CERROR("Failed delayed transmit to "LPX64
1948 ": %d\n", ktx->ktx_nid, rc);
1949 kqswnal_tx_done (ktx, rc);
1951 atomic_dec (&kqswnal_data.kqn_pending_txs);
1954 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1957 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1959 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1960 list_del (&fwd->kprfd_list);
1961 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1963 /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1964 kqswnal_fwd_packet (NULL, fwd);
1967 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1970 /* nothing to do or hogging CPU */
1971 if (!did_something || counter++ == KQSW_RESCHED) {
1972 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1977 if (!did_something) {
1978 if (kqswnal_data.kqn_shuttingdown == 2) {
1979 /* We only exit in stage 2 of shutdown when
1980 * there's nothing left to do */
1983 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1984 kqswnal_data.kqn_shuttingdown == 2 ||
1985 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1986 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1987 !list_empty(&kqswnal_data.kqn_delayedfwds));
1989 } else if (need_resched())
1992 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1996 kqswnal_thread_fini ();
2000 lib_nal_t kqswnal_lib =
2002 libnal_data: &kqswnal_data, /* NAL private data */
2003 libnal_send: kqswnal_send,
2004 libnal_send_pages: kqswnal_send_pages,
2005 libnal_recv: kqswnal_recv,
2006 libnal_recv_pages: kqswnal_recv_pages,
2007 libnal_dist: kqswnal_dist