1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2002 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Portals, http://www.lustre.org
9 * Portals is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Portals is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Portals; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
32 do_gettimeofday (&now);
33 then = now.tv_sec - (jiffies - ktx->ktx_launchtime)/HZ;
35 lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
43 ktx->ktx_rail = -1; /* unset rail */
45 if (ktx->ktx_nmappedpages == 0)
48 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
51 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52 ep_dvma_unload(kqswnal_data.kqn_ep,
53 kqswnal_data.kqn_ep_tx_nmh,
56 ktx->ktx_nmappedpages = 0;
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob,
61 unsigned int niov, lnet_kiov_t *kiov)
63 int nfrags = ktx->ktx_nfrag;
64 int nmapped = ktx->ktx_nmappedpages;
65 int maxmapped = ktx->ktx_npages;
66 uint32_t basepage = ktx->ktx_basepage + nmapped;
72 if (ktx->ktx_rail < 0)
73 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
75 kqswnal_nid2elanid(ktx->ktx_nid));
78 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
83 LASSERT (nmapped <= maxmapped);
84 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85 LASSERT (nfrags <= EP_MAXFRAG);
89 /* skip complete frags before 'offset' */
90 while (offset >= kiov->kiov_len) {
91 offset -= kiov->kiov_len;
98 int fraglen = kiov->kiov_len - offset;
100 /* each page frag is contained in one page */
101 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
107 if (nmapped > maxmapped) {
108 CERROR("Can't map message in %d pages (max %d)\n",
113 if (nfrags == EP_MAXFRAG) {
114 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
119 /* XXX this is really crap, but we'll have to kmap until
120 * EKC has a page (rather than vaddr) mapping interface */
122 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
125 "%p[%d] loading %p for %d, page %d, %d total\n",
126 ktx, nfrags, ptr, fraglen, basepage, nmapped);
128 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
130 kqswnal_data.kqn_ep_tx_nmh, basepage,
131 &railmask, &ktx->ktx_frags[nfrags]);
133 if (nfrags == ktx->ktx_firsttmpfrag ||
134 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135 &ktx->ktx_frags[nfrags - 1],
136 &ktx->ktx_frags[nfrags])) {
137 /* new frag if this is the first or can't merge */
141 kunmap (kiov->kiov_page);
143 /* keep in loop for failure case */
144 ktx->ktx_nmappedpages = nmapped;
152 /* iov must not run out before end of data */
153 LASSERT (nob == 0 || niov > 0);
157 ktx->ktx_nfrag = nfrags;
158 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob,
167 unsigned int niov, lnet_kiov_t *kiov)
177 /* skip complete frags before 'offset' */
178 while (offset >= kiov->kiov_len) {
179 offset -= kiov->kiov_len;
186 int fraglen = kiov->kiov_len - offset;
188 /* each page frag is contained in one page */
189 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
194 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
196 csum = kqswnal_csum(csum, ptr, fraglen);
198 kunmap (kiov->kiov_page);
205 /* iov must not run out before end of data */
206 LASSERT (nob == 0 || niov > 0);
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
216 unsigned int niov, struct iovec *iov)
218 int nfrags = ktx->ktx_nfrag;
219 int nmapped = ktx->ktx_nmappedpages;
220 int maxmapped = ktx->ktx_npages;
221 uint32_t basepage = ktx->ktx_basepage + nmapped;
223 EP_RAILMASK railmask;
226 if (ktx->ktx_rail < 0)
227 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
229 kqswnal_nid2elanid(ktx->ktx_nid));
230 rail = ktx->ktx_rail;
232 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
235 railmask = 1 << rail;
237 LASSERT (nmapped <= maxmapped);
238 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239 LASSERT (nfrags <= EP_MAXFRAG);
243 /* skip complete frags before offset */
244 while (offset >= iov->iov_len) {
245 offset -= iov->iov_len;
252 int fraglen = iov->iov_len - offset;
257 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
260 if (nmapped > maxmapped) {
261 CERROR("Can't map message in %d pages (max %d)\n",
266 if (nfrags == EP_MAXFRAG) {
267 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
273 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274 ktx, nfrags, iov->iov_base + offset, fraglen,
275 basepage, npages, nmapped);
277 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278 iov->iov_base + offset, fraglen,
279 kqswnal_data.kqn_ep_tx_nmh, basepage,
280 &railmask, &ktx->ktx_frags[nfrags]);
282 if (nfrags == ktx->ktx_firsttmpfrag ||
283 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284 &ktx->ktx_frags[nfrags - 1],
285 &ktx->ktx_frags[nfrags])) {
286 /* new frag if this is the first or can't merge */
290 /* keep in loop for failure case */
291 ktx->ktx_nmappedpages = nmapped;
299 /* iov must not run out before end of data */
300 LASSERT (nob == 0 || niov > 0);
304 ktx->ktx_nfrag = nfrags;
305 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
313 kqswnal_csum_iov (__u32 csum, int offset, int nob,
314 unsigned int niov, struct iovec *iov)
322 /* skip complete frags before offset */
323 while (offset >= iov->iov_len) {
324 offset -= iov->iov_len;
331 int fraglen = iov->iov_len - offset;
336 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
343 /* iov must not run out before end of data */
344 LASSERT (nob == 0 || niov > 0);
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
357 kqswnal_unmap_tx (ktx); /* release temporary mappings */
358 ktx->ktx_state = KTX_IDLE;
360 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
362 list_del (&ktx->ktx_list); /* take off active list */
363 list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
365 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
369 kqswnal_get_idle_tx (void)
374 spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
376 if (kqswnal_data.kqn_shuttingdown ||
377 list_empty (&kqswnal_data.kqn_idletxds)) {
378 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
383 ktx = list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t, ktx_list);
384 list_del (&ktx->ktx_list);
386 list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
387 ktx->ktx_launcher = current->pid;
388 atomic_inc(&kqswnal_data.kqn_pending_txs);
390 spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
392 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
393 LASSERT (ktx->ktx_nmappedpages == 0);
398 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
400 lnet_msg_t *lnetmsg0 = NULL;
401 lnet_msg_t *lnetmsg1 = NULL;
406 LASSERT (!in_interrupt());
408 if (ktx->ktx_status == -EHOSTDOWN)
409 kqswnal_notify_peer_down(ktx);
411 switch (ktx->ktx_state) {
412 case KTX_RDMA_FETCH: /* optimized PUT/REPLY handled */
413 krx = (kqswnal_rx_t *)ktx->ktx_args[0];
414 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
415 status0 = ktx->ktx_status;
417 if (status0 == 0) { /* RDMA succeeded */
421 msg = (kqswnal_msg_t *)
422 page_address(krx->krx_kiov[0].kiov_page);
424 csum = (lnetmsg0->msg_kiov != NULL) ?
425 kqswnal_csum_kiov(krx->krx_cksum,
426 lnetmsg0->msg_offset,
427 lnetmsg0->msg_wanted,
429 lnetmsg0->msg_kiov) :
430 kqswnal_csum_iov(krx->krx_cksum,
431 lnetmsg0->msg_offset,
432 lnetmsg0->msg_wanted,
436 /* Can only check csum if I got it all */
437 if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
438 csum != msg->kqm_cksum) {
439 ktx->ktx_status = -EIO;
440 krx->krx_rpc_reply.msg.status = -EIO;
441 CERROR("RDMA checksum failed %u(%u) from %s\n",
442 csum, msg->kqm_cksum,
443 libcfs_nid2str(kqswnal_rx_nid(krx)));
447 LASSERT (krx->krx_state == KRX_COMPLETING);
448 kqswnal_rx_decref (krx);
451 case KTX_RDMA_STORE: /* optimized GET handled */
452 case KTX_PUTTING: /* optimized PUT sent */
453 case KTX_SENDING: /* normal send */
454 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
455 status0 = ktx->ktx_status;
458 case KTX_GETTING: /* optimized GET sent & payload received */
459 /* Complete the GET with success since we can't avoid
460 * delivering a REPLY event; we committed to it when we
461 * launched the GET */
462 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
464 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
465 status1 = ktx->ktx_status;
467 if (status1 == 0) { /* RDMA succeeded */
468 lnet_msg_t *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
469 lnet_libmd_t *md = lnetmsg0->msg_md;
472 csum = ((md->md_options & LNET_MD_KIOV) != 0) ?
473 kqswnal_csum_kiov(~0, 0,
477 kqswnal_csum_iov(~0, 0,
482 if (csum != ktx->ktx_cksum) {
483 CERROR("RDMA checksum failed %u(%u) from %s\n",
484 csum, ktx->ktx_cksum,
485 libcfs_nid2str(ktx->ktx_nid));
496 kqswnal_put_idle_tx (ktx);
498 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
499 if (lnetmsg1 != NULL)
500 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
504 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
508 ktx->ktx_status = status;
510 if (!in_interrupt()) {
511 kqswnal_tx_done_in_thread_context(ktx);
515 /* Complete the send in thread context */
516 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
518 list_add_tail(&ktx->ktx_schedlist,
519 &kqswnal_data.kqn_donetxds);
520 wake_up(&kqswnal_data.kqn_sched_waitq);
522 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
526 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
528 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
529 kqswnal_rpc_reply_t *reply;
531 LASSERT (txd != NULL);
532 LASSERT (ktx != NULL);
534 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
536 if (status != EP_SUCCESS) {
538 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n",
539 libcfs_nid2str(ktx->ktx_nid), status);
543 } else switch (ktx->ktx_state) {
548 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
549 if (reply->msg.magic == 0) { /* "old" peer */
550 status = reply->msg.status;
554 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
555 if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
556 CERROR("%s unexpected rpc reply magic %08x\n",
557 libcfs_nid2str(ktx->ktx_nid),
563 __swab32s(&reply->msg.status);
564 __swab32s(&reply->msg.version);
566 if (ktx->ktx_state == KTX_GETTING) {
567 __swab32s(&reply->msg.u.get.len);
568 __swab32s(&reply->msg.u.get.cksum);
572 status = reply->msg.status;
574 CERROR("%s RPC status %08x\n",
575 libcfs_nid2str(ktx->ktx_nid), status);
579 if (ktx->ktx_state == KTX_GETTING) {
580 lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
581 (lnet_msg_t *)ktx->ktx_args[2],
582 reply->msg.u.get.len);
584 ktx->ktx_cksum = reply->msg.u.get.cksum;
598 kqswnal_tx_done(ktx, status);
602 kqswnal_launch (kqswnal_tx_t *ktx)
604 /* Don't block for transmit descriptor if we're in interrupt context */
605 int attr = in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
606 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
610 ktx->ktx_launchtime = jiffies;
612 if (kqswnal_data.kqn_shuttingdown)
615 LASSERT (dest >= 0); /* must be a peer */
617 if (ktx->ktx_nmappedpages != 0)
618 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
620 switch (ktx->ktx_state) {
623 if (the_lnet.ln_testprotocompat != 0 &&
624 the_lnet.ln_ptlcompat == 0) {
625 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
627 /* single-shot proto test:
628 * Future version queries will use an RPC, so I'll
629 * co-opt one of the existing ones */
631 if ((the_lnet.ln_testprotocompat & 1) != 0) {
633 the_lnet.ln_testprotocompat &= ~1;
635 if ((the_lnet.ln_testprotocompat & 2) != 0) {
636 msg->kqm_magic = LNET_PROTO_MAGIC;
637 the_lnet.ln_testprotocompat &= ~2;
642 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
643 * The other frags are the payload, awaiting RDMA */
644 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
646 kqswnal_txhandler, ktx,
647 NULL, ktx->ktx_frags, 1);
651 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
653 kqswnal_txhandler, ktx,
654 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
659 rc = -EINVAL; /* no compiler warning please */
664 case EP_SUCCESS: /* success */
667 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
668 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
670 list_add_tail (&ktx->ktx_schedlist, &kqswnal_data.kqn_delayedtxds);
671 wake_up (&kqswnal_data.kqn_sched_waitq);
673 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
676 default: /* fatal error */
677 CDEBUG (D_NETERROR, "Tx to %s failed: %d\n", libcfs_nid2str(ktx->ktx_nid), rc);
678 kqswnal_notify_peer_down(ktx);
679 return (-EHOSTUNREACH);
685 hdr_type_string (lnet_hdr_t *hdr)
697 return ("<UNKNOWN>");
702 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
704 char *type_str = hdr_type_string (hdr);
706 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
707 le32_to_cpu(hdr->payload_length));
708 CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
709 le32_to_cpu(hdr->src_pid));
710 CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
711 le32_to_cpu(hdr->dest_pid));
713 switch (le32_to_cpu(hdr->type)) {
715 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
716 "match bits "LPX64"\n",
717 le32_to_cpu(hdr->msg.put.ptl_index),
718 hdr->msg.put.ack_wmd.wh_interface_cookie,
719 hdr->msg.put.ack_wmd.wh_object_cookie,
720 le64_to_cpu(hdr->msg.put.match_bits));
721 CERROR(" offset %d, hdr data "LPX64"\n",
722 le32_to_cpu(hdr->msg.put.offset),
723 hdr->msg.put.hdr_data);
727 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
728 "match bits "LPX64"\n",
729 le32_to_cpu(hdr->msg.get.ptl_index),
730 hdr->msg.get.return_wmd.wh_interface_cookie,
731 hdr->msg.get.return_wmd.wh_object_cookie,
732 hdr->msg.get.match_bits);
733 CERROR(" Length %d, src offset %d\n",
734 le32_to_cpu(hdr->msg.get.sink_length),
735 le32_to_cpu(hdr->msg.get.src_offset));
739 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
740 hdr->msg.ack.dst_wmd.wh_interface_cookie,
741 hdr->msg.ack.dst_wmd.wh_object_cookie,
742 le32_to_cpu(hdr->msg.ack.mlength));
746 CERROR(" dst md "LPX64"."LPX64"\n",
747 hdr->msg.reply.dst_wmd.wh_interface_cookie,
748 hdr->msg.reply.dst_wmd.wh_object_cookie);
751 } /* end of print_hdr() */
755 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
756 int nrfrag, EP_NMD *rfrag)
760 if (nlfrag != nrfrag) {
761 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
766 for (i = 0; i < nlfrag; i++)
767 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
768 CERROR("Can't cope with unequal frags %d(%d):"
769 " %d local %d remote\n",
770 i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
778 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
780 /* Check that the RMD sent after the "raw" LNET header in a
781 * portals-compatible QSWLND message is OK */
782 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
783 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
785 /* Note RDMA addresses are sent in native endian-ness in the "old"
786 * portals protocol so no swabbing... */
788 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
789 /* msg too small to discover rmd size */
790 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
791 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
795 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
796 /* rmd doesn't fit in the incoming message */
797 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
798 krx->krx_nob, rmd->kqrmd_nfrag,
799 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
807 kqswnal_rdma_store_complete (EP_RXD *rxd)
809 int status = ep_rxd_status(rxd);
810 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
811 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
813 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
814 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
816 LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
817 LASSERT (krx->krx_rxd == rxd);
818 LASSERT (krx->krx_rpc_reply_needed);
820 krx->krx_rpc_reply_needed = 0;
821 kqswnal_rx_decref (krx);
823 /* free ktx & finalize() its lnet_msg_t */
824 kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
828 kqswnal_rdma_fetch_complete (EP_RXD *rxd)
830 /* Completed fetching the PUT/REPLY data */
831 int status = ep_rxd_status(rxd);
832 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
833 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
835 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
836 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
838 LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
839 LASSERT (krx->krx_rxd == rxd);
840 /* RPC completes with failure by default */
841 LASSERT (krx->krx_rpc_reply_needed);
842 LASSERT (krx->krx_rpc_reply.msg.status != 0);
844 if (status == EP_SUCCESS) {
845 krx->krx_rpc_reply.msg.status = 0;
848 /* Abandon RPC since get failed */
849 krx->krx_rpc_reply_needed = 0;
850 status = -ECONNABORTED;
853 /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
854 LASSERT (krx->krx_state == KRX_PARSE);
855 krx->krx_state = KRX_COMPLETING;
857 /* free ktx & finalize() its lnet_msg_t */
858 kqswnal_tx_done(ktx, status);
862 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
863 int type, kqswnal_remotemd_t *rmd,
864 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
865 unsigned int offset, unsigned int len)
871 /* Not both mapped and paged payload */
872 LASSERT (iov == NULL || kiov == NULL);
873 /* RPC completes with failure by default */
874 LASSERT (krx->krx_rpc_reply_needed);
875 LASSERT (krx->krx_rpc_reply.msg.status != 0);
878 /* data got truncated to nothing. */
879 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
880 /* Let kqswnal_rx_done() complete the RPC with success */
881 krx->krx_rpc_reply.msg.status = 0;
885 /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
886 actually sending a portals message with it */
887 ktx = kqswnal_get_idle_tx();
889 CERROR ("Can't get txd for RDMA with %s\n",
890 libcfs_nid2str(kqswnal_rx_nid(krx)));
894 ktx->ktx_state = type;
895 ktx->ktx_nid = kqswnal_rx_nid(krx);
896 ktx->ktx_args[0] = krx;
897 ktx->ktx_args[1] = lntmsg;
899 LASSERT (atomic_read(&krx->krx_refcount) > 0);
900 /* Take an extra ref for the completion callback */
901 atomic_inc(&krx->krx_refcount);
903 /* Map on the rail the RPC prefers */
904 ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
905 ep_rxd_railmask(krx->krx_rxd));
907 /* Start mapping at offset 0 (we're not mapping any headers) */
908 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
911 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
913 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
916 CERROR ("Can't map local RDMA data: %d\n", rc);
920 rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
921 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
923 CERROR ("Incompatible RDMA descriptors\n");
932 krx->krx_rpc_reply.msg.status = 0;
933 krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
934 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
935 krx->krx_rpc_reply.msg.u.get.len = len;
937 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
938 kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
939 kqswnal_csum_iov(~0, offset, len, niov, iov);
940 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
941 krx->krx_rpc_reply.msg.u.get.cksum++;
942 *kqswnal_tunables.kqn_inject_csum_error = 0;
945 eprc = ep_complete_rpc(krx->krx_rxd,
946 kqswnal_rdma_store_complete, ktx,
947 &krx->krx_rpc_reply.ep_statusblk,
948 ktx->ktx_frags, rmd->kqrmd_frag,
950 if (eprc != EP_SUCCESS) {
951 CERROR("can't complete RPC: %d\n", eprc);
952 /* don't re-attempt RPC completion */
953 krx->krx_rpc_reply_needed = 0;
959 eprc = ep_rpc_get (krx->krx_rxd,
960 kqswnal_rdma_fetch_complete, ktx,
961 rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
962 if (eprc != EP_SUCCESS) {
963 CERROR("ep_rpc_get failed: %d\n", eprc);
964 /* Don't attempt RPC completion:
965 * EKC nuked it when the get failed */
966 krx->krx_rpc_reply_needed = 0;
974 kqswnal_rx_decref(krx); /* drop callback's ref */
975 kqswnal_put_idle_tx (ktx);
978 atomic_dec(&kqswnal_data.kqn_pending_txs);
983 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
985 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
986 int type = lntmsg->msg_type;
987 lnet_process_id_t target = lntmsg->msg_target;
988 int target_is_router = lntmsg->msg_target_is_router;
989 int routing = lntmsg->msg_routing;
990 unsigned int payload_niov = lntmsg->msg_niov;
991 struct iovec *payload_iov = lntmsg->msg_iov;
992 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
993 unsigned int payload_offset = lntmsg->msg_offset;
994 unsigned int payload_nob = lntmsg->msg_len;
999 /* NB 1. hdr is in network byte order */
1000 /* 2. 'private' depends on the message type */
1002 CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1003 payload_nob, payload_niov, libcfs_id2str(target));
1005 LASSERT (payload_nob == 0 || payload_niov > 0);
1006 LASSERT (payload_niov <= LNET_MAX_IOV);
1008 /* It must be OK to kmap() if required */
1009 LASSERT (payload_kiov == NULL || !in_interrupt ());
1010 /* payload is either all vaddrs or all pages */
1011 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1013 if (kqswnal_nid2elanid (target.nid) < 0) {
1014 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1018 /* I may not block for a transmit descriptor if I might block the
1019 * router, receiver, or an interrupt handler. */
1020 ktx = kqswnal_get_idle_tx();
1022 CERROR ("Can't get txd for msg type %d for %s\n",
1023 type, libcfs_nid2str(target.nid));
1027 ktx->ktx_state = KTX_SENDING;
1028 ktx->ktx_nid = target.nid;
1029 ktx->ktx_args[0] = private;
1030 ktx->ktx_args[1] = lntmsg;
1031 ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
1033 /* The first frag will be the pre-mapped buffer. */
1034 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1036 if ((!target_is_router && /* target.nid is final dest */
1037 !routing && /* I'm the source */
1038 type == LNET_MSG_GET && /* optimize GET? */
1039 *kqswnal_tunables.kqn_optimized_gets != 0 &&
1040 lntmsg->msg_md->md_length >=
1041 *kqswnal_tunables.kqn_optimized_gets) ||
1042 ((type == LNET_MSG_PUT || /* optimize PUT? */
1043 type == LNET_MSG_REPLY) && /* optimize REPLY? */
1044 *kqswnal_tunables.kqn_optimized_puts != 0 &&
1045 payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1046 lnet_libmd_t *md = lntmsg->msg_md;
1047 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1049 kqswnal_remotemd_t *rmd;
1051 /* Optimised path: I send over the Elan vaddrs of the local
1052 * buffers, and my peer DMAs directly to/from them.
1054 * First I set up ktx as if it was going to send this
1055 * payload, (it needs to map it anyway). This fills
1056 * ktx_frags[1] and onward with the network addresses
1057 * of the buffer frags. */
1059 if (the_lnet.ln_ptlcompat == 2) {
1060 /* Strong portals compatibility: send "raw" LNET
1061 * header + rdma descriptor */
1062 mhdr = (lnet_hdr_t *)ktx->ktx_buffer;
1063 rmd = (kqswnal_remotemd_t *)(mhdr + 1);
1065 /* Send an RDMA message */
1066 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1067 msg->kqm_version = QSWLND_PROTO_VERSION;
1068 msg->kqm_type = QSWLND_MSG_RDMA;
1070 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1071 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1075 nob = (((char *)rmd) - ktx->ktx_buffer);
1077 if (type == LNET_MSG_GET) {
1078 if ((md->md_options & LNET_MD_KIOV) != 0)
1079 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1080 md->md_niov, md->md_iov.kiov);
1082 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1083 md->md_niov, md->md_iov.iov);
1084 ktx->ktx_state = KTX_GETTING;
1086 if (payload_kiov != NULL)
1087 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1088 payload_niov, payload_kiov);
1090 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1091 payload_niov, payload_iov);
1092 ktx->ktx_state = KTX_PUTTING;
1098 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1099 nob += offsetof(kqswnal_remotemd_t,
1100 kqrmd_frag[rmd->kqrmd_nfrag]);
1101 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1103 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1104 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1106 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1108 LASSERT (the_lnet.ln_ptlcompat != 2);
1109 msg->kqm_nob = nob + payload_nob;
1111 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1113 if (type == LNET_MSG_GET) {
1114 /* Allocate reply message now while I'm in thread context */
1115 ktx->ktx_args[2] = lnet_create_reply_msg (
1116 kqswnal_data.kqn_ni, lntmsg);
1117 if (ktx->ktx_args[2] == NULL)
1120 /* NB finalizing the REPLY message is my
1121 * responsibility now, whatever happens. */
1123 if (*kqswnal_tunables.kqn_inject_csum_error == 3) {
1125 *kqswnal_tunables.kqn_inject_csum_error = 0;
1128 } else if (payload_kiov != NULL) {
1129 /* must checksum payload after header so receiver can
1130 * compute partial header cksum before swab. Sadly
1131 * this causes 2 rounds of kmap */
1133 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1134 payload_niov, payload_kiov);
1135 if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
1137 *kqswnal_tunables.kqn_inject_csum_error = 0;
1141 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1142 payload_niov, payload_iov);
1143 if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
1145 *kqswnal_tunables.kqn_inject_csum_error = 0;
1150 } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1153 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1155 /* small message: single frag copied into the pre-mapped buffer */
1156 if (the_lnet.ln_ptlcompat == 2) {
1157 /* Strong portals compatibility: send "raw" LNET header
1159 mhdr = (lnet_hdr_t *)ktx->ktx_buffer;
1160 payload = (char *)(mhdr + 1);
1162 /* Send an IMMEDIATE message */
1163 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1164 msg->kqm_version = QSWLND_PROTO_VERSION;
1165 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1167 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1168 payload = msg->kqm_u.immediate.kqim_payload;
1172 nob = (payload - ktx->ktx_buffer) + payload_nob;
1174 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1176 if (payload_kiov != NULL)
1177 lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1178 payload_niov, payload_kiov,
1179 payload_offset, payload_nob);
1181 lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1182 payload_niov, payload_iov,
1183 payload_offset, payload_nob);
1185 LASSERT (the_lnet.ln_ptlcompat != 2);
1188 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1189 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1191 *kqswnal_tunables.kqn_inject_csum_error = 0;
1196 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1198 /* large message: multiple frags: first is hdr in pre-mapped buffer */
1199 if (the_lnet.ln_ptlcompat == 2) {
1200 /* Strong portals compatibility: send "raw" LNET header
1202 mhdr = (lnet_hdr_t *)ktx->ktx_buffer;
1203 nob = sizeof(lnet_hdr_t);
1205 /* Send an IMMEDIATE message */
1206 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1207 msg->kqm_version = QSWLND_PROTO_VERSION;
1208 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1210 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1211 nob = offsetof(kqswnal_msg_t,
1212 kqm_u.immediate.kqim_payload);
1217 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1219 if (payload_kiov != NULL)
1220 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1221 payload_niov, payload_kiov);
1223 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1224 payload_niov, payload_iov);
1229 msg->kqm_nob = nob + payload_nob;
1231 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1233 msg->kqm_cksum = (payload_kiov != NULL) ?
1234 kqswnal_csum_kiov(msg->kqm_cksum,
1235 payload_offset, payload_nob,
1236 payload_niov, payload_kiov) :
1237 kqswnal_csum_iov(msg->kqm_cksum,
1238 payload_offset, payload_nob,
1239 payload_niov, payload_iov);
1241 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1243 *kqswnal_tunables.kqn_inject_csum_error = 0;
1249 ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1250 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1252 rc = kqswnal_launch (ktx);
1255 CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1256 routing ? (rc == 0 ? "Routed" : "Failed to route") :
1257 (rc == 0 ? "Sent" : "Failed to send"),
1258 nob, libcfs_nid2str(target.nid),
1259 target_is_router ? "(router)" : "", rc);
1262 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1263 int state = ktx->ktx_state;
1265 kqswnal_put_idle_tx (ktx);
1267 if (state == KTX_GETTING && repmsg != NULL) {
1268 /* We committed to reply, but there was a problem
1269 * launching the GET. We can't avoid delivering a
1270 * REPLY event since we committed above, so we
1271 * pretend the GET succeeded but the REPLY
1274 lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1275 lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1280 atomic_dec(&kqswnal_data.kqn_pending_txs);
1281 return (rc == 0 ? 0 : -EIO);
1285 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1287 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1288 LASSERT (!krx->krx_rpc_reply_needed);
1290 krx->krx_state = KRX_POSTED;
1292 if (kqswnal_data.kqn_shuttingdown) {
1293 /* free EKC rxd on shutdown */
1294 ep_complete_receive(krx->krx_rxd);
1296 /* repost receive */
1297 ep_requeue_receive(krx->krx_rxd,
1298 kqswnal_rxhandler, krx,
1299 &krx->krx_elanbuffer, 0);
1304 kqswnal_rpc_complete (EP_RXD *rxd)
1306 int status = ep_rxd_status(rxd);
1307 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1309 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1310 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1312 LASSERT (krx->krx_rxd == rxd);
1313 LASSERT (krx->krx_rpc_reply_needed);
1315 krx->krx_rpc_reply_needed = 0;
1316 kqswnal_requeue_rx (krx);
1320 kqswnal_rx_done (kqswnal_rx_t *krx)
1324 LASSERT (atomic_read(&krx->krx_refcount) == 0);
1326 if (krx->krx_rpc_reply_needed) {
1327 /* We've not completed the peer's RPC yet... */
1328 krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
1329 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1331 LASSERT (!in_interrupt());
1333 rc = ep_complete_rpc(krx->krx_rxd,
1334 kqswnal_rpc_complete, krx,
1335 &krx->krx_rpc_reply.ep_statusblk,
1337 if (rc == EP_SUCCESS)
1340 CERROR("can't complete RPC: %d\n", rc);
1341 krx->krx_rpc_reply_needed = 0;
1344 kqswnal_requeue_rx(krx);
1348 kqswnal_parse (kqswnal_rx_t *krx)
1350 lnet_ni_t *ni = kqswnal_data.kqn_ni;
1351 kqswnal_msg_t *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1352 lnet_nid_t fromnid = kqswnal_rx_nid(krx);
1359 LASSERT (atomic_read(&krx->krx_refcount) == 1);
1361 /* If ln_ptlcompat is set, peers may send me an "old" unencapsulated
1363 LASSERT (offsetof(kqswnal_msg_t, kqm_u) <= sizeof(lnet_hdr_t));
1365 if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1366 CERROR("Short message %d received from %s\n",
1367 krx->krx_nob, libcfs_nid2str(fromnid));
1371 swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1373 if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1378 /* csum byte array before swab */
1379 csum1 = msg->kqm_cksum;
1381 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1382 krx->krx_npages, krx->krx_kiov);
1383 msg->kqm_cksum = csum1;
1387 __swab16s(&msg->kqm_version);
1388 __swab16s(&msg->kqm_type);
1390 __swab32s(&msg->kqm_cksum);
1391 __swab32s(&msg->kqm_nob);
1395 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1396 /* Future protocol version compatibility support!
1397 * The next qswlnd-specific protocol rev will first
1398 * send an RPC to check version.
1399 * 1.4.6 and 1.4.7.early reply with a status
1400 * block containing its current version.
1401 * Later versions send a failure (-ve) status +
1404 if (!krx->krx_rpc_reply_needed) {
1405 CERROR("Unexpected version %d from %s\n",
1406 msg->kqm_version, libcfs_nid2str(fromnid));
1410 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1414 switch (msg->kqm_type) {
1416 CERROR("Bad request type %x from %s\n",
1417 msg->kqm_type, libcfs_nid2str(fromnid));
1420 case QSWLND_MSG_IMMEDIATE:
1421 if (krx->krx_rpc_reply_needed) {
1422 /* Should have been a simple message */
1423 CERROR("IMMEDIATE sent as RPC from %s\n",
1424 libcfs_nid2str(fromnid));
1428 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1429 if (krx->krx_nob < nob) {
1430 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1431 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1436 if (csum0 != msg->kqm_cksum) {
1437 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1438 csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1439 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1443 rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1449 case QSWLND_MSG_RDMA:
1450 if (!krx->krx_rpc_reply_needed) {
1451 /* Should have been a simple message */
1452 CERROR("RDMA sent as simple message from %s\n",
1453 libcfs_nid2str(fromnid));
1457 nob = offsetof(kqswnal_msg_t,
1458 kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1459 if (krx->krx_nob < nob) {
1460 CERROR("Short RDMA message %d(%d) from %s\n",
1461 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1466 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1468 n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1469 nob = offsetof(kqswnal_msg_t,
1470 kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1472 if (krx->krx_nob < nob) {
1473 CERROR("short RDMA message %d(%d) from %s\n",
1474 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1479 for (i = 0; i < n; i++) {
1480 EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1482 __swab32s(&nmd->nmd_addr);
1483 __swab32s(&nmd->nmd_len);
1484 __swab32s(&nmd->nmd_attr);
1489 krx->krx_cksum = csum0; /* stash checksum so far */
1491 rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1500 if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1501 msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1502 /* Future protocol version compatibility support!
1503 * When LNET unifies protocols over all LNDs, the first thing a
1504 * peer will send will be a version query RPC.
1505 * 1.4.6 and 1.4.7.early reply with a status block containing
1506 * LNET_PROTO_QSW_MAGIC..
1507 * Later versions send a failure (-ve) status +
1510 if (!krx->krx_rpc_reply_needed) {
1511 CERROR("Unexpected magic %08x from %s\n",
1512 msg->kqm_magic, libcfs_nid2str(fromnid));
1516 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1520 if (the_lnet.ln_ptlcompat != 0) {
1521 /* Portals compatibility (strong or weak)
1522 * This could be an unencapsulated LNET header. If it's big
1523 * enough, let LNET's parser sort it out */
1525 if (krx->krx_nob < sizeof(lnet_hdr_t)) {
1526 CERROR("Short portals-compatible message from %s\n",
1527 libcfs_nid2str(fromnid));
1531 krx->krx_raw_lnet_hdr = 1;
1532 rc = lnet_parse(ni, (lnet_hdr_t *)msg,
1533 fromnid, krx, krx->krx_rpc_reply_needed);
1539 CERROR("Unrecognised magic %08x from %s\n",
1540 msg->kqm_magic, libcfs_nid2str(fromnid));
1542 kqswnal_rx_decref(krx);
1545 /* Receive Interrupt Handler: posts to schedulers */
1547 kqswnal_rxhandler(EP_RXD *rxd)
1549 unsigned long flags;
1550 int nob = ep_rxd_len (rxd);
1551 int status = ep_rxd_status (rxd);
1552 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1553 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1554 rxd, krx, nob, status);
1556 LASSERT (krx != NULL);
1557 LASSERT (krx->krx_state == KRX_POSTED);
1559 krx->krx_state = KRX_PARSE;
1562 krx->krx_raw_lnet_hdr = 0;
1564 /* RPC reply iff rpc request received without error */
1565 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1566 (status == EP_SUCCESS ||
1567 status == EP_MSG_TOO_BIG);
1569 /* Default to failure if an RPC reply is requested but not handled */
1570 krx->krx_rpc_reply.msg.status = -EPROTO;
1571 atomic_set (&krx->krx_refcount, 1);
1573 if (status != EP_SUCCESS) {
1574 /* receives complete with failure when receiver is removed */
1575 if (status == EP_SHUTDOWN)
1576 LASSERT (kqswnal_data.kqn_shuttingdown);
1578 CERROR("receive status failed with status %d nob %d\n",
1579 ep_rxd_status(rxd), nob);
1580 kqswnal_rx_decref(krx);
1584 if (!in_interrupt()) {
1589 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1591 list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1592 wake_up (&kqswnal_data.kqn_sched_waitq);
1594 spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1598 kqswnal_recv (lnet_ni_t *ni,
1605 unsigned int offset,
1609 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1613 kqswnal_remotemd_t *rmd;
1617 LASSERT (!in_interrupt ()); /* OK to map */
1618 /* Either all pages or all vaddrs */
1619 LASSERT (!(kiov != NULL && iov != NULL));
1621 fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1622 msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1624 if (krx->krx_rpc_reply_needed) {
1625 /* optimized (rdma) request sent as RPC */
1627 if (krx->krx_raw_lnet_hdr) {
1628 LASSERT (the_lnet.ln_ptlcompat != 0);
1629 hdr = (lnet_hdr_t *)msg;
1630 rmd = kqswnal_get_portalscompat_rmd(krx);
1634 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1635 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1636 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1639 /* NB header is still in wire byte order */
1641 switch (le32_to_cpu(hdr->type)) {
1643 case LNET_MSG_REPLY:
1644 /* This is an optimized PUT/REPLY */
1645 rc = kqswnal_rdma(krx, lntmsg,
1646 KTX_RDMA_FETCH, rmd,
1647 niov, iov, kiov, offset, mlen);
1652 if (krx->krx_cksum != msg->kqm_cksum) {
1653 CERROR("Bad GET checksum %08x(%08x) from %s\n",
1654 krx->krx_cksum, msg->kqm_cksum,
1655 libcfs_nid2str(fromnid));
1660 if (lntmsg == NULL) {
1661 /* No buffer match: my decref will
1662 * complete the RPC with failure */
1665 /* Matched something! */
1666 rc = kqswnal_rdma(krx, lntmsg,
1667 KTX_RDMA_STORE, rmd,
1677 CERROR("Bad RPC type %d\n",
1678 le32_to_cpu(hdr->type));
1683 kqswnal_rx_decref(krx);
1687 if (krx->krx_raw_lnet_hdr) {
1688 LASSERT (the_lnet.ln_ptlcompat != 0);
1689 msg_offset = sizeof(lnet_hdr_t);
1691 LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1692 msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1695 if (krx->krx_nob < msg_offset + rlen) {
1696 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1697 libcfs_nid2str(fromnid), krx->krx_nob,
1699 kqswnal_rx_decref(krx);
1704 lnet_copy_kiov2kiov(niov, kiov, offset,
1705 krx->krx_npages, krx->krx_kiov,
1708 lnet_copy_kiov2iov(niov, iov, offset,
1709 krx->krx_npages, krx->krx_kiov,
1712 lnet_finalize(ni, lntmsg, 0);
1713 kqswnal_rx_decref(krx);
1718 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1720 long pid = kernel_thread (fn, arg, 0);
1725 atomic_inc (&kqswnal_data.kqn_nthreads);
1730 kqswnal_thread_fini (void)
1732 atomic_dec (&kqswnal_data.kqn_nthreads);
1736 kqswnal_scheduler (void *arg)
1740 unsigned long flags;
1745 cfs_daemonize ("kqswnal_sched");
1746 cfs_block_allsigs ();
1748 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1754 if (!list_empty (&kqswnal_data.kqn_readyrxds))
1756 krx = list_entry(kqswnal_data.kqn_readyrxds.next,
1757 kqswnal_rx_t, krx_list);
1758 list_del (&krx->krx_list);
1759 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1762 LASSERT (krx->krx_state == KRX_PARSE);
1763 kqswnal_parse (krx);
1766 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1769 if (!list_empty (&kqswnal_data.kqn_donetxds))
1771 ktx = list_entry(kqswnal_data.kqn_donetxds.next,
1772 kqswnal_tx_t, ktx_schedlist);
1773 list_del_init (&ktx->ktx_schedlist);
1774 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1777 kqswnal_tx_done_in_thread_context(ktx);
1780 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1783 if (!list_empty (&kqswnal_data.kqn_delayedtxds))
1785 ktx = list_entry(kqswnal_data.kqn_delayedtxds.next,
1786 kqswnal_tx_t, ktx_schedlist);
1787 list_del_init (&ktx->ktx_schedlist);
1788 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1791 rc = kqswnal_launch (ktx);
1793 CERROR("Failed delayed transmit to %s: %d\n",
1794 libcfs_nid2str(ktx->ktx_nid), rc);
1795 kqswnal_tx_done (ktx, rc);
1797 atomic_dec (&kqswnal_data.kqn_pending_txs);
1800 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1803 /* nothing to do or hogging CPU */
1804 if (!did_something || counter++ == KQSW_RESCHED) {
1805 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1810 if (!did_something) {
1811 if (kqswnal_data.kqn_shuttingdown == 2) {
1812 /* We only exit in stage 2 of shutdown when
1813 * there's nothing left to do */
1816 rc = wait_event_interruptible_exclusive (
1817 kqswnal_data.kqn_sched_waitq,
1818 kqswnal_data.kqn_shuttingdown == 2 ||
1819 !list_empty(&kqswnal_data.kqn_readyrxds) ||
1820 !list_empty(&kqswnal_data.kqn_donetxds) ||
1821 !list_empty(&kqswnal_data.kqn_delayedtxds));
1823 } else if (need_resched())
1826 spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1830 kqswnal_thread_fini ();