1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 EP_STATUSBLK kqswnal_rpc_success;
30 EP_STATUSBLK kqswnal_rpc_failed;
33 * LIB functions follow
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41 nal->ni.nid, len, src_addr, dst_addr );
42 memcpy( dst_addr, src_addr, len );
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52 nal->ni.nid, len, src_addr, dst_addr );
53 memcpy( dst_addr, src_addr, len );
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
63 PORTAL_ALLOC(buf, len);
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
70 PORTAL_FREE(buf, len);
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
80 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
83 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
85 CDEBUG (D_NET, "%s", msg);
88 #if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
89 # error "Can't save/restore irq contexts in different procedures"
93 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
95 kqswnal_data_t *data= nal->nal_data;
97 spin_lock_irqsave(&data->kqn_statelock, *flags);
102 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
104 kqswnal_data_t *data= nal->nal_data;
106 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
110 kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
112 /* holding kqn_statelock */
114 if (eq->event_callback != NULL)
115 eq->event_callback(ev);
117 if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
118 wake_up_all(&kqswnal_data.kqn_yield_waitq);
122 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
124 if (nid == nal->ni.nid)
125 *dist = 0; /* it's me */
126 else if (kqswnal_nid2elanid (nid) >= 0)
127 *dist = 1; /* it's my peer */
129 *dist = 2; /* via router */
134 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
139 do_gettimeofday (&now);
140 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
142 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
146 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
152 if (ktx->ktx_nmappedpages == 0)
156 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
157 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
159 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
160 ep_dvma_unload(kqswnal_data.kqn_ep,
161 kqswnal_data.kqn_ep_tx_nmh,
164 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
165 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
167 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
168 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
169 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
171 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
172 kqswnal_data.kqn_eptxdmahandle,
173 ktx->ktx_basepage, ktx->ktx_nmappedpages);
175 ktx->ktx_nmappedpages = 0;
179 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
181 int nfrags = ktx->ktx_nfrag;
182 int nmapped = ktx->ktx_nmappedpages;
183 int maxmapped = ktx->ktx_npages;
184 uint32_t basepage = ktx->ktx_basepage + nmapped;
187 EP_RAILMASK railmask;
188 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
190 kqswnal_nid2elanid(ktx->ktx_nid));
193 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
196 railmask = 1 << rail;
198 LASSERT (nmapped <= maxmapped);
199 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
200 LASSERT (nfrags <= EP_MAXFRAG);
204 /* skip complete frags before 'offset' */
205 while (offset >= kiov->kiov_len) {
206 offset -= kiov->kiov_len;
213 int fraglen = kiov->kiov_len - offset;
215 /* nob exactly spans the iovs */
216 LASSERT (fraglen <= nob);
217 /* each frag fits in a page */
218 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
221 if (nmapped > maxmapped) {
222 CERROR("Can't map message in %d pages (max %d)\n",
227 if (nfrags == EP_MAXFRAG) {
228 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
233 /* XXX this is really crap, but we'll have to kmap until
234 * EKC has a page (rather than vaddr) mapping interface */
236 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
239 "%p[%d] loading %p for %d, page %d, %d total\n",
240 ktx, nfrags, ptr, fraglen, basepage, nmapped);
243 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
245 kqswnal_data.kqn_ep_tx_nmh, basepage,
246 &railmask, &ktx->ktx_frags[nfrags]);
248 if (nfrags == ktx->ktx_firsttmpfrag ||
249 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
250 &ktx->ktx_frags[nfrags - 1],
251 &ktx->ktx_frags[nfrags])) {
252 /* new frag if this is the first or can't merge */
256 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
257 kqswnal_data.kqn_eptxdmahandle,
259 basepage, &ktx->ktx_frags[nfrags].Base);
261 if (nfrags > 0 && /* previous frag mapped */
262 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
263 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
264 /* just extend previous */
265 ktx->ktx_frags[nfrags - 1].Len += fraglen;
267 ktx->ktx_frags[nfrags].Len = fraglen;
268 nfrags++; /* new frag */
272 kunmap (kiov->kiov_page);
274 /* keep in loop for failure case */
275 ktx->ktx_nmappedpages = nmapped;
283 /* iov must not run out before end of data */
284 LASSERT (nob == 0 || niov > 0);
288 ktx->ktx_nfrag = nfrags;
289 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
290 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
296 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
297 int niov, struct iovec *iov)
299 int nfrags = ktx->ktx_nfrag;
300 int nmapped = ktx->ktx_nmappedpages;
301 int maxmapped = ktx->ktx_npages;
302 uint32_t basepage = ktx->ktx_basepage + nmapped;
304 EP_RAILMASK railmask;
305 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
307 kqswnal_nid2elanid(ktx->ktx_nid));
310 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
313 railmask = 1 << rail;
315 LASSERT (nmapped <= maxmapped);
316 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
317 LASSERT (nfrags <= EP_MAXFRAG);
321 /* skip complete frags before offset */
322 while (offset >= iov->iov_len) {
323 offset -= iov->iov_len;
330 int fraglen = iov->iov_len - offset;
331 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
333 /* nob exactly spans the iovs */
334 LASSERT (fraglen <= nob);
337 if (nmapped > maxmapped) {
338 CERROR("Can't map message in %d pages (max %d)\n",
343 if (nfrags == EP_MAXFRAG) {
344 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
350 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
351 ktx, nfrags, iov->iov_base + offset, fraglen,
352 basepage, npages, nmapped);
355 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
356 iov->iov_base + offset, fraglen,
357 kqswnal_data.kqn_ep_tx_nmh, basepage,
358 &railmask, &ktx->ktx_frags[nfrags]);
360 if (nfrags == ktx->ktx_firsttmpfrag ||
361 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
362 &ktx->ktx_frags[nfrags - 1],
363 &ktx->ktx_frags[nfrags])) {
364 /* new frag if this is the first or can't merge */
368 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
369 kqswnal_data.kqn_eptxdmahandle,
370 iov->iov_base + offset, fraglen,
371 basepage, &ktx->ktx_frags[nfrags].Base);
373 if (nfrags > 0 && /* previous frag mapped */
374 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
375 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
376 /* just extend previous */
377 ktx->ktx_frags[nfrags - 1].Len += fraglen;
379 ktx->ktx_frags[nfrags].Len = fraglen;
380 nfrags++; /* new frag */
384 /* keep in loop for failure case */
385 ktx->ktx_nmappedpages = nmapped;
393 /* iov must not run out before end of data */
394 LASSERT (nob == 0 || niov > 0);
398 ktx->ktx_nfrag = nfrags;
399 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
400 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
407 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
409 kpr_fwd_desc_t *fwd = NULL;
412 kqswnal_unmap_tx (ktx); /* release temporary mappings */
413 ktx->ktx_state = KTX_IDLE;
415 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
417 list_del (&ktx->ktx_list); /* take off active list */
419 if (ktx->ktx_isnblk) {
420 /* reserved for non-blocking tx */
421 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
422 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
426 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
428 /* anything blocking for a tx descriptor? */
429 if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
431 CDEBUG(D_NET,"wakeup fwd\n");
433 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
434 kpr_fwd_desc_t, kprfd_list);
435 list_del (&fwd->kprfd_list);
438 wake_up (&kqswnal_data.kqn_idletxd_waitq);
440 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
445 /* schedule packet for forwarding again */
446 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
448 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
449 wake_up (&kqswnal_data.kqn_sched_waitq);
451 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
455 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
458 kqswnal_tx_t *ktx = NULL;
461 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
463 /* "normal" descriptor is free */
464 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
465 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
466 kqswnal_tx_t, ktx_list);
470 /* "normal" descriptor pool is empty */
472 if (fwd != NULL) { /* forwarded packet => queue for idle txd */
473 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
474 list_add_tail (&fwd->kprfd_list,
475 &kqswnal_data.kqn_idletxd_fwdq);
479 /* doing a local transmit */
481 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
482 CERROR ("intr tx desc pool exhausted\n");
486 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
487 kqswnal_tx_t, ktx_list);
491 /* block for idle tx */
493 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
495 CDEBUG (D_NET, "blocking for tx desc\n");
496 wait_event (kqswnal_data.kqn_idletxd_waitq,
497 !list_empty (&kqswnal_data.kqn_idletxds));
501 list_del (&ktx->ktx_list);
502 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
503 ktx->ktx_launcher = current->pid;
506 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
508 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
509 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
515 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
518 lib_msg_t *repmsg = NULL;
520 switch (ktx->ktx_state) {
521 case KTX_FORWARDING: /* router asked me to forward this packet */
522 kpr_fwd_done (&kqswnal_data.kqn_router,
523 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
526 case KTX_SENDING: /* packet sourced locally */
527 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
528 (lib_msg_t *)ktx->ktx_args[1],
529 (error == 0) ? PTL_OK :
530 (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
533 case KTX_GETTING: /* Peer has DMA-ed direct? */
534 msg = (lib_msg_t *)ktx->ktx_args[1];
537 repmsg = lib_create_reply_msg (&kqswnal_lib,
544 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
546 lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
548 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
549 (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
557 kqswnal_put_idle_tx (ktx);
561 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
563 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
565 LASSERT (txd != NULL);
566 LASSERT (ktx != NULL);
568 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
570 if (status != EP_SUCCESS) {
572 CERROR ("Tx completion to "LPX64" failed: %d\n",
573 ktx->ktx_nid, status);
575 kqswnal_notify_peer_down(ktx);
578 } else if (ktx->ktx_state == KTX_GETTING) {
579 /* RPC completed OK; what did our peer put in the status
582 status = ep_txd_statusblk(txd)->Data[0];
584 status = ep_txd_statusblk(txd)->Status;
590 kqswnal_tx_done (ktx, status);
594 kqswnal_launch (kqswnal_tx_t *ktx)
596 /* Don't block for transmit descriptor if we're in interrupt context */
597 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
598 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
602 ktx->ktx_launchtime = jiffies;
604 LASSERT (dest >= 0); /* must be a peer */
605 if (ktx->ktx_state == KTX_GETTING) {
606 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
607 * other frags are the GET sink which we obviously don't
610 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
612 kqswnal_txhandler, ktx,
613 NULL, ktx->ktx_frags, 1);
615 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
616 ktx->ktx_port, attr, kqswnal_txhandler,
617 ktx, NULL, ktx->ktx_frags, 1);
621 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
623 kqswnal_txhandler, ktx,
624 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
626 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
628 kqswnal_txhandler, ktx,
629 ktx->ktx_frags, ktx->ktx_nfrag);
634 case EP_SUCCESS: /* success */
637 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
638 LASSERT (in_interrupt());
640 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
642 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
643 wake_up (&kqswnal_data.kqn_sched_waitq);
645 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
648 default: /* fatal error */
649 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
650 kqswnal_notify_peer_down(ktx);
651 return (-EHOSTUNREACH);
656 hdr_type_string (ptl_hdr_t *hdr)
668 return ("<UNKNOWN>");
673 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
675 char *type_str = hdr_type_string (hdr);
677 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
678 NTOH__u32(hdr->payload_length));
679 CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
680 NTOH__u32(hdr->src_pid));
681 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
682 NTOH__u32(hdr->dest_pid));
684 switch (NTOH__u32(hdr->type)) {
686 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
687 "match bits "LPX64"\n",
688 NTOH__u32 (hdr->msg.put.ptl_index),
689 hdr->msg.put.ack_wmd.wh_interface_cookie,
690 hdr->msg.put.ack_wmd.wh_object_cookie,
691 NTOH__u64 (hdr->msg.put.match_bits));
692 CERROR(" offset %d, hdr data "LPX64"\n",
693 NTOH__u32(hdr->msg.put.offset),
694 hdr->msg.put.hdr_data);
698 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
699 "match bits "LPX64"\n",
700 NTOH__u32 (hdr->msg.get.ptl_index),
701 hdr->msg.get.return_wmd.wh_interface_cookie,
702 hdr->msg.get.return_wmd.wh_object_cookie,
703 hdr->msg.get.match_bits);
704 CERROR(" Length %d, src offset %d\n",
705 NTOH__u32 (hdr->msg.get.sink_length),
706 NTOH__u32 (hdr->msg.get.src_offset));
710 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
711 hdr->msg.ack.dst_wmd.wh_interface_cookie,
712 hdr->msg.ack.dst_wmd.wh_object_cookie,
713 NTOH__u32 (hdr->msg.ack.mlength));
717 CERROR(" dst md "LPX64"."LPX64"\n",
718 hdr->msg.reply.dst_wmd.wh_interface_cookie,
719 hdr->msg.reply.dst_wmd.wh_object_cookie);
722 } /* end of print_hdr() */
726 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
730 CDEBUG (how, "%s: %d\n", str, n);
731 for (i = 0; i < n; i++) {
732 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
737 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
738 int nsrc, EP_IOVEC *src,
739 int ndst, EP_IOVEC *dst)
748 for (count = 0; count < ndv; count++, dv++) {
750 if (nsrc == 0 || ndst == 0) {
752 /* For now I'll barf on any left over entries */
753 CERROR ("mismatched src and dst iovs\n");
759 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
761 dv->Source = src->Base;
762 dv->Dest = dst->Base;
764 if (nob >= src->Len) {
772 if (nob >= dst->Len) {
781 CERROR ("DATAVEC too small\n");
787 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
788 struct iovec *iov, ptl_kiov_t *kiov,
791 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
792 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
793 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
798 EP_DATAVEC datav[EP_MAXFRAG];
801 LASSERT (krx->krx_rpc_reply_needed);
802 LASSERT ((iov == NULL) != (kiov == NULL));
804 /* see kqswnal_sendmsg comment regarding endian-ness */
805 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
806 /* msg too small to discover rmd size */
807 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
808 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
812 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
813 /* rmd doesn't fit in the incoming message */
814 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
815 krx->krx_nob, rmd->kqrmd_nfrag,
816 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
820 /* Map the source data... */
821 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
823 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
825 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
828 CERROR ("Can't map source data: %d\n", rc);
833 if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
834 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
835 ktx->ktx_nfrag, rmd->kqrmd_nfrag);
839 for (i = 0; i < rmd->kqrmd_nfrag; i++)
840 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
841 CERROR("Can't cope with unequal frags %d(%d):"
842 " %d local %d remote\n",
844 ktx->ktx_frags[i].nmd_len,
845 rmd->kqrmd_frag[i].nmd_len);
849 ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
850 ktx->ktx_nfrag, ktx->ktx_frags,
851 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
853 CERROR ("Can't create datavec: %d\n", ndatav);
858 /* Our caller will start to race with kqswnal_dma_reply_complete... */
859 LASSERT (atomic_read (&krx->krx_refcount) == 1);
860 atomic_set (&krx->krx_refcount, 2);
863 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
864 &kqswnal_rpc_success,
865 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
866 if (rc == EP_SUCCESS)
869 /* Well we tried... */
870 krx->krx_rpc_reply_needed = 0;
872 rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
873 &kqswnal_rpc_success, datav, ndatav);
874 if (rc == EP_SUCCESS)
877 /* "old" EKC destroys rxd on failed completion */
881 CERROR("can't complete RPC: %d\n", rc);
883 /* reset refcount back to 1: we're not going to be racing with
884 * kqswnal_dma_reply_complete. */
885 atomic_set (&krx->krx_refcount, 1);
887 return (-ECONNABORTED);
891 kqswnal_sendmsg (nal_cb_t *nal,
898 unsigned int payload_niov,
899 struct iovec *payload_iov,
900 ptl_kiov_t *payload_kiov,
901 size_t payload_offset,
914 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
915 " pid %u\n", payload_nob, payload_niov, nid, pid);
917 LASSERT (payload_nob == 0 || payload_niov > 0);
918 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
920 /* It must be OK to kmap() if required */
921 LASSERT (payload_kiov == NULL || !in_interrupt ());
922 /* payload is either all vaddrs or all pages */
923 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
925 if (payload_nob > KQSW_MAXPAYLOAD) {
926 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
927 payload_nob, KQSW_MAXPAYLOAD);
932 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
933 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
934 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
936 CERROR("Can't route to "LPX64": router error %d\n",
940 if (kqswnal_nid2elanid (targetnid) < 0) {
941 CERROR("Bad gateway "LPX64" for "LPX64"\n",
947 /* I may not block for a transmit descriptor if I might block the
948 * receiver, or an interrupt handler. */
949 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
950 type == PTL_MSG_REPLY ||
953 kqswnal_cerror_hdr (hdr);
954 return (PTL_NO_SPACE);
957 ktx->ktx_nid = targetnid;
958 ktx->ktx_args[0] = private;
959 ktx->ktx_args[1] = libmsg;
961 if (type == PTL_MSG_REPLY &&
962 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
963 if (nid != targetnid ||
964 kqswnal_nid2elanid(nid) !=
965 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
966 CERROR("Optimized reply nid conflict: "
967 "nid "LPX64" via "LPX64" elanID %d\n",
969 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
973 /* peer expects RPC completion with GET data */
974 rc = kqswnal_dma_reply (ktx, payload_niov,
975 payload_iov, payload_kiov,
976 payload_offset, payload_nob);
980 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
981 kqswnal_put_idle_tx (ktx);
985 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
986 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
989 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
990 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
991 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
993 if (payload_kiov != NULL) {
994 ptl_kiov_t *kiov = &payload_kiov[i];
996 if (sumoff >= kiov->kiov_len) {
997 sumoff -= kiov->kiov_len;
999 char *addr = ((char *)kmap (kiov->kiov_page)) +
1000 kiov->kiov_offset + sumoff;
1001 int fragnob = kiov->kiov_len - sumoff;
1003 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1006 kunmap(kiov->kiov_page);
1009 struct iovec *iov = &payload_iov[i];
1011 if (sumoff > iov->iov_len) {
1012 sumoff -= iov->iov_len;
1014 char *addr = iov->iov_base + sumoff;
1015 int fragnob = iov->iov_len - sumoff;
1017 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1023 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1026 if (kqswnal_data.kqn_optimized_gets &&
1027 type == PTL_MSG_GET && /* doing a GET */
1028 nid == targetnid) { /* not forwarding */
1029 lib_md_t *md = libmsg->md;
1030 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1032 /* Optimised path: I send over the Elan vaddrs of the get
1033 * sink buffers, and my peer DMAs directly into them.
1035 * First I set up ktx as if it was going to send this
1036 * payload, (it needs to map it anyway). This fills
1037 * ktx_frags[1] and onward with the network addresses
1038 * of the GET sink frags. I copy these into ktx_buffer,
1039 * immediately after the header, and send that as my GET
1042 * Note that the addresses are sent in native endian-ness.
1043 * When EKC copes with different endian nodes, I'll fix
1044 * this (and eat my hat :) */
1046 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1047 ktx->ktx_state = KTX_GETTING;
1049 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1050 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1051 md->md_niov, md->md_iov.kiov);
1053 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1054 md->md_niov, md->md_iov.iov);
1057 kqswnal_put_idle_tx (ktx);
1061 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1063 payload_nob = offsetof(kqswnal_remotemd_t,
1064 kqrmd_frag[rmd->kqrmd_nfrag]);
1065 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1068 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1069 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1071 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1072 0, KQSW_HDR_SIZE + payload_nob);
1074 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1075 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1077 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1078 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1080 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1082 /* small message: single frag copied into the pre-mapped buffer */
1084 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1085 ktx->ktx_state = KTX_SENDING;
1087 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1088 0, KQSW_HDR_SIZE + payload_nob);
1090 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1091 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1093 if (payload_nob > 0) {
1094 if (payload_kiov != NULL)
1095 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1096 payload_niov, payload_kiov,
1097 payload_offset, payload_nob);
1099 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1100 payload_niov, payload_iov,
1101 payload_offset, payload_nob);
1105 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1107 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1108 ktx->ktx_state = KTX_SENDING;
1110 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1113 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1114 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1116 if (payload_kiov != NULL)
1117 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1118 payload_niov, payload_kiov);
1120 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1121 payload_niov, payload_iov);
1123 kqswnal_put_idle_tx (ktx);
1128 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1129 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1131 rc = kqswnal_launch (ktx);
1132 if (rc != 0) { /* failed? */
1133 CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc);
1134 kqswnal_put_idle_tx (ktx);
1138 CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n",
1139 payload_nob, nid, targetnid);
1144 kqswnal_send (nal_cb_t *nal,
1151 unsigned int payload_niov,
1152 struct iovec *payload_iov,
1153 size_t payload_offset,
1156 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1157 payload_niov, payload_iov, NULL,
1158 payload_offset, payload_nob));
1162 kqswnal_send_pages (nal_cb_t *nal,
1169 unsigned int payload_niov,
1170 ptl_kiov_t *payload_kiov,
1171 size_t payload_offset,
1174 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1175 payload_niov, NULL, payload_kiov,
1176 payload_offset, payload_nob));
1180 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1184 ptl_kiov_t *kiov = fwd->kprfd_kiov;
1185 int niov = fwd->kprfd_niov;
1186 int nob = fwd->kprfd_nob;
1187 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1190 CERROR ("checksums for forwarded packets not implemented\n");
1193 /* The router wants this NAL to forward a packet */
1194 CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1195 fwd, nid, niov, nob);
1197 ktx = kqswnal_get_idle_tx (fwd, 0);
1198 if (ktx == NULL) /* can't get txd right now */
1199 return; /* fwd will be scheduled when tx desc freed */
1201 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
1202 nid = fwd->kprfd_target_nid; /* target is final dest */
1204 if (kqswnal_nid2elanid (nid) < 0) {
1205 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1210 /* copy hdr into pre-mapped buffer */
1211 memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1212 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1214 ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
1215 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1217 ktx->ktx_state = KTX_FORWARDING;
1218 ktx->ktx_args[0] = fwd;
1219 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1221 if (nob <= KQSW_TX_MAXCONTIG)
1223 /* send payload from ktx's pre-mapped contiguous buffer */
1225 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1226 0, KQSW_HDR_SIZE + nob);
1228 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1229 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1232 lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1233 niov, kiov, 0, nob);
1237 /* zero copy payload */
1239 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1242 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1243 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1245 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1250 rc = kqswnal_launch (ktx);
1256 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1258 kqswnal_put_idle_tx (ktx);
1259 /* complete now (with failure) */
1260 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1264 kqswnal_fwd_callback (void *arg, int error)
1266 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1268 /* The router has finished forwarding this packet */
1272 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1274 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1275 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1278 kqswnal_requeue_rx (krx);
1282 kqswnal_dma_reply_complete (EP_RXD *rxd)
1284 int status = ep_rxd_status(rxd);
1285 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1286 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1287 lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
1289 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1290 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1292 LASSERT (krx->krx_rxd == rxd);
1293 LASSERT (krx->krx_rpc_reply_needed);
1295 krx->krx_rpc_reply_needed = 0;
1296 kqswnal_rx_done (krx);
1298 lib_finalize (&kqswnal_lib, NULL, msg,
1299 (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1300 kqswnal_put_idle_tx (ktx);
1304 kqswnal_rpc_complete (EP_RXD *rxd)
1306 int status = ep_rxd_status(rxd);
1307 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1309 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1310 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1312 LASSERT (krx->krx_rxd == rxd);
1313 LASSERT (krx->krx_rpc_reply_needed);
1315 krx->krx_rpc_reply_needed = 0;
1316 kqswnal_requeue_rx (krx);
1320 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1324 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1326 if (krx->krx_rpc_reply_needed) {
1328 /* We failed to complete the peer's optimized GET (e.g. we
1329 * couldn't map the source buffers). We complete the
1330 * peer's EKC rpc now with failure. */
1332 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1333 &kqswnal_rpc_failed, NULL, NULL, 0);
1334 if (rc == EP_SUCCESS)
1337 CERROR("can't complete RPC: %d\n", rc);
1339 if (krx->krx_rxd != NULL) {
1340 /* We didn't try (and fail) to complete earlier... */
1341 rc = ep_complete_rpc(krx->krx_rxd,
1342 kqswnal_rpc_complete, krx,
1343 &kqswnal_rpc_failed, NULL, 0);
1344 if (rc == EP_SUCCESS)
1347 CERROR("can't complete RPC: %d\n", rc);
1350 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1351 * have to requeue from scratch here, unless we're shutting
1353 if (kqswnal_data.kqn_shuttingdown)
1356 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1357 krx->krx_elanbuffer,
1358 krx->krx_npages * PAGE_SIZE, 0);
1359 LASSERT (rc == EP_SUCCESS);
1360 /* We don't handle failure here; it's incredibly rare
1361 * (never reported?) and only happens with "old" EKC */
1367 if (kqswnal_data.kqn_shuttingdown) {
1368 /* free EKC rxd on shutdown */
1369 ep_complete_receive(krx->krx_rxd);
1371 /* repost receive */
1372 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1373 &krx->krx_elanbuffer, 0);
1376 /* don't actually requeue on shutdown */
1377 if (!kqswnal_data.kqn_shuttingdown)
1378 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1379 krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1384 kqswnal_rx (kqswnal_rx_t *krx)
1386 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1387 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
1392 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1394 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1395 atomic_set(&krx->krx_refcount, 1);
1396 lib_parse (&kqswnal_lib, hdr, krx);
1397 kqswnal_rx_done(krx);
1402 CERROR ("checksums for forwarded packets not implemented\n");
1405 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1407 CERROR("dropping packet from "LPX64" for "LPX64
1408 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1410 kqswnal_requeue_rx (krx);
1414 nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1417 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1418 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1420 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1423 LASSERT (niov < krx->krx_npages);
1425 krx->krx_kiov[niov].kiov_offset = 0;
1426 krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1432 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1433 hdr, payload_nob, niov, krx->krx_kiov,
1434 kqswnal_fwd_callback, krx);
1436 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1439 /* Receive Interrupt Handler: posts to schedulers */
1441 kqswnal_rxhandler(EP_RXD *rxd)
1444 int nob = ep_rxd_len (rxd);
1445 int status = ep_rxd_status (rxd);
1446 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1448 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1449 rxd, krx, nob, status);
1451 LASSERT (krx != NULL);
1456 krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1458 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1461 /* must receive a whole header to be able to parse */
1462 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1464 /* receives complete with failure when receiver is removed */
1466 if (status == EP_SHUTDOWN)
1467 LASSERT (kqswnal_data.kqn_shuttingdown);
1469 CERROR("receive status failed with status %d nob %d\n",
1470 ep_rxd_status(rxd), nob);
1472 if (!kqswnal_data.kqn_shuttingdown)
1473 CERROR("receive status failed with status %d nob %d\n",
1474 ep_rxd_status(rxd), nob);
1476 kqswnal_requeue_rx (krx);
1480 if (!in_interrupt()) {
1485 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1487 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1488 wake_up (&kqswnal_data.kqn_sched_waitq);
1490 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1495 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1497 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1499 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1500 ", dpid %d, spid %d, type %d\n",
1501 ishdr ? "Header" : "Payload", krx,
1502 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1503 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1504 NTOH__u32(hdr->type));
1506 switch (NTOH__u32 (hdr->type))
1509 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1511 NTOH__u32(hdr->msg.ack.mlength),
1512 hdr->msg.ack.dst_wmd.handle_cookie,
1513 hdr->msg.ack.dst_wmd.handle_idx,
1514 NTOH__u64(hdr->msg.ack.match_bits),
1515 NTOH__u32(hdr->msg.ack.length));
1518 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1519 " len %u off %u data "LPX64"\n",
1520 NTOH__u32(hdr->msg.put.ptl_index),
1521 hdr->msg.put.ack_wmd.handle_cookie,
1522 hdr->msg.put.ack_wmd.handle_idx,
1523 NTOH__u64(hdr->msg.put.match_bits),
1524 NTOH__u32(hdr->msg.put.length),
1525 NTOH__u32(hdr->msg.put.offset),
1526 hdr->msg.put.hdr_data);
1529 CERROR ("GET: <>\n");
1532 CERROR ("REPLY: <>\n");
1535 CERROR ("TYPE?: <>\n");
1541 kqswnal_recvmsg (nal_cb_t *nal,
1551 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1552 char *buffer = page_address(krx->krx_kiov[0].kiov_page);
1560 kqsw_csum_t senders_csum;
1561 kqsw_csum_t payload_csum = 0;
1562 kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1563 size_t csum_len = mlen;
1566 static atomic_t csum_counter;
1567 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1569 atomic_inc (&csum_counter);
1571 memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1572 if (senders_csum != hdr_csum)
1573 kqswnal_csum_error (krx, 1);
1575 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1577 /* What was actually received must be >= payload. */
1578 LASSERT (mlen <= rlen);
1579 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1580 CERROR("Bad message size: have %d, need %d + %d\n",
1581 krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1585 /* It must be OK to kmap() if required */
1586 LASSERT (kiov == NULL || !in_interrupt ());
1587 /* Either all pages or all vaddrs */
1588 LASSERT (!(kiov != NULL && iov != NULL));
1592 page_ptr = buffer + KQSW_HDR_SIZE;
1593 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1598 /* skip complete frags */
1599 while (offset >= kiov->kiov_len) {
1600 offset -= kiov->kiov_len;
1605 iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1606 kiov->kiov_offset + offset;
1607 iov_nob = kiov->kiov_len - offset;
1609 /* skip complete frags */
1610 while (offset >= iov->iov_len) {
1611 offset -= iov->iov_len;
1616 iov_ptr = iov->iov_base + offset;
1617 iov_nob = iov->iov_len - offset;
1623 if (frag > page_nob)
1628 memcpy (iov_ptr, page_ptr, frag);
1630 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1644 LASSERT (page < krx->krx_npages);
1645 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1646 page_nob = PAGE_SIZE;
1652 else if (kiov != NULL) {
1653 kunmap (kiov->kiov_page);
1657 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1658 iov_nob = kiov->kiov_len;
1663 iov_ptr = iov->iov_base;
1664 iov_nob = iov->iov_len;
1669 kunmap (kiov->kiov_page);
1673 memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
1674 sizeof(kqsw_csum_t));
1676 if (csum_len != rlen)
1677 CERROR("Unable to checksum data in user's buffer\n");
1678 else if (senders_csum != payload_csum)
1679 kqswnal_csum_error (krx, 0);
1682 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1684 hdr_csum, payload_csum, csum_frags, csum_nob);
1686 lib_finalize(nal, private, libmsg, PTL_OK);
1692 kqswnal_recv(nal_cb_t *nal,
1701 return (kqswnal_recvmsg(nal, private, libmsg,
1703 offset, mlen, rlen));
1707 kqswnal_recv_pages (nal_cb_t *nal,
1716 return (kqswnal_recvmsg(nal, private, libmsg,
1718 offset, mlen, rlen));
1722 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1724 long pid = kernel_thread (fn, arg, 0);
1729 atomic_inc (&kqswnal_data.kqn_nthreads);
1730 atomic_inc (&kqswnal_data.kqn_nthreads_running);
1735 kqswnal_thread_fini (void)
1737 atomic_dec (&kqswnal_data.kqn_nthreads);
1741 kqswnal_scheduler (void *arg)
1745 kpr_fwd_desc_t *fwd;
1749 int shuttingdown = 0;
1752 kportal_daemonize ("kqswnal_sched");
1753 kportal_blockallsigs ();
1755 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1759 if (kqswnal_data.kqn_shuttingdown != shuttingdown) {
1761 if (kqswnal_data.kqn_shuttingdown == 2)
1764 /* During stage 1 of shutdown we are still responsive
1767 atomic_dec (&kqswnal_data.kqn_nthreads_running);
1768 shuttingdown = kqswnal_data.kqn_shuttingdown;
1773 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1775 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1776 kqswnal_rx_t, krx_list);
1777 list_del (&krx->krx_list);
1778 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1784 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1787 if (!shuttingdown &&
1788 !list_empty (&kqswnal_data.kqn_delayedtxds))
1790 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1791 kqswnal_tx_t, ktx_list);
1792 list_del_init (&ktx->ktx_delayed_list);
1793 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1796 rc = kqswnal_launch (ktx);
1797 if (rc != 0) /* failed: ktx_nid down? */
1799 CERROR("Failed delayed transmit to "LPX64
1800 ": %d\n", ktx->ktx_nid, rc);
1801 kqswnal_tx_done (ktx, rc);
1805 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1809 !list_empty (&kqswnal_data.kqn_delayedfwds))
1811 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1812 list_del (&fwd->kprfd_list);
1813 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1815 kqswnal_fwd_packet (NULL, fwd);
1818 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1821 /* nothing to do or hogging CPU */
1822 if (!did_something || counter++ == KQSW_RESCHED) {
1823 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1828 if (!did_something) {
1829 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1830 kqswnal_data.kqn_shuttingdown != shuttingdown ||
1831 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1832 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1833 !list_empty(&kqswnal_data.kqn_delayedfwds));
1835 } else if (current->need_resched)
1838 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1842 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1844 kqswnal_thread_fini ();
1848 nal_cb_t kqswnal_lib =
1850 nal_data: &kqswnal_data, /* NAL private data */
1851 cb_send: kqswnal_send,
1852 cb_send_pages: kqswnal_send_pages,
1853 cb_recv: kqswnal_recv,
1854 cb_recv_pages: kqswnal_recv_pages,
1855 cb_read: kqswnal_read,
1856 cb_write: kqswnal_write,
1857 cb_malloc: kqswnal_malloc,
1858 cb_free: kqswnal_free,
1859 cb_printf: kqswnal_printf,
1860 cb_cli: kqswnal_cli,
1861 cb_sti: kqswnal_sti,
1862 cb_callback: kqswnal_callback,
1863 cb_dist: kqswnal_dist