1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * Copyright (C) 2002, Lawrence Livermore National Labs (LLNL)
8 * W. Marcus Miller - Based on ksocknal
10 * This file is part of Portals, http://www.sf.net/projects/sandiaportals/
12 * Portals is free software; you can redistribute it and/or
13 * modify it under the terms of version 2 of the GNU General Public
14 * License as published by the Free Software Foundation.
16 * Portals is distributed in the hope that it will be useful,
17 * but WITHOUT ANY WARRANTY; without even the implied warranty of
18 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 * GNU General Public License for more details.
21 * You should have received a copy of the GNU General Public License
22 * along with Portals; if not, write to the Free Software
23 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
29 EP_STATUSBLK kqswnal_rpc_success;
30 EP_STATUSBLK kqswnal_rpc_failed;
33 * LIB functions follow
37 kqswnal_read(nal_cb_t *nal, void *private, void *dst_addr, user_ptr src_addr,
40 CDEBUG (D_NET, LPX64": reading "LPSZ" bytes from %p -> %p\n",
41 nal->ni.nid, len, src_addr, dst_addr );
42 memcpy( dst_addr, src_addr, len );
48 kqswnal_write(nal_cb_t *nal, void *private, user_ptr dst_addr, void *src_addr,
51 CDEBUG (D_NET, LPX64": writing "LPSZ" bytes from %p -> %p\n",
52 nal->ni.nid, len, src_addr, dst_addr );
53 memcpy( dst_addr, src_addr, len );
59 kqswnal_malloc(nal_cb_t *nal, size_t len)
63 PORTAL_ALLOC(buf, len);
68 kqswnal_free(nal_cb_t *nal, void *buf, size_t len)
70 PORTAL_FREE(buf, len);
74 kqswnal_printf (nal_cb_t * nal, const char *fmt, ...)
80 vsnprintf (msg, sizeof (msg), fmt, ap); /* sprint safely */
83 msg[sizeof (msg) - 1] = 0; /* ensure terminated */
85 CDEBUG (D_NET, "%s", msg);
88 #if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64))
89 # error "Can't save/restore irq contexts in different procedures"
93 kqswnal_cli(nal_cb_t *nal, unsigned long *flags)
95 kqswnal_data_t *data= nal->nal_data;
97 spin_lock_irqsave(&data->kqn_statelock, *flags);
102 kqswnal_sti(nal_cb_t *nal, unsigned long *flags)
104 kqswnal_data_t *data= nal->nal_data;
106 spin_unlock_irqrestore(&data->kqn_statelock, *flags);
110 kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev)
112 /* holding kqn_statelock */
114 if (eq->event_callback != NULL)
115 eq->event_callback(ev);
117 if (waitqueue_active(&kqswnal_data.kqn_yield_waitq))
118 wake_up_all(&kqswnal_data.kqn_yield_waitq);
122 kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist)
124 if (nid == nal->ni.nid)
125 *dist = 0; /* it's me */
126 else if (kqswnal_nid2elanid (nid) >= 0)
127 *dist = 1; /* it's my peer */
129 *dist = 2; /* via router */
134 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
139 do_gettimeofday (&now);
140 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
142 kpr_notify(&kqswnal_data.kqn_router, ktx->ktx_nid, 0, then);
146 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
152 if (ktx->ktx_nmappedpages == 0)
156 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
157 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
159 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
160 ep_dvma_unload(kqswnal_data.kqn_ep,
161 kqswnal_data.kqn_ep_tx_nmh,
164 CDEBUG (D_NET, "%p[%d] unloading pages %d for %d\n",
165 ktx, ktx->ktx_nfrag, ktx->ktx_basepage, ktx->ktx_nmappedpages);
167 LASSERT (ktx->ktx_nmappedpages <= ktx->ktx_npages);
168 LASSERT (ktx->ktx_basepage + ktx->ktx_nmappedpages <=
169 kqswnal_data.kqn_eptxdmahandle->NumDvmaPages);
171 elan3_dvma_unload(kqswnal_data.kqn_ep->DmaState,
172 kqswnal_data.kqn_eptxdmahandle,
173 ktx->ktx_basepage, ktx->ktx_nmappedpages);
175 ktx->ktx_nmappedpages = 0;
179 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob, int niov, ptl_kiov_t *kiov)
181 int nfrags = ktx->ktx_nfrag;
182 int nmapped = ktx->ktx_nmappedpages;
183 int maxmapped = ktx->ktx_npages;
184 uint32_t basepage = ktx->ktx_basepage + nmapped;
187 EP_RAILMASK railmask;
188 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
190 kqswnal_nid2elanid(ktx->ktx_nid));
193 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
196 railmask = 1 << rail;
198 LASSERT (nmapped <= maxmapped);
199 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
200 LASSERT (nfrags <= EP_MAXFRAG);
204 /* skip complete frags before 'offset' */
205 while (offset >= kiov->kiov_len) {
206 offset -= kiov->kiov_len;
213 int fraglen = kiov->kiov_len - offset;
215 /* nob exactly spans the iovs */
216 LASSERT (fraglen <= nob);
217 /* each frag fits in a page */
218 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
221 if (nmapped > maxmapped) {
222 CERROR("Can't map message in %d pages (max %d)\n",
227 if (nfrags == EP_MAXFRAG) {
228 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
233 /* XXX this is really crap, but we'll have to kmap until
234 * EKC has a page (rather than vaddr) mapping interface */
236 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
239 "%p[%d] loading %p for %d, page %d, %d total\n",
240 ktx, nfrags, ptr, fraglen, basepage, nmapped);
243 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
245 kqswnal_data.kqn_ep_tx_nmh, basepage,
246 &railmask, &ktx->ktx_frags[nfrags]);
248 if (nfrags == ktx->ktx_firsttmpfrag ||
249 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
250 &ktx->ktx_frags[nfrags - 1],
251 &ktx->ktx_frags[nfrags])) {
252 /* new frag if this is the first or can't merge */
256 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
257 kqswnal_data.kqn_eptxdmahandle,
259 basepage, &ktx->ktx_frags[nfrags].Base);
261 if (nfrags > 0 && /* previous frag mapped */
262 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
263 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
264 /* just extend previous */
265 ktx->ktx_frags[nfrags - 1].Len += fraglen;
267 ktx->ktx_frags[nfrags].Len = fraglen;
268 nfrags++; /* new frag */
272 kunmap (kiov->kiov_page);
274 /* keep in loop for failure case */
275 ktx->ktx_nmappedpages = nmapped;
283 /* iov must not run out before end of data */
284 LASSERT (nob == 0 || niov > 0);
288 ktx->ktx_nfrag = nfrags;
289 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
290 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
296 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
297 int niov, struct iovec *iov)
299 int nfrags = ktx->ktx_nfrag;
300 int nmapped = ktx->ktx_nmappedpages;
301 int maxmapped = ktx->ktx_npages;
302 uint32_t basepage = ktx->ktx_basepage + nmapped;
304 EP_RAILMASK railmask;
305 int rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
307 kqswnal_nid2elanid(ktx->ktx_nid));
310 CERROR("No rails available for "LPX64"\n", ktx->ktx_nid);
313 railmask = 1 << rail;
315 LASSERT (nmapped <= maxmapped);
316 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
317 LASSERT (nfrags <= EP_MAXFRAG);
321 /* skip complete frags before offset */
322 while (offset >= iov->iov_len) {
323 offset -= iov->iov_len;
330 int fraglen = iov->iov_len - offset;
331 long npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
333 /* nob exactly spans the iovs */
334 LASSERT (fraglen <= nob);
337 if (nmapped > maxmapped) {
338 CERROR("Can't map message in %d pages (max %d)\n",
343 if (nfrags == EP_MAXFRAG) {
344 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
350 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
351 ktx, nfrags, iov->iov_base + offset, fraglen,
352 basepage, npages, nmapped);
355 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
356 iov->iov_base + offset, fraglen,
357 kqswnal_data.kqn_ep_tx_nmh, basepage,
358 &railmask, &ktx->ktx_frags[nfrags]);
360 if (nfrags == ktx->ktx_firsttmpfrag ||
361 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
362 &ktx->ktx_frags[nfrags - 1],
363 &ktx->ktx_frags[nfrags])) {
364 /* new frag if this is the first or can't merge */
368 elan3_dvma_kaddr_load (kqswnal_data.kqn_ep->DmaState,
369 kqswnal_data.kqn_eptxdmahandle,
370 iov->iov_base + offset, fraglen,
371 basepage, &ktx->ktx_frags[nfrags].Base);
373 if (nfrags > 0 && /* previous frag mapped */
374 ktx->ktx_frags[nfrags].Base == /* contiguous with this one */
375 (ktx->ktx_frags[nfrags-1].Base + ktx->ktx_frags[nfrags-1].Len))
376 /* just extend previous */
377 ktx->ktx_frags[nfrags - 1].Len += fraglen;
379 ktx->ktx_frags[nfrags].Len = fraglen;
380 nfrags++; /* new frag */
384 /* keep in loop for failure case */
385 ktx->ktx_nmappedpages = nmapped;
393 /* iov must not run out before end of data */
394 LASSERT (nob == 0 || niov > 0);
398 ktx->ktx_nfrag = nfrags;
399 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
400 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
407 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
409 kpr_fwd_desc_t *fwd = NULL;
412 kqswnal_unmap_tx (ktx); /* release temporary mappings */
413 ktx->ktx_state = KTX_IDLE;
415 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
417 list_del (&ktx->ktx_list); /* take off active list */
419 if (ktx->ktx_isnblk) {
420 /* reserved for non-blocking tx */
421 list_add (&ktx->ktx_list, &kqswnal_data.kqn_nblk_idletxds);
422 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
426 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
428 /* anything blocking for a tx descriptor? */
429 if (!kqswnal_data.kqn_shuttingdown &&
430 !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */
432 CDEBUG(D_NET,"wakeup fwd\n");
434 fwd = list_entry (kqswnal_data.kqn_idletxd_fwdq.next,
435 kpr_fwd_desc_t, kprfd_list);
436 list_del (&fwd->kprfd_list);
439 wake_up (&kqswnal_data.kqn_idletxd_waitq);
441 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
446 /* schedule packet for forwarding again */
447 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
449 list_add_tail (&fwd->kprfd_list, &kqswnal_data.kqn_delayedfwds);
450 wake_up (&kqswnal_data.kqn_sched_waitq);
452 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
456 kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block)
459 kqswnal_tx_t *ktx = NULL;
462 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
464 if (kqswnal_data.kqn_shuttingdown)
467 /* "normal" descriptor is free */
468 if (!list_empty (&kqswnal_data.kqn_idletxds)) {
469 ktx = list_entry (kqswnal_data.kqn_idletxds.next,
470 kqswnal_tx_t, ktx_list);
474 if (fwd != NULL) /* forwarded packet? */
477 /* doing a local transmit */
479 if (list_empty (&kqswnal_data.kqn_nblk_idletxds)) {
480 CERROR ("intr tx desc pool exhausted\n");
484 ktx = list_entry (kqswnal_data.kqn_nblk_idletxds.next,
485 kqswnal_tx_t, ktx_list);
489 /* block for idle tx */
491 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
493 CDEBUG (D_NET, "blocking for tx desc\n");
494 wait_event (kqswnal_data.kqn_idletxd_waitq,
495 !list_empty (&kqswnal_data.kqn_idletxds) ||
496 kqswnal_data.kqn_shuttingdown);
500 list_del (&ktx->ktx_list);
501 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
502 ktx->ktx_launcher = current->pid;
503 atomic_inc(&kqswnal_data.kqn_pending_txs);
504 } else if (fwd != NULL) {
505 /* queue forwarded packet until idle txd available */
506 CDEBUG (D_NET, "blocked fwd [%p]\n", fwd);
507 list_add_tail (&fwd->kprfd_list,
508 &kqswnal_data.kqn_idletxd_fwdq);
511 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
513 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
514 LASSERT (ktx == NULL || ktx->ktx_nmappedpages == 0);
520 kqswnal_tx_done (kqswnal_tx_t *ktx, int error)
523 lib_msg_t *repmsg = NULL;
525 switch (ktx->ktx_state) {
526 case KTX_FORWARDING: /* router asked me to forward this packet */
527 kpr_fwd_done (&kqswnal_data.kqn_router,
528 (kpr_fwd_desc_t *)ktx->ktx_args[0], error);
531 case KTX_SENDING: /* packet sourced locally */
532 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
533 (lib_msg_t *)ktx->ktx_args[1],
534 (error == 0) ? PTL_OK :
535 (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
538 case KTX_GETTING: /* Peer has DMA-ed direct? */
539 msg = (lib_msg_t *)ktx->ktx_args[1];
542 repmsg = lib_create_reply_msg (&kqswnal_lib,
549 lib_finalize (&kqswnal_lib, ktx->ktx_args[0],
551 lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK);
553 lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg,
554 (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL);
562 kqswnal_put_idle_tx (ktx);
566 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
568 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
570 LASSERT (txd != NULL);
571 LASSERT (ktx != NULL);
573 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
575 if (status != EP_SUCCESS) {
577 CERROR ("Tx completion to "LPX64" failed: %d\n",
578 ktx->ktx_nid, status);
580 kqswnal_notify_peer_down(ktx);
583 } else if (ktx->ktx_state == KTX_GETTING) {
584 /* RPC completed OK; what did our peer put in the status
587 status = ep_txd_statusblk(txd)->Data[0];
589 status = ep_txd_statusblk(txd)->Status;
595 kqswnal_tx_done (ktx, status);
599 kqswnal_launch (kqswnal_tx_t *ktx)
601 /* Don't block for transmit descriptor if we're in interrupt context */
602 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
603 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
607 ktx->ktx_launchtime = jiffies;
609 if (kqswnal_data.kqn_shuttingdown)
612 LASSERT (dest >= 0); /* must be a peer */
613 if (ktx->ktx_state == KTX_GETTING) {
614 /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The
615 * other frags are the GET sink which we obviously don't
618 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
620 kqswnal_txhandler, ktx,
621 NULL, ktx->ktx_frags, 1);
623 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
624 ktx->ktx_port, attr, kqswnal_txhandler,
625 ktx, NULL, ktx->ktx_frags, 1);
629 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
631 kqswnal_txhandler, ktx,
632 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
634 rc = ep_transmit_large(kqswnal_data.kqn_eptx, dest,
636 kqswnal_txhandler, ktx,
637 ktx->ktx_frags, ktx->ktx_nfrag);
642 case EP_SUCCESS: /* success */
645 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
646 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
648 list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds);
649 wake_up (&kqswnal_data.kqn_sched_waitq);
651 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
654 default: /* fatal error */
655 CERROR ("Tx to "LPX64" failed: %d\n", ktx->ktx_nid, rc);
656 kqswnal_notify_peer_down(ktx);
657 return (-EHOSTUNREACH);
662 hdr_type_string (ptl_hdr_t *hdr)
674 return ("<UNKNOWN>");
679 kqswnal_cerror_hdr(ptl_hdr_t * hdr)
681 char *type_str = hdr_type_string (hdr);
683 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
684 NTOH__u32(hdr->payload_length));
685 CERROR(" From nid/pid "LPU64"/%u\n", NTOH__u64(hdr->src_nid),
686 NTOH__u32(hdr->src_pid));
687 CERROR(" To nid/pid "LPU64"/%u\n", NTOH__u64(hdr->dest_nid),
688 NTOH__u32(hdr->dest_pid));
690 switch (NTOH__u32(hdr->type)) {
692 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
693 "match bits "LPX64"\n",
694 NTOH__u32 (hdr->msg.put.ptl_index),
695 hdr->msg.put.ack_wmd.wh_interface_cookie,
696 hdr->msg.put.ack_wmd.wh_object_cookie,
697 NTOH__u64 (hdr->msg.put.match_bits));
698 CERROR(" offset %d, hdr data "LPX64"\n",
699 NTOH__u32(hdr->msg.put.offset),
700 hdr->msg.put.hdr_data);
704 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
705 "match bits "LPX64"\n",
706 NTOH__u32 (hdr->msg.get.ptl_index),
707 hdr->msg.get.return_wmd.wh_interface_cookie,
708 hdr->msg.get.return_wmd.wh_object_cookie,
709 hdr->msg.get.match_bits);
710 CERROR(" Length %d, src offset %d\n",
711 NTOH__u32 (hdr->msg.get.sink_length),
712 NTOH__u32 (hdr->msg.get.src_offset));
716 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
717 hdr->msg.ack.dst_wmd.wh_interface_cookie,
718 hdr->msg.ack.dst_wmd.wh_object_cookie,
719 NTOH__u32 (hdr->msg.ack.mlength));
723 CERROR(" dst md "LPX64"."LPX64"\n",
724 hdr->msg.reply.dst_wmd.wh_interface_cookie,
725 hdr->msg.reply.dst_wmd.wh_object_cookie);
728 } /* end of print_hdr() */
732 kqswnal_print_eiov (int how, char *str, int n, EP_IOVEC *iov)
736 CDEBUG (how, "%s: %d\n", str, n);
737 for (i = 0; i < n; i++) {
738 CDEBUG (how, " %08x for %d\n", iov[i].Base, iov[i].Len);
743 kqswnal_eiovs2datav (int ndv, EP_DATAVEC *dv,
744 int nsrc, EP_IOVEC *src,
745 int ndst, EP_IOVEC *dst)
754 for (count = 0; count < ndv; count++, dv++) {
756 if (nsrc == 0 || ndst == 0) {
758 /* For now I'll barf on any left over entries */
759 CERROR ("mismatched src and dst iovs\n");
765 nob = (src->Len < dst->Len) ? src->Len : dst->Len;
767 dv->Source = src->Base;
768 dv->Dest = dst->Base;
770 if (nob >= src->Len) {
778 if (nob >= dst->Len) {
787 CERROR ("DATAVEC too small\n");
793 kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag,
794 struct iovec *iov, ptl_kiov_t *kiov,
797 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
798 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
799 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE);
804 EP_DATAVEC datav[EP_MAXFRAG];
807 LASSERT (krx->krx_rpc_reply_needed);
808 LASSERT ((iov == NULL) != (kiov == NULL));
810 /* see kqswnal_sendmsg comment regarding endian-ness */
811 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
812 /* msg too small to discover rmd size */
813 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
814 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
818 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
819 /* rmd doesn't fit in the incoming message */
820 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
821 krx->krx_nob, rmd->kqrmd_nfrag,
822 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
826 /* Map the source data... */
827 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
829 rc = kqswnal_map_tx_kiov (ktx, offset, nob, nfrag, kiov);
831 rc = kqswnal_map_tx_iov (ktx, offset, nob, nfrag, iov);
834 CERROR ("Can't map source data: %d\n", rc);
839 if (ktx->ktx_nfrag != rmd->kqrmd_nfrag) {
840 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
841 ktx->ktx_nfrag, rmd->kqrmd_nfrag);
845 for (i = 0; i < rmd->kqrmd_nfrag; i++)
846 if (ktx->ktx_frags[i].nmd_len != rmd->kqrmd_frag[i].nmd_len) {
847 CERROR("Can't cope with unequal frags %d(%d):"
848 " %d local %d remote\n",
850 ktx->ktx_frags[i].nmd_len,
851 rmd->kqrmd_frag[i].nmd_len);
855 ndatav = kqswnal_eiovs2datav (EP_MAXFRAG, datav,
856 ktx->ktx_nfrag, ktx->ktx_frags,
857 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
859 CERROR ("Can't create datavec: %d\n", ndatav);
864 /* Our caller will start to race with kqswnal_dma_reply_complete... */
865 LASSERT (atomic_read (&krx->krx_refcount) == 1);
866 atomic_set (&krx->krx_refcount, 2);
869 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
870 &kqswnal_rpc_success,
871 ktx->ktx_frags, rmd->kqrmd_frag, rmd->kqrmd_nfrag);
872 if (rc == EP_SUCCESS)
875 /* Well we tried... */
876 krx->krx_rpc_reply_needed = 0;
878 rc = ep_complete_rpc (krx->krx_rxd, kqswnal_dma_reply_complete, ktx,
879 &kqswnal_rpc_success, datav, ndatav);
880 if (rc == EP_SUCCESS)
883 /* "old" EKC destroys rxd on failed completion */
887 CERROR("can't complete RPC: %d\n", rc);
889 /* reset refcount back to 1: we're not going to be racing with
890 * kqswnal_dma_reply_complete. */
891 atomic_set (&krx->krx_refcount, 1);
893 return (-ECONNABORTED);
897 kqswnal_sendmsg (nal_cb_t *nal,
904 unsigned int payload_niov,
905 struct iovec *payload_iov,
906 ptl_kiov_t *payload_kiov,
907 size_t payload_offset,
920 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid: "LPX64
921 " pid %u\n", payload_nob, payload_niov, nid, pid);
923 LASSERT (payload_nob == 0 || payload_niov > 0);
924 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
926 /* It must be OK to kmap() if required */
927 LASSERT (payload_kiov == NULL || !in_interrupt ());
928 /* payload is either all vaddrs or all pages */
929 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
931 if (payload_nob > KQSW_MAXPAYLOAD) {
932 CERROR ("request exceeds MTU size "LPSZ" (max %u).\n",
933 payload_nob, KQSW_MAXPAYLOAD);
938 if (kqswnal_nid2elanid (nid) < 0) { /* Can't send direct: find gateway? */
939 rc = kpr_lookup (&kqswnal_data.kqn_router, nid,
940 sizeof (ptl_hdr_t) + payload_nob, &targetnid);
942 CERROR("Can't route to "LPX64": router error %d\n",
946 if (kqswnal_nid2elanid (targetnid) < 0) {
947 CERROR("Bad gateway "LPX64" for "LPX64"\n",
953 /* I may not block for a transmit descriptor if I might block the
954 * receiver, or an interrupt handler. */
955 ktx = kqswnal_get_idle_tx(NULL, !(type == PTL_MSG_ACK ||
956 type == PTL_MSG_REPLY ||
959 kqswnal_cerror_hdr (hdr);
960 return (PTL_NO_SPACE);
963 ktx->ktx_nid = targetnid;
964 ktx->ktx_args[0] = private;
965 ktx->ktx_args[1] = libmsg;
967 if (type == PTL_MSG_REPLY &&
968 ((kqswnal_rx_t *)private)->krx_rpc_reply_needed) {
969 if (nid != targetnid ||
970 kqswnal_nid2elanid(nid) !=
971 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)) {
972 CERROR("Optimized reply nid conflict: "
973 "nid "LPX64" via "LPX64" elanID %d\n",
975 ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd));
980 /* peer expects RPC completion with GET data */
981 rc = kqswnal_dma_reply (ktx, payload_niov,
982 payload_iov, payload_kiov,
983 payload_offset, payload_nob);
985 CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc);
989 memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */
990 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
993 csum = kqsw_csum (0, (char *)hdr, sizeof (*hdr));
994 memcpy (ktx->ktx_buffer + sizeof (*hdr), &csum, sizeof (csum));
995 for (csum = 0, i = 0, sumoff = payload_offset, sumnob = payload_nob; sumnob > 0; i++) {
997 if (payload_kiov != NULL) {
998 ptl_kiov_t *kiov = &payload_kiov[i];
1000 if (sumoff >= kiov->kiov_len) {
1001 sumoff -= kiov->kiov_len;
1003 char *addr = ((char *)kmap (kiov->kiov_page)) +
1004 kiov->kiov_offset + sumoff;
1005 int fragnob = kiov->kiov_len - sumoff;
1007 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1010 kunmap(kiov->kiov_page);
1013 struct iovec *iov = &payload_iov[i];
1015 if (sumoff > iov->iov_len) {
1016 sumoff -= iov->iov_len;
1018 char *addr = iov->iov_base + sumoff;
1019 int fragnob = iov->iov_len - sumoff;
1021 csum = kqsw_csum(csum, addr, MIN(sumnob, fragnob));
1027 memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum));
1030 if (kqswnal_data.kqn_optimized_gets &&
1031 type == PTL_MSG_GET && /* doing a GET */
1032 nid == targetnid) { /* not forwarding */
1033 lib_md_t *md = libmsg->md;
1034 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(ktx->ktx_buffer + KQSW_HDR_SIZE);
1036 /* Optimised path: I send over the Elan vaddrs of the get
1037 * sink buffers, and my peer DMAs directly into them.
1039 * First I set up ktx as if it was going to send this
1040 * payload, (it needs to map it anyway). This fills
1041 * ktx_frags[1] and onward with the network addresses
1042 * of the GET sink frags. I copy these into ktx_buffer,
1043 * immediately after the header, and send that as my GET
1046 * Note that the addresses are sent in native endian-ness.
1047 * When EKC copes with different endian nodes, I'll fix
1048 * this (and eat my hat :) */
1050 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1051 ktx->ktx_state = KTX_GETTING;
1053 if ((libmsg->md->options & PTL_MD_KIOV) != 0)
1054 rc = kqswnal_map_tx_kiov (ktx, 0, md->length,
1055 md->md_niov, md->md_iov.kiov);
1057 rc = kqswnal_map_tx_iov (ktx, 0, md->length,
1058 md->md_niov, md->md_iov.iov);
1062 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1064 payload_nob = offsetof(kqswnal_remotemd_t,
1065 kqrmd_frag[rmd->kqrmd_nfrag]);
1066 LASSERT (KQSW_HDR_SIZE + payload_nob <= KQSW_TX_BUFFER_SIZE);
1069 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1070 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1072 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1073 0, KQSW_HDR_SIZE + payload_nob);
1075 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1076 rmd->kqrmd_nfrag * sizeof(EP_IOVEC));
1078 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1079 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1081 } else if (payload_nob <= KQSW_TX_MAXCONTIG) {
1083 /* small message: single frag copied into the pre-mapped buffer */
1085 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1086 ktx->ktx_state = KTX_SENDING;
1088 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1089 0, KQSW_HDR_SIZE + payload_nob);
1091 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1092 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + payload_nob;
1094 if (payload_nob > 0) {
1095 if (payload_kiov != NULL)
1096 lib_copy_kiov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1097 payload_niov, payload_kiov,
1098 payload_offset, payload_nob);
1100 lib_copy_iov2buf (ktx->ktx_buffer + KQSW_HDR_SIZE,
1101 payload_niov, payload_iov,
1102 payload_offset, payload_nob);
1106 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1108 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1109 ktx->ktx_state = KTX_SENDING;
1111 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1114 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1115 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1117 if (payload_kiov != NULL)
1118 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1119 payload_niov, payload_kiov);
1121 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1122 payload_niov, payload_iov);
1127 ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ?
1128 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1130 rc = kqswnal_launch (ktx);
1133 CDEBUG(rc == 0 ? D_NET : D_ERROR,
1134 "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n",
1135 rc == 0 ? "Sent" : "Failed to send",
1136 payload_nob, nid, targetnid, rc);
1139 kqswnal_put_idle_tx (ktx);
1141 atomic_dec(&kqswnal_data.kqn_pending_txs);
1142 return (rc == 0 ? PTL_OK : PTL_FAIL);
1146 kqswnal_send (nal_cb_t *nal,
1153 unsigned int payload_niov,
1154 struct iovec *payload_iov,
1155 size_t payload_offset,
1158 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1159 payload_niov, payload_iov, NULL,
1160 payload_offset, payload_nob));
1164 kqswnal_send_pages (nal_cb_t *nal,
1171 unsigned int payload_niov,
1172 ptl_kiov_t *payload_kiov,
1173 size_t payload_offset,
1176 return (kqswnal_sendmsg (nal, private, libmsg, hdr, type, nid, pid,
1177 payload_niov, NULL, payload_kiov,
1178 payload_offset, payload_nob));
1182 kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd)
1186 ptl_kiov_t *kiov = fwd->kprfd_kiov;
1187 int niov = fwd->kprfd_niov;
1188 int nob = fwd->kprfd_nob;
1189 ptl_nid_t nid = fwd->kprfd_gateway_nid;
1192 CERROR ("checksums for forwarded packets not implemented\n");
1195 /* The router wants this NAL to forward a packet */
1196 CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n",
1197 fwd, nid, niov, nob);
1199 ktx = kqswnal_get_idle_tx (fwd, 0);
1200 if (ktx == NULL) /* can't get txd right now */
1201 return; /* fwd will be scheduled when tx desc freed */
1203 if (nid == kqswnal_lib.ni.nid) /* gateway is me */
1204 nid = fwd->kprfd_target_nid; /* target is final dest */
1206 if (kqswnal_nid2elanid (nid) < 0) {
1207 CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid);
1212 /* copy hdr into pre-mapped buffer */
1213 memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t));
1214 ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer;
1216 ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ?
1217 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1219 ktx->ktx_state = KTX_FORWARDING;
1220 ktx->ktx_args[0] = fwd;
1221 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1223 if (nob <= KQSW_TX_MAXCONTIG)
1225 /* send payload from ktx's pre-mapped contiguous buffer */
1227 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1228 0, KQSW_HDR_SIZE + nob);
1230 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1231 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob;
1234 lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE,
1235 niov, kiov, 0, nob);
1239 /* zero copy payload */
1241 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer,
1244 ktx->ktx_frags[0].Base = ktx->ktx_ebuffer;
1245 ktx->ktx_frags[0].Len = KQSW_HDR_SIZE;
1247 rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov);
1252 rc = kqswnal_launch (ktx);
1255 CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc);
1257 kqswnal_put_idle_tx (ktx);
1258 /* complete now (with failure) */
1259 kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc);
1262 atomic_dec(&kqswnal_data.kqn_pending_txs);
1266 kqswnal_fwd_callback (void *arg, int error)
1268 kqswnal_rx_t *krx = (kqswnal_rx_t *)arg;
1270 /* The router has finished forwarding this packet */
1274 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1276 CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n",
1277 NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error);
1280 kqswnal_requeue_rx (krx);
1284 kqswnal_dma_reply_complete (EP_RXD *rxd)
1286 int status = ep_rxd_status(rxd);
1287 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
1288 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
1289 lib_msg_t *msg = (lib_msg_t *)ktx->ktx_args[1];
1291 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1292 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
1294 LASSERT (krx->krx_rxd == rxd);
1295 LASSERT (krx->krx_rpc_reply_needed);
1297 krx->krx_rpc_reply_needed = 0;
1298 kqswnal_rx_done (krx);
1300 lib_finalize (&kqswnal_lib, NULL, msg,
1301 (status == EP_SUCCESS) ? PTL_OK : PTL_FAIL);
1302 kqswnal_put_idle_tx (ktx);
1306 kqswnal_rpc_complete (EP_RXD *rxd)
1308 int status = ep_rxd_status(rxd);
1309 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1311 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1312 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1314 LASSERT (krx->krx_rxd == rxd);
1315 LASSERT (krx->krx_rpc_reply_needed);
1317 krx->krx_rpc_reply_needed = 0;
1318 kqswnal_requeue_rx (krx);
1322 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1326 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1328 if (krx->krx_rpc_reply_needed) {
1330 /* We failed to complete the peer's optimized GET (e.g. we
1331 * couldn't map the source buffers). We complete the
1332 * peer's EKC rpc now with failure. */
1334 rc = ep_complete_rpc(krx->krx_rxd, kqswnal_rpc_complete, krx,
1335 &kqswnal_rpc_failed, NULL, NULL, 0);
1336 if (rc == EP_SUCCESS)
1339 CERROR("can't complete RPC: %d\n", rc);
1341 if (krx->krx_rxd != NULL) {
1342 /* We didn't try (and fail) to complete earlier... */
1343 rc = ep_complete_rpc(krx->krx_rxd,
1344 kqswnal_rpc_complete, krx,
1345 &kqswnal_rpc_failed, NULL, 0);
1346 if (rc == EP_SUCCESS)
1349 CERROR("can't complete RPC: %d\n", rc);
1352 /* NB the old ep_complete_rpc() frees rxd on failure, so we
1353 * have to requeue from scratch here, unless we're shutting
1355 if (kqswnal_data.kqn_shuttingdown)
1358 rc = ep_queue_receive(krx->krx_eprx, kqswnal_rxhandler, krx,
1359 krx->krx_elanbuffer,
1360 krx->krx_npages * PAGE_SIZE, 0);
1361 LASSERT (rc == EP_SUCCESS);
1362 /* We don't handle failure here; it's incredibly rare
1363 * (never reported?) and only happens with "old" EKC */
1369 if (kqswnal_data.kqn_shuttingdown) {
1370 /* free EKC rxd on shutdown */
1371 ep_complete_receive(krx->krx_rxd);
1373 /* repost receive */
1374 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1375 &krx->krx_elanbuffer, 0);
1378 /* don't actually requeue on shutdown */
1379 if (!kqswnal_data.kqn_shuttingdown)
1380 ep_requeue_receive(krx->krx_rxd, kqswnal_rxhandler, krx,
1381 krx->krx_elanbuffer, krx->krx_npages * PAGE_SIZE);
1386 kqswnal_rx (kqswnal_rx_t *krx)
1388 ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page);
1389 ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid);
1394 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1396 if (dest_nid == kqswnal_lib.ni.nid) { /* It's for me :) */
1397 atomic_set(&krx->krx_refcount, 1);
1398 lib_parse (&kqswnal_lib, hdr, krx);
1399 kqswnal_rx_done(krx);
1404 CERROR ("checksums for forwarded packets not implemented\n");
1407 if (kqswnal_nid2elanid (dest_nid) >= 0) /* should have gone direct to peer */
1409 CERROR("dropping packet from "LPX64" for "LPX64
1410 ": target is peer\n", NTOH__u64(hdr->src_nid), dest_nid);
1412 kqswnal_requeue_rx (krx);
1416 nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE;
1419 krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE;
1420 krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob);
1422 nob -= PAGE_SIZE - KQSW_HDR_SIZE;
1425 LASSERT (niov < krx->krx_npages);
1427 krx->krx_kiov[niov].kiov_offset = 0;
1428 krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob);
1434 kpr_fwd_init (&krx->krx_fwd, dest_nid,
1435 hdr, payload_nob, niov, krx->krx_kiov,
1436 kqswnal_fwd_callback, krx);
1438 kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd);
1441 /* Receive Interrupt Handler: posts to schedulers */
1443 kqswnal_rxhandler(EP_RXD *rxd)
1446 int nob = ep_rxd_len (rxd);
1447 int status = ep_rxd_status (rxd);
1448 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1450 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1451 rxd, krx, nob, status);
1453 LASSERT (krx != NULL);
1458 krx->krx_rpc_reply_needed = (status != EP_SHUTDOWN) && ep_rxd_isrpc(rxd);
1460 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd);
1463 /* must receive a whole header to be able to parse */
1464 if (status != EP_SUCCESS || nob < sizeof (ptl_hdr_t))
1466 /* receives complete with failure when receiver is removed */
1468 if (status == EP_SHUTDOWN)
1469 LASSERT (kqswnal_data.kqn_shuttingdown);
1471 CERROR("receive status failed with status %d nob %d\n",
1472 ep_rxd_status(rxd), nob);
1474 if (!kqswnal_data.kqn_shuttingdown)
1475 CERROR("receive status failed with status %d nob %d\n",
1476 ep_rxd_status(rxd), nob);
1478 kqswnal_requeue_rx (krx);
1482 if (!in_interrupt()) {
1487 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1489 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1490 wake_up (&kqswnal_data.kqn_sched_waitq);
1492 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1497 kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr)
1499 ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page);
1501 CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64
1502 ", dpid %d, spid %d, type %d\n",
1503 ishdr ? "Header" : "Payload", krx,
1504 NTOH__u64(hdr->dest_nid), NTOH__u64(hdr->src_nid)
1505 NTOH__u32(hdr->dest_pid), NTOH__u32(hdr->src_pid),
1506 NTOH__u32(hdr->type));
1508 switch (NTOH__u32 (hdr->type))
1511 CERROR("ACK: mlen %d dmd "LPX64"."LPX64" match "LPX64
1513 NTOH__u32(hdr->msg.ack.mlength),
1514 hdr->msg.ack.dst_wmd.handle_cookie,
1515 hdr->msg.ack.dst_wmd.handle_idx,
1516 NTOH__u64(hdr->msg.ack.match_bits),
1517 NTOH__u32(hdr->msg.ack.length));
1520 CERROR("PUT: ptl %d amd "LPX64"."LPX64" match "LPX64
1521 " len %u off %u data "LPX64"\n",
1522 NTOH__u32(hdr->msg.put.ptl_index),
1523 hdr->msg.put.ack_wmd.handle_cookie,
1524 hdr->msg.put.ack_wmd.handle_idx,
1525 NTOH__u64(hdr->msg.put.match_bits),
1526 NTOH__u32(hdr->msg.put.length),
1527 NTOH__u32(hdr->msg.put.offset),
1528 hdr->msg.put.hdr_data);
1531 CERROR ("GET: <>\n");
1534 CERROR ("REPLY: <>\n");
1537 CERROR ("TYPE?: <>\n");
1543 kqswnal_recvmsg (nal_cb_t *nal,
1553 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1554 char *buffer = page_address(krx->krx_kiov[0].kiov_page);
1562 kqsw_csum_t senders_csum;
1563 kqsw_csum_t payload_csum = 0;
1564 kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t));
1565 size_t csum_len = mlen;
1568 static atomic_t csum_counter;
1569 int csum_verbose = (atomic_read(&csum_counter)%1000001) == 0;
1571 atomic_inc (&csum_counter);
1573 memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t));
1574 if (senders_csum != hdr_csum)
1575 kqswnal_csum_error (krx, 1);
1577 CDEBUG(D_NET,"kqswnal_recv, mlen="LPSZ", rlen="LPSZ"\n", mlen, rlen);
1579 /* What was actually received must be >= payload. */
1580 LASSERT (mlen <= rlen);
1581 if (krx->krx_nob < KQSW_HDR_SIZE + mlen) {
1582 CERROR("Bad message size: have %d, need %d + %d\n",
1583 krx->krx_nob, (int)KQSW_HDR_SIZE, (int)mlen);
1587 /* It must be OK to kmap() if required */
1588 LASSERT (kiov == NULL || !in_interrupt ());
1589 /* Either all pages or all vaddrs */
1590 LASSERT (!(kiov != NULL && iov != NULL));
1594 page_ptr = buffer + KQSW_HDR_SIZE;
1595 page_nob = PAGE_SIZE - KQSW_HDR_SIZE;
1600 /* skip complete frags */
1601 while (offset >= kiov->kiov_len) {
1602 offset -= kiov->kiov_len;
1607 iov_ptr = ((char *)kmap (kiov->kiov_page)) +
1608 kiov->kiov_offset + offset;
1609 iov_nob = kiov->kiov_len - offset;
1611 /* skip complete frags */
1612 while (offset >= iov->iov_len) {
1613 offset -= iov->iov_len;
1618 iov_ptr = iov->iov_base + offset;
1619 iov_nob = iov->iov_len - offset;
1625 if (frag > page_nob)
1630 memcpy (iov_ptr, page_ptr, frag);
1632 payload_csum = kqsw_csum (payload_csum, iov_ptr, frag);
1646 LASSERT (page < krx->krx_npages);
1647 page_ptr = page_address(krx->krx_kiov[page].kiov_page);
1648 page_nob = PAGE_SIZE;
1654 else if (kiov != NULL) {
1655 kunmap (kiov->kiov_page);
1659 iov_ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset;
1660 iov_nob = kiov->kiov_len;
1665 iov_ptr = iov->iov_base;
1666 iov_nob = iov->iov_len;
1671 kunmap (kiov->kiov_page);
1675 memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t),
1676 sizeof(kqsw_csum_t));
1678 if (csum_len != rlen)
1679 CERROR("Unable to checksum data in user's buffer\n");
1680 else if (senders_csum != payload_csum)
1681 kqswnal_csum_error (krx, 0);
1684 CERROR("hdr csum %lx, payload_csum %lx, csum_frags %d, "
1686 hdr_csum, payload_csum, csum_frags, csum_nob);
1688 lib_finalize(nal, private, libmsg, PTL_OK);
1694 kqswnal_recv(nal_cb_t *nal,
1703 return (kqswnal_recvmsg(nal, private, libmsg,
1705 offset, mlen, rlen));
1709 kqswnal_recv_pages (nal_cb_t *nal,
1718 return (kqswnal_recvmsg(nal, private, libmsg,
1720 offset, mlen, rlen));
1724 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1726 long pid = kernel_thread (fn, arg, 0);
1731 atomic_inc (&kqswnal_data.kqn_nthreads);
1736 kqswnal_thread_fini (void)
1738 atomic_dec (&kqswnal_data.kqn_nthreads);
1742 kqswnal_scheduler (void *arg)
1746 kpr_fwd_desc_t *fwd;
1752 kportal_daemonize ("kqswnal_sched");
1753 kportal_blockallsigs ();
1755 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1761 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1763 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1764 kqswnal_rx_t, krx_list);
1765 list_del (&krx->krx_list);
1766 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1772 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1775 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1777 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1778 kqswnal_tx_t, ktx_list);
1779 list_del_init (&ktx->ktx_delayed_list);
1780 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1783 rc = kqswnal_launch (ktx);
1785 CERROR("Failed delayed transmit to "LPX64
1786 ": %d\n", ktx->ktx_nid, rc);
1787 kqswnal_tx_done (ktx, rc);
1789 atomic_dec (&kqswnal_data.kqn_pending_txs);
1792 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1795 if (!list_empty (&kqswnal_data.kqn_delayedfwds))
1797 fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list);
1798 list_del (&fwd->kprfd_list);
1799 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1801 /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */
1802 kqswnal_fwd_packet (NULL, fwd);
1805 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1808 /* nothing to do or hogging CPU */
1809 if (!did_something || counter++ == KQSW_RESCHED) {
1810 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1815 if (!did_something) {
1816 if (kqswnal_data.kqn_shuttingdown == 2) {
1817 /* We only exit in stage 2 of shutdown when
1818 * there's nothing left to do */
1821 rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq,
1822 kqswnal_data.kqn_shuttingdown == 2 ||
1823 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1824 !list_empty(&kqswnal_data.kqn_delayedtxds) ||
1825 !list_empty(&kqswnal_data.kqn_delayedfwds));
1827 } else if (current->need_resched)
1830 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1834 kqswnal_thread_fini ();
1838 nal_cb_t kqswnal_lib =
1840 nal_data: &kqswnal_data, /* NAL private data */
1841 cb_send: kqswnal_send,
1842 cb_send_pages: kqswnal_send_pages,
1843 cb_recv: kqswnal_recv,
1844 cb_recv_pages: kqswnal_recv_pages,
1845 cb_read: kqswnal_read,
1846 cb_write: kqswnal_write,
1847 cb_malloc: kqswnal_malloc,
1848 cb_free: kqswnal_free,
1849 cb_printf: kqswnal_printf,
1850 cb_cli: kqswnal_cli,
1851 cb_sti: kqswnal_sti,
1852 cb_callback: kqswnal_callback,
1853 cb_dist: kqswnal_dist