1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Portals, http://www.lustre.org
9 * Portals is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Portals is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Portals; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * LIB functions follow
31 kqswnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
33 if (nid == nal->libnal_ni.ni_pid.nid)
34 *dist = 0; /* it's me */
35 else if (kqswnal_nid2elanid (nid) >= 0)
36 *dist = 1; /* it's my peer */
38 *dist = 2; /* via router */
43 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
48 do_gettimeofday (&now);
49 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
51 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
55 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
60 ktx->ktx_rail = -1; /* unset rail */
63 if (ktx->ktx_nmappedpages == 0)
67 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
68 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
70 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
71 ep_dvma_unload(kqswnal_data.kqn_ep,
72 kqswnal_data.kqn_ep_tx_nmh,
75 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
76 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
78 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
79 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
80 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
82 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
83 kqswnal_data.kqn_eptxdmahandle,
84 ktx->ktx_basepage, ktx->ktx_nmappedpages);
86 ktx->ktx_nmappedpages = 0;
90 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
92 int nfrags = ktx->ktx_nfrag;
93 int nmapped = ktx->ktx_nmappedpages;
94 int maxmapped = ktx->ktx_npages;
95 uint32_t basepage = ktx->ktx_basepage + nmapped;
101 if (ktx->ktx_rail < 0)
102 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
104 kqswnal_nid2elanid(ktx->ktx_nid));
105 rail = ktx->ktx_rail;
107 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
110 railmask = 1 << rail;
112 LASSERT (nmapped <= maxmapped);
113 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
114 LASSERT (nfrags <= EP_MAXFRAG);
118 /* skip complete frags before 'offset' */
119 while (offset >= kiov->kiov_len) {
120 offset -= kiov->kiov_len;
127 int fraglen = kiov->kiov_len - offset;
129 /* each page frag is contained in one page */
130 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
136 if (nmapped > maxmapped) {
137 CERROR("Can't map message in %d pages (max %d)\n",
142 if (nfrags == EP_MAXFRAG) {
143 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
148 /* XXX this is really crap, but we'll have to kmap until
149 * EKC has a page (rather than vaddr) mapping interface */
151 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
154 "%p[%d] loading %p for %d, page %d, %d total\n",
155 ktx, nfrags, ptr, fraglen, basepage, nmapped);
158 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
160 kqswnal_data.kqn_ep_tx_nmh, basepage,
161 &railmask, &ktx->ktx_frags[nfrags]);
163 if (nfrags == ktx->ktx_firsttmpfrag ||
164 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
165 &ktx->ktx_frags[nfrags - 1],
166 &ktx->ktx_frags[nfrags])) {
167 /* new frag if this is the first or can't merge */
171 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
172 kqswnal_data.kqn_eptxdmahandle,
174 basepage, &ktx->ktx_frags[nfrags].Base);
176 if (nfrags > 0 && /* previous frag mapped */
177 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
178 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
179 /* just extend previous */
180 ktx->ktx_frags[nfrags - 1].Len += fraglen;
182 ktx->ktx_frags[nfrags].Len = fraglen;
183 nfrags++; /* new frag */
187 kunmap (kiov->kiov_page);
189 /* keep in loop for failure case */
190 ktx->ktx_nmappedpages = nmapped;
198 /* iov must not run out before end of data */
199 LASSERT (nob == 0 || niov > 0);
203 ktx->ktx_nfrag = nfrags;
204 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
205 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
211 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
212 int niov, struct iovec *iov)
214 int nfrags = ktx->ktx_nfrag;
215 int nmapped = ktx->ktx_nmappedpages;
216 int maxmapped = ktx->ktx_npages;
217 uint32_t basepage = ktx->ktx_basepage + nmapped;
219 EP_RAILMASK railmask;
222 if (ktx->ktx_rail < 0)
223 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
225 kqswnal_nid2elanid(ktx->ktx_nid));
226 rail = ktx->ktx_rail;
228 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
231 railmask = 1 << rail;
233 LASSERT (nmapped <= maxmapped);
234 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
235 LASSERT (nfrags <= EP_MAXFRAG);
239 /* skip complete frags before offset */
240 while (offset >= iov->iov_len) {
241 offset -= iov->iov_len;
248 int fraglen = iov->iov_len - offset;
253 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
256 if (nmapped > maxmapped) {
257 CERROR("Can't map message in %d pages (max %d)\n",
262 if (nfrags == EP_MAXFRAG) {
263 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
269 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
270 ktx, nfrags, iov->iov_base + offset, fraglen,
271 basepage, npages, nmapped);
274 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
275 iov->iov_base + offset, fraglen,
276 kqswnal_data.kqn_ep_tx_nmh, basepage,
277 &railmask, &ktx->ktx_frags[nfrags]);
279 if (nfrags == ktx->ktx_firsttmpfrag ||
280 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
281 &ktx->ktx_frags[nfrags - 1],
282 &ktx->ktx_frags[nfrags])) {
283 /* new frag if this is the first or can't merge */
287 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
288 kqswnal_data.kqn_eptxdmahandle,
289 iov->iov_base + offset, fraglen,
290 basepage, &ktx->ktx_frags[nfrags].Base);
292 if (nfrags > 0 && /* previous frag mapped */
293 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
294 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
295 /* just extend previous */
296 ktx->ktx_frags[nfrags - 1].Len += fraglen;
298 ktx->ktx_frags[nfrags].Len = fraglen;
299 nfrags++; /* new frag */
303 /* keep in loop for failure case */
304 ktx->ktx_nmappedpages = nmapped;
312 /* iov must not run out before end of data */
313 LASSERT (nob == 0 || niov > 0);
317 ktx->ktx_nfrag = nfrags;
318 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
319 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
326 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
328 kpr_fwd_desc_t *fwd = NULL;
331 kqswnal_unmap_tx (ktx); /* release temporary mappings */
332 ktx->ktx_state = KTX_IDLE;
334 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
336 list_del (&ktx->ktx_list); /* take off active list */
338 if (ktx->ktx_isnblk) {
339 /* reserved for non-blocking tx */
340 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
341 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
345 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
347 /* anything blocking for a tx descriptor? */
348 if (!kqswnal_data.kqn_shuttingdown &&
349 !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
351 CDEBUG(D_NET,"wakeup fwd\n");
353 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
354 kpr_fwd_desc_t, kprfd_list);
355 list_del (&fwd->kprfd_list);
358 wake_up (&kqswnal_data.kqn_idletxd_waitq);
360 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
365 /* schedule packet for forwarding again */
366 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
368 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
369 wake_up (&kqswnal_data.kqn_sched_waitq);
371 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
375 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
378 kqswnal_tx_t *ktx = NULL;
381 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
383 if (kqswnal_data.kqn_shuttingdown)
386 /* "normal" descriptor is free */
387 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
388 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
389 kqswnal_tx_t, ktx_list);
393 if (fwd != NULL) /* forwarded packet? */
396 /* doing a local transmit */
398 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
399 CERROR ("intr tx desc pool exhausted\n");
403 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
404 kqswnal_tx_t, ktx_list);
408 /* block for idle tx */
410 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
412 CDEBUG (D_NET, "blocking for tx desc\n");
413 wait_event (kqswnal_data.kqn_idletxd_waitq,
414 !list_empty (&kqswnal_data.kqn_idletxds) ||
415 kqswnal_data.kqn_shuttingdown);
419 list_del (&ktx->ktx_list);
420 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
421 ktx->ktx_launcher = current->pid;
422 atomic_inc(&kqswnal_data.kqn_pending_txs);
423 } else if (fwd != NULL) {
424 /* queue forwarded packet until idle txd available */
425 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
426 list_add_tail (&fwd->kprfd_list,
427 &kqswnal_data.kqn_idletxd_fwdq);
430 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
432 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
433 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
439 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
441 switch (ktx->ktx_state) {
442 case KTX_FORWARDING: /* router asked me to forward this packet */
443 kpr_fwd_done (&kqswnal_data.kqn_router,
444 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
447 case KTX_RDMAING: /* optimized GET/PUT handled */
448 case KTX_PUTTING: /* optimized PUT sent */
449 case KTX_SENDING: /* normal send */
450 lib_finalize (&kqswnal_lib, NULL,
451 (lib_msg_t *)ktx->ktx_args[1],
452 (error == 0) ? PTL_OK : PTL_FAIL);
455 case KTX_GETTING: /* optimized GET sent & REPLY received */
456 /* Complete the GET with success since we can't avoid
457 * delivering a REPLY event; we committed to it when we
458 * launched the GET */
459 lib_finalize (&kqswnal_lib, NULL,
460 (lib_msg_t *)ktx->ktx_args[1], PTL_OK);
461 lib_finalize (&kqswnal_lib, NULL,
462 (lib_msg_t *)ktx->ktx_args[2],
463 (error == 0) ? PTL_OK : PTL_FAIL);
470 kqswnal_put_idle_tx (ktx);
474 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
476 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
478 LASSERT (txd != NULL);
479 LASSERT (ktx != NULL);
481 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
483 if (status != EP_SUCCESS) {
485 CERROR ("Tx completion to "LPX64" failed: %d\n",
486 ktx->ktx_nid, status);
488 kqswnal_notify_peer_down(ktx);
491 } else switch (ktx->ktx_state) {
495 /* RPC completed OK; but what did our peer put in the status
498 status = ep_txd_statusblk(txd)->Data[0];
500 status = ep_txd_statusblk(txd)->Status;
514 kqswnal_tx_done (ktx, status);
518 kqswnal_launch (kqswnal_tx_t *ktx)
520 /* Don't block for transmit descriptor if we're in interrupt context */
521 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
522 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
526 ktx->ktx_launchtime = jiffies;
528 if (kqswnal_data.kqn_shuttingdown)
531 LASSERT (dest >= 0); /* must be a peer */
534 if (ktx->ktx_nmappedpages != 0)
535 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
538 switch (ktx->ktx_state) {
541 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
542 * The other frags are the payload, awaiting RDMA */
543 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
545 kqswnal_txhandler, ktx,
546 NULL, ktx->ktx_frags, 1);
552 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
554 kqswnal_txhandler, ktx,
555 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
557 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
559 kqswnal_txhandler, ktx,
560 ktx->ktx_frags, ktx->ktx_nfrag);
566 rc = -EINVAL; /* no compiler warning please */
571 case EP_SUCCESS: /* success */
574 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
575 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
577 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
578 wake_up (&kqswnal_data.kqn_sched_waitq);
580 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
583 default: /* fatal error */
584 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
585 kqswnal_notify_peer_down(ktx);
586 return (-EHOSTUNREACH);
592 hdr_type_string (ptl_hdr_t *hdr)
604 return ("<UNKNOWN>");
609 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
611 char *type_str = hdr_type_string (hdr);
613 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
614 le32_to_cpu(hdr->payload_length));
615 CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
616 le32_to_cpu(hdr->src_pid));
617 CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
618 le32_to_cpu(hdr->dest_pid));
620 switch (le32_to_cpu(hdr->type)) {
622 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
623 "match bits "LPX64"\n",
624 le32_to_cpu(hdr->msg.put.ptl_index),
625 hdr->msg.put.ack_wmd.wh_interface_cookie,
626 hdr->msg.put.ack_wmd.wh_object_cookie,
627 le64_to_cpu(hdr->msg.put.match_bits));
628 CERROR(" offset %d, hdr data "LPX64"\n",
629 le32_to_cpu(hdr->msg.put.offset),
630 hdr->msg.put.hdr_data);
634 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
635 "match bits "LPX64"\n",
636 le32_to_cpu(hdr->msg.get.ptl_index),
637 hdr->msg.get.return_wmd.wh_interface_cookie,
638 hdr->msg.get.return_wmd.wh_object_cookie,
639 hdr->msg.get.match_bits);
640 CERROR(" Length %d, src offset %d\n",
641 le32_to_cpu(hdr->msg.get.sink_length),
642 le32_to_cpu(hdr->msg.get.src_offset));
646 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
647 hdr->msg.ack.dst_wmd.wh_interface_cookie,
648 hdr->msg.ack.dst_wmd.wh_object_cookie,
649 le32_to_cpu(hdr->msg.ack.mlength));
653 CERROR(" dst md "LPX64"."LPX64"\n",
654 hdr->msg.reply.dst_wmd.wh_interface_cookie,
655 hdr->msg.reply.dst_wmd.wh_object_cookie);
658 } /* end of print_hdr() */
663 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
667 CDEBUG (how, "%s: %d\n", str, n);
668 for (i = 0; i < n; i++) {
669 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
674 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
675 int nsrc, EP_IOVEC *src,
676 int ndst, EP_IOVEC *dst)
685 for (count = 0; count < ndv; count++, dv++) {
687 if (nsrc == 0 || ndst == 0) {
689 /* For now I'll barf on any left over entries */
690 CERROR ("mismatched src and dst iovs\n");
696 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
698 dv->Source = src->Base;
699 dv->Dest = dst->Base;
701 if (nob >= src->Len) {
709 if (nob >= dst->Len) {
718 CERROR ("DATAVEC too small\n");
723 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
724 int nrfrag, EP_NMD *rfrag)
728 if (nlfrag != nrfrag) {
729 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
734 for (i = 0; i < nlfrag; i++)
735 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
736 CERROR("Can't cope with unequal frags %d(%d):"
737 " %d local %d remote\n",
738 i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
747 kqswnal_parse_rmd (kqswnal_rx_t *krx, int type, ptl_nid_t expected_nid)
749 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
750 ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
751 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
752 ptl_nid_t nid = kqswnal_rx_nid(krx);
754 /* Note (1) lib_parse has already flipped hdr.
755 * (2) RDMA addresses are sent in native endian-ness. When
756 * EKC copes with different endian nodes, I'll fix this (and
759 LASSERT (krx->krx_nob >= sizeof(*hdr));
761 if (hdr->type != type) {
762 CERROR ("Unexpected optimized get/put type %d (%d expected)"
763 "from "LPX64"\n", hdr->type, type, nid);
767 if (hdr->src_nid != nid) {
768 CERROR ("Unexpected optimized get/put source NID "
769 LPX64" from "LPX64"\n", hdr->src_nid, nid);
773 LASSERT (nid == expected_nid);
775 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
776 /* msg too small to discover rmd size */
777 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
778 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
782 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
783 /* rmd doesn't fit in the incoming message */
784 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
785 krx->krx_nob, rmd->kqrmd_nfrag,
786 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
794 kqswnal_rdma_store_complete (EP_RXD *rxd)
796 int status = ep_rxd_status(rxd);
797 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
798 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
800 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
801 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
803 LASSERT (ktx->ktx_state == KTX_RDMAING);
804 LASSERT (krx->krx_rxd == rxd);
805 LASSERT (krx->krx_rpc_reply_needed);
807 krx->krx_rpc_reply_needed = 0;
808 kqswnal_rx_decref (krx);
810 /* free ktx & finalize() its lib_msg_t */
811 kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
815 kqswnal_rdma_fetch_complete (EP_RXD *rxd)
817 /* Completed fetching the PUT data */
818 int status = ep_rxd_status(rxd);
819 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
820 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
823 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
824 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
826 LASSERT (ktx->ktx_state == KTX_RDMAING);
827 LASSERT (krx->krx_rxd == rxd);
828 /* RPC completes with failure by default */
829 LASSERT (krx->krx_rpc_reply_needed);
830 LASSERT (krx->krx_rpc_reply_status != 0);
832 if (status == EP_SUCCESS) {
833 status = krx->krx_rpc_reply_status = 0;
835 /* Abandon RPC since get failed */
836 krx->krx_rpc_reply_needed = 0;
837 status = -ECONNABORTED;
840 /* free ktx & finalize() its lib_msg_t */
841 kqswnal_tx_done(ktx, status);
843 if (!in_interrupt()) {
844 /* OK to complete the RPC now (iff I had the last ref) */
845 kqswnal_rx_decref (krx);
849 LASSERT (krx->krx_state == KRX_PARSE);
850 krx->krx_state = KRX_COMPLETING;
852 /* Complete the RPC in thread context */
853 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
855 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
856 wake_up (&kqswnal_data.kqn_sched_waitq);
858 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
862 kqswnal_rdma (kqswnal_rx_t *krx, lib_msg_t *libmsg, int type,
863 int niov, struct iovec *iov, ptl_kiov_t *kiov,
864 size_t offset, size_t len)
866 kqswnal_remotemd_t *rmd;
871 EP_DATAVEC datav[EP_MAXFRAG];
875 LASSERT (type == PTL_MSG_GET || type == PTL_MSG_PUT);
876 /* Not both mapped and paged payload */
877 LASSERT (iov == NULL || kiov == NULL);
878 /* RPC completes with failure by default */
879 LASSERT (krx->krx_rpc_reply_needed);
880 LASSERT (krx->krx_rpc_reply_status != 0);
882 rmd = kqswnal_parse_rmd(krx, type, libmsg->ev.initiator.nid);
887 /* data got truncated to nothing. */
888 lib_finalize(&kqswnal_lib, krx, libmsg, PTL_OK);
889 /* Let kqswnal_rx_done() complete the RPC with success */
890 krx->krx_rpc_reply_status = 0;
894 /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
895 actually sending a portals message with it */
896 ktx = kqswnal_get_idle_tx(NULL, 0);
898 CERROR ("Can't get txd for RDMA with "LPX64"\n",
899 libmsg->ev.initiator.nid);
903 ktx->ktx_state = KTX_RDMAING;
904 ktx->ktx_nid = libmsg->ev.initiator.nid;
905 ktx->ktx_args[0] = krx;
906 ktx->ktx_args[1] = libmsg;
909 /* Map on the rail the RPC prefers */
910 ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
911 ep_rxd_railmask(krx->krx_rxd));
914 /* Start mapping at offset 0 (we're not mapping any headers) */
915 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
918 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
920 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
923 CERROR ("Can't map local RDMA data: %d\n", rc);
928 rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
929 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
931 CERROR ("Incompatible RDMA descriptors\n");
940 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
941 ktx->ktx_nfrag, ktx->ktx_frags,
942 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
946 ndatav = kqswnal_eiovs2datav(EP_MAXFRAG, datav,
947 rmd->kqrmd_nfrag, rmd->kqrmd_frag,
948 ktx->ktx_nfrag, ktx->ktx_frags);
953 CERROR ("Can't create datavec: %d\n", ndatav);
959 LASSERT (atomic_read(&krx->krx_refcount) > 0);
960 /* Take an extra ref for the completion callback */
961 atomic_inc(&krx->krx_refcount);
969 eprc = ep_complete_rpc(krx->krx_rxd,
970 kqswnal_rdma_store_complete, ktx,
971 &kqswnal_data.kqn_rpc_success,
972 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
974 eprc = ep_complete_rpc (krx->krx_rxd,
975 kqswnal_rdma_store_complete, ktx,
976 &kqswnal_data.kqn_rpc_success,
978 if (eprc != EP_SUCCESS) /* "old" EKC destroys rxd on failed completion */
981 if (eprc != EP_SUCCESS) {
982 CERROR("can't complete RPC: %d\n", eprc);
983 /* don't re-attempt RPC completion */
984 krx->krx_rpc_reply_needed = 0;
991 eprc = ep_rpc_get (krx->krx_rxd,
992 kqswnal_rdma_fetch_complete, ktx,
993 rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
995 eprc = ep_rpc_get (krx->krx_rxd,
996 kqswnal_rdma_fetch_complete, ktx,
999 if (eprc != EP_SUCCESS) {
1000 CERROR("ep_rpc_get failed: %d\n", eprc);
1001 /* Don't attempt RPC completion:
1002 * EKC nuked it when the get failed */
1003 krx->krx_rpc_reply_needed = 0;
1011 kqswnal_rx_decref(krx); /* drop callback's ref */
1012 kqswnal_put_idle_tx (ktx);
1015 atomic_dec(&kqswnal_data.kqn_pending_txs);
1020 kqswnal_sendmsg (lib_nal_t *nal,
1027 unsigned int payload_niov,
1028 struct iovec *payload_iov,
1029 ptl_kiov_t *payload_kiov,
1030 size_t payload_offset,
1035 ptl_nid_t targetnid;
1042 /* NB 1. hdr is in network byte order */
1043 /* 2. 'private' depends on the message type */
1045 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
1046 " pid %u\n", payload_nob, payload_niov, nid, pid);
1048 LASSERT (payload_nob == 0 || payload_niov > 0);
1049 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1051 /* It must be OK to kmap() if required */
1052 LASSERT (payload_kiov == NULL || !in_interrupt ());
1053 /* payload is either all vaddrs or all pages */
1054 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1056 if (payload_nob > KQSW_MAXPAYLOAD) {
1057 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
1058 payload_nob, KQSW_MAXPAYLOAD);
1062 if (type == PTL_MSG_REPLY && /* can I look in 'private' */
1063 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) { /* is it an RPC */
1064 /* Must be a REPLY for an optimized GET */
1065 rc = kqswnal_rdma ((kqswnal_rx_t *)private, libmsg, PTL_MSG_GET,
1066 payload_niov, payload_iov, payload_kiov,
1067 payload_offset, payload_nob);
1068 return ((rc == 0) ? PTL_OK : PTL_FAIL);
1072 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
1073 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
1074 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
1076 CERROR("Can't route to "LPX64": router error %d\n",
1080 if (kqswnal_nid2elanid (targetnid) < 0) {
1081 CERROR("Bad gateway "LPX64" for "LPX64"\n",
1087 /* I may not block for a transmit descriptor if I might block the
1088 * receiver, or an interrupt handler. */
1089 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
1090 type == PTL_MSG_REPLY ||
1093 CERROR ("Can't get txd for msg type %d for "LPX64"\n",
1094 type, libmsg->ev.initiator.nid);
1095 return (PTL_NO_SPACE);
1098 ktx->ktx_state = KTX_SENDING;
1099 ktx->ktx_nid = targetnid;
1100 ktx->ktx_args[0] = private;
1101 ktx->ktx_args[1] = libmsg;
1102 ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
1104 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
1107 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
1108 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
1109 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
1111 if (payload_kiov != NULL) {
1112 ptl_kiov_t *kiov = &payload_kiov[i];
1114 if (sumoff >= kiov->kiov_len) {
1115 sumoff -= kiov->kiov_len;
1117 char *addr = ((char *)kmap (kiov->kiov_page)) +
1118 kiov->kiov_offset + sumoff;
1119 int fragnob = kiov->kiov_len - sumoff;
1121 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1124 kunmap(kiov->kiov_page);
1127 struct iovec *iov = &payload_iov[i];
1129 if (sumoff > iov->iov_len) {
1130 sumoff -= iov->iov_len;
1132 char *addr = iov->iov_base + sumoff;
1133 int fragnob = iov->iov_len - sumoff;
1135 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1141 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1144 /* The first frag will be the pre-mapped buffer for (at least) the
1145 * portals header. */
1146 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1148 if (nid == targetnid && /* not forwarding */
1149 ((type == PTL_MSG_GET && /* optimize GET? */
1150 kqswnal_tunables.kqn_optimized_gets != 0 &&
1151 le32_to_cpu(hdr->msg.get.sink_length) >= kqswnal_tunables.kqn_optimized_gets) ||
1152 (type == PTL_MSG_PUT && /* optimize PUT? */
1153 kqswnal_tunables.kqn_optimized_puts != 0 &&
1154 payload_nob >= kqswnal_tunables.kqn_optimized_puts))) {
1155 lib_md_t *md = libmsg->md;
1156 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1158 /* Optimised path: I send over the Elan vaddrs of the local
1159 * buffers, and my peer DMAs directly to/from them.
1161 * First I set up ktx as if it was going to send this
1162 * payload, (it needs to map it anyway). This fills
1163 * ktx_frags[1] and onward with the network addresses
1164 * of the GET sink frags. I copy these into ktx_buffer,
1165 * immediately after the header, and send that as my
1168 ktx->ktx_state = (type == PTL_MSG_PUT) ? KTX_PUTTING : KTX_GETTING;
1170 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1171 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1172 md->md_niov, md->md_iov.kiov);
1174 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1175 md->md_niov, md->md_iov.iov);
1179 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1181 payload_nob = offsetof(kqswnal_remotemd_t,
1182 kqrmd_frag[rmd->kqrmd_nfrag]);
1183 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1186 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1187 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1189 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1190 0, KQSW_HDR_SIZE + payload_nob);
1192 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1193 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1195 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1196 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1198 if (type == PTL_MSG_GET) {
1199 /* Allocate reply message now while I'm in thread context */
1200 ktx->ktx_args[2] = lib_create_reply_msg (&kqswnal_lib,
1202 if (ktx->ktx_args[2] == NULL)
1205 /* NB finalizing the REPLY message is my
1206 * responsibility now, whatever happens. */
1209 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1211 /* small message: single frag copied into the pre-mapped buffer */
1214 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1215 0, KQSW_HDR_SIZE + payload_nob);
1217 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1218 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1220 if (payload_nob > 0) {
1221 if (payload_kiov != NULL)
1222 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1223 payload_niov, payload_kiov,
1224 payload_offset, payload_nob);
1226 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1227 payload_niov, payload_iov,
1228 payload_offset, payload_nob);
1232 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1235 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1238 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1239 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1241 if (payload_kiov != NULL)
1242 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1243 payload_niov, payload_kiov);
1245 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1246 payload_niov, payload_iov);
1251 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1252 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1254 rc = kqswnal_launch (ktx);
1257 CDEBUG(rc == 0 ? D_NET : D_ERROR,
1258 "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
1259 rc == 0 ? "Sent" : "Failed to send",
1260 payload_nob, nid, targetnid, rc);
1263 if (ktx->ktx_state == KTX_GETTING &&
1264 ktx->ktx_args[2] != NULL) {
1265 /* We committed to reply, but there was a problem
1266 * launching the GET. We can't avoid delivering a
1267 * REPLY event since we committed above, so we
1268 * pretend the GET succeeded but the REPLY
1271 lib_finalize (&kqswnal_lib, private, libmsg, PTL_OK);
1272 lib_finalize (&kqswnal_lib, private,
1273 (lib_msg_t *)ktx->ktx_args[2], PTL_FAIL);
1276 kqswnal_put_idle_tx (ktx);
1279 atomic_dec(&kqswnal_data.kqn_pending_txs);
1280 return (rc == 0 ? PTL_OK : PTL_FAIL);
1284 kqswnal_send (lib_nal_t *nal,
1291 unsigned int payload_niov,
1292 struct iovec *payload_iov,
1293 size_t payload_offset,
1296 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1297 payload_niov, payload_iov, NULL,
1298 payload_offset, payload_nob));
1302 kqswnal_send_pages (lib_nal_t *nal,
1309 unsigned int payload_niov,
1310 ptl_kiov_t *payload_kiov,
1311 size_t payload_offset,
1314 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1315 payload_niov, NULL, payload_kiov,
1316 payload_offset, payload_nob));
1320 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1324 ptl_kiov_t *kiov = fwd->kprfd_kiov;
1325 int niov = fwd->kprfd_niov;
1326 int nob = fwd->kprfd_nob;
1327 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1330 CERROR ("checksums for forwarded packets not implemented\n");
1333 /* The router wants this NAL to forward a packet */
1334 CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1335 fwd, nid, niov, nob);
1337 ktx = kqswnal_get_idle_tx (fwd, 0);
1338 if (ktx == NULL) /* can't get txd right now */
1339 return; /* fwd will be scheduled when tx desc freed */
1341 if (nid == kqswnal_lib.libnal_ni.ni_pid.nid) /* gateway is me */
1342 nid = fwd->kprfd_target_nid; /* target is final dest */
1344 if (kqswnal_nid2elanid (nid) < 0) {
1345 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1350 /* copy hdr into pre-mapped buffer */
1351 memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1353 ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
1354 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1356 ktx->ktx_state = KTX_FORWARDING;
1357 ktx->ktx_args[0] = fwd;
1358 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1360 if (nob <= KQSW_TX_MAXCONTIG)
1362 /* send payload from ktx's pre-mapped contiguous buffer */
1364 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1365 0, KQSW_HDR_SIZE + nob);
1367 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1368 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1371 lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1372 niov, kiov, 0, nob);
1376 /* zero copy payload */
1378 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1381 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1382 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1384 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1389 rc = kqswnal_launch (ktx);
1392 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1394 /* complete now (with failure) */
1395 kqswnal_tx_done (ktx, rc);
1398 atomic_dec(&kqswnal_data.kqn_pending_txs);
1402 kqswnal_fwd_callback (void *arg, int error)
1404 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1406 /* The router has finished forwarding this packet */
1410 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1412 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1413 le64_to_cpu(hdr->src_nid), le64_to_cpu(hdr->dest_nid),error);
1416 LASSERT (atomic_read(&krx->krx_refcount) == 1);
1417 kqswnal_rx_decref (krx);
1421 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1423 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1424 LASSERT (!krx->krx_rpc_reply_needed);
1426 krx->krx_state = KRX_POSTED;
1429 if (kqswnal_data.kqn_shuttingdown) {
1430 /* free EKC rxd on shutdown */
1431 ep_complete_receive(krx->krx_rxd);
1433 /* repost receive */
1434 ep_requeue_receive(krx->krx_rxd,
1435 kqswnal_rxhandler, krx,
1436 &krx->krx_elanbuffer, 0);
1439 if (kqswnal_data.kqn_shuttingdown)
1442 if (krx->krx_rxd == NULL) {
1443 /* We had a failed ep_complete_rpc() which nukes the
1444 * descriptor in "old" EKC */
1445 int eprc = ep_queue_receive(krx->krx_eprx,
1446 kqswnal_rxhandler, krx,
1447 krx->krx_elanbuffer,
1448 krx->krx_npages * PAGE_SIZE, 0);
1449 LASSERT (eprc == EP_SUCCESS);
1450 /* We don't handle failure here; it's incredibly rare
1451 * (never reported?) and only happens with "old" EKC */
1453 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1454 krx->krx_elanbuffer,
1455 krx->krx_npages * PAGE_SIZE);
1461 kqswnal_rpc_complete (EP_RXD *rxd)
1463 int status = ep_rxd_status(rxd);
1464 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1466 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1467 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1469 LASSERT (krx->krx_rxd == rxd);
1470 LASSERT (krx->krx_rpc_reply_needed);
1472 krx->krx_rpc_reply_needed = 0;
1473 kqswnal_requeue_rx (krx);
1477 kqswnal_rx_done (kqswnal_rx_t *krx)
1482 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1484 if (krx->krx_rpc_reply_needed) {
1485 /* We've not completed the peer's RPC yet... */
1486 sblk = (krx->krx_rpc_reply_status == 0) ?
1487 &kqswnal_data.kqn_rpc_success :
1488 &kqswnal_data.kqn_rpc_failed;
1490 LASSERT (!in_interrupt());
1492 rc = ep_complete_rpc(krx->krx_rxd,
1493 kqswnal_rpc_complete, krx,
1494 sblk, NULL, NULL, 0);
1495 if (rc == EP_SUCCESS)
1498 rc = ep_complete_rpc(krx->krx_rxd,
1499 kqswnal_rpc_complete, krx,
1501 if (rc == EP_SUCCESS)
1504 /* "old" EKC destroys rxd on failed completion */
1505 krx->krx_rxd = NULL;
1507 CERROR("can't complete RPC: %d\n", rc);
1508 krx->krx_rpc_reply_needed = 0;
1511 kqswnal_requeue_rx(krx);
1515 kqswnal_parse (kqswnal_rx_t *krx)
1517 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1518 ptl_nid_t dest_nid = le64_to_cpu(hdr->dest_nid);
1523 LASSERT (atomic_read(&krx->krx_refcount) == 1);
1525 if (dest_nid == kqswnal_lib.libnal_ni.ni_pid.nid) { /* It's for me :) */
1526 /* I ignore parse errors since I'm not consuming a byte
1528 (void)lib_parse (&kqswnal_lib, hdr, krx);
1530 /* Drop my ref; any RDMA activity takes an additional ref */
1531 kqswnal_rx_decref(krx);
1536 LASSERTF (0, "checksums for forwarded packets not implemented\n");
1539 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1541 CERROR("dropping packet from "LPX64" for "LPX64
1542 ": target is peer\n", le64_to_cpu(hdr->src_nid), dest_nid);
1544 kqswnal_rx_decref (krx);
1548 nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1551 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1552 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1554 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1557 LASSERT (niov < krx->krx_npages);
1559 krx->krx_kiov[niov].kiov_offset = 0;
1560 krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1566 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1567 hdr, payload_nob, niov, krx->krx_kiov,
1568 kqswnal_fwd_callback, krx);
1570 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1573 /* Receive Interrupt Handler: posts to schedulers */
1575 kqswnal_rxhandler(EP_RXD *rxd)
1577 unsigned long flags;
1578 int nob = ep_rxd_len (rxd);
1579 int status = ep_rxd_status (rxd);
1580 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1582 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1583 rxd, krx, nob, status);
1585 LASSERT (krx != NULL);
1586 LASSERT (krx->krx_state = KRX_POSTED);
1588 krx->krx_state = KRX_PARSE;
1592 /* RPC reply iff rpc request received without error */
1593 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1594 (status == EP_SUCCESS ||
1595 status == EP_MSG_TOO_BIG);
1597 /* Default to failure if an RPC reply is requested but not handled */
1598 krx->krx_rpc_reply_status = -EPROTO;
1599 atomic_set (&krx->krx_refcount, 1);
1601 /* must receive a whole header to be able to parse */
1602 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1604 /* receives complete with failure when receiver is removed */
1606 if (status == EP_SHUTDOWN)
1607 LASSERT (kqswnal_data.kqn_shuttingdown);
1609 CERROR("receive status failed with status %d nob %d\n",
1610 ep_rxd_status(rxd), nob);
1612 if (!kqswnal_data.kqn_shuttingdown)
1613 CERROR("receive status failed with status %d nob %d\n",
1614 ep_rxd_status(rxd), nob);
1616 kqswnal_rx_decref(krx);
1620 if (!in_interrupt()) {
1625 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1627 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1628 wake_up (&kqswnal_data.kqn_sched_waitq);
1630 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1635 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1637 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1639 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1640 ", dpid %d, spid %d, type %d\n",
1641 ishdr ? "Header" : "Payload", krx,
1642 le64_to_cpu(hdr->dest_nid), le64_to_cpu(hdr->src_nid)
1643 le32_to_cpu(hdr->dest_pid), le32_to_cpu(hdr->src_pid),
1644 le32_to_cpu(hdr->type));
1646 switch (le32_to_cpu(hdr->type))
1649 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1651 le32_to_cpu(hdr->msg.ack.mlength),
1652 hdr->msg.ack.dst_wmd.handle_cookie,
1653 hdr->msg.ack.dst_wmd.handle_idx,
1654 le64_to_cpu(hdr->msg.ack.match_bits),
1655 le32_to_cpu(hdr->msg.ack.length));
1658 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1659 " len %u off %u data "LPX64"\n",
1660 le32_to_cpu(hdr->msg.put.ptl_index),
1661 hdr->msg.put.ack_wmd.handle_cookie,
1662 hdr->msg.put.ack_wmd.handle_idx,
1663 le64_to_cpu(hdr->msg.put.match_bits),
1664 le32_to_cpu(hdr->msg.put.length),
1665 le32_to_cpu(hdr->msg.put.offset),
1666 hdr->msg.put.hdr_data);
1669 CERROR ("GET: <>\n");
1672 CERROR ("REPLY: <>\n");
1675 CERROR ("TYPE?: <>\n");
1681 kqswnal_recvmsg (lib_nal_t *nal,
1691 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1692 char *buffer = page_address(krx->krx_kiov[0].kiov_page);
1693 ptl_hdr_t *hdr = (ptl_hdr_t *)buffer;
1702 kqsw_csum_t senders_csum;
1703 kqsw_csum_t payload_csum = 0;
1704 kqsw_csum_t hdr_csum = kqsw_csum(0, hdr, sizeof(*hdr));
1705 size_t csum_len = mlen;
1708 static atomic_t csum_counter;
1709 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1711 atomic_inc (&csum_counter);
1713 memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1714 if (senders_csum != hdr_csum)
1715 kqswnal_csum_error (krx, 1);
1717 /* NB lib_parse() has already flipped *hdr */
1719 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1721 if (libmsg == NULL) { /* portals is discarding. */
1722 LASSERT (mlen == 0);
1723 return PTL_OK; /* ignored by caller! */
1726 if (krx->krx_rpc_reply_needed &&
1727 hdr->type == PTL_MSG_PUT) {
1728 /* This must be an optimized PUT */
1729 rc = kqswnal_rdma (krx, libmsg, PTL_MSG_PUT,
1730 niov, iov, kiov, offset, mlen);
1731 return (rc == 0 ? PTL_OK : PTL_FAIL);
1734 /* What was actually received must be >= payload. */
1735 LASSERT (mlen <= rlen);
1736 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1737 CERROR("Bad message size: have %d, need %d + %d\n",
1738 krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1742 /* It must be OK to kmap() if required */
1743 LASSERT (kiov == NULL || !in_interrupt ());
1744 /* Either all pages or all vaddrs */
1745 LASSERT (!(kiov != NULL && iov != NULL));
1749 page_ptr = buffer + KQSW_HDR_SIZE;
1750 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1755 /* skip complete frags */
1756 while (offset >= kiov->kiov_len) {
1757 offset -= kiov->kiov_len;
1762 iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1763 kiov->kiov_offset + offset;
1764 iov_nob = kiov->kiov_len - offset;
1766 /* skip complete frags */
1767 while (offset >= iov->iov_len) {
1768 offset -= iov->iov_len;
1773 iov_ptr = iov->iov_base + offset;
1774 iov_nob = iov->iov_len - offset;
1780 if (frag > page_nob)
1785 memcpy (iov_ptr, page_ptr, frag);
1787 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1801 LASSERT (page < krx->krx_npages);
1802 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1803 page_nob = PAGE_SIZE;
1809 else if (kiov != NULL) {
1810 kunmap (kiov->kiov_page);
1814 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1815 iov_nob = kiov->kiov_len;
1820 iov_ptr = iov->iov_base;
1821 iov_nob = iov->iov_len;
1826 kunmap (kiov->kiov_page);
1830 memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
1831 sizeof(kqsw_csum_t));
1833 if (csum_len != rlen)
1834 CERROR("Unable to checksum data in user's buffer\n");
1835 else if (senders_csum != payload_csum)
1836 kqswnal_csum_error (krx, 0);
1839 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1841 hdr_csum, payload_csum, csum_frags, csum_nob);
1843 lib_finalize(nal, private, libmsg, PTL_OK);
1849 kqswnal_recv(lib_nal_t *nal,
1858 return (kqswnal_recvmsg(nal, private, libmsg,
1860 offset, mlen, rlen));
1864 kqswnal_recv_pages (lib_nal_t *nal,
1873 return (kqswnal_recvmsg(nal, private, libmsg,
1875 offset, mlen, rlen));
1879 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1881 long pid = kernel_thread (fn, arg, 0);
1886 atomic_inc (&kqswnal_data.kqn_nthreads);
1891 kqswnal_thread_fini (void)
1893 atomic_dec (&kqswnal_data.kqn_nthreads);
1897 kqswnal_scheduler (void *arg)
1901 kpr_fwd_desc_t *fwd;
1902 unsigned long flags;
1907 kportal_daemonize ("kqswnal_sched");
1908 kportal_blockallsigs ();
1910 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1916 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1918 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1919 kqswnal_rx_t, krx_list);
1920 list_del (&krx->krx_list);
1921 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1924 switch (krx->krx_state) {
1926 kqswnal_parse (krx);
1928 case KRX_COMPLETING:
1929 kqswnal_rx_decref (krx);
1936 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1939 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1941 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1942 kqswnal_tx_t, ktx_list);
1943 list_del_init (&ktx->ktx_delayed_list);
1944 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1947 rc = kqswnal_launch (ktx);
1949 CERROR("Failed delayed transmit to "LPX64
1950 ": %d\n", ktx->ktx_nid, rc);
1951 kqswnal_tx_done (ktx, rc);
1953 atomic_dec (&kqswnal_data.kqn_pending_txs);
1956 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1959 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1961 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1962 list_del (&fwd->kprfd_list);
1963 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1965 /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1966 kqswnal_fwd_packet (NULL, fwd);
1969 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1972 /* nothing to do or hogging CPU */
1973 if (!did_something || counter++ == KQSW_RESCHED) {
1974 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1979 if (!did_something) {
1980 if (kqswnal_data.kqn_shuttingdown == 2) {
1981 /* We only exit in stage 2 of shutdown when
1982 * there's nothing left to do */
1985 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1986 kqswnal_data.kqn_shuttingdown == 2 ||
1987 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1988 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1989 !list_empty(&kqswnal_data.kqn_delayedfwds));
1991 } else if (need_resched())
1994 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1998 kqswnal_thread_fini ();
2002 lib_nal_t kqswnal_lib =
2004 libnal_data: &kqswnal_data, /* NAL private data */
2005 libnal_send: kqswnal_send,
2006 libnal_send_pages: kqswnal_send_pages,
2007 libnal_recv: kqswnal_recv,
2008 libnal_recv_pages: kqswnal_recv_pages,
2009 libnal_dist: kqswnal_dist