From 2ffb3e2e4a37c1edf408cf38959eb7083cbf2908 Mon Sep 17 00:00:00 2001 From: green Date: Mon, 26 Apr 2004 17:04:58 +0000 Subject: [PATCH] Update to HEAD. --- lnet/klnds/qswlnd/qswlnd_cb.c | 256 +++++++++++++++++--------------- lustre/include/linux/lustre_compat25.h | 70 ++++++++- lustre/llite/llite_internal.h | 38 +++-- lustre/portals/knals/qswnal/qswnal_cb.c | 256 +++++++++++++++++--------------- 4 files changed, 356 insertions(+), 264 deletions(-) diff --git a/lnet/klnds/qswlnd/qswlnd_cb.c b/lnet/klnds/qswlnd/qswlnd_cb.c index 478c25f..f92f974 100644 --- a/lnet/klnds/qswlnd/qswlnd_cb.c +++ b/lnet/klnds/qswlnd/qswlnd_cb.c @@ -85,6 +85,9 @@ kqswnal_printf (nal_cb_t * nal, const char *fmt, ...) CDEBUG (D_NET, "%s", msg); } +#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64)) +# error "Can't save/restore irq contexts in different procedures" +#endif static void kqswnal_cli(nal_cb_t *nal, unsigned long *flags) @@ -103,6 +106,17 @@ kqswnal_sti(nal_cb_t *nal, unsigned long *flags) spin_unlock_irqrestore(&data->kqn_statelock, *flags); } +static void +kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev) +{ + /* holding kqn_statelock */ + + if (eq->event_callback != NULL) + eq->event_callback(ev); + + if (waitqueue_active(&kqswnal_data.kqn_yield_waitq)) + wake_up_all(&kqswnal_data.kqn_yield_waitq); +} static int kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist) @@ -412,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx) list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds); /* anything blocking for a tx descriptor? */ - if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ + if (!kqswnal_data.kqn_shuttingdown && + !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ { CDEBUG(D_NET,"wakeup fwd\n"); @@ -446,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) for (;;) { spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); + if (kqswnal_data.kqn_shuttingdown) + break; + /* "normal" descriptor is free */ if (!list_empty (&kqswnal_data.kqn_idletxds)) { ktx = list_entry (kqswnal_data.kqn_idletxds.next, @@ -453,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) break; } - /* "normal" descriptor pool is empty */ - - if (fwd != NULL) { /* forwarded packet => queue for idle txd */ - CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); - list_add_tail (&fwd->kprfd_list, - &kqswnal_data.kqn_idletxd_fwdq); + if (fwd != NULL) /* forwarded packet? */ break; - } /* doing a local transmit */ if (!may_block) { @@ -480,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) CDEBUG (D_NET, "blocking for tx desc\n"); wait_event (kqswnal_data.kqn_idletxd_waitq, - !list_empty (&kqswnal_data.kqn_idletxds)); + !list_empty (&kqswnal_data.kqn_idletxds) || + kqswnal_data.kqn_shuttingdown); } if (ktx != NULL) { list_del (&ktx->ktx_list); list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds); ktx->ktx_launcher = current->pid; + atomic_inc(&kqswnal_data.kqn_pending_txs); + } else if (fwd != NULL) { + /* queue forwarded packet until idle txd available */ + CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); + list_add_tail (&fwd->kprfd_list, + &kqswnal_data.kqn_idletxd_fwdq); } spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); @@ -513,15 +532,15 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) lib_finalize (&kqswnal_lib, ktx->ktx_args[0], (lib_msg_t *)ktx->ktx_args[1], (error == 0) ? PTL_OK : - (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL); break; case KTX_GETTING: /* Peer has DMA-ed direct? */ msg = (lib_msg_t *)ktx->ktx_args[1]; if (error == 0) { - repmsg = lib_fake_reply_msg (&kqswnal_lib, - ktx->ktx_nid, msg->md); + repmsg = lib_create_reply_msg (&kqswnal_lib, + ktx->ktx_nid, msg); if (repmsg == NULL) error = -ENOMEM; } @@ -532,7 +551,7 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK); } else { lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg, - (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL); } break; @@ -587,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx) ktx->ktx_launchtime = jiffies; + if (kqswnal_data.kqn_shuttingdown) + return (-ESHUTDOWN); + LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_state == KTX_GETTING) { /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The @@ -621,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx) return (0); case EP_ENOMEM: /* can't allocate ep txd => queue for later */ - LASSERT (in_interrupt()); - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds); @@ -775,7 +795,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; - char *buffer = (char *)page_address(krx->krx_pages[0]); + char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); int rc; #if MULTIRAIL_EKC @@ -907,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t *nal, LASSERT (payload_kiov == NULL || !in_interrupt ()); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - + if (payload_nob > KQSW_MAXPAYLOAD) { CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", payload_nob, KQSW_MAXPAYLOAD); @@ -937,7 +957,7 @@ kqswnal_sendmsg (nal_cb_t *nal, in_interrupt())); if (ktx == NULL) { kqswnal_cerror_hdr (hdr); - return (PTL_NOSPACE); + return (PTL_NO_SPACE); } ktx->ktx_nid = targetnid; @@ -953,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t *nal, "nid "LPX64" via "LPX64" elanID %d\n", nid, targetnid, ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)); - return (PTL_FAIL); + rc = -EINVAL; + goto out; } /* peer expects RPC completion with GET data */ rc = kqswnal_dma_reply (ktx, payload_niov, payload_iov, payload_kiov, payload_offset, payload_nob); - if (rc == 0) - return (PTL_OK); - - CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); + if (rc != 0) + CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); + goto out; } memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ @@ -1008,8 +1026,8 @@ kqswnal_sendmsg (nal_cb_t *nal, } memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif - - if (kqswnal_data.kqn_optimized_gets && + + if (kqswnal_tunables.kqn_optimized_gets && type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ lib_md_t *md = libmsg->md; @@ -1038,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); - - if (rc < 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; @@ -1105,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; } ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); - if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc); + + out: + CDEBUG(rc == 0 ? D_NET : D_ERROR, + "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", + rc == 0 ? "Sent" : "Failed to send", + payload_nob, nid, targetnid, rc); + + if (rc != 0) kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } - CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", - payload_nob, nid, targetnid); - return (PTL_OK); + atomic_dec(&kqswnal_data.kqn_pending_txs); + return (rc == 0 ? PTL_OK : PTL_FAIL); } static ptl_err_t @@ -1167,7 +1183,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { int rc; kqswnal_tx_t *ktx; - struct iovec *iov = fwd->kprfd_iov; + ptl_kiov_t *kiov = fwd->kprfd_kiov; int niov = fwd->kprfd_niov; int nob = fwd->kprfd_nob; ptl_nid_t nid = fwd->kprfd_gateway_nid; @@ -1177,11 +1193,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LBUG (); #endif /* The router wants this NAL to forward a packet */ - CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n", + CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n", fwd, nid, niov, nob); - LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1192,60 +1206,60 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) if (kqswnal_nid2elanid (nid) < 0) { CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid); rc = -EHOSTUNREACH; - goto failed; + goto out; } - if (nob > KQSW_NRXMSGBYTES_LARGE) { - CERROR ("Can't forward [%p] to "LPX64 - ": size %d bigger than max packet size %ld\n", - fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE); - rc = -EMSGSIZE; - goto failed; - } + /* copy hdr into pre-mapped buffer */ + memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t)); + ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; - ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; ktx->ktx_nid = nid; ktx->ktx_state = KTX_FORWARDING; ktx->ktx_args[0] = fwd; + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && - nob <= KQSW_TX_BUFFER_SIZE) + if (nob <= KQSW_TX_MAXCONTIG) { - /* send from ktx's pre-mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob); + /* send payload from ktx's pre-mapped contiguous buffer */ #if MULTIRAIL_EKC ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, - 0, nob); + 0, KQSW_HDR_SIZE + nob); #else ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags[0].Len = nob; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob; #endif - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; + if (nob > 0) + lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE, + niov, kiov, 0, nob); } else { - /* zero copy */ - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; - rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov); + /* zero copy payload */ +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) - goto failed; - - ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; + goto out; } rc = kqswnal_launch (ktx); - if (rc == 0) - return; + out: + if (rc != 0) { + CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); - failed: - LASSERT (rc != 0); - CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); + kqswnal_put_idle_tx (ktx); + /* complete now (with failure) */ + kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + } - kqswnal_put_idle_tx (ktx); - /* complete now (with failure) */ - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + atomic_dec(&kqswnal_data.kqn_pending_txs); } void @@ -1257,7 +1271,7 @@ kqswnal_fwd_callback (void *arg, int error) if (error != 0) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n", NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error); @@ -1371,8 +1385,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx) void kqswnal_rx (kqswnal_rx_t *krx) { - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page); ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid); + int payload_nob; int nob; int niov; @@ -1398,16 +1413,26 @@ kqswnal_rx (kqswnal_rx_t *krx) return; } - /* NB forwarding may destroy iov; rebuild every time */ - for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++) - { - LASSERT (niov < krx->krx_npages); - krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]); - krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob); + nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE; + niov = 0; + if (nob > 0) { + krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE; + krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob); + niov = 1; + nob -= PAGE_SIZE - KQSW_HDR_SIZE; + + while (nob > 0) { + LASSERT (niov < krx->krx_npages); + + krx->krx_kiov[niov].kiov_offset = 0; + krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob); + niov++; + nob -= PAGE_SIZE; + } } - kpr_fwd_init (&krx->krx_fwd, dest_nid, - krx->krx_nob, niov, krx->krx_iov, + kpr_fwd_init (&krx->krx_fwd, dest_nid, + hdr, payload_nob, niov, krx->krx_kiov, kqswnal_fwd_callback, krx); kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd); @@ -1471,7 +1496,7 @@ kqswnal_rxhandler(EP_RXD *rxd) void kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64 ", dpid %d, spid %d, type %d\n", @@ -1526,6 +1551,7 @@ kqswnal_recvmsg (nal_cb_t *nal, size_t rlen) { kqswnal_rx_t *krx = (kqswnal_rx_t *)private; + char *buffer = page_address(krx->krx_kiov[0].kiov_page); int page; char *page_ptr; int page_nob; @@ -1535,8 +1561,7 @@ kqswnal_recvmsg (nal_cb_t *nal, #if KQSW_CHECKSUM kqsw_csum_t senders_csum; kqsw_csum_t payload_csum = 0; - kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]), - sizeof(ptl_hdr_t)); + kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t)); size_t csum_len = mlen; int csum_frags = 0; int csum_nob = 0; @@ -1545,8 +1570,7 @@ kqswnal_recvmsg (nal_cb_t *nal, atomic_inc (&csum_counter); - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); if (senders_csum != hdr_csum) kqswnal_csum_error (krx, 1); #endif @@ -1567,8 +1591,7 @@ kqswnal_recvmsg (nal_cb_t *nal, if (mlen != 0) { page = 0; - page_ptr = ((char *) page_address(krx->krx_pages[0])) + - KQSW_HDR_SIZE; + page_ptr = buffer + KQSW_HDR_SIZE; page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); @@ -1621,7 +1644,7 @@ kqswnal_recvmsg (nal_cb_t *nal, { page++; LASSERT (page < krx->krx_npages); - page_ptr = page_address(krx->krx_pages[page]); + page_ptr = page_address(krx->krx_kiov[page].kiov_page); page_nob = PAGE_SIZE; } @@ -1649,8 +1672,8 @@ kqswnal_recvmsg (nal_cb_t *nal, } #if KQSW_CHECKSUM - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), + sizeof(kqsw_csum_t)); if (csum_len != rlen) CERROR("Unable to checksum data in user's buffer\n"); @@ -1706,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg) return ((int)pid); atomic_inc (&kqswnal_data.kqn_nthreads); - atomic_inc (&kqswnal_data.kqn_nthreads_running); return (0); } @@ -1725,7 +1747,6 @@ kqswnal_scheduler (void *arg) long flags; int rc; int counter = 0; - int shuttingdown = 0; int did_something; kportal_daemonize ("kqswnal_sched"); @@ -1735,18 +1756,6 @@ kqswnal_scheduler (void *arg) for (;;) { - if (kqswnal_data.kqn_shuttingdown != shuttingdown) { - - if (kqswnal_data.kqn_shuttingdown == 2) - break; - - /* During stage 1 of shutdown we are still responsive - * to receives */ - - atomic_dec (&kqswnal_data.kqn_nthreads_running); - shuttingdown = kqswnal_data.kqn_shuttingdown; - } - did_something = 0; if (!list_empty (&kqswnal_data.kqn_readyrxds)) @@ -1763,8 +1772,7 @@ kqswnal_scheduler (void *arg) spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown && - !list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!list_empty (&kqswnal_data.kqn_delayedtxds)) { ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, kqswnal_tx_t, ktx_list); @@ -1773,31 +1781,31 @@ kqswnal_scheduler (void *arg) flags); rc = kqswnal_launch (ktx); - if (rc != 0) /* failed: ktx_nid down? */ - { + if (rc != 0) { CERROR("Failed delayed transmit to "LPX64 ": %d\n", ktx->ktx_nid, rc); kqswnal_tx_done (ktx, rc); } + atomic_dec (&kqswnal_data.kqn_pending_txs); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown & - !list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!list_empty (&kqswnal_data.kqn_delayedfwds)) { fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */ kqswnal_fwd_packet (NULL, fwd); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - /* nothing to do or hogging CPU */ + /* nothing to do or hogging CPU */ if (!did_something || counter++ == KQSW_RESCHED) { spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags); @@ -1805,8 +1813,13 @@ kqswnal_scheduler (void *arg) counter = 0; if (!did_something) { + if (kqswnal_data.kqn_shuttingdown == 2) { + /* We only exit in stage 2 of shutdown when + * there's nothing left to do */ + break; + } rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown != shuttingdown || + kqswnal_data.kqn_shuttingdown == 2 || !list_empty(&kqswnal_data.kqn_readyrxds) || !list_empty(&kqswnal_data.kqn_delayedtxds) || !list_empty(&kqswnal_data.kqn_delayedfwds)); @@ -1818,8 +1831,6 @@ kqswnal_scheduler (void *arg) } } - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); - kqswnal_thread_fini (); return (0); } @@ -1838,5 +1849,6 @@ nal_cb_t kqswnal_lib = cb_printf: kqswnal_printf, cb_cli: kqswnal_cli, cb_sti: kqswnal_sti, + cb_callback: kqswnal_callback, cb_dist: kqswnal_dist }; diff --git a/lustre/include/linux/lustre_compat25.h b/lustre/include/linux/lustre_compat25.h index df59db4..120e996 100644 --- a/lustre/include/linux/lustre_compat25.h +++ b/lustre/include/linux/lustre_compat25.h @@ -52,7 +52,7 @@ #define LTIME_S(time) (time.tv_sec) #define ll_path_lookup path_lookup -#define ll_permission permission +#define ll_permission(inode,mask,nd) permission(inode,mask,nd) #define ll_pgcache_lock(mapping) spin_lock(&mapping->page_lock) #define ll_pgcache_unlock(mapping) spin_unlock(&mapping->page_lock) @@ -85,6 +85,21 @@ static inline void lustre_daemonize_helper(void) current->tty = NULL; } +static inline int cleanup_group_info(void) +{ + struct group_info *ginfo; + + ginfo = groups_alloc(2); + if (!ginfo) + return -ENOMEM; + + ginfo->ngroups = 0; + set_current_groups(ginfo); + put_group_info(ginfo); + + return 0; +} + #define smp_num_cpus NR_CPUS #ifndef conditional_schedule @@ -96,7 +111,7 @@ static inline void lustre_daemonize_helper(void) #else /* 2.4.. */ #define ll_vfs_create(a,b,c,d) vfs_create(a,b,c) -#define ll_permission(a,b,c) permission(a,b) +#define ll_permission(inode,mask,nd) permission(inode,mask) #define ILOOKUP(sb, ino, test, data) ilookup4(sb, ino, test, data); #define DCACHE_DISCONNECTED DCACHE_NFSD_DISCONNECTED #define ll_dev_t int @@ -129,15 +144,15 @@ static inline void clear_page_dirty(struct page *page) #define cpu_online(cpu) (cpu_online_map & (1<tty = NULL; } +static inline int cleanup_group_info(void) +{ + /* Get rid of unneeded supplementary groups */ + current->ngroups = 0; + memset(current->groups, 0, sizeof(current->groups)); + return 0; +} + #ifndef conditional_schedule #define conditional_schedule() if (unlikely(need_resched())) schedule() #endif @@ -171,5 +194,42 @@ static inline void lustre_daemonize_helper(void) #endif /* end of 2.4 compat macros */ +#ifdef HAVE_PAGE_LIST +static inline int mapping_has_pages(struct address_space *mapping) +{ + int rc = 1; + + ll_pgcache_lock(mapping); + if (list_empty(&mapping->dirty_pages) && + list_empty(&mapping->clean_pages) && + list_empty(&mapping->locked_pages)) { + rc = 0; + } + ll_pgcache_unlock(mapping); + + return rc; +} + +static inline int clear_page_dirty_for_io(struct page *page) +{ + struct address_space *mapping = page->mapping; + + if (page->mapping && PageDirty(page)) { + ClearPageDirty(page); + ll_pgcache_lock(mapping); + list_del(&page->list); + list_add(&page->list, &mapping->locked_pages); + ll_pgcache_unlock(mapping); + return 1; + } + return 0; +} +#else +static inline int mapping_has_pages(struct address_space *mapping) +{ + return mapping->nrpages > 0; +} +#endif + #endif /* __KERNEL__ */ #endif /* _COMPAT25_H */ diff --git a/lustre/llite/llite_internal.h b/lustre/llite/llite_internal.h index 4316c03..9100756 100644 --- a/lustre/llite/llite_internal.h +++ b/lustre/llite/llite_internal.h @@ -63,13 +63,15 @@ struct ll_readahead_state { }; extern kmem_cache_t *ll_file_data_slab; +struct lustre_handle; struct ll_file_data { struct obd_client_handle fd_mds_och; struct ll_readahead_state fd_ras; __u32 fd_flags; + struct lustre_handle fd_cwlockh; + unsigned long fd_gid; }; -struct lustre_handle; struct lov_stripe_md; extern spinlock_t inode_lock; @@ -124,14 +126,14 @@ struct ll_async_page { struct page *llap_page; struct list_head llap_pending_write; /* only trust these if the page lock is providing exclusion */ - int llap_write_queued:1, + int llap_write_queued:1, llap_defer_uptodate:1; struct list_head llap_proc_item; }; -#define LL_CDEBUG_PAGE(page, STR) \ - CDEBUG(D_PAGE, "page %p map %p ind %lu priv %0lx: " STR, \ - page, page->mapping, page->index, page->private) +#define LL_CDEBUG_PAGE(mask, page, fmt, arg...) \ + CDEBUG(mask, "page %p map %p ind %lu priv %0lx: " fmt, \ + page, page->mapping, page->index, page->private, ## arg) /* llite/lproc_llite.c */ int lprocfs_register_mountpoint(struct proc_dir_entry *parent, @@ -159,13 +161,7 @@ void ll_prepare_mdc_op_data(struct mdc_op_data *, int ll_prepare_write(struct file *, struct page *, unsigned from, unsigned to); int ll_commit_write(struct file *, struct page *, unsigned from, unsigned to); void ll_inode_fill_obdo(struct inode *inode, int cmd, struct obdo *oa); -#if (LINUX_VERSION_CODE < KERNEL_VERSION(2,5,0)) -#define ll_ap_completion ll_ap_completion_24 -void ll_ap_completion_24(void *data, int cmd, int rc); -#else -#define ll_ap_completion ll_ap_completion_26 -void ll_ap_completion_26(void *data, int cmd, int rc); -#endif +void ll_ap_completion(void *data, int cmd, struct obdo *oa, int rc); void ll_removepage(struct page *page); int ll_sync_page(struct page *page); int ll_readpage(struct file *file, struct page *page); @@ -178,7 +174,6 @@ void ll_truncate(struct inode *inode); /* llite/file.c */ extern struct file_operations ll_file_operations; extern struct inode_operations ll_file_inode_operations; -extern struct inode_operations ll_special_inode_operations; extern int ll_inode_revalidate_it(struct dentry *, struct lookup_intent *); int ll_extent_lock(struct ll_file_data *, struct inode *, struct lov_stripe_md *, int mode, ldlm_policy_data_t *, @@ -189,6 +184,9 @@ int ll_file_open(struct inode *inode, struct file *file); int ll_file_release(struct inode *inode, struct file *file); int ll_lsm_getattr(struct obd_export *, struct lov_stripe_md *, struct obdo *); int ll_glimpse_size(struct inode *inode, struct ost_lvb *lvb); +int ll_local_open(struct file *file, struct lookup_intent *it); +int ll_mdc_close(struct obd_export *mdc_exp, struct inode *inode, + struct file *file); #if (LINUX_VERSION_CODE > KERNEL_VERSION(2,5,0)) int ll_getattr(struct vfsmount *mnt, struct dentry *de, struct lookup_intent *it, struct kstat *stat); @@ -214,7 +212,6 @@ int ll_gns_start_thread(void); void ll_gns_stop_thread(void); /* llite/llite_lib.c */ -extern struct super_operations ll_super_operations; extern struct super_operations lustre_super_operations; char *ll_read_opt(const char *opt, char *data); @@ -224,7 +221,6 @@ void ll_lli_init(struct ll_inode_info *lli); int ll_fill_super(struct super_block *sb, void *data, int silent); int lustre_fill_super(struct super_block *sb, void *data, int silent); void lustre_put_super(struct super_block *sb); -void ll_put_super(struct super_block *sb); struct inode *ll_inode_from_lock(struct ldlm_lock *lock); void ll_clear_inode(struct inode *inode); int ll_attr2inode(struct inode *inode, struct iattr *attr, int trunc); @@ -247,6 +243,16 @@ __u32 get_uuid2int(const char *name, int len); struct dentry *ll_fh_to_dentry(struct super_block *sb, __u32 *data, int len, int fhtype, int parent); int ll_dentry_to_fh(struct dentry *, __u32 *datap, int *lenp, int need_parent); + +/* llite/special.c */ +extern struct inode_operations ll_special_inode_operations; +extern struct file_operations ll_special_chr_inode_fops; +extern struct file_operations ll_special_chr_file_fops; +extern struct file_operations ll_special_blk_inode_fops; +extern struct file_operations ll_special_fifo_inode_fops; +extern struct file_operations ll_special_fifo_file_fops; +extern struct file_operations ll_special_sock_inode_fops; + /* llite/symlink.c */ extern struct inode_operations ll_fast_symlink_inode_operations; @@ -273,6 +279,8 @@ int ll_close_thread_start(struct ll_close_queue **lcq_ret); #define LL_SBI_NOLCK 0x1 #define LL_SBI_READAHEAD 0x2 +#define LL_MAX_BLKSIZE (4UL * 1024 * 1024) + #if (LINUX_VERSION_CODE >= KERNEL_VERSION(2,5,0)) #define ll_s2sbi(sb) ((struct ll_sb_info *)((sb)->s_fs_info)) void __d_rehash(struct dentry * entry, int lock); diff --git a/lustre/portals/knals/qswnal/qswnal_cb.c b/lustre/portals/knals/qswnal/qswnal_cb.c index 478c25f..f92f974 100644 --- a/lustre/portals/knals/qswnal/qswnal_cb.c +++ b/lustre/portals/knals/qswnal/qswnal_cb.c @@ -85,6 +85,9 @@ kqswnal_printf (nal_cb_t * nal, const char *fmt, ...) CDEBUG (D_NET, "%s", msg); } +#if (defined(CONFIG_SPARC32) || defined(CONFIG_SPARC64)) +# error "Can't save/restore irq contexts in different procedures" +#endif static void kqswnal_cli(nal_cb_t *nal, unsigned long *flags) @@ -103,6 +106,17 @@ kqswnal_sti(nal_cb_t *nal, unsigned long *flags) spin_unlock_irqrestore(&data->kqn_statelock, *flags); } +static void +kqswnal_callback(nal_cb_t *nal, void *private, lib_eq_t *eq, ptl_event_t *ev) +{ + /* holding kqn_statelock */ + + if (eq->event_callback != NULL) + eq->event_callback(ev); + + if (waitqueue_active(&kqswnal_data.kqn_yield_waitq)) + wake_up_all(&kqswnal_data.kqn_yield_waitq); +} static int kqswnal_dist(nal_cb_t *nal, ptl_nid_t nid, unsigned long *dist) @@ -412,7 +426,8 @@ kqswnal_put_idle_tx (kqswnal_tx_t *ktx) list_add (&ktx->ktx_list, &kqswnal_data.kqn_idletxds); /* anything blocking for a tx descriptor? */ - if (!list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ + if (!kqswnal_data.kqn_shuttingdown && + !list_empty(&kqswnal_data.kqn_idletxd_fwdq)) /* forwarded packet? */ { CDEBUG(D_NET,"wakeup fwd\n"); @@ -446,6 +461,9 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) for (;;) { spin_lock_irqsave (&kqswnal_data.kqn_idletxd_lock, flags); + if (kqswnal_data.kqn_shuttingdown) + break; + /* "normal" descriptor is free */ if (!list_empty (&kqswnal_data.kqn_idletxds)) { ktx = list_entry (kqswnal_data.kqn_idletxds.next, @@ -453,14 +471,8 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) break; } - /* "normal" descriptor pool is empty */ - - if (fwd != NULL) { /* forwarded packet => queue for idle txd */ - CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); - list_add_tail (&fwd->kprfd_list, - &kqswnal_data.kqn_idletxd_fwdq); + if (fwd != NULL) /* forwarded packet? */ break; - } /* doing a local transmit */ if (!may_block) { @@ -480,13 +492,20 @@ kqswnal_get_idle_tx (kpr_fwd_desc_t *fwd, int may_block) CDEBUG (D_NET, "blocking for tx desc\n"); wait_event (kqswnal_data.kqn_idletxd_waitq, - !list_empty (&kqswnal_data.kqn_idletxds)); + !list_empty (&kqswnal_data.kqn_idletxds) || + kqswnal_data.kqn_shuttingdown); } if (ktx != NULL) { list_del (&ktx->ktx_list); list_add (&ktx->ktx_list, &kqswnal_data.kqn_activetxds); ktx->ktx_launcher = current->pid; + atomic_inc(&kqswnal_data.kqn_pending_txs); + } else if (fwd != NULL) { + /* queue forwarded packet until idle txd available */ + CDEBUG (D_NET, "blocked fwd [%p]\n", fwd); + list_add_tail (&fwd->kprfd_list, + &kqswnal_data.kqn_idletxd_fwdq); } spin_unlock_irqrestore (&kqswnal_data.kqn_idletxd_lock, flags); @@ -513,15 +532,15 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) lib_finalize (&kqswnal_lib, ktx->ktx_args[0], (lib_msg_t *)ktx->ktx_args[1], (error == 0) ? PTL_OK : - (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL); break; case KTX_GETTING: /* Peer has DMA-ed direct? */ msg = (lib_msg_t *)ktx->ktx_args[1]; if (error == 0) { - repmsg = lib_fake_reply_msg (&kqswnal_lib, - ktx->ktx_nid, msg->md); + repmsg = lib_create_reply_msg (&kqswnal_lib, + ktx->ktx_nid, msg); if (repmsg == NULL) error = -ENOMEM; } @@ -532,7 +551,7 @@ kqswnal_tx_done (kqswnal_tx_t *ktx, int error) lib_finalize (&kqswnal_lib, NULL, repmsg, PTL_OK); } else { lib_finalize (&kqswnal_lib, ktx->ktx_args[0], msg, - (error == -ENOMEM) ? PTL_NOSPACE : PTL_FAIL); + (error == -ENOMEM) ? PTL_NO_SPACE : PTL_FAIL); } break; @@ -587,6 +606,9 @@ kqswnal_launch (kqswnal_tx_t *ktx) ktx->ktx_launchtime = jiffies; + if (kqswnal_data.kqn_shuttingdown) + return (-ESHUTDOWN); + LASSERT (dest >= 0); /* must be a peer */ if (ktx->ktx_state == KTX_GETTING) { /* NB ktx_frag[0] is the GET hdr + kqswnal_remotemd_t. The @@ -621,8 +643,6 @@ kqswnal_launch (kqswnal_tx_t *ktx) return (0); case EP_ENOMEM: /* can't allocate ep txd => queue for later */ - LASSERT (in_interrupt()); - spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); list_add_tail (&ktx->ktx_delayed_list, &kqswnal_data.kqn_delayedtxds); @@ -775,7 +795,7 @@ kqswnal_dma_reply (kqswnal_tx_t *ktx, int nfrag, int offset, int nob) { kqswnal_rx_t *krx = (kqswnal_rx_t *)ktx->ktx_args[0]; - char *buffer = (char *)page_address(krx->krx_pages[0]); + char *buffer = (char *)page_address(krx->krx_kiov[0].kiov_page); kqswnal_remotemd_t *rmd = (kqswnal_remotemd_t *)(buffer + KQSW_HDR_SIZE); int rc; #if MULTIRAIL_EKC @@ -907,7 +927,7 @@ kqswnal_sendmsg (nal_cb_t *nal, LASSERT (payload_kiov == NULL || !in_interrupt ()); /* payload is either all vaddrs or all pages */ LASSERT (!(payload_kiov != NULL && payload_iov != NULL)); - + if (payload_nob > KQSW_MAXPAYLOAD) { CERROR ("request exceeds MTU size "LPSZ" (max %u).\n", payload_nob, KQSW_MAXPAYLOAD); @@ -937,7 +957,7 @@ kqswnal_sendmsg (nal_cb_t *nal, in_interrupt())); if (ktx == NULL) { kqswnal_cerror_hdr (hdr); - return (PTL_NOSPACE); + return (PTL_NO_SPACE); } ktx->ktx_nid = targetnid; @@ -953,19 +973,17 @@ kqswnal_sendmsg (nal_cb_t *nal, "nid "LPX64" via "LPX64" elanID %d\n", nid, targetnid, ep_rxd_node(((kqswnal_rx_t *)private)->krx_rxd)); - return (PTL_FAIL); + rc = -EINVAL; + goto out; } /* peer expects RPC completion with GET data */ rc = kqswnal_dma_reply (ktx, payload_niov, payload_iov, payload_kiov, payload_offset, payload_nob); - if (rc == 0) - return (PTL_OK); - - CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); + if (rc != 0) + CERROR ("Can't DMA reply to "LPX64": %d\n", nid, rc); + goto out; } memcpy (ktx->ktx_buffer, hdr, sizeof (*hdr)); /* copy hdr from caller's stack */ @@ -1008,8 +1026,8 @@ kqswnal_sendmsg (nal_cb_t *nal, } memcpy(ktx->ktx_buffer + sizeof(*hdr) + sizeof(csum), &csum, sizeof(csum)); #endif - - if (kqswnal_data.kqn_optimized_gets && + + if (kqswnal_tunables.kqn_optimized_gets && type == PTL_MSG_GET && /* doing a GET */ nid == targetnid) { /* not forwarding */ lib_md_t *md = libmsg->md; @@ -1038,11 +1056,8 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, 0, md->length, md->md_niov, md->md_iov.iov); - - if (rc < 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; rmd->kqrmd_nfrag = ktx->ktx_nfrag - 1; @@ -1105,25 +1120,26 @@ kqswnal_sendmsg (nal_cb_t *nal, else rc = kqswnal_map_tx_iov (ktx, payload_offset, payload_nob, payload_niov, payload_iov); - if (rc != 0) { - kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } + if (rc != 0) + goto out; } ktx->ktx_port = (payload_nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; rc = kqswnal_launch (ktx); - if (rc != 0) { /* failed? */ - CERROR ("Failed to send packet to "LPX64": %d\n", targetnid, rc); + + out: + CDEBUG(rc == 0 ? D_NET : D_ERROR, + "%s "LPSZ" bytes to "LPX64" via "LPX64": rc %d\n", + rc == 0 ? "Sent" : "Failed to send", + payload_nob, nid, targetnid, rc); + + if (rc != 0) kqswnal_put_idle_tx (ktx); - return (PTL_FAIL); - } - CDEBUG(D_NET, "sent "LPSZ" bytes to "LPX64" via "LPX64"\n", - payload_nob, nid, targetnid); - return (PTL_OK); + atomic_dec(&kqswnal_data.kqn_pending_txs); + return (rc == 0 ? PTL_OK : PTL_FAIL); } static ptl_err_t @@ -1167,7 +1183,7 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) { int rc; kqswnal_tx_t *ktx; - struct iovec *iov = fwd->kprfd_iov; + ptl_kiov_t *kiov = fwd->kprfd_kiov; int niov = fwd->kprfd_niov; int nob = fwd->kprfd_nob; ptl_nid_t nid = fwd->kprfd_gateway_nid; @@ -1177,11 +1193,9 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) LBUG (); #endif /* The router wants this NAL to forward a packet */ - CDEBUG (D_NET, "forwarding [%p] to "LPX64", %d frags %d bytes\n", + CDEBUG (D_NET, "forwarding [%p] to "LPX64", payload: %d frags %d bytes\n", fwd, nid, niov, nob); - LASSERT (niov > 0); - ktx = kqswnal_get_idle_tx (fwd, 0); if (ktx == NULL) /* can't get txd right now */ return; /* fwd will be scheduled when tx desc freed */ @@ -1192,60 +1206,60 @@ kqswnal_fwd_packet (void *arg, kpr_fwd_desc_t *fwd) if (kqswnal_nid2elanid (nid) < 0) { CERROR("Can't forward [%p] to "LPX64": not a peer\n", fwd, nid); rc = -EHOSTUNREACH; - goto failed; + goto out; } - if (nob > KQSW_NRXMSGBYTES_LARGE) { - CERROR ("Can't forward [%p] to "LPX64 - ": size %d bigger than max packet size %ld\n", - fwd, nid, nob, (long)KQSW_NRXMSGBYTES_LARGE); - rc = -EMSGSIZE; - goto failed; - } + /* copy hdr into pre-mapped buffer */ + memcpy(ktx->ktx_buffer, fwd->kprfd_hdr, sizeof(ptl_hdr_t)); + ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; - ktx->ktx_port = (nob <= (KQSW_HDR_SIZE + KQSW_SMALLPAYLOAD)) ? + ktx->ktx_port = (nob <= KQSW_SMALLPAYLOAD) ? EP_MSG_SVC_PORTALS_SMALL : EP_MSG_SVC_PORTALS_LARGE; ktx->ktx_nid = nid; ktx->ktx_state = KTX_FORWARDING; ktx->ktx_args[0] = fwd; + ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - if ((kqswnal_data.kqn_copy_small_fwd || niov > 1) && - nob <= KQSW_TX_BUFFER_SIZE) + if (nob <= KQSW_TX_MAXCONTIG) { - /* send from ktx's pre-mapped contiguous buffer? */ - lib_copy_iov2buf (ktx->ktx_buffer, niov, iov, 0, nob); + /* send payload from ktx's pre-mapped contiguous buffer */ #if MULTIRAIL_EKC ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, - 0, nob); + 0, KQSW_HDR_SIZE + nob); #else ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; - ktx->ktx_frags[0].Len = nob; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE + nob; #endif - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 1; - ktx->ktx_wire_hdr = (ptl_hdr_t *)ktx->ktx_buffer; + if (nob > 0) + lib_copy_kiov2buf(ktx->ktx_buffer + KQSW_HDR_SIZE, + niov, kiov, 0, nob); } else { - /* zero copy */ - ktx->ktx_nfrag = ktx->ktx_firsttmpfrag = 0; - rc = kqswnal_map_tx_iov (ktx, 0, nob, niov, iov); + /* zero copy payload */ +#if MULTIRAIL_EKC + ep_nmd_subset(&ktx->ktx_frags[0], &ktx->ktx_ebuffer, + 0, KQSW_HDR_SIZE); +#else + ktx->ktx_frags[0].Base = ktx->ktx_ebuffer; + ktx->ktx_frags[0].Len = KQSW_HDR_SIZE; +#endif + rc = kqswnal_map_tx_kiov (ktx, 0, nob, niov, kiov); if (rc != 0) - goto failed; - - ktx->ktx_wire_hdr = (ptl_hdr_t *)iov[0].iov_base; + goto out; } rc = kqswnal_launch (ktx); - if (rc == 0) - return; + out: + if (rc != 0) { + CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); - failed: - LASSERT (rc != 0); - CERROR ("Failed to forward [%p] to "LPX64": %d\n", fwd, nid, rc); + kqswnal_put_idle_tx (ktx); + /* complete now (with failure) */ + kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + } - kqswnal_put_idle_tx (ktx); - /* complete now (with failure) */ - kpr_fwd_done (&kqswnal_data.kqn_router, fwd, rc); + atomic_dec(&kqswnal_data.kqn_pending_txs); } void @@ -1257,7 +1271,7 @@ kqswnal_fwd_callback (void *arg, int error) if (error != 0) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR("Failed to route packet from "LPX64" to "LPX64": %d\n", NTOH__u64(hdr->src_nid), NTOH__u64(hdr->dest_nid),error); @@ -1371,8 +1385,9 @@ kqswnal_requeue_rx (kqswnal_rx_t *krx) void kqswnal_rx (kqswnal_rx_t *krx) { - ptl_hdr_t *hdr = (ptl_hdr_t *) page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *) page_address(krx->krx_kiov[0].kiov_page); ptl_nid_t dest_nid = NTOH__u64 (hdr->dest_nid); + int payload_nob; int nob; int niov; @@ -1398,16 +1413,26 @@ kqswnal_rx (kqswnal_rx_t *krx) return; } - /* NB forwarding may destroy iov; rebuild every time */ - for (nob = krx->krx_nob, niov = 0; nob > 0; nob -= PAGE_SIZE, niov++) - { - LASSERT (niov < krx->krx_npages); - krx->krx_iov[niov].iov_base= page_address(krx->krx_pages[niov]); - krx->krx_iov[niov].iov_len = MIN(PAGE_SIZE, nob); + nob = payload_nob = krx->krx_nob - KQSW_HDR_SIZE; + niov = 0; + if (nob > 0) { + krx->krx_kiov[0].kiov_offset = KQSW_HDR_SIZE; + krx->krx_kiov[0].kiov_len = MIN(PAGE_SIZE - KQSW_HDR_SIZE, nob); + niov = 1; + nob -= PAGE_SIZE - KQSW_HDR_SIZE; + + while (nob > 0) { + LASSERT (niov < krx->krx_npages); + + krx->krx_kiov[niov].kiov_offset = 0; + krx->krx_kiov[niov].kiov_len = MIN(PAGE_SIZE, nob); + niov++; + nob -= PAGE_SIZE; + } } - kpr_fwd_init (&krx->krx_fwd, dest_nid, - krx->krx_nob, niov, krx->krx_iov, + kpr_fwd_init (&krx->krx_fwd, dest_nid, + hdr, payload_nob, niov, krx->krx_kiov, kqswnal_fwd_callback, krx); kpr_fwd_start (&kqswnal_data.kqn_router, &krx->krx_fwd); @@ -1471,7 +1496,7 @@ kqswnal_rxhandler(EP_RXD *rxd) void kqswnal_csum_error (kqswnal_rx_t *krx, int ishdr) { - ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_pages[0]); + ptl_hdr_t *hdr = (ptl_hdr_t *)page_address (krx->krx_kiov[0].kiov_page); CERROR ("%s checksum mismatch %p: dnid "LPX64", snid "LPX64 ", dpid %d, spid %d, type %d\n", @@ -1526,6 +1551,7 @@ kqswnal_recvmsg (nal_cb_t *nal, size_t rlen) { kqswnal_rx_t *krx = (kqswnal_rx_t *)private; + char *buffer = page_address(krx->krx_kiov[0].kiov_page); int page; char *page_ptr; int page_nob; @@ -1535,8 +1561,7 @@ kqswnal_recvmsg (nal_cb_t *nal, #if KQSW_CHECKSUM kqsw_csum_t senders_csum; kqsw_csum_t payload_csum = 0; - kqsw_csum_t hdr_csum = kqsw_csum(0, page_address(krx->krx_pages[0]), - sizeof(ptl_hdr_t)); + kqsw_csum_t hdr_csum = kqsw_csum(0, buffer, sizeof(ptl_hdr_t)); size_t csum_len = mlen; int csum_frags = 0; int csum_nob = 0; @@ -1545,8 +1570,7 @@ kqswnal_recvmsg (nal_cb_t *nal, atomic_inc (&csum_counter); - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof (ptl_hdr_t), sizeof (kqsw_csum_t)); if (senders_csum != hdr_csum) kqswnal_csum_error (krx, 1); #endif @@ -1567,8 +1591,7 @@ kqswnal_recvmsg (nal_cb_t *nal, if (mlen != 0) { page = 0; - page_ptr = ((char *) page_address(krx->krx_pages[0])) + - KQSW_HDR_SIZE; + page_ptr = buffer + KQSW_HDR_SIZE; page_nob = PAGE_SIZE - KQSW_HDR_SIZE; LASSERT (niov > 0); @@ -1621,7 +1644,7 @@ kqswnal_recvmsg (nal_cb_t *nal, { page++; LASSERT (page < krx->krx_npages); - page_ptr = page_address(krx->krx_pages[page]); + page_ptr = page_address(krx->krx_kiov[page].kiov_page); page_nob = PAGE_SIZE; } @@ -1649,8 +1672,8 @@ kqswnal_recvmsg (nal_cb_t *nal, } #if KQSW_CHECKSUM - memcpy (&senders_csum, ((char *)page_address (krx->krx_pages[0])) + - sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), sizeof(kqsw_csum_t)); + memcpy (&senders_csum, buffer + sizeof(ptl_hdr_t) + sizeof(kqsw_csum_t), + sizeof(kqsw_csum_t)); if (csum_len != rlen) CERROR("Unable to checksum data in user's buffer\n"); @@ -1706,7 +1729,6 @@ kqswnal_thread_start (int (*fn)(void *arg), void *arg) return ((int)pid); atomic_inc (&kqswnal_data.kqn_nthreads); - atomic_inc (&kqswnal_data.kqn_nthreads_running); return (0); } @@ -1725,7 +1747,6 @@ kqswnal_scheduler (void *arg) long flags; int rc; int counter = 0; - int shuttingdown = 0; int did_something; kportal_daemonize ("kqswnal_sched"); @@ -1735,18 +1756,6 @@ kqswnal_scheduler (void *arg) for (;;) { - if (kqswnal_data.kqn_shuttingdown != shuttingdown) { - - if (kqswnal_data.kqn_shuttingdown == 2) - break; - - /* During stage 1 of shutdown we are still responsive - * to receives */ - - atomic_dec (&kqswnal_data.kqn_nthreads_running); - shuttingdown = kqswnal_data.kqn_shuttingdown; - } - did_something = 0; if (!list_empty (&kqswnal_data.kqn_readyrxds)) @@ -1763,8 +1772,7 @@ kqswnal_scheduler (void *arg) spin_lock_irqsave(&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown && - !list_empty (&kqswnal_data.kqn_delayedtxds)) + if (!list_empty (&kqswnal_data.kqn_delayedtxds)) { ktx = list_entry(kqswnal_data.kqn_delayedtxds.next, kqswnal_tx_t, ktx_list); @@ -1773,31 +1781,31 @@ kqswnal_scheduler (void *arg) flags); rc = kqswnal_launch (ktx); - if (rc != 0) /* failed: ktx_nid down? */ - { + if (rc != 0) { CERROR("Failed delayed transmit to "LPX64 ": %d\n", ktx->ktx_nid, rc); kqswnal_tx_done (ktx, rc); } + atomic_dec (&kqswnal_data.kqn_pending_txs); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - if (!shuttingdown & - !list_empty (&kqswnal_data.kqn_delayedfwds)) + if (!list_empty (&kqswnal_data.kqn_delayedfwds)) { fwd = list_entry (kqswnal_data.kqn_delayedfwds.next, kpr_fwd_desc_t, kprfd_list); list_del (&fwd->kprfd_list); spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); + /* If we're shutting down, this will just requeue fwd on kqn_idletxd_fwdq */ kqswnal_fwd_packet (NULL, fwd); did_something = 1; spin_lock_irqsave (&kqswnal_data.kqn_sched_lock, flags); } - /* nothing to do or hogging CPU */ + /* nothing to do or hogging CPU */ if (!did_something || counter++ == KQSW_RESCHED) { spin_unlock_irqrestore(&kqswnal_data.kqn_sched_lock, flags); @@ -1805,8 +1813,13 @@ kqswnal_scheduler (void *arg) counter = 0; if (!did_something) { + if (kqswnal_data.kqn_shuttingdown == 2) { + /* We only exit in stage 2 of shutdown when + * there's nothing left to do */ + break; + } rc = wait_event_interruptible (kqswnal_data.kqn_sched_waitq, - kqswnal_data.kqn_shuttingdown != shuttingdown || + kqswnal_data.kqn_shuttingdown == 2 || !list_empty(&kqswnal_data.kqn_readyrxds) || !list_empty(&kqswnal_data.kqn_delayedtxds) || !list_empty(&kqswnal_data.kqn_delayedfwds)); @@ -1818,8 +1831,6 @@ kqswnal_scheduler (void *arg) } } - spin_unlock_irqrestore (&kqswnal_data.kqn_sched_lock, flags); - kqswnal_thread_fini (); return (0); } @@ -1838,5 +1849,6 @@ nal_cb_t kqswnal_lib = cb_printf: kqswnal_printf, cb_cli: kqswnal_cli, cb_sti: kqswnal_sti, + cb_callback: kqswnal_callback, cb_dist: kqswnal_dist }; -- 1.8.3.1