1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
6 * Author: Eric Barton <eric@bartonsoftware.com>
8 * This file is part of Portals, http://www.lustre.org
10 * Portals is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Portals is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Portals; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
31 then = cfs_time_current_sec() -
32 cfs_duration_sec(cfs_time_current() -
35 lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
43 ktx->ktx_rail = -1; /* unset rail */
45 if (ktx->ktx_nmappedpages == 0)
48 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
51 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52 ep_dvma_unload(kqswnal_data.kqn_ep,
53 kqswnal_data.kqn_ep_tx_nmh,
56 ktx->ktx_nmappedpages = 0;
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob,
61 unsigned int niov, lnet_kiov_t *kiov)
63 int nfrags = ktx->ktx_nfrag;
64 int nmapped = ktx->ktx_nmappedpages;
65 int maxmapped = ktx->ktx_npages;
66 __u32 basepage = ktx->ktx_basepage + nmapped;
72 if (ktx->ktx_rail < 0)
73 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
75 kqswnal_nid2elanid(ktx->ktx_nid));
78 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
83 LASSERT (nmapped <= maxmapped);
84 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85 LASSERT (nfrags <= EP_MAXFRAG);
89 /* skip complete frags before 'offset' */
90 while (offset >= kiov->kiov_len) {
91 offset -= kiov->kiov_len;
98 int fraglen = kiov->kiov_len - offset;
100 /* each page frag is contained in one page */
101 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
107 if (nmapped > maxmapped) {
108 CERROR("Can't map message in %d pages (max %d)\n",
113 if (nfrags == EP_MAXFRAG) {
114 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
119 /* XXX this is really crap, but we'll have to kmap until
120 * EKC has a page (rather than vaddr) mapping interface */
122 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
125 "%p[%d] loading %p for %d, page %d, %d total\n",
126 ktx, nfrags, ptr, fraglen, basepage, nmapped);
128 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
130 kqswnal_data.kqn_ep_tx_nmh, basepage,
131 &railmask, &ktx->ktx_frags[nfrags]);
133 if (nfrags == ktx->ktx_firsttmpfrag ||
134 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135 &ktx->ktx_frags[nfrags - 1],
136 &ktx->ktx_frags[nfrags])) {
137 /* new frag if this is the first or can't merge */
141 kunmap (kiov->kiov_page);
143 /* keep in loop for failure case */
144 ktx->ktx_nmappedpages = nmapped;
152 /* iov must not run out before end of data */
153 LASSERT (nob == 0 || niov > 0);
157 ktx->ktx_nfrag = nfrags;
158 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob,
167 unsigned int niov, lnet_kiov_t *kiov)
177 /* skip complete frags before 'offset' */
178 while (offset >= kiov->kiov_len) {
179 offset -= kiov->kiov_len;
186 int fraglen = kiov->kiov_len - offset;
188 /* each page frag is contained in one page */
189 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
194 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
196 csum = kqswnal_csum(csum, ptr, fraglen);
198 kunmap (kiov->kiov_page);
205 /* iov must not run out before end of data */
206 LASSERT (nob == 0 || niov > 0);
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
216 unsigned int niov, struct iovec *iov)
218 int nfrags = ktx->ktx_nfrag;
219 int nmapped = ktx->ktx_nmappedpages;
220 int maxmapped = ktx->ktx_npages;
221 __u32 basepage = ktx->ktx_basepage + nmapped;
223 EP_RAILMASK railmask;
226 if (ktx->ktx_rail < 0)
227 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
229 kqswnal_nid2elanid(ktx->ktx_nid));
230 rail = ktx->ktx_rail;
232 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
235 railmask = 1 << rail;
237 LASSERT (nmapped <= maxmapped);
238 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239 LASSERT (nfrags <= EP_MAXFRAG);
243 /* skip complete frags before offset */
244 while (offset >= iov->iov_len) {
245 offset -= iov->iov_len;
252 int fraglen = iov->iov_len - offset;
257 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
260 if (nmapped > maxmapped) {
261 CERROR("Can't map message in %d pages (max %d)\n",
266 if (nfrags == EP_MAXFRAG) {
267 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
273 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274 ktx, nfrags, iov->iov_base + offset, fraglen,
275 basepage, npages, nmapped);
277 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278 iov->iov_base + offset, fraglen,
279 kqswnal_data.kqn_ep_tx_nmh, basepage,
280 &railmask, &ktx->ktx_frags[nfrags]);
282 if (nfrags == ktx->ktx_firsttmpfrag ||
283 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284 &ktx->ktx_frags[nfrags - 1],
285 &ktx->ktx_frags[nfrags])) {
286 /* new frag if this is the first or can't merge */
290 /* keep in loop for failure case */
291 ktx->ktx_nmappedpages = nmapped;
299 /* iov must not run out before end of data */
300 LASSERT (nob == 0 || niov > 0);
304 ktx->ktx_nfrag = nfrags;
305 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
313 kqswnal_csum_iov (__u32 csum, int offset, int nob,
314 unsigned int niov, struct iovec *iov)
322 /* skip complete frags before offset */
323 while (offset >= iov->iov_len) {
324 offset -= iov->iov_len;
331 int fraglen = iov->iov_len - offset;
336 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
343 /* iov must not run out before end of data */
344 LASSERT (nob == 0 || niov > 0);
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
357 kqswnal_unmap_tx (ktx); /* release temporary mappings */
358 ktx->ktx_state = KTX_IDLE;
360 cfs_spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
362 cfs_list_del (&ktx->ktx_list); /* take off active list */
363 cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
365 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
369 kqswnal_get_idle_tx (void)
374 cfs_spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags);
376 if (kqswnal_data.kqn_shuttingdown ||
377 cfs_list_empty (&kqswnal_data.kqn_idletxds)) {
378 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock,
384 ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
386 cfs_list_del (&ktx->ktx_list);
388 cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
389 ktx->ktx_launcher = current->pid;
390 cfs_atomic_inc(&kqswnal_data.kqn_pending_txs);
392 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags);
394 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
395 LASSERT (ktx->ktx_nmappedpages == 0);
400 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
402 lnet_msg_t *lnetmsg0 = NULL;
403 lnet_msg_t *lnetmsg1 = NULL;
408 LASSERT (!cfs_in_interrupt());
410 if (ktx->ktx_status == -EHOSTDOWN)
411 kqswnal_notify_peer_down(ktx);
413 switch (ktx->ktx_state) {
414 case KTX_RDMA_FETCH: /* optimized PUT/REPLY handled */
415 krx = (kqswnal_rx_t *)ktx->ktx_args[0];
416 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
417 status0 = ktx->ktx_status;
419 if (status0 == 0) { /* RDMA succeeded */
423 msg = (kqswnal_msg_t *)
424 page_address(krx->krx_kiov[0].kiov_page);
426 csum = (lnetmsg0->msg_kiov != NULL) ?
427 kqswnal_csum_kiov(krx->krx_cksum,
428 lnetmsg0->msg_offset,
429 lnetmsg0->msg_wanted,
431 lnetmsg0->msg_kiov) :
432 kqswnal_csum_iov(krx->krx_cksum,
433 lnetmsg0->msg_offset,
434 lnetmsg0->msg_wanted,
438 /* Can only check csum if I got it all */
439 if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
440 csum != msg->kqm_cksum) {
441 ktx->ktx_status = -EIO;
442 krx->krx_rpc_reply.msg.status = -EIO;
443 CERROR("RDMA checksum failed %u(%u) from %s\n",
444 csum, msg->kqm_cksum,
445 libcfs_nid2str(kqswnal_rx_nid(krx)));
449 LASSERT (krx->krx_state == KRX_COMPLETING);
450 kqswnal_rx_decref (krx);
453 case KTX_RDMA_STORE: /* optimized GET handled */
454 case KTX_PUTTING: /* optimized PUT sent */
455 case KTX_SENDING: /* normal send */
456 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
457 status0 = ktx->ktx_status;
460 case KTX_GETTING: /* optimized GET sent & payload received */
461 /* Complete the GET with success since we can't avoid
462 * delivering a REPLY event; we committed to it when we
463 * launched the GET */
464 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
466 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
467 status1 = ktx->ktx_status;
469 if (status1 == 0) { /* RDMA succeeded */
470 lnet_msg_t *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
471 lnet_libmd_t *md = lnetmsg0->msg_md;
474 csum = ((md->md_options & LNET_MD_KIOV) != 0) ?
475 kqswnal_csum_kiov(~0, 0,
479 kqswnal_csum_iov(~0, 0,
484 if (csum != ktx->ktx_cksum) {
485 CERROR("RDMA checksum failed %u(%u) from %s\n",
486 csum, ktx->ktx_cksum,
487 libcfs_nid2str(ktx->ktx_nid));
498 kqswnal_put_idle_tx (ktx);
500 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
501 if (lnetmsg1 != NULL)
502 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
506 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
510 ktx->ktx_status = status;
512 if (!cfs_in_interrupt()) {
513 kqswnal_tx_done_in_thread_context(ktx);
517 /* Complete the send in thread context */
518 cfs_spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
520 cfs_list_add_tail(&ktx->ktx_schedlist,
521 &kqswnal_data.kqn_donetxds);
522 cfs_waitq_signal(&kqswnal_data.kqn_sched_waitq);
524 cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
528 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
530 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
531 kqswnal_rpc_reply_t *reply;
533 LASSERT (txd != NULL);
534 LASSERT (ktx != NULL);
536 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
538 if (status != EP_SUCCESS) {
540 CDEBUG (D_NETERROR, "Tx completion to %s failed: %d\n",
541 libcfs_nid2str(ktx->ktx_nid), status);
545 } else switch (ktx->ktx_state) {
550 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
551 if (reply->msg.magic == 0) { /* "old" peer */
552 status = reply->msg.status;
556 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
557 if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
558 CERROR("%s unexpected rpc reply magic %08x\n",
559 libcfs_nid2str(ktx->ktx_nid),
565 __swab32s(&reply->msg.status);
566 __swab32s(&reply->msg.version);
568 if (ktx->ktx_state == KTX_GETTING) {
569 __swab32s(&reply->msg.u.get.len);
570 __swab32s(&reply->msg.u.get.cksum);
574 status = reply->msg.status;
576 CERROR("%s RPC status %08x\n",
577 libcfs_nid2str(ktx->ktx_nid), status);
581 if (ktx->ktx_state == KTX_GETTING) {
582 lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
583 (lnet_msg_t *)ktx->ktx_args[2],
584 reply->msg.u.get.len);
586 ktx->ktx_cksum = reply->msg.u.get.cksum;
600 kqswnal_tx_done(ktx, status);
604 kqswnal_launch (kqswnal_tx_t *ktx)
606 /* Don't block for transmit descriptor if we're in interrupt context */
607 int attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
608 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
612 ktx->ktx_launchtime = cfs_time_current();
614 if (kqswnal_data.kqn_shuttingdown)
617 LASSERT (dest >= 0); /* must be a peer */
619 if (ktx->ktx_nmappedpages != 0)
620 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
622 switch (ktx->ktx_state) {
625 if (the_lnet.ln_testprotocompat != 0) {
626 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
628 /* single-shot proto test:
629 * Future version queries will use an RPC, so I'll
630 * co-opt one of the existing ones */
632 if ((the_lnet.ln_testprotocompat & 1) != 0) {
634 the_lnet.ln_testprotocompat &= ~1;
636 if ((the_lnet.ln_testprotocompat & 2) != 0) {
637 msg->kqm_magic = LNET_PROTO_MAGIC;
638 the_lnet.ln_testprotocompat &= ~2;
643 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
644 * The other frags are the payload, awaiting RDMA */
645 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
647 kqswnal_txhandler, ktx,
648 NULL, ktx->ktx_frags, 1);
652 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
654 kqswnal_txhandler, ktx,
655 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
660 rc = -EINVAL; /* no compiler warning please */
665 case EP_SUCCESS: /* success */
668 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
669 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
671 cfs_list_add_tail (&ktx->ktx_schedlist,
672 &kqswnal_data.kqn_delayedtxds);
673 cfs_waitq_signal (&kqswnal_data.kqn_sched_waitq);
675 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock,
679 default: /* fatal error */
680 CDEBUG (D_NETERROR, "Tx to %s failed: %d\n",
681 libcfs_nid2str(ktx->ktx_nid), rc);
682 kqswnal_notify_peer_down(ktx);
683 return (-EHOSTUNREACH);
689 hdr_type_string (lnet_hdr_t *hdr)
701 return ("<UNKNOWN>");
706 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
708 char *type_str = hdr_type_string (hdr);
710 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
711 le32_to_cpu(hdr->payload_length));
712 CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
713 le32_to_cpu(hdr->src_pid));
714 CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
715 le32_to_cpu(hdr->dest_pid));
717 switch (le32_to_cpu(hdr->type)) {
719 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
720 "match bits "LPX64"\n",
721 le32_to_cpu(hdr->msg.put.ptl_index),
722 hdr->msg.put.ack_wmd.wh_interface_cookie,
723 hdr->msg.put.ack_wmd.wh_object_cookie,
724 le64_to_cpu(hdr->msg.put.match_bits));
725 CERROR(" offset %d, hdr data "LPX64"\n",
726 le32_to_cpu(hdr->msg.put.offset),
727 hdr->msg.put.hdr_data);
731 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
732 "match bits "LPX64"\n",
733 le32_to_cpu(hdr->msg.get.ptl_index),
734 hdr->msg.get.return_wmd.wh_interface_cookie,
735 hdr->msg.get.return_wmd.wh_object_cookie,
736 hdr->msg.get.match_bits);
737 CERROR(" Length %d, src offset %d\n",
738 le32_to_cpu(hdr->msg.get.sink_length),
739 le32_to_cpu(hdr->msg.get.src_offset));
743 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
744 hdr->msg.ack.dst_wmd.wh_interface_cookie,
745 hdr->msg.ack.dst_wmd.wh_object_cookie,
746 le32_to_cpu(hdr->msg.ack.mlength));
750 CERROR(" dst md "LPX64"."LPX64"\n",
751 hdr->msg.reply.dst_wmd.wh_interface_cookie,
752 hdr->msg.reply.dst_wmd.wh_object_cookie);
755 } /* end of print_hdr() */
759 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
760 int nrfrag, EP_NMD *rfrag)
764 if (nlfrag != nrfrag) {
765 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
770 for (i = 0; i < nlfrag; i++)
771 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
772 CERROR("Can't cope with unequal frags %d(%d):"
773 " %d local %d remote\n",
774 i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
782 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
784 /* Check that the RMD sent after the "raw" LNET header in a
785 * portals-compatible QSWLND message is OK */
786 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
787 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
789 /* Note RDMA addresses are sent in native endian-ness in the "old"
790 * portals protocol so no swabbing... */
792 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
793 /* msg too small to discover rmd size */
794 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
795 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
799 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
800 /* rmd doesn't fit in the incoming message */
801 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
802 krx->krx_nob, rmd->kqrmd_nfrag,
803 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
811 kqswnal_rdma_store_complete (EP_RXD *rxd)
813 int status = ep_rxd_status(rxd);
814 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
815 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
817 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
818 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
820 LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
821 LASSERT (krx->krx_rxd == rxd);
822 LASSERT (krx->krx_rpc_reply_needed);
824 krx->krx_rpc_reply_needed = 0;
825 kqswnal_rx_decref (krx);
827 /* free ktx & finalize() its lnet_msg_t */
828 kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
832 kqswnal_rdma_fetch_complete (EP_RXD *rxd)
834 /* Completed fetching the PUT/REPLY data */
835 int status = ep_rxd_status(rxd);
836 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
837 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
839 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
840 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
842 LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
843 LASSERT (krx->krx_rxd == rxd);
844 /* RPC completes with failure by default */
845 LASSERT (krx->krx_rpc_reply_needed);
846 LASSERT (krx->krx_rpc_reply.msg.status != 0);
848 if (status == EP_SUCCESS) {
849 krx->krx_rpc_reply.msg.status = 0;
852 /* Abandon RPC since get failed */
853 krx->krx_rpc_reply_needed = 0;
854 status = -ECONNABORTED;
857 /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
858 LASSERT (krx->krx_state == KRX_PARSE);
859 krx->krx_state = KRX_COMPLETING;
861 /* free ktx & finalize() its lnet_msg_t */
862 kqswnal_tx_done(ktx, status);
866 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
867 int type, kqswnal_remotemd_t *rmd,
868 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
869 unsigned int offset, unsigned int len)
875 /* Not both mapped and paged payload */
876 LASSERT (iov == NULL || kiov == NULL);
877 /* RPC completes with failure by default */
878 LASSERT (krx->krx_rpc_reply_needed);
879 LASSERT (krx->krx_rpc_reply.msg.status != 0);
882 /* data got truncated to nothing. */
883 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
884 /* Let kqswnal_rx_done() complete the RPC with success */
885 krx->krx_rpc_reply.msg.status = 0;
889 /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
890 actually sending a portals message with it */
891 ktx = kqswnal_get_idle_tx();
893 CERROR ("Can't get txd for RDMA with %s\n",
894 libcfs_nid2str(kqswnal_rx_nid(krx)));
898 ktx->ktx_state = type;
899 ktx->ktx_nid = kqswnal_rx_nid(krx);
900 ktx->ktx_args[0] = krx;
901 ktx->ktx_args[1] = lntmsg;
903 LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0);
904 /* Take an extra ref for the completion callback */
905 cfs_atomic_inc(&krx->krx_refcount);
907 /* Map on the rail the RPC prefers */
908 ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
909 ep_rxd_railmask(krx->krx_rxd));
911 /* Start mapping at offset 0 (we're not mapping any headers) */
912 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
915 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
917 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
920 CERROR ("Can't map local RDMA data: %d\n", rc);
924 rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
925 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
927 CERROR ("Incompatible RDMA descriptors\n");
936 krx->krx_rpc_reply.msg.status = 0;
937 krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
938 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
939 krx->krx_rpc_reply.msg.u.get.len = len;
941 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
942 kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
943 kqswnal_csum_iov(~0, offset, len, niov, iov);
944 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
945 krx->krx_rpc_reply.msg.u.get.cksum++;
946 *kqswnal_tunables.kqn_inject_csum_error = 0;
949 eprc = ep_complete_rpc(krx->krx_rxd,
950 kqswnal_rdma_store_complete, ktx,
951 &krx->krx_rpc_reply.ep_statusblk,
952 ktx->ktx_frags, rmd->kqrmd_frag,
954 if (eprc != EP_SUCCESS) {
955 CERROR("can't complete RPC: %d\n", eprc);
956 /* don't re-attempt RPC completion */
957 krx->krx_rpc_reply_needed = 0;
963 eprc = ep_rpc_get (krx->krx_rxd,
964 kqswnal_rdma_fetch_complete, ktx,
965 rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
966 if (eprc != EP_SUCCESS) {
967 CERROR("ep_rpc_get failed: %d\n", eprc);
968 /* Don't attempt RPC completion:
969 * EKC nuked it when the get failed */
970 krx->krx_rpc_reply_needed = 0;
978 kqswnal_rx_decref(krx); /* drop callback's ref */
979 kqswnal_put_idle_tx (ktx);
982 cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
987 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
989 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
990 int type = lntmsg->msg_type;
991 lnet_process_id_t target = lntmsg->msg_target;
992 int target_is_router = lntmsg->msg_target_is_router;
993 int routing = lntmsg->msg_routing;
994 unsigned int payload_niov = lntmsg->msg_niov;
995 struct iovec *payload_iov = lntmsg->msg_iov;
996 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
997 unsigned int payload_offset = lntmsg->msg_offset;
998 unsigned int payload_nob = lntmsg->msg_len;
1003 /* NB 1. hdr is in network byte order */
1004 /* 2. 'private' depends on the message type */
1006 CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1007 payload_nob, payload_niov, libcfs_id2str(target));
1009 LASSERT (payload_nob == 0 || payload_niov > 0);
1010 LASSERT (payload_niov <= LNET_MAX_IOV);
1012 /* It must be OK to kmap() if required */
1013 LASSERT (payload_kiov == NULL || !cfs_in_interrupt ());
1014 /* payload is either all vaddrs or all pages */
1015 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1017 if (kqswnal_nid2elanid (target.nid) < 0) {
1018 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1022 /* I may not block for a transmit descriptor if I might block the
1023 * router, receiver, or an interrupt handler. */
1024 ktx = kqswnal_get_idle_tx();
1026 CERROR ("Can't get txd for msg type %d for %s\n",
1027 type, libcfs_nid2str(target.nid));
1031 ktx->ktx_state = KTX_SENDING;
1032 ktx->ktx_nid = target.nid;
1033 ktx->ktx_args[0] = private;
1034 ktx->ktx_args[1] = lntmsg;
1035 ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
1037 /* The first frag will be the pre-mapped buffer. */
1038 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1040 if ((!target_is_router && /* target.nid is final dest */
1041 !routing && /* I'm the source */
1042 type == LNET_MSG_GET && /* optimize GET? */
1043 *kqswnal_tunables.kqn_optimized_gets != 0 &&
1044 lntmsg->msg_md->md_length >=
1045 *kqswnal_tunables.kqn_optimized_gets) ||
1046 ((type == LNET_MSG_PUT || /* optimize PUT? */
1047 type == LNET_MSG_REPLY) && /* optimize REPLY? */
1048 *kqswnal_tunables.kqn_optimized_puts != 0 &&
1049 payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1050 lnet_libmd_t *md = lntmsg->msg_md;
1051 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1053 kqswnal_remotemd_t *rmd;
1055 /* Optimised path: I send over the Elan vaddrs of the local
1056 * buffers, and my peer DMAs directly to/from them.
1058 * First I set up ktx as if it was going to send this
1059 * payload, (it needs to map it anyway). This fills
1060 * ktx_frags[1] and onward with the network addresses
1061 * of the buffer frags. */
1063 /* Send an RDMA message */
1064 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1065 msg->kqm_version = QSWLND_PROTO_VERSION;
1066 msg->kqm_type = QSWLND_MSG_RDMA;
1068 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1069 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1072 nob = (((char *)rmd) - ktx->ktx_buffer);
1074 if (type == LNET_MSG_GET) {
1075 if ((md->md_options & LNET_MD_KIOV) != 0)
1076 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1077 md->md_niov, md->md_iov.kiov);
1079 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1080 md->md_niov, md->md_iov.iov);
1081 ktx->ktx_state = KTX_GETTING;
1083 if (payload_kiov != NULL)
1084 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1085 payload_niov, payload_kiov);
1087 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1088 payload_niov, payload_iov);
1089 ktx->ktx_state = KTX_PUTTING;
1095 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1096 nob += offsetof(kqswnal_remotemd_t,
1097 kqrmd_frag[rmd->kqrmd_nfrag]);
1098 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1100 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1101 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1103 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1105 msg->kqm_nob = nob + payload_nob;
1107 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1109 if (type == LNET_MSG_GET) {
1110 /* Allocate reply message now while I'm in thread context */
1111 ktx->ktx_args[2] = lnet_create_reply_msg (
1112 kqswnal_data.kqn_ni, lntmsg);
1113 if (ktx->ktx_args[2] == NULL)
1116 /* NB finalizing the REPLY message is my
1117 * responsibility now, whatever happens. */
1119 if (*kqswnal_tunables.kqn_inject_csum_error == 3) {
1121 *kqswnal_tunables.kqn_inject_csum_error = 0;
1124 } else if (payload_kiov != NULL) {
1125 /* must checksum payload after header so receiver can
1126 * compute partial header cksum before swab. Sadly
1127 * this causes 2 rounds of kmap */
1129 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1130 payload_niov, payload_kiov);
1131 if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
1133 *kqswnal_tunables.kqn_inject_csum_error = 0;
1137 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1138 payload_niov, payload_iov);
1139 if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
1141 *kqswnal_tunables.kqn_inject_csum_error = 0;
1146 } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1149 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1151 /* single frag copied into the pre-mapped buffer */
1152 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1153 msg->kqm_version = QSWLND_PROTO_VERSION;
1154 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1156 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1157 payload = msg->kqm_u.immediate.kqim_payload;
1160 nob = (payload - ktx->ktx_buffer) + payload_nob;
1162 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1164 if (payload_kiov != NULL)
1165 lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1166 payload_niov, payload_kiov,
1167 payload_offset, payload_nob);
1169 lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1170 payload_niov, payload_iov,
1171 payload_offset, payload_nob);
1175 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1176 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1178 *kqswnal_tunables.kqn_inject_csum_error = 0;
1183 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1185 /* multiple frags: first is hdr in pre-mapped buffer */
1186 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1187 msg->kqm_version = QSWLND_PROTO_VERSION;
1188 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1190 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1191 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1195 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1197 if (payload_kiov != NULL)
1198 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1199 payload_niov, payload_kiov);
1201 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1202 payload_niov, payload_iov);
1207 msg->kqm_nob = nob + payload_nob;
1209 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1211 msg->kqm_cksum = (payload_kiov != NULL) ?
1212 kqswnal_csum_kiov(msg->kqm_cksum,
1213 payload_offset, payload_nob,
1214 payload_niov, payload_kiov) :
1215 kqswnal_csum_iov(msg->kqm_cksum,
1216 payload_offset, payload_nob,
1217 payload_niov, payload_iov);
1219 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1221 *kqswnal_tunables.kqn_inject_csum_error = 0;
1227 ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1228 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1230 rc = kqswnal_launch (ktx);
1233 CDEBUG(rc == 0 ? D_NET : D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1234 routing ? (rc == 0 ? "Routed" : "Failed to route") :
1235 (rc == 0 ? "Sent" : "Failed to send"),
1236 nob, libcfs_nid2str(target.nid),
1237 target_is_router ? "(router)" : "", rc);
1240 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1241 int state = ktx->ktx_state;
1243 kqswnal_put_idle_tx (ktx);
1245 if (state == KTX_GETTING && repmsg != NULL) {
1246 /* We committed to reply, but there was a problem
1247 * launching the GET. We can't avoid delivering a
1248 * REPLY event since we committed above, so we
1249 * pretend the GET succeeded but the REPLY
1252 lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1253 lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1258 cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
1259 return (rc == 0 ? 0 : -EIO);
1263 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1265 LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1266 LASSERT (!krx->krx_rpc_reply_needed);
1268 krx->krx_state = KRX_POSTED;
1270 if (kqswnal_data.kqn_shuttingdown) {
1271 /* free EKC rxd on shutdown */
1272 ep_complete_receive(krx->krx_rxd);
1274 /* repost receive */
1275 ep_requeue_receive(krx->krx_rxd,
1276 kqswnal_rxhandler, krx,
1277 &krx->krx_elanbuffer, 0);
1282 kqswnal_rpc_complete (EP_RXD *rxd)
1284 int status = ep_rxd_status(rxd);
1285 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1287 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1288 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1290 LASSERT (krx->krx_rxd == rxd);
1291 LASSERT (krx->krx_rpc_reply_needed);
1293 krx->krx_rpc_reply_needed = 0;
1294 kqswnal_requeue_rx (krx);
1298 kqswnal_rx_done (kqswnal_rx_t *krx)
1302 LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1304 if (krx->krx_rpc_reply_needed) {
1305 /* We've not completed the peer's RPC yet... */
1306 krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
1307 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1309 LASSERT (!cfs_in_interrupt());
1311 rc = ep_complete_rpc(krx->krx_rxd,
1312 kqswnal_rpc_complete, krx,
1313 &krx->krx_rpc_reply.ep_statusblk,
1315 if (rc == EP_SUCCESS)
1318 CERROR("can't complete RPC: %d\n", rc);
1319 krx->krx_rpc_reply_needed = 0;
1322 kqswnal_requeue_rx(krx);
1326 kqswnal_parse (kqswnal_rx_t *krx)
1328 lnet_ni_t *ni = kqswnal_data.kqn_ni;
1329 kqswnal_msg_t *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1330 lnet_nid_t fromnid = kqswnal_rx_nid(krx);
1337 LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1);
1339 if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1340 CERROR("Short message %d received from %s\n",
1341 krx->krx_nob, libcfs_nid2str(fromnid));
1345 swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1347 if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1352 /* csum byte array before swab */
1353 csum1 = msg->kqm_cksum;
1355 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1356 krx->krx_npages, krx->krx_kiov);
1357 msg->kqm_cksum = csum1;
1361 __swab16s(&msg->kqm_version);
1362 __swab16s(&msg->kqm_type);
1364 __swab32s(&msg->kqm_cksum);
1365 __swab32s(&msg->kqm_nob);
1369 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1370 /* Future protocol version compatibility support!
1371 * The next qswlnd-specific protocol rev will first
1372 * send an RPC to check version.
1373 * 1.4.6 and 1.4.7.early reply with a status
1374 * block containing its current version.
1375 * Later versions send a failure (-ve) status +
1378 if (!krx->krx_rpc_reply_needed) {
1379 CERROR("Unexpected version %d from %s\n",
1380 msg->kqm_version, libcfs_nid2str(fromnid));
1384 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1388 switch (msg->kqm_type) {
1390 CERROR("Bad request type %x from %s\n",
1391 msg->kqm_type, libcfs_nid2str(fromnid));
1394 case QSWLND_MSG_IMMEDIATE:
1395 if (krx->krx_rpc_reply_needed) {
1396 /* Should have been a simple message */
1397 CERROR("IMMEDIATE sent as RPC from %s\n",
1398 libcfs_nid2str(fromnid));
1402 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1403 if (krx->krx_nob < nob) {
1404 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1405 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1410 if (csum0 != msg->kqm_cksum) {
1411 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1412 csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1413 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1417 rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1423 case QSWLND_MSG_RDMA:
1424 if (!krx->krx_rpc_reply_needed) {
1425 /* Should have been a simple message */
1426 CERROR("RDMA sent as simple message from %s\n",
1427 libcfs_nid2str(fromnid));
1431 nob = offsetof(kqswnal_msg_t,
1432 kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1433 if (krx->krx_nob < nob) {
1434 CERROR("Short RDMA message %d(%d) from %s\n",
1435 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1440 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1442 n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1443 nob = offsetof(kqswnal_msg_t,
1444 kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1446 if (krx->krx_nob < nob) {
1447 CERROR("short RDMA message %d(%d) from %s\n",
1448 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1453 for (i = 0; i < n; i++) {
1454 EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1456 __swab32s(&nmd->nmd_addr);
1457 __swab32s(&nmd->nmd_len);
1458 __swab32s(&nmd->nmd_attr);
1463 krx->krx_cksum = csum0; /* stash checksum so far */
1465 rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1474 if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1475 msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1476 /* Future protocol version compatibility support!
1477 * When LNET unifies protocols over all LNDs, the first thing a
1478 * peer will send will be a version query RPC.
1479 * 1.4.6 and 1.4.7.early reply with a status block containing
1480 * LNET_PROTO_QSW_MAGIC..
1481 * Later versions send a failure (-ve) status +
1484 if (!krx->krx_rpc_reply_needed) {
1485 CERROR("Unexpected magic %08x from %s\n",
1486 msg->kqm_magic, libcfs_nid2str(fromnid));
1490 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1494 CERROR("Unrecognised magic %08x from %s\n",
1495 msg->kqm_magic, libcfs_nid2str(fromnid));
1497 kqswnal_rx_decref(krx);
1500 /* Receive Interrupt Handler: posts to schedulers */
1502 kqswnal_rxhandler(EP_RXD *rxd)
1504 unsigned long flags;
1505 int nob = ep_rxd_len (rxd);
1506 int status = ep_rxd_status (rxd);
1507 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1508 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1509 rxd, krx, nob, status);
1511 LASSERT (krx != NULL);
1512 LASSERT (krx->krx_state == KRX_POSTED);
1514 krx->krx_state = KRX_PARSE;
1518 /* RPC reply iff rpc request received without error */
1519 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1520 (status == EP_SUCCESS ||
1521 status == EP_MSG_TOO_BIG);
1523 /* Default to failure if an RPC reply is requested but not handled */
1524 krx->krx_rpc_reply.msg.status = -EPROTO;
1525 cfs_atomic_set (&krx->krx_refcount, 1);
1527 if (status != EP_SUCCESS) {
1528 /* receives complete with failure when receiver is removed */
1529 if (status == EP_SHUTDOWN)
1530 LASSERT (kqswnal_data.kqn_shuttingdown);
1532 CERROR("receive status failed with status %d nob %d\n",
1533 ep_rxd_status(rxd), nob);
1534 kqswnal_rx_decref(krx);
1538 if (!cfs_in_interrupt()) {
1543 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1545 cfs_list_add_tail (&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1546 cfs_waitq_signal (&kqswnal_data.kqn_sched_waitq);
1548 cfs_spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags);
1552 kqswnal_recv (lnet_ni_t *ni,
1559 unsigned int offset,
1563 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1567 kqswnal_remotemd_t *rmd;
1571 LASSERT (!cfs_in_interrupt ()); /* OK to map */
1572 /* Either all pages or all vaddrs */
1573 LASSERT (!(kiov != NULL && iov != NULL));
1575 fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1576 msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1578 if (krx->krx_rpc_reply_needed) {
1579 /* optimized (rdma) request sent as RPC */
1581 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1582 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1583 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1585 /* NB header is still in wire byte order */
1587 switch (le32_to_cpu(hdr->type)) {
1589 case LNET_MSG_REPLY:
1590 /* This is an optimized PUT/REPLY */
1591 rc = kqswnal_rdma(krx, lntmsg,
1592 KTX_RDMA_FETCH, rmd,
1593 niov, iov, kiov, offset, mlen);
1598 if (krx->krx_cksum != msg->kqm_cksum) {
1599 CERROR("Bad GET checksum %08x(%08x) from %s\n",
1600 krx->krx_cksum, msg->kqm_cksum,
1601 libcfs_nid2str(fromnid));
1606 if (lntmsg == NULL) {
1607 /* No buffer match: my decref will
1608 * complete the RPC with failure */
1611 /* Matched something! */
1612 rc = kqswnal_rdma(krx, lntmsg,
1613 KTX_RDMA_STORE, rmd,
1623 CERROR("Bad RPC type %d\n",
1624 le32_to_cpu(hdr->type));
1629 kqswnal_rx_decref(krx);
1633 LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1634 msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1636 if (krx->krx_nob < msg_offset + rlen) {
1637 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1638 libcfs_nid2str(fromnid), krx->krx_nob,
1640 kqswnal_rx_decref(krx);
1645 lnet_copy_kiov2kiov(niov, kiov, offset,
1646 krx->krx_npages, krx->krx_kiov,
1649 lnet_copy_kiov2iov(niov, iov, offset,
1650 krx->krx_npages, krx->krx_kiov,
1653 lnet_finalize(ni, lntmsg, 0);
1654 kqswnal_rx_decref(krx);
1659 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1661 long pid = cfs_kernel_thread (fn, arg, 0);
1666 cfs_atomic_inc (&kqswnal_data.kqn_nthreads);
1671 kqswnal_thread_fini (void)
1673 cfs_atomic_dec (&kqswnal_data.kqn_nthreads);
1677 kqswnal_scheduler (void *arg)
1681 unsigned long flags;
1686 cfs_daemonize ("kqswnal_sched");
1687 cfs_block_allsigs ();
1689 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags);
1695 if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
1697 krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
1698 kqswnal_rx_t, krx_list);
1699 cfs_list_del (&krx->krx_list);
1700 cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1703 LASSERT (krx->krx_state == KRX_PARSE);
1704 kqswnal_parse (krx);
1707 cfs_spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1711 if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
1713 ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
1714 kqswnal_tx_t, ktx_schedlist);
1715 cfs_list_del_init (&ktx->ktx_schedlist);
1716 cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1719 kqswnal_tx_done_in_thread_context(ktx);
1722 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1726 if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
1728 ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
1729 kqswnal_tx_t, ktx_schedlist);
1730 cfs_list_del_init (&ktx->ktx_schedlist);
1731 cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1734 rc = kqswnal_launch (ktx);
1736 CERROR("Failed delayed transmit to %s: %d\n",
1737 libcfs_nid2str(ktx->ktx_nid), rc);
1738 kqswnal_tx_done (ktx, rc);
1740 cfs_atomic_dec (&kqswnal_data.kqn_pending_txs);
1743 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1747 /* nothing to do or hogging CPU */
1748 if (!did_something || counter++ == KQSW_RESCHED) {
1749 cfs_spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1754 if (!did_something) {
1755 if (kqswnal_data.kqn_shuttingdown == 2) {
1756 /* We only exit in stage 2 of shutdown
1757 * when there's nothing left to do */
1760 cfs_wait_event_interruptible_exclusive (
1761 kqswnal_data.kqn_sched_waitq,
1762 kqswnal_data.kqn_shuttingdown == 2 ||
1763 !cfs_list_empty(&kqswnal_data. \
1765 !cfs_list_empty(&kqswnal_data. \
1767 !cfs_list_empty(&kqswnal_data. \
1768 kqn_delayedtxds, rc));
1770 } else if (need_resched())
1773 cfs_spin_lock_irqsave (&kqswnal_data.kqn_sched_lock,
1778 kqswnal_thread_fini ();