2 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
4 * Copyright (c) 2012, Intel Corporation.
6 * Author: Eric Barton <eric@bartonsoftware.com>
8 * This file is part of Portals, http://www.lustre.org
10 * Portals is free software; you can redistribute it and/or
11 * modify it under the terms of version 2 of the GNU General Public
12 * License as published by the Free Software Foundation.
14 * Portals is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with Portals; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kqswnal_notify_peer_down(kqswnal_tx_t *ktx)
31 then = cfs_time_current_sec() -
32 cfs_duration_sec(cfs_time_current() -
35 lnet_notify(kqswnal_data.kqn_ni, ktx->ktx_nid, 0, then);
39 kqswnal_unmap_tx (kqswnal_tx_t *ktx)
43 ktx->ktx_rail = -1; /* unset rail */
45 if (ktx->ktx_nmappedpages == 0)
48 CDEBUG(D_NET, "%p unloading %d frags starting at %d\n",
49 ktx, ktx->ktx_nfrag, ktx->ktx_firsttmpfrag);
51 for (i = ktx->ktx_firsttmpfrag; i < ktx->ktx_nfrag; i++)
52 ep_dvma_unload(kqswnal_data.kqn_ep,
53 kqswnal_data.kqn_ep_tx_nmh,
56 ktx->ktx_nmappedpages = 0;
60 kqswnal_map_tx_kiov (kqswnal_tx_t *ktx, int offset, int nob,
61 unsigned int niov, lnet_kiov_t *kiov)
63 int nfrags = ktx->ktx_nfrag;
64 int nmapped = ktx->ktx_nmappedpages;
65 int maxmapped = ktx->ktx_npages;
66 __u32 basepage = ktx->ktx_basepage + nmapped;
72 if (ktx->ktx_rail < 0)
73 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
75 kqswnal_nid2elanid(ktx->ktx_nid));
78 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
83 LASSERT (nmapped <= maxmapped);
84 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
85 LASSERT (nfrags <= EP_MAXFRAG);
89 /* skip complete frags before 'offset' */
90 while (offset >= kiov->kiov_len) {
91 offset -= kiov->kiov_len;
98 int fraglen = kiov->kiov_len - offset;
100 /* each page frag is contained in one page */
101 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
107 if (nmapped > maxmapped) {
108 CERROR("Can't map message in %d pages (max %d)\n",
113 if (nfrags == EP_MAXFRAG) {
114 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
119 /* XXX this is really crap, but we'll have to kmap until
120 * EKC has a page (rather than vaddr) mapping interface */
122 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
125 "%p[%d] loading %p for %d, page %d, %d total\n",
126 ktx, nfrags, ptr, fraglen, basepage, nmapped);
128 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
130 kqswnal_data.kqn_ep_tx_nmh, basepage,
131 &railmask, &ktx->ktx_frags[nfrags]);
133 if (nfrags == ktx->ktx_firsttmpfrag ||
134 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
135 &ktx->ktx_frags[nfrags - 1],
136 &ktx->ktx_frags[nfrags])) {
137 /* new frag if this is the first or can't merge */
141 kunmap (kiov->kiov_page);
143 /* keep in loop for failure case */
144 ktx->ktx_nmappedpages = nmapped;
152 /* iov must not run out before end of data */
153 LASSERT (nob == 0 || niov > 0);
157 ktx->ktx_nfrag = nfrags;
158 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
159 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
166 kqswnal_csum_kiov (__u32 csum, int offset, int nob,
167 unsigned int niov, lnet_kiov_t *kiov)
177 /* skip complete frags before 'offset' */
178 while (offset >= kiov->kiov_len) {
179 offset -= kiov->kiov_len;
186 int fraglen = kiov->kiov_len - offset;
188 /* each page frag is contained in one page */
189 LASSERT (kiov->kiov_offset + kiov->kiov_len <= PAGE_SIZE);
194 ptr = ((char *)kmap (kiov->kiov_page)) + kiov->kiov_offset + offset;
196 csum = kqswnal_csum(csum, ptr, fraglen);
198 kunmap (kiov->kiov_page);
205 /* iov must not run out before end of data */
206 LASSERT (nob == 0 || niov > 0);
215 kqswnal_map_tx_iov (kqswnal_tx_t *ktx, int offset, int nob,
216 unsigned int niov, struct iovec *iov)
218 int nfrags = ktx->ktx_nfrag;
219 int nmapped = ktx->ktx_nmappedpages;
220 int maxmapped = ktx->ktx_npages;
221 __u32 basepage = ktx->ktx_basepage + nmapped;
223 EP_RAILMASK railmask;
226 if (ktx->ktx_rail < 0)
227 ktx->ktx_rail = ep_xmtr_prefrail(kqswnal_data.kqn_eptx,
229 kqswnal_nid2elanid(ktx->ktx_nid));
230 rail = ktx->ktx_rail;
232 CERROR("No rails available for %s\n", libcfs_nid2str(ktx->ktx_nid));
235 railmask = 1 << rail;
237 LASSERT (nmapped <= maxmapped);
238 LASSERT (nfrags >= ktx->ktx_firsttmpfrag);
239 LASSERT (nfrags <= EP_MAXFRAG);
243 /* skip complete frags before offset */
244 while (offset >= iov->iov_len) {
245 offset -= iov->iov_len;
252 int fraglen = iov->iov_len - offset;
257 npages = kqswnal_pages_spanned (iov->iov_base, fraglen);
260 if (nmapped > maxmapped) {
261 CERROR("Can't map message in %d pages (max %d)\n",
266 if (nfrags == EP_MAXFRAG) {
267 CERROR("Message too fragmented in Elan VM (max %d frags)\n",
273 "%p[%d] loading %p for %d, pages %d for %ld, %d total\n",
274 ktx, nfrags, iov->iov_base + offset, fraglen,
275 basepage, npages, nmapped);
277 ep_dvma_load(kqswnal_data.kqn_ep, NULL,
278 iov->iov_base + offset, fraglen,
279 kqswnal_data.kqn_ep_tx_nmh, basepage,
280 &railmask, &ktx->ktx_frags[nfrags]);
282 if (nfrags == ktx->ktx_firsttmpfrag ||
283 !ep_nmd_merge(&ktx->ktx_frags[nfrags - 1],
284 &ktx->ktx_frags[nfrags - 1],
285 &ktx->ktx_frags[nfrags])) {
286 /* new frag if this is the first or can't merge */
290 /* keep in loop for failure case */
291 ktx->ktx_nmappedpages = nmapped;
299 /* iov must not run out before end of data */
300 LASSERT (nob == 0 || niov > 0);
304 ktx->ktx_nfrag = nfrags;
305 CDEBUG (D_NET, "%p got %d frags over %d pages\n",
306 ktx, ktx->ktx_nfrag, ktx->ktx_nmappedpages);
313 kqswnal_csum_iov (__u32 csum, int offset, int nob,
314 unsigned int niov, struct iovec *iov)
322 /* skip complete frags before offset */
323 while (offset >= iov->iov_len) {
324 offset -= iov->iov_len;
331 int fraglen = iov->iov_len - offset;
336 csum = kqswnal_csum(csum, iov->iov_base + offset, fraglen);
343 /* iov must not run out before end of data */
344 LASSERT (nob == 0 || niov > 0);
353 kqswnal_put_idle_tx (kqswnal_tx_t *ktx)
357 kqswnal_unmap_tx(ktx); /* release temporary mappings */
358 ktx->ktx_state = KTX_IDLE;
360 spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
362 cfs_list_del(&ktx->ktx_list); /* take off active list */
363 cfs_list_add(&ktx->ktx_list, &kqswnal_data.kqn_idletxds);
365 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
369 kqswnal_get_idle_tx (void)
374 spin_lock_irqsave(&kqswnal_data.kqn_idletxd_lock, flags);
376 if (kqswnal_data.kqn_shuttingdown ||
377 cfs_list_empty(&kqswnal_data.kqn_idletxds)) {
378 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
383 ktx = cfs_list_entry (kqswnal_data.kqn_idletxds.next, kqswnal_tx_t,
385 cfs_list_del (&ktx->ktx_list);
387 cfs_list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds);
388 ktx->ktx_launcher = current->pid;
389 cfs_atomic_inc(&kqswnal_data.kqn_pending_txs);
391 spin_unlock_irqrestore(&kqswnal_data.kqn_idletxd_lock, flags);
393 /* Idle descs can't have any mapped (as opposed to pre-mapped) pages */
394 LASSERT (ktx->ktx_nmappedpages == 0);
399 kqswnal_tx_done_in_thread_context (kqswnal_tx_t *ktx)
401 lnet_msg_t *lnetmsg0 = NULL;
402 lnet_msg_t *lnetmsg1 = NULL;
407 LASSERT (!cfs_in_interrupt());
409 if (ktx->ktx_status == -EHOSTDOWN)
410 kqswnal_notify_peer_down(ktx);
412 switch (ktx->ktx_state) {
413 case KTX_RDMA_FETCH: /* optimized PUT/REPLY handled */
414 krx = (kqswnal_rx_t *)ktx->ktx_args[0];
415 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
416 status0 = ktx->ktx_status;
418 if (status0 == 0) { /* RDMA succeeded */
422 msg = (kqswnal_msg_t *)
423 page_address(krx->krx_kiov[0].kiov_page);
425 csum = (lnetmsg0->msg_kiov != NULL) ?
426 kqswnal_csum_kiov(krx->krx_cksum,
427 lnetmsg0->msg_offset,
428 lnetmsg0->msg_wanted,
430 lnetmsg0->msg_kiov) :
431 kqswnal_csum_iov(krx->krx_cksum,
432 lnetmsg0->msg_offset,
433 lnetmsg0->msg_wanted,
437 /* Can only check csum if I got it all */
438 if (lnetmsg0->msg_wanted == lnetmsg0->msg_len &&
439 csum != msg->kqm_cksum) {
440 ktx->ktx_status = -EIO;
441 krx->krx_rpc_reply.msg.status = -EIO;
442 CERROR("RDMA checksum failed %u(%u) from %s\n",
443 csum, msg->kqm_cksum,
444 libcfs_nid2str(kqswnal_rx_nid(krx)));
448 LASSERT (krx->krx_state == KRX_COMPLETING);
449 kqswnal_rx_decref (krx);
452 case KTX_RDMA_STORE: /* optimized GET handled */
453 case KTX_PUTTING: /* optimized PUT sent */
454 case KTX_SENDING: /* normal send */
455 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
456 status0 = ktx->ktx_status;
459 case KTX_GETTING: /* optimized GET sent & payload received */
460 /* Complete the GET with success since we can't avoid
461 * delivering a REPLY event; we committed to it when we
462 * launched the GET */
463 lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
465 lnetmsg1 = (lnet_msg_t *)ktx->ktx_args[2];
466 status1 = ktx->ktx_status;
468 if (status1 == 0) { /* RDMA succeeded */
469 lnet_msg_t *lnetmsg0 = (lnet_msg_t *)ktx->ktx_args[1];
470 lnet_libmd_t *md = lnetmsg0->msg_md;
473 csum = ((md->md_options & LNET_MD_KIOV) != 0) ?
474 kqswnal_csum_kiov(~0, 0,
478 kqswnal_csum_iov(~0, 0,
483 if (csum != ktx->ktx_cksum) {
484 CERROR("RDMA checksum failed %u(%u) from %s\n",
485 csum, ktx->ktx_cksum,
486 libcfs_nid2str(ktx->ktx_nid));
497 kqswnal_put_idle_tx (ktx);
499 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg0, status0);
500 if (lnetmsg1 != NULL)
501 lnet_finalize (kqswnal_data.kqn_ni, lnetmsg1, status1);
505 kqswnal_tx_done (kqswnal_tx_t *ktx, int status)
509 ktx->ktx_status = status;
511 if (!cfs_in_interrupt()) {
512 kqswnal_tx_done_in_thread_context(ktx);
516 /* Complete the send in thread context */
517 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
519 cfs_list_add_tail(&ktx->ktx_schedlist,
520 &kqswnal_data.kqn_donetxds);
521 cfs_waitq_signal(&kqswnal_data.kqn_sched_waitq);
523 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
527 kqswnal_txhandler(EP_TXD *txd, void *arg, int status)
529 kqswnal_tx_t *ktx = (kqswnal_tx_t *)arg;
530 kqswnal_rpc_reply_t *reply;
532 LASSERT (txd != NULL);
533 LASSERT (ktx != NULL);
535 CDEBUG(D_NET, "txd %p, arg %p status %d\n", txd, arg, status);
537 if (status != EP_SUCCESS) {
539 CNETERR("Tx completion to %s failed: %d\n",
540 libcfs_nid2str(ktx->ktx_nid), status);
544 } else switch (ktx->ktx_state) {
549 reply = (kqswnal_rpc_reply_t *)ep_txd_statusblk(txd);
550 if (reply->msg.magic == 0) { /* "old" peer */
551 status = reply->msg.status;
555 if (reply->msg.magic != LNET_PROTO_QSW_MAGIC) {
556 if (reply->msg.magic != swab32(LNET_PROTO_QSW_MAGIC)) {
557 CERROR("%s unexpected rpc reply magic %08x\n",
558 libcfs_nid2str(ktx->ktx_nid),
564 __swab32s(&reply->msg.status);
565 __swab32s(&reply->msg.version);
567 if (ktx->ktx_state == KTX_GETTING) {
568 __swab32s(&reply->msg.u.get.len);
569 __swab32s(&reply->msg.u.get.cksum);
573 status = reply->msg.status;
575 CERROR("%s RPC status %08x\n",
576 libcfs_nid2str(ktx->ktx_nid), status);
580 if (ktx->ktx_state == KTX_GETTING) {
581 lnet_set_reply_msg_len(kqswnal_data.kqn_ni,
582 (lnet_msg_t *)ktx->ktx_args[2],
583 reply->msg.u.get.len);
585 ktx->ktx_cksum = reply->msg.u.get.cksum;
599 kqswnal_tx_done(ktx, status);
603 kqswnal_launch (kqswnal_tx_t *ktx)
605 /* Don't block for transmit descriptor if we're in interrupt context */
606 int attr = cfs_in_interrupt() ? (EP_NO_SLEEP | EP_NO_ALLOC) : 0;
607 int dest = kqswnal_nid2elanid (ktx->ktx_nid);
611 ktx->ktx_launchtime = cfs_time_current();
613 if (kqswnal_data.kqn_shuttingdown)
616 LASSERT (dest >= 0); /* must be a peer */
618 if (ktx->ktx_nmappedpages != 0)
619 attr = EP_SET_PREFRAIL(attr, ktx->ktx_rail);
621 switch (ktx->ktx_state) {
624 if (the_lnet.ln_testprotocompat != 0) {
625 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
627 /* single-shot proto test:
628 * Future version queries will use an RPC, so I'll
629 * co-opt one of the existing ones */
631 if ((the_lnet.ln_testprotocompat & 1) != 0) {
633 the_lnet.ln_testprotocompat &= ~1;
635 if ((the_lnet.ln_testprotocompat & 2) != 0) {
636 msg->kqm_magic = LNET_PROTO_MAGIC;
637 the_lnet.ln_testprotocompat &= ~2;
642 /* NB ktx_frag[0] is the GET/PUT hdr + kqswnal_remotemd_t.
643 * The other frags are the payload, awaiting RDMA */
644 rc = ep_transmit_rpc(kqswnal_data.kqn_eptx, dest,
646 kqswnal_txhandler, ktx,
647 NULL, ktx->ktx_frags, 1);
651 rc = ep_transmit_message(kqswnal_data.kqn_eptx, dest,
653 kqswnal_txhandler, ktx,
654 NULL, ktx->ktx_frags, ktx->ktx_nfrag);
659 rc = -EINVAL; /* no compiler warning please */
664 case EP_SUCCESS: /* success */
667 case EP_ENOMEM: /* can't allocate ep txd => queue for later */
668 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
670 cfs_list_add_tail(&ktx->ktx_schedlist,
671 &kqswnal_data.kqn_delayedtxds);
672 cfs_waitq_signal(&kqswnal_data.kqn_sched_waitq);
674 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
678 default: /* fatal error */
679 CNETERR ("Tx to %s failed: %d\n",
680 libcfs_nid2str(ktx->ktx_nid), rc);
681 kqswnal_notify_peer_down(ktx);
682 return (-EHOSTUNREACH);
688 hdr_type_string (lnet_hdr_t *hdr)
700 return ("<UNKNOWN>");
705 kqswnal_cerror_hdr(lnet_hdr_t * hdr)
707 char *type_str = hdr_type_string (hdr);
709 CERROR("P3 Header at %p of type %s length %d\n", hdr, type_str,
710 le32_to_cpu(hdr->payload_length));
711 CERROR(" From nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->src_nid),
712 le32_to_cpu(hdr->src_pid));
713 CERROR(" To nid/pid "LPU64"/%u\n", le64_to_cpu(hdr->dest_nid),
714 le32_to_cpu(hdr->dest_pid));
716 switch (le32_to_cpu(hdr->type)) {
718 CERROR(" Ptl index %d, ack md "LPX64"."LPX64", "
719 "match bits "LPX64"\n",
720 le32_to_cpu(hdr->msg.put.ptl_index),
721 hdr->msg.put.ack_wmd.wh_interface_cookie,
722 hdr->msg.put.ack_wmd.wh_object_cookie,
723 le64_to_cpu(hdr->msg.put.match_bits));
724 CERROR(" offset %d, hdr data "LPX64"\n",
725 le32_to_cpu(hdr->msg.put.offset),
726 hdr->msg.put.hdr_data);
730 CERROR(" Ptl index %d, return md "LPX64"."LPX64", "
731 "match bits "LPX64"\n",
732 le32_to_cpu(hdr->msg.get.ptl_index),
733 hdr->msg.get.return_wmd.wh_interface_cookie,
734 hdr->msg.get.return_wmd.wh_object_cookie,
735 hdr->msg.get.match_bits);
736 CERROR(" Length %d, src offset %d\n",
737 le32_to_cpu(hdr->msg.get.sink_length),
738 le32_to_cpu(hdr->msg.get.src_offset));
742 CERROR(" dst md "LPX64"."LPX64", manipulated length %d\n",
743 hdr->msg.ack.dst_wmd.wh_interface_cookie,
744 hdr->msg.ack.dst_wmd.wh_object_cookie,
745 le32_to_cpu(hdr->msg.ack.mlength));
749 CERROR(" dst md "LPX64"."LPX64"\n",
750 hdr->msg.reply.dst_wmd.wh_interface_cookie,
751 hdr->msg.reply.dst_wmd.wh_object_cookie);
754 } /* end of print_hdr() */
758 kqswnal_check_rdma (int nlfrag, EP_NMD *lfrag,
759 int nrfrag, EP_NMD *rfrag)
763 if (nlfrag != nrfrag) {
764 CERROR("Can't cope with unequal # frags: %d local %d remote\n",
769 for (i = 0; i < nlfrag; i++)
770 if (lfrag[i].nmd_len != rfrag[i].nmd_len) {
771 CERROR("Can't cope with unequal frags %d(%d):"
772 " %d local %d remote\n",
773 i, nlfrag, lfrag[i].nmd_len, rfrag[i].nmd_len);
781 kqswnal_get_portalscompat_rmd (kqswnal_rx_t *krx)
783 /* Check that the RMD sent after the "raw" LNET header in a
784 * portals-compatible QSWLND message is OK */
785 char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page);
786 kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + sizeof(lnet_hdr_t));
788 /* Note RDMA addresses are sent in native endian-ness in the "old"
789 * portals protocol so no swabbing... */
791 if (buffer + krx->krx_nob < (char *)(rmd + 1)) {
792 /* msg too small to discover rmd size */
793 CERROR ("Incoming message [%d] too small for RMD (%d needed)\n",
794 krx->krx_nob, (int)(((char *)(rmd + 1)) - buffer));
798 if (buffer + krx->krx_nob < (char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) {
799 /* rmd doesn't fit in the incoming message */
800 CERROR ("Incoming message [%d] too small for RMD[%d] (%d needed)\n",
801 krx->krx_nob, rmd->kqrmd_nfrag,
802 (int)(((char *)&rmd->kqrmd_frag[rmd->kqrmd_nfrag]) - buffer));
810 kqswnal_rdma_store_complete (EP_RXD *rxd)
812 int status = ep_rxd_status(rxd);
813 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
814 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
816 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
817 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
819 LASSERT (ktx->ktx_state == KTX_RDMA_STORE);
820 LASSERT (krx->krx_rxd == rxd);
821 LASSERT (krx->krx_rpc_reply_needed);
823 krx->krx_rpc_reply_needed = 0;
824 kqswnal_rx_decref (krx);
826 /* free ktx & finalize() its lnet_msg_t */
827 kqswnal_tx_done(ktx, (status == EP_SUCCESS) ? 0 : -ECONNABORTED);
831 kqswnal_rdma_fetch_complete (EP_RXD *rxd)
833 /* Completed fetching the PUT/REPLY data */
834 int status = ep_rxd_status(rxd);
835 kqswnal_tx_t *ktx = (kqswnal_tx_t *)ep_rxd_arg(rxd);
836 kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0];
838 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
839 "rxd %p, ktx %p, status %d\n", rxd, ktx, status);
841 LASSERT (ktx->ktx_state == KTX_RDMA_FETCH);
842 LASSERT (krx->krx_rxd == rxd);
843 /* RPC completes with failure by default */
844 LASSERT (krx->krx_rpc_reply_needed);
845 LASSERT (krx->krx_rpc_reply.msg.status != 0);
847 if (status == EP_SUCCESS) {
848 krx->krx_rpc_reply.msg.status = 0;
851 /* Abandon RPC since get failed */
852 krx->krx_rpc_reply_needed = 0;
853 status = -ECONNABORTED;
856 /* krx gets decref'd in kqswnal_tx_done_in_thread_context() */
857 LASSERT (krx->krx_state == KRX_PARSE);
858 krx->krx_state = KRX_COMPLETING;
860 /* free ktx & finalize() its lnet_msg_t */
861 kqswnal_tx_done(ktx, status);
865 kqswnal_rdma (kqswnal_rx_t *krx, lnet_msg_t *lntmsg,
866 int type, kqswnal_remotemd_t *rmd,
867 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
868 unsigned int offset, unsigned int len)
874 /* Not both mapped and paged payload */
875 LASSERT (iov == NULL || kiov == NULL);
876 /* RPC completes with failure by default */
877 LASSERT (krx->krx_rpc_reply_needed);
878 LASSERT (krx->krx_rpc_reply.msg.status != 0);
881 /* data got truncated to nothing. */
882 lnet_finalize(kqswnal_data.kqn_ni, lntmsg, 0);
883 /* Let kqswnal_rx_done() complete the RPC with success */
884 krx->krx_rpc_reply.msg.status = 0;
888 /* NB I'm using 'ktx' just to map the local RDMA buffers; I'm not
889 actually sending a portals message with it */
890 ktx = kqswnal_get_idle_tx();
892 CERROR ("Can't get txd for RDMA with %s\n",
893 libcfs_nid2str(kqswnal_rx_nid(krx)));
897 ktx->ktx_state = type;
898 ktx->ktx_nid = kqswnal_rx_nid(krx);
899 ktx->ktx_args[0] = krx;
900 ktx->ktx_args[1] = lntmsg;
902 LASSERT (cfs_atomic_read(&krx->krx_refcount) > 0);
903 /* Take an extra ref for the completion callback */
904 cfs_atomic_inc(&krx->krx_refcount);
906 /* Map on the rail the RPC prefers */
907 ktx->ktx_rail = ep_rcvr_prefrail(krx->krx_eprx,
908 ep_rxd_railmask(krx->krx_rxd));
910 /* Start mapping at offset 0 (we're not mapping any headers) */
911 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0;
914 rc = kqswnal_map_tx_kiov(ktx, offset, len, niov, kiov);
916 rc = kqswnal_map_tx_iov(ktx, offset, len, niov, iov);
919 CERROR ("Can't map local RDMA data: %d\n", rc);
923 rc = kqswnal_check_rdma (ktx->ktx_nfrag, ktx->ktx_frags,
924 rmd->kqrmd_nfrag, rmd->kqrmd_frag);
926 CERROR ("Incompatible RDMA descriptors\n");
935 krx->krx_rpc_reply.msg.status = 0;
936 krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
937 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
938 krx->krx_rpc_reply.msg.u.get.len = len;
940 krx->krx_rpc_reply.msg.u.get.cksum = (kiov != NULL) ?
941 kqswnal_csum_kiov(~0, offset, len, niov, kiov) :
942 kqswnal_csum_iov(~0, offset, len, niov, iov);
943 if (*kqswnal_tunables.kqn_inject_csum_error == 4) {
944 krx->krx_rpc_reply.msg.u.get.cksum++;
945 *kqswnal_tunables.kqn_inject_csum_error = 0;
948 eprc = ep_complete_rpc(krx->krx_rxd,
949 kqswnal_rdma_store_complete, ktx,
950 &krx->krx_rpc_reply.ep_statusblk,
951 ktx->ktx_frags, rmd->kqrmd_frag,
953 if (eprc != EP_SUCCESS) {
954 CERROR("can't complete RPC: %d\n", eprc);
955 /* don't re-attempt RPC completion */
956 krx->krx_rpc_reply_needed = 0;
962 eprc = ep_rpc_get (krx->krx_rxd,
963 kqswnal_rdma_fetch_complete, ktx,
964 rmd->kqrmd_frag, ktx->ktx_frags, ktx->ktx_nfrag);
965 if (eprc != EP_SUCCESS) {
966 CERROR("ep_rpc_get failed: %d\n", eprc);
967 /* Don't attempt RPC completion:
968 * EKC nuked it when the get failed */
969 krx->krx_rpc_reply_needed = 0;
977 kqswnal_rx_decref(krx); /* drop callback's ref */
978 kqswnal_put_idle_tx (ktx);
981 cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
986 kqswnal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
988 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
989 int type = lntmsg->msg_type;
990 lnet_process_id_t target = lntmsg->msg_target;
991 int target_is_router = lntmsg->msg_target_is_router;
992 int routing = lntmsg->msg_routing;
993 unsigned int payload_niov = lntmsg->msg_niov;
994 struct iovec *payload_iov = lntmsg->msg_iov;
995 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
996 unsigned int payload_offset = lntmsg->msg_offset;
997 unsigned int payload_nob = lntmsg->msg_len;
1002 /* NB 1. hdr is in network byte order */
1003 /* 2. 'private' depends on the message type */
1005 CDEBUG(D_NET, "sending %u bytes in %d frags to %s\n",
1006 payload_nob, payload_niov, libcfs_id2str(target));
1008 LASSERT (payload_nob == 0 || payload_niov > 0);
1009 LASSERT (payload_niov <= LNET_MAX_IOV);
1011 /* It must be OK to kmap() if required */
1012 LASSERT (payload_kiov == NULL || !cfs_in_interrupt ());
1013 /* payload is either all vaddrs or all pages */
1014 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1016 if (kqswnal_nid2elanid (target.nid) < 0) {
1017 CERROR("%s not in my cluster\n", libcfs_nid2str(target.nid));
1021 /* I may not block for a transmit descriptor if I might block the
1022 * router, receiver, or an interrupt handler. */
1023 ktx = kqswnal_get_idle_tx();
1025 CERROR ("Can't get txd for msg type %d for %s\n",
1026 type, libcfs_nid2str(target.nid));
1030 ktx->ktx_state = KTX_SENDING;
1031 ktx->ktx_nid = target.nid;
1032 ktx->ktx_args[0] = private;
1033 ktx->ktx_args[1] = lntmsg;
1034 ktx->ktx_args[2] = NULL; /* set when a GET commits to REPLY */
1036 /* The first frag will be the pre-mapped buffer. */
1037 ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1;
1039 if ((!target_is_router && /* target.nid is final dest */
1040 !routing && /* I'm the source */
1041 type == LNET_MSG_GET && /* optimize GET? */
1042 *kqswnal_tunables.kqn_optimized_gets != 0 &&
1043 lntmsg->msg_md->md_length >=
1044 *kqswnal_tunables.kqn_optimized_gets) ||
1045 ((type == LNET_MSG_PUT || /* optimize PUT? */
1046 type == LNET_MSG_REPLY) && /* optimize REPLY? */
1047 *kqswnal_tunables.kqn_optimized_puts != 0 &&
1048 payload_nob >= *kqswnal_tunables.kqn_optimized_puts)) {
1049 lnet_libmd_t *md = lntmsg->msg_md;
1050 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1052 kqswnal_remotemd_t *rmd;
1054 /* Optimised path: I send over the Elan vaddrs of the local
1055 * buffers, and my peer DMAs directly to/from them.
1057 * First I set up ktx as if it was going to send this
1058 * payload, (it needs to map it anyway). This fills
1059 * ktx_frags[1] and onward with the network addresses
1060 * of the buffer frags. */
1062 /* Send an RDMA message */
1063 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1064 msg->kqm_version = QSWLND_PROTO_VERSION;
1065 msg->kqm_type = QSWLND_MSG_RDMA;
1067 mhdr = &msg->kqm_u.rdma.kqrm_hdr;
1068 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1071 nob = (((char *)rmd) - ktx->ktx_buffer);
1073 if (type == LNET_MSG_GET) {
1074 if ((md->md_options & LNET_MD_KIOV) != 0)
1075 rc = kqswnal_map_tx_kiov (ktx, 0, md->md_length,
1076 md->md_niov, md->md_iov.kiov);
1078 rc = kqswnal_map_tx_iov (ktx, 0, md->md_length,
1079 md->md_niov, md->md_iov.iov);
1080 ktx->ktx_state = KTX_GETTING;
1082 if (payload_kiov != NULL)
1083 rc = kqswnal_map_tx_kiov(ktx, 0, payload_nob,
1084 payload_niov, payload_kiov);
1086 rc = kqswnal_map_tx_iov(ktx, 0, payload_nob,
1087 payload_niov, payload_iov);
1088 ktx->ktx_state = KTX_PUTTING;
1094 rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1;
1095 nob += offsetof(kqswnal_remotemd_t,
1096 kqrmd_frag[rmd->kqrmd_nfrag]);
1097 LASSERT (nob <= KQSW_TX_BUFFER_SIZE);
1099 memcpy(&rmd->kqrmd_frag[0], &ktx->ktx_frags[1],
1100 rmd->kqrmd_nfrag * sizeof(EP_NMD));
1102 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1104 msg->kqm_nob = nob + payload_nob;
1106 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1108 if (type == LNET_MSG_GET) {
1109 /* Allocate reply message now while I'm in thread context */
1110 ktx->ktx_args[2] = lnet_create_reply_msg (
1111 kqswnal_data.kqn_ni, lntmsg);
1112 if (ktx->ktx_args[2] == NULL)
1115 /* NB finalizing the REPLY message is my
1116 * responsibility now, whatever happens. */
1118 if (*kqswnal_tunables.kqn_inject_csum_error == 3) {
1120 *kqswnal_tunables.kqn_inject_csum_error = 0;
1123 } else if (payload_kiov != NULL) {
1124 /* must checksum payload after header so receiver can
1125 * compute partial header cksum before swab. Sadly
1126 * this causes 2 rounds of kmap */
1128 kqswnal_csum_kiov(msg->kqm_cksum, 0, payload_nob,
1129 payload_niov, payload_kiov);
1130 if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
1132 *kqswnal_tunables.kqn_inject_csum_error = 0;
1136 kqswnal_csum_iov(msg->kqm_cksum, 0, payload_nob,
1137 payload_niov, payload_iov);
1138 if (*kqswnal_tunables.kqn_inject_csum_error == 2) {
1140 *kqswnal_tunables.kqn_inject_csum_error = 0;
1145 } else if (payload_nob <= *kqswnal_tunables.kqn_tx_maxcontig) {
1148 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1150 /* single frag copied into the pre-mapped buffer */
1151 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1152 msg->kqm_version = QSWLND_PROTO_VERSION;
1153 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1155 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1156 payload = msg->kqm_u.immediate.kqim_payload;
1159 nob = (payload - ktx->ktx_buffer) + payload_nob;
1161 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1163 if (payload_kiov != NULL)
1164 lnet_copy_kiov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1165 payload_niov, payload_kiov,
1166 payload_offset, payload_nob);
1168 lnet_copy_iov2flat(KQSW_TX_BUFFER_SIZE, payload, 0,
1169 payload_niov, payload_iov,
1170 payload_offset, payload_nob);
1174 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1175 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1177 *kqswnal_tunables.kqn_inject_csum_error = 0;
1182 kqswnal_msg_t *msg = (kqswnal_msg_t *)ktx->ktx_buffer;
1184 /* multiple frags: first is hdr in pre-mapped buffer */
1185 msg->kqm_magic = LNET_PROTO_QSW_MAGIC;
1186 msg->kqm_version = QSWLND_PROTO_VERSION;
1187 msg->kqm_type = QSWLND_MSG_IMMEDIATE;
1189 mhdr = &msg->kqm_u.immediate.kqim_hdr;
1190 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1194 ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, 0, nob);
1196 if (payload_kiov != NULL)
1197 rc = kqswnal_map_tx_kiov (ktx, payload_offset, payload_nob,
1198 payload_niov, payload_kiov);
1200 rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob,
1201 payload_niov, payload_iov);
1206 msg->kqm_nob = nob + payload_nob;
1208 msg->kqm_cksum = kqswnal_csum(~0, (char *)msg, nob);
1210 msg->kqm_cksum = (payload_kiov != NULL) ?
1211 kqswnal_csum_kiov(msg->kqm_cksum,
1212 payload_offset, payload_nob,
1213 payload_niov, payload_kiov) :
1214 kqswnal_csum_iov(msg->kqm_cksum,
1215 payload_offset, payload_nob,
1216 payload_niov, payload_iov);
1218 if (*kqswnal_tunables.kqn_inject_csum_error == 1) {
1220 *kqswnal_tunables.kqn_inject_csum_error = 0;
1226 ktx->ktx_port = (nob <= KQSW_SMALLMSG) ?
1227 EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE;
1229 rc = kqswnal_launch (ktx);
1232 CDEBUG_LIMIT(rc == 0? D_NET :D_NETERROR, "%s %d bytes to %s%s: rc %d\n",
1233 routing ? (rc == 0 ? "Routed" : "Failed to route") :
1234 (rc == 0 ? "Sent" : "Failed to send"),
1235 nob, libcfs_nid2str(target.nid),
1236 target_is_router ? "(router)" : "", rc);
1239 lnet_msg_t *repmsg = (lnet_msg_t *)ktx->ktx_args[2];
1240 int state = ktx->ktx_state;
1242 kqswnal_put_idle_tx (ktx);
1244 if (state == KTX_GETTING && repmsg != NULL) {
1245 /* We committed to reply, but there was a problem
1246 * launching the GET. We can't avoid delivering a
1247 * REPLY event since we committed above, so we
1248 * pretend the GET succeeded but the REPLY
1251 lnet_finalize (kqswnal_data.kqn_ni, lntmsg, 0);
1252 lnet_finalize (kqswnal_data.kqn_ni, repmsg, -EIO);
1257 cfs_atomic_dec(&kqswnal_data.kqn_pending_txs);
1258 return (rc == 0 ? 0 : -EIO);
1262 kqswnal_requeue_rx (kqswnal_rx_t *krx)
1264 LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1265 LASSERT (!krx->krx_rpc_reply_needed);
1267 krx->krx_state = KRX_POSTED;
1269 if (kqswnal_data.kqn_shuttingdown) {
1270 /* free EKC rxd on shutdown */
1271 ep_complete_receive(krx->krx_rxd);
1273 /* repost receive */
1274 ep_requeue_receive(krx->krx_rxd,
1275 kqswnal_rxhandler, krx,
1276 &krx->krx_elanbuffer, 0);
1281 kqswnal_rpc_complete (EP_RXD *rxd)
1283 int status = ep_rxd_status(rxd);
1284 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg(rxd);
1286 CDEBUG((status == EP_SUCCESS) ? D_NET : D_ERROR,
1287 "rxd %p, krx %p, status %d\n", rxd, krx, status);
1289 LASSERT (krx->krx_rxd == rxd);
1290 LASSERT (krx->krx_rpc_reply_needed);
1292 krx->krx_rpc_reply_needed = 0;
1293 kqswnal_requeue_rx (krx);
1297 kqswnal_rx_done (kqswnal_rx_t *krx)
1301 LASSERT (cfs_atomic_read(&krx->krx_refcount) == 0);
1303 if (krx->krx_rpc_reply_needed) {
1304 /* We've not completed the peer's RPC yet... */
1305 krx->krx_rpc_reply.msg.magic = LNET_PROTO_QSW_MAGIC;
1306 krx->krx_rpc_reply.msg.version = QSWLND_PROTO_VERSION;
1308 LASSERT (!cfs_in_interrupt());
1310 rc = ep_complete_rpc(krx->krx_rxd,
1311 kqswnal_rpc_complete, krx,
1312 &krx->krx_rpc_reply.ep_statusblk,
1314 if (rc == EP_SUCCESS)
1317 CERROR("can't complete RPC: %d\n", rc);
1318 krx->krx_rpc_reply_needed = 0;
1321 kqswnal_requeue_rx(krx);
1325 kqswnal_parse (kqswnal_rx_t *krx)
1327 lnet_ni_t *ni = kqswnal_data.kqn_ni;
1328 kqswnal_msg_t *msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1329 lnet_nid_t fromnid = kqswnal_rx_nid(krx);
1336 LASSERT (cfs_atomic_read(&krx->krx_refcount) == 1);
1338 if (krx->krx_nob < offsetof(kqswnal_msg_t, kqm_u)) {
1339 CERROR("Short message %d received from %s\n",
1340 krx->krx_nob, libcfs_nid2str(fromnid));
1344 swab = msg->kqm_magic == __swab32(LNET_PROTO_QSW_MAGIC);
1346 if (swab || msg->kqm_magic == LNET_PROTO_QSW_MAGIC) {
1351 /* csum byte array before swab */
1352 csum1 = msg->kqm_cksum;
1354 csum0 = kqswnal_csum_kiov(~0, 0, krx->krx_nob,
1355 krx->krx_npages, krx->krx_kiov);
1356 msg->kqm_cksum = csum1;
1360 __swab16s(&msg->kqm_version);
1361 __swab16s(&msg->kqm_type);
1363 __swab32s(&msg->kqm_cksum);
1364 __swab32s(&msg->kqm_nob);
1368 if (msg->kqm_version != QSWLND_PROTO_VERSION) {
1369 /* Future protocol version compatibility support!
1370 * The next qswlnd-specific protocol rev will first
1371 * send an RPC to check version.
1372 * 1.4.6 and 1.4.7.early reply with a status
1373 * block containing its current version.
1374 * Later versions send a failure (-ve) status +
1377 if (!krx->krx_rpc_reply_needed) {
1378 CERROR("Unexpected version %d from %s\n",
1379 msg->kqm_version, libcfs_nid2str(fromnid));
1383 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1387 switch (msg->kqm_type) {
1389 CERROR("Bad request type %x from %s\n",
1390 msg->kqm_type, libcfs_nid2str(fromnid));
1393 case QSWLND_MSG_IMMEDIATE:
1394 if (krx->krx_rpc_reply_needed) {
1395 /* Should have been a simple message */
1396 CERROR("IMMEDIATE sent as RPC from %s\n",
1397 libcfs_nid2str(fromnid));
1401 nob = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1402 if (krx->krx_nob < nob) {
1403 CERROR("Short IMMEDIATE %d(%d) from %s\n",
1404 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1409 if (csum0 != msg->kqm_cksum) {
1410 CERROR("Bad IMMEDIATE checksum %08x(%08x) from %s\n",
1411 csum0, msg->kqm_cksum, libcfs_nid2str(fromnid));
1412 CERROR("nob %d (%d)\n", krx->krx_nob, msg->kqm_nob);
1416 rc = lnet_parse(ni, &msg->kqm_u.immediate.kqim_hdr,
1422 case QSWLND_MSG_RDMA:
1423 if (!krx->krx_rpc_reply_needed) {
1424 /* Should have been a simple message */
1425 CERROR("RDMA sent as simple message from %s\n",
1426 libcfs_nid2str(fromnid));
1430 nob = offsetof(kqswnal_msg_t,
1431 kqm_u.rdma.kqrm_rmd.kqrmd_frag[0]);
1432 if (krx->krx_nob < nob) {
1433 CERROR("Short RDMA message %d(%d) from %s\n",
1434 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1439 __swab32s(&msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag);
1441 n = msg->kqm_u.rdma.kqrm_rmd.kqrmd_nfrag;
1442 nob = offsetof(kqswnal_msg_t,
1443 kqm_u.rdma.kqrm_rmd.kqrmd_frag[n]);
1445 if (krx->krx_nob < nob) {
1446 CERROR("short RDMA message %d(%d) from %s\n",
1447 krx->krx_nob, nob, libcfs_nid2str(fromnid));
1452 for (i = 0; i < n; i++) {
1453 EP_NMD *nmd = &msg->kqm_u.rdma.kqrm_rmd.kqrmd_frag[i];
1455 __swab32s(&nmd->nmd_addr);
1456 __swab32s(&nmd->nmd_len);
1457 __swab32s(&nmd->nmd_attr);
1462 krx->krx_cksum = csum0; /* stash checksum so far */
1464 rc = lnet_parse(ni, &msg->kqm_u.rdma.kqrm_hdr,
1473 if (msg->kqm_magic == LNET_PROTO_MAGIC ||
1474 msg->kqm_magic == __swab32(LNET_PROTO_MAGIC)) {
1475 /* Future protocol version compatibility support!
1476 * When LNET unifies protocols over all LNDs, the first thing a
1477 * peer will send will be a version query RPC.
1478 * 1.4.6 and 1.4.7.early reply with a status block containing
1479 * LNET_PROTO_QSW_MAGIC..
1480 * Later versions send a failure (-ve) status +
1483 if (!krx->krx_rpc_reply_needed) {
1484 CERROR("Unexpected magic %08x from %s\n",
1485 msg->kqm_magic, libcfs_nid2str(fromnid));
1489 LASSERT (krx->krx_rpc_reply.msg.status == -EPROTO);
1493 CERROR("Unrecognised magic %08x from %s\n",
1494 msg->kqm_magic, libcfs_nid2str(fromnid));
1496 kqswnal_rx_decref(krx);
1499 /* Receive Interrupt Handler: posts to schedulers */
1501 kqswnal_rxhandler(EP_RXD *rxd)
1503 unsigned long flags;
1504 int nob = ep_rxd_len (rxd);
1505 int status = ep_rxd_status (rxd);
1506 kqswnal_rx_t *krx = (kqswnal_rx_t *)ep_rxd_arg (rxd);
1507 CDEBUG(D_NET, "kqswnal_rxhandler: rxd %p, krx %p, nob %d, status %d\n",
1508 rxd, krx, nob, status);
1510 LASSERT (krx != NULL);
1511 LASSERT (krx->krx_state == KRX_POSTED);
1513 krx->krx_state = KRX_PARSE;
1517 /* RPC reply iff rpc request received without error */
1518 krx->krx_rpc_reply_needed = ep_rxd_isrpc(rxd) &&
1519 (status == EP_SUCCESS ||
1520 status == EP_MSG_TOO_BIG);
1522 /* Default to failure if an RPC reply is requested but not handled */
1523 krx->krx_rpc_reply.msg.status = -EPROTO;
1524 cfs_atomic_set (&krx->krx_refcount, 1);
1526 if (status != EP_SUCCESS) {
1527 /* receives complete with failure when receiver is removed */
1528 if (status == EP_SHUTDOWN)
1529 LASSERT (kqswnal_data.kqn_shuttingdown);
1531 CERROR("receive status failed with status %d nob %d\n",
1532 ep_rxd_status(rxd), nob);
1533 kqswnal_rx_decref(krx);
1537 if (!cfs_in_interrupt()) {
1542 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1544 cfs_list_add_tail(&krx->krx_list, &kqswnal_data.kqn_readyrxds);
1545 cfs_waitq_signal(&kqswnal_data.kqn_sched_waitq);
1547 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags);
1551 kqswnal_recv (lnet_ni_t *ni,
1558 unsigned int offset,
1562 kqswnal_rx_t *krx = (kqswnal_rx_t *)private;
1566 kqswnal_remotemd_t *rmd;
1570 LASSERT (!cfs_in_interrupt ()); /* OK to map */
1571 /* Either all pages or all vaddrs */
1572 LASSERT (!(kiov != NULL && iov != NULL));
1574 fromnid = LNET_MKNID(LNET_NIDNET(ni->ni_nid), ep_rxd_node(krx->krx_rxd));
1575 msg = (kqswnal_msg_t *)page_address(krx->krx_kiov[0].kiov_page);
1577 if (krx->krx_rpc_reply_needed) {
1578 /* optimized (rdma) request sent as RPC */
1580 LASSERT (msg->kqm_type == QSWLND_MSG_RDMA);
1581 hdr = &msg->kqm_u.rdma.kqrm_hdr;
1582 rmd = &msg->kqm_u.rdma.kqrm_rmd;
1584 /* NB header is still in wire byte order */
1586 switch (le32_to_cpu(hdr->type)) {
1588 case LNET_MSG_REPLY:
1589 /* This is an optimized PUT/REPLY */
1590 rc = kqswnal_rdma(krx, lntmsg,
1591 KTX_RDMA_FETCH, rmd,
1592 niov, iov, kiov, offset, mlen);
1597 if (krx->krx_cksum != msg->kqm_cksum) {
1598 CERROR("Bad GET checksum %08x(%08x) from %s\n",
1599 krx->krx_cksum, msg->kqm_cksum,
1600 libcfs_nid2str(fromnid));
1605 if (lntmsg == NULL) {
1606 /* No buffer match: my decref will
1607 * complete the RPC with failure */
1610 /* Matched something! */
1611 rc = kqswnal_rdma(krx, lntmsg,
1612 KTX_RDMA_STORE, rmd,
1622 CERROR("Bad RPC type %d\n",
1623 le32_to_cpu(hdr->type));
1628 kqswnal_rx_decref(krx);
1632 LASSERT (msg->kqm_type == QSWLND_MSG_IMMEDIATE);
1633 msg_offset = offsetof(kqswnal_msg_t, kqm_u.immediate.kqim_payload);
1635 if (krx->krx_nob < msg_offset + rlen) {
1636 CERROR("Bad message size from %s: have %d, need %d + %d\n",
1637 libcfs_nid2str(fromnid), krx->krx_nob,
1639 kqswnal_rx_decref(krx);
1644 lnet_copy_kiov2kiov(niov, kiov, offset,
1645 krx->krx_npages, krx->krx_kiov,
1648 lnet_copy_kiov2iov(niov, iov, offset,
1649 krx->krx_npages, krx->krx_kiov,
1652 lnet_finalize(ni, lntmsg, 0);
1653 kqswnal_rx_decref(krx);
1658 kqswnal_thread_start (int (*fn)(void *arg), void *arg)
1660 long pid = cfs_create_thread (fn, arg, 0);
1665 cfs_atomic_inc (&kqswnal_data.kqn_nthreads);
1670 kqswnal_thread_fini (void)
1672 cfs_atomic_dec (&kqswnal_data.kqn_nthreads);
1676 kqswnal_scheduler (void *arg)
1680 unsigned long flags;
1685 cfs_daemonize ("kqswnal_sched");
1686 cfs_block_allsigs ();
1688 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags);
1694 if (!cfs_list_empty (&kqswnal_data.kqn_readyrxds))
1696 krx = cfs_list_entry(kqswnal_data.kqn_readyrxds.next,
1697 kqswnal_rx_t, krx_list);
1698 cfs_list_del (&krx->krx_list);
1699 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1702 LASSERT (krx->krx_state == KRX_PARSE);
1703 kqswnal_parse (krx);
1706 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1710 if (!cfs_list_empty (&kqswnal_data.kqn_donetxds))
1712 ktx = cfs_list_entry(kqswnal_data.kqn_donetxds.next,
1713 kqswnal_tx_t, ktx_schedlist);
1714 cfs_list_del_init (&ktx->ktx_schedlist);
1715 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1718 kqswnal_tx_done_in_thread_context(ktx);
1721 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1725 if (!cfs_list_empty (&kqswnal_data.kqn_delayedtxds))
1727 ktx = cfs_list_entry(kqswnal_data.kqn_delayedtxds.next,
1728 kqswnal_tx_t, ktx_schedlist);
1729 cfs_list_del_init (&ktx->ktx_schedlist);
1730 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1733 rc = kqswnal_launch (ktx);
1735 CERROR("Failed delayed transmit to %s: %d\n",
1736 libcfs_nid2str(ktx->ktx_nid), rc);
1737 kqswnal_tx_done (ktx, rc);
1739 cfs_atomic_dec (&kqswnal_data.kqn_pending_txs);
1742 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1746 /* nothing to do or hogging CPU */
1747 if (!did_something || counter++ == KQSW_RESCHED) {
1748 spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock,
1753 if (!did_something) {
1754 if (kqswnal_data.kqn_shuttingdown == 2) {
1755 /* We only exit in stage 2 of shutdown
1756 * when there's nothing left to do */
1759 cfs_wait_event_interruptible_exclusive (
1760 kqswnal_data.kqn_sched_waitq,
1761 kqswnal_data.kqn_shuttingdown == 2 ||
1762 !cfs_list_empty(&kqswnal_data. \
1764 !cfs_list_empty(&kqswnal_data. \
1766 !cfs_list_empty(&kqswnal_data. \
1767 kqn_delayedtxds, rc));
1769 } else if (need_resched())
1772 spin_lock_irqsave(&kqswnal_data.kqn_sched_lock,
1777 kqswnal_thread_fini ();