1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ptllnd/ptllnd_cb.c
38 * Author: PJ Kirner <pjkirner@clusterfs.com>
43 #ifndef _USING_LUSTRE_PORTALS_
45 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
46 int src_niov, struct iovec *src,
47 unsigned int offset, unsigned int len)
49 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
50 * for exactly 'len' bytes, and return the number of entries.
51 * NB not destructive to 'src' */
52 unsigned int frag_len;
55 if (len == 0) /* no data => */
56 return (0); /* no frags */
58 LASSERT (src_niov > 0);
59 while (offset >= src->iov_len) { /* skip initial frags */
60 offset -= src->iov_len;
63 LASSERT (src_niov > 0);
68 LASSERT (src_niov > 0);
69 LASSERT (niov <= dst_niov);
71 frag_len = src->iov_len - offset;
72 dst->iov_base = ((char *)src->iov_base) + offset;
74 if (len <= frag_len) {
79 dst->iov_len = frag_len;
91 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
92 int src_niov, lnet_kiov_t *src,
93 unsigned int offset, unsigned int len)
95 /* Initialise 'dst' to the physical addresses of the subset of 'src'
96 * starting at 'offset', for exactly 'len' bytes, and return the number
97 * of entries. NB not destructive to 'src' */
98 unsigned int frag_len;
103 if (len == 0) /* no data => */
104 return (0); /* no frags */
106 LASSERT (src_niov > 0);
107 while (offset >= src->kiov_len) { /* skip initial frags */
108 offset -= src->kiov_len;
111 LASSERT (src_niov > 0);
116 LASSERT (src_niov > 0);
117 LASSERT (niov <= dst_niov);
119 frag_len = min(src->kiov_len - offset, len);
120 phys_page = lnet_page2phys(src->kiov_page);
121 phys = phys_page + src->kiov_offset + offset;
123 LASSERT (sizeof(void *) > 4 ||
124 (phys <= 0xffffffffULL &&
125 phys + (frag_len - 1) <= 0xffffffffULL));
127 dst->iov_base = (void *)((unsigned long)phys);
128 dst->iov_len = frag_len;
144 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
145 struct iovec *iov, lnet_kiov_t *kiov,
146 unsigned int offset, unsigned int nob)
148 LASSERT (iov == NULL || kiov == NULL);
150 memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
152 tx->tx_rdma_md.start = tx->tx_frags;
153 tx->tx_rdma_md.user_ptr = &tx->tx_rdma_eventarg;
154 tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
155 tx->tx_rdma_md.options = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
156 PTL_MD_EVENT_START_DISABLE;
157 switch (tx->tx_type) {
161 case TX_TYPE_PUT_REQUEST: /* passive: peer gets */
162 tx->tx_rdma_md.threshold = 1; /* GET event */
163 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
166 case TX_TYPE_GET_REQUEST: /* passive: peer puts */
167 tx->tx_rdma_md.threshold = 1; /* PUT event */
168 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
171 case TX_TYPE_PUT_RESPONSE: /* active: I get */
172 tx->tx_rdma_md.threshold = 2; /* SEND + REPLY */
175 case TX_TYPE_GET_RESPONSE: /* active: I put */
176 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1; /* SEND + ACK? */
181 tx->tx_rdma_md.length = 0;
185 #ifdef _USING_LUSTRE_PORTALS_
187 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
188 tx->tx_rdma_md.length =
189 lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
190 niov, iov, offset, nob);
194 /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
195 CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
196 CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
197 offsetof(lnet_kiov_t, kiov_offset));
198 CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
199 offsetof(lnet_kiov_t, kiov_page));
200 CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
201 offsetof(lnet_kiov_t, kiov_len));
203 tx->tx_rdma_md.options |= PTL_MD_KIOV;
204 tx->tx_rdma_md.length =
205 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
206 niov, kiov, offset, nob);
209 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
210 tx->tx_rdma_md.length =
211 kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
212 niov, iov, offset, nob);
216 tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
217 tx->tx_rdma_md.length =
218 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
219 niov, kiov, offset, nob);
224 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
225 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
226 unsigned int offset, int nob)
230 kptl_msg_t *rxmsg = rx->rx_msg;
231 kptl_peer_t *peer = rx->rx_peer;
235 LASSERT (type == TX_TYPE_PUT_RESPONSE ||
236 type == TX_TYPE_GET_RESPONSE);
238 tx = kptllnd_get_idle_tx(type);
240 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
241 type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
242 libcfs_id2str(peer->peer_id));
246 kptllnd_set_tx_peer(tx, peer);
247 kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
249 ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md,
251 if (ptlrc != PTL_OK) {
252 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
253 libcfs_id2str(peer->peer_id),
254 kptllnd_errtype2str(ptlrc), ptlrc);
255 tx->tx_status = -EIO;
256 kptllnd_tx_decref(tx);
260 spin_lock_irqsave(&peer->peer_lock, flags);
262 tx->tx_lnet_msg = lntmsg;
263 /* lnet_finalize() will be called when tx is torn down, so I must
264 * return success from here on... */
266 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * HZ);
267 tx->tx_rdma_mdh = mdh;
269 list_add_tail(&tx->tx_list, &peer->peer_activeq);
271 /* peer has now got my ref on 'tx' */
273 spin_unlock_irqrestore(&peer->peer_lock, flags);
275 tx->tx_tposted = jiffies;
277 if (type == TX_TYPE_GET_RESPONSE)
279 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
281 *kptllnd_tunables.kptl_portal,
283 rxmsg->ptlm_u.rdma.kptlrm_matchbits,
285 (lntmsg != NULL) ? /* header data */
291 *kptllnd_tunables.kptl_portal,
293 rxmsg->ptlm_u.rdma.kptlrm_matchbits,
296 if (ptlrc != PTL_OK) {
297 CERROR("Ptl%s failed: %s(%d)\n",
298 (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
299 kptllnd_errtype2str(ptlrc), ptlrc);
301 kptllnd_peer_close(peer, -EIO);
302 /* Everything (including this RDMA) queued on the peer will
303 * be completed with failure */
304 kptllnd_schedule_ptltrace_dump();
311 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
313 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
314 int type = lntmsg->msg_type;
315 lnet_process_id_t target = lntmsg->msg_target;
316 int target_is_router = lntmsg->msg_target_is_router;
317 int routing = lntmsg->msg_routing;
318 unsigned int payload_niov = lntmsg->msg_niov;
319 struct iovec *payload_iov = lntmsg->msg_iov;
320 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
321 unsigned int payload_offset = lntmsg->msg_offset;
322 unsigned int payload_nob = lntmsg->msg_len;
323 kptl_net_t *net = ni->ni_data;
330 LASSERT (net->net_ni == ni);
331 LASSERT (!net->net_shutdown);
332 LASSERT (payload_nob == 0 || payload_niov > 0);
333 LASSERT (payload_niov <= LNET_MAX_IOV);
334 LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
335 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
336 LASSERT (!in_interrupt());
338 rc = kptllnd_find_target(net, target, &peer);
342 /* NB peer->peer_id does NOT always equal target, be careful with
343 * which one to use */
351 /* Should the payload avoid RDMA? */
352 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
353 if (payload_kiov == NULL &&
354 nob <= peer->peer_max_msg_size)
357 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
359 CERROR("Can't send %s to %s: can't allocate descriptor\n",
360 lnet_msgtyp2str(type),
361 libcfs_id2str(target));
366 kptllnd_init_rdma_md(tx, payload_niov,
367 payload_iov, payload_kiov,
368 payload_offset, payload_nob);
370 tx->tx_lnet_msg = lntmsg;
371 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
372 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
373 target, sizeof(kptl_rdma_msg_t));
375 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
376 libcfs_id2str(target),
377 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
379 kptllnd_tx_launch(peer, tx, 0);
383 /* routed gets don't RDMA */
384 if (target_is_router || routing)
387 /* Is the payload small enough not to need RDMA? */
388 nob = lntmsg->msg_md->md_length;
389 nob = offsetof(kptl_msg_t,
390 ptlm_u.immediate.kptlim_payload[nob]);
391 if (nob <= peer->peer_max_msg_size)
394 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
396 CERROR("Can't send GET to %s: can't allocate descriptor\n",
397 libcfs_id2str(target));
402 tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
403 if (tx->tx_lnet_replymsg == NULL) {
404 CERROR("Failed to allocate LNET reply for %s\n",
405 libcfs_id2str(target));
406 kptllnd_tx_decref(tx);
411 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
412 kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
413 lntmsg->msg_md->md_iov.iov, NULL,
414 0, lntmsg->msg_md->md_length);
416 kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
417 NULL, lntmsg->msg_md->md_iov.kiov,
418 0, lntmsg->msg_md->md_length);
420 tx->tx_lnet_msg = lntmsg;
421 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
422 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
423 target, sizeof(kptl_rdma_msg_t));
425 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
426 libcfs_id2str(target),
427 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
429 kptllnd_tx_launch(peer, tx, 0);
433 CDEBUG(D_NET, "LNET_MSG_ACK\n");
434 LASSERT (payload_nob == 0);
438 /* I don't have to handle kiovs */
439 LASSERT (payload_nob == 0 || payload_iov != NULL);
441 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
443 CERROR("Can't send %s to %s: can't allocate descriptor\n",
444 lnet_msgtyp2str(type), libcfs_id2str(target));
449 tx->tx_lnet_msg = lntmsg;
450 tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
452 if (payload_nob == 0) {
455 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
456 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
457 ptlm_u.immediate.kptlim_payload);
459 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
461 #ifdef _USING_LUSTRE_PORTALS_
462 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1,
463 &tx->tx_frags->iov[1],
464 payload_niov, payload_iov,
465 payload_offset, payload_nob);
467 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
468 &tx->tx_frags->iov[1],
469 payload_niov, payload_iov,
470 payload_offset, payload_nob);
474 nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
475 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
477 CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
478 libcfs_id2str(target),
479 lnet_msgtyp2str(lntmsg->msg_type),
480 (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ?
481 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
482 (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ?
483 le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
486 kptllnd_tx_launch(peer, tx, nfrag);
489 kptllnd_peer_decref(peer);
494 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
495 lnet_msg_t *msg, void **new_privatep)
497 kptl_rx_t *rx = private;
499 CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
501 /* I have to release my ref on rxb (if I have one) to ensure I'm an
502 * eager receiver, so I copy the incoming request from the buffer it
503 * landed in, into space reserved in the descriptor... */
505 #if (PTL_MD_LOCAL_ALIGN8 == 0)
506 if (rx->rx_rxb == NULL) /* already copied */
507 return 0; /* to fix alignment */
509 LASSERT(rx->rx_rxb != NULL);
511 LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
513 memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
514 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
516 kptllnd_rx_buffer_decref(rx->rx_rxb);
524 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
525 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
526 unsigned int offset, unsigned int mlen, unsigned int rlen)
528 kptl_rx_t *rx = private;
529 kptl_msg_t *rxmsg = rx->rx_msg;
533 CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
534 kptllnd_msgtype2str(rxmsg->ptlm_type),
535 niov, offset, mlen, rlen);
537 LASSERT (mlen <= rlen);
539 LASSERT (!in_interrupt());
540 LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
541 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
544 if (lntmsg != NULL &&
546 /* Set the UID if the sender's uid isn't 0; i.e. non-root
547 * running in userspace (e.g. a catamount node; linux kernel
548 * senders, including routers have uid 0). If this is a lustre
549 * RPC request, this tells lustre not to trust the creds in the
550 * RPC message body. */
551 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
554 switch(rxmsg->ptlm_type)
561 case PTLLND_MSG_TYPE_IMMEDIATE:
562 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
564 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
565 if (nob > rx->rx_nob) {
566 CERROR ("Immediate message from %s too big: %d(%d)\n",
567 libcfs_id2str(rx->rx_peer->peer_id), nob,
576 *kptllnd_tunables.kptl_max_msg_size,
577 rxmsg->ptlm_u.immediate.kptlim_payload,
583 *kptllnd_tunables.kptl_max_msg_size,
584 rxmsg->ptlm_u.immediate.kptlim_payload,
588 lnet_finalize (ni, lntmsg, 0);
592 case PTLLND_MSG_TYPE_GET:
593 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
595 /* NB always send RDMA so the peer can complete. I send
596 * success/failure in the portals 'hdr_data' */
599 rc = kptllnd_active_rdma(rx, NULL,
600 TX_TYPE_GET_RESPONSE,
601 0, NULL, NULL, 0, 0);
603 rc = kptllnd_active_rdma(rx, lntmsg,
604 TX_TYPE_GET_RESPONSE,
612 case PTLLND_MSG_TYPE_PUT:
613 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
615 /* NB always send RDMA so the peer can complete; it'll be 0
616 * bytes if there was no match (lntmsg == NULL). I have no way
617 * to let my peer know this, but she's only interested in when
618 * the net has stopped accessing her buffer in any case. */
620 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
621 niov, iov, kiov, offset, mlen);
626 * We're done with the RX
628 kptllnd_rx_done(rx, PTLLND_POSTRX_PEER_CREDIT);
633 kptllnd_eq_callback(ptl_event_t *ev)
635 kptl_eventarg_t *eva = ev->md.user_ptr;
637 switch (eva->eva_type) {
641 case PTLLND_EVENTARG_TYPE_MSG:
642 case PTLLND_EVENTARG_TYPE_RDMA:
643 kptllnd_tx_callback(ev);
646 case PTLLND_EVENTARG_TYPE_BUF:
647 kptllnd_rx_buffer_callback(ev);
653 kptllnd_thread_fini (void)
655 atomic_dec(&kptllnd_data.kptl_nthreads);
659 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
663 atomic_inc(&kptllnd_data.kptl_nthreads);
665 pid = kernel_thread (fn, arg, 0);
669 CERROR("Failed to start kernel_thread: error %d\n", (int)pid);
670 kptllnd_thread_fini();
675 kptllnd_watchdog(void *arg)
679 wait_queue_t waitlink;
682 unsigned long deadline = jiffies;
686 snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
690 init_waitqueue_entry(&waitlink, current);
692 /* threads shut down in phase 2 after all peers have been destroyed */
693 while (kptllnd_data.kptl_shutdown < 2) {
695 /* add a check for needs ptltrace
696 * yes, this is blatant hijacking of this thread
697 * we can't dump directly from tx or rx _callbacks as it deadlocks portals
698 * and takes out the node
701 if (atomic_read(&kptllnd_data.kptl_needs_ptltrace)) {
703 kptllnd_dump_ptltrace();
704 /* we only dump once, no matter how many pending */
705 atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
711 timeout = (int)(deadline - jiffies);
716 int chunk = kptllnd_data.kptl_peer_hash_size;
719 /* Time to check for RDMA timeouts on a few more
720 * peers: I do checks every 'p' seconds on a
721 * proportion of the peer table and I need to check
722 * every connection 'n' times within a timeout
723 * interval, to ensure I detect a timeout on any
724 * connection within (n+1)/n times the timeout
727 if ((*kptllnd_tunables.kptl_timeout) > n * p)
728 chunk = (chunk * n * p) /
729 (*kptllnd_tunables.kptl_timeout);
733 for (i = 0; i < chunk; i++) {
734 kptllnd_peer_check_bucket(peer_index, stamp);
735 peer_index = (peer_index + 1) %
736 kptllnd_data.kptl_peer_hash_size;
744 kptllnd_handle_closing_peers();
746 set_current_state(TASK_INTERRUPTIBLE);
747 add_wait_queue_exclusive(&kptllnd_data.kptl_watchdog_waitq,
750 schedule_timeout(timeout);
752 set_current_state (TASK_RUNNING);
753 remove_wait_queue(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
756 kptllnd_thread_fini();
757 CDEBUG(D_NET, "<<<\n");
762 kptllnd_scheduler (void *arg)
766 wait_queue_t waitlink;
771 kptl_rx_buffer_t *rxb;
774 snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
778 init_waitqueue_entry(&waitlink, current);
780 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
782 /* threads shut down in phase 2 after all peers have been destroyed */
783 while (kptllnd_data.kptl_shutdown < 2) {
787 if (!list_empty(&kptllnd_data.kptl_sched_rxq)) {
788 rx = list_entry (kptllnd_data.kptl_sched_rxq.next,
790 list_del(&rx->rx_list);
792 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
795 kptllnd_rx_parse(rx);
798 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
801 if (!list_empty(&kptllnd_data.kptl_sched_rxbq)) {
802 rxb = list_entry (kptllnd_data.kptl_sched_rxbq.next,
803 kptl_rx_buffer_t, rxb_repost_list);
804 list_del(&rxb->rxb_repost_list);
806 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
809 kptllnd_rx_buffer_post(rxb);
812 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
815 if (!list_empty(&kptllnd_data.kptl_sched_txq)) {
816 tx = list_entry (kptllnd_data.kptl_sched_txq.next,
818 list_del_init(&tx->tx_list);
820 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
825 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
829 if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
833 set_current_state(TASK_INTERRUPTIBLE);
834 add_wait_queue_exclusive(&kptllnd_data.kptl_sched_waitq,
836 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
843 set_current_state(TASK_RUNNING);
844 remove_wait_queue(&kptllnd_data.kptl_sched_waitq, &waitlink);
846 spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
851 spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
853 kptllnd_thread_fini();