4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/klnds/ptllnd/ptllnd_cb.c
36 * Author: PJ Kirner <pjkirner@clusterfs.com>
41 #ifndef _USING_LUSTRE_PORTALS_
43 kptllnd_extract_iov (int dst_niov, ptl_md_iovec_t *dst,
44 int src_niov, struct iovec *src,
45 unsigned int offset, unsigned int len)
47 /* Initialise 'dst' to the subset of 'src' starting at 'offset',
48 * for exactly 'len' bytes, and return the number of entries.
49 * NB not destructive to 'src' */
50 unsigned int frag_len;
53 if (len == 0) /* no data => */
54 return (0); /* no frags */
56 LASSERT (src_niov > 0);
57 while (offset >= src->iov_len) { /* skip initial frags */
58 offset -= src->iov_len;
61 LASSERT (src_niov > 0);
66 LASSERT (src_niov > 0);
67 LASSERT (niov <= dst_niov);
69 frag_len = src->iov_len - offset;
70 dst->iov_base = ((char *)src->iov_base) + offset;
72 if (len <= frag_len) {
77 dst->iov_len = frag_len;
89 kptllnd_extract_phys (int dst_niov, ptl_md_iovec_t *dst,
90 int src_niov, lnet_kiov_t *src,
91 unsigned int offset, unsigned int len)
93 /* Initialise 'dst' to the physical addresses of the subset of 'src'
94 * starting at 'offset', for exactly 'len' bytes, and return the number
95 * of entries. NB not destructive to 'src' */
96 unsigned int frag_len;
101 if (len == 0) /* no data => */
102 return (0); /* no frags */
104 LASSERT (src_niov > 0);
105 while (offset >= src->kiov_len) { /* skip initial frags */
106 offset -= src->kiov_len;
109 LASSERT (src_niov > 0);
114 LASSERT (src_niov > 0);
115 LASSERT (niov <= dst_niov);
117 frag_len = min(src->kiov_len - offset, len);
118 phys_page = lnet_page2phys(src->kiov_page);
119 phys = phys_page + src->kiov_offset + offset;
121 LASSERT (sizeof(void *) > 4 ||
122 (phys <= 0xffffffffULL &&
123 phys + (frag_len - 1) <= 0xffffffffULL));
125 dst->iov_base = (void *)((unsigned long)phys);
126 dst->iov_len = frag_len;
142 kptllnd_init_rdma_md(kptl_tx_t *tx, unsigned int niov,
143 struct iovec *iov, lnet_kiov_t *kiov,
144 unsigned int offset, unsigned int nob)
146 LASSERT (iov == NULL || kiov == NULL);
148 memset(&tx->tx_rdma_md, 0, sizeof(tx->tx_rdma_md));
150 tx->tx_rdma_md.start = tx->tx_frags;
151 tx->tx_rdma_md.user_ptr = &tx->tx_rdma_eventarg;
152 tx->tx_rdma_md.eq_handle = kptllnd_data.kptl_eqh;
153 tx->tx_rdma_md.options = PTL_MD_LUSTRE_COMPLETION_SEMANTICS |
154 PTL_MD_EVENT_START_DISABLE;
155 switch (tx->tx_type) {
159 case TX_TYPE_PUT_REQUEST: /* passive: peer gets */
160 tx->tx_rdma_md.threshold = 1; /* GET event */
161 tx->tx_rdma_md.options |= PTL_MD_OP_GET;
164 case TX_TYPE_GET_REQUEST: /* passive: peer puts */
165 tx->tx_rdma_md.threshold = 1; /* PUT event */
166 tx->tx_rdma_md.options |= PTL_MD_OP_PUT;
169 case TX_TYPE_PUT_RESPONSE: /* active: I get */
170 tx->tx_rdma_md.threshold = 2; /* SEND + REPLY */
173 case TX_TYPE_GET_RESPONSE: /* active: I put */
174 tx->tx_rdma_md.threshold = tx->tx_acked ? 2 : 1; /* SEND + ACK? */
179 tx->tx_rdma_md.length = 0;
183 #ifdef _USING_LUSTRE_PORTALS_
185 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
186 tx->tx_rdma_md.length =
187 lnet_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
188 niov, iov, offset, nob);
192 /* Cheating OK since ptl_kiov_t == lnet_kiov_t */
193 CLASSERT(sizeof(ptl_kiov_t) == sizeof(lnet_kiov_t));
194 CLASSERT(offsetof(ptl_kiov_t, kiov_offset) ==
195 offsetof(lnet_kiov_t, kiov_offset));
196 CLASSERT(offsetof(ptl_kiov_t, kiov_page) ==
197 offsetof(lnet_kiov_t, kiov_page));
198 CLASSERT(offsetof(ptl_kiov_t, kiov_len) ==
199 offsetof(lnet_kiov_t, kiov_len));
201 tx->tx_rdma_md.options |= PTL_MD_KIOV;
202 tx->tx_rdma_md.length =
203 lnet_extract_kiov(PTL_MD_MAX_IOV, tx->tx_frags->kiov,
204 niov, kiov, offset, nob);
207 tx->tx_rdma_md.options |= PTL_MD_IOVEC;
208 tx->tx_rdma_md.length =
209 kptllnd_extract_iov(PTL_MD_MAX_IOV, tx->tx_frags->iov,
210 niov, iov, offset, nob);
214 tx->tx_rdma_md.options |= PTL_MD_IOVEC | PTL_MD_PHYS;
215 tx->tx_rdma_md.length =
216 kptllnd_extract_phys(PTL_MD_MAX_IOV, tx->tx_frags->iov,
217 niov, kiov, offset, nob);
222 kptllnd_active_rdma(kptl_rx_t *rx, lnet_msg_t *lntmsg, int type,
223 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
224 unsigned int offset, int nob)
228 kptl_msg_t *rxmsg = rx->rx_msg;
229 kptl_peer_t *peer = rx->rx_peer;
233 LASSERT (type == TX_TYPE_PUT_RESPONSE ||
234 type == TX_TYPE_GET_RESPONSE);
236 tx = kptllnd_get_idle_tx(type);
238 CERROR ("Can't do %s rdma to %s: can't allocate descriptor\n",
239 type == TX_TYPE_PUT_RESPONSE ? "GET" : "PUT",
240 libcfs_id2str(peer->peer_id));
244 kptllnd_set_tx_peer(tx, peer);
245 kptllnd_init_rdma_md(tx, niov, iov, kiov, offset, nob);
247 ptlrc = PtlMDBind(kptllnd_data.kptl_nih, tx->tx_rdma_md,
249 if (ptlrc != PTL_OK) {
250 CERROR("PtlMDBind(%s) failed: %s(%d)\n",
251 libcfs_id2str(peer->peer_id),
252 kptllnd_errtype2str(ptlrc), ptlrc);
253 tx->tx_status = -EIO;
254 kptllnd_tx_decref(tx);
258 cfs_spin_lock_irqsave(&peer->peer_lock, flags);
260 tx->tx_lnet_msg = lntmsg;
261 /* lnet_finalize() will be called when tx is torn down, so I must
262 * return success from here on... */
264 tx->tx_deadline = jiffies + (*kptllnd_tunables.kptl_timeout * CFS_HZ);
265 tx->tx_rdma_mdh = mdh;
267 cfs_list_add_tail(&tx->tx_list, &peer->peer_activeq);
269 /* peer has now got my ref on 'tx' */
271 cfs_spin_unlock_irqrestore(&peer->peer_lock, flags);
273 tx->tx_tposted = jiffies;
275 if (type == TX_TYPE_GET_RESPONSE)
277 tx->tx_acked ? PTL_ACK_REQ : PTL_NOACK_REQ,
279 *kptllnd_tunables.kptl_portal,
281 rxmsg->ptlm_u.rdma.kptlrm_matchbits,
283 (lntmsg != NULL) ? /* header data */
289 *kptllnd_tunables.kptl_portal,
291 rxmsg->ptlm_u.rdma.kptlrm_matchbits,
294 if (ptlrc != PTL_OK) {
295 CERROR("Ptl%s failed: %s(%d)\n",
296 (type == TX_TYPE_GET_RESPONSE) ? "Put" : "Get",
297 kptllnd_errtype2str(ptlrc), ptlrc);
299 kptllnd_peer_close(peer, -EIO);
300 /* Everything (including this RDMA) queued on the peer will
301 * be completed with failure */
302 kptllnd_schedule_ptltrace_dump();
309 kptllnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
311 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
312 int type = lntmsg->msg_type;
313 lnet_process_id_t target = lntmsg->msg_target;
314 int target_is_router = lntmsg->msg_target_is_router;
315 int routing = lntmsg->msg_routing;
316 unsigned int payload_niov = lntmsg->msg_niov;
317 struct iovec *payload_iov = lntmsg->msg_iov;
318 lnet_kiov_t *payload_kiov = lntmsg->msg_kiov;
319 unsigned int payload_offset = lntmsg->msg_offset;
320 unsigned int payload_nob = lntmsg->msg_len;
321 kptl_net_t *net = ni->ni_data;
322 kptl_peer_t *peer = NULL;
329 LASSERT (net->net_ni == ni);
330 LASSERT (!net->net_shutdown);
331 LASSERT (payload_nob == 0 || payload_niov > 0);
332 LASSERT (payload_niov <= LNET_MAX_IOV);
333 LASSERT (payload_niov <= PTL_MD_MAX_IOV); /* !!! */
334 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
335 LASSERT (!cfs_in_interrupt());
337 if (lntmsg->msg_vmflush)
338 mpflag = cfs_memory_pressure_get_and_set();
340 rc = kptllnd_find_target(net, target, &peer);
344 /* NB peer->peer_id does NOT always equal target, be careful with
345 * which one to use */
353 /* Should the payload avoid RDMA? */
354 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[payload_nob]);
355 if (payload_kiov == NULL &&
356 nob <= peer->peer_max_msg_size)
359 tx = kptllnd_get_idle_tx(TX_TYPE_PUT_REQUEST);
361 CERROR("Can't send %s to %s: can't allocate descriptor\n",
362 lnet_msgtyp2str(type),
363 libcfs_id2str(target));
368 kptllnd_init_rdma_md(tx, payload_niov,
369 payload_iov, payload_kiov,
370 payload_offset, payload_nob);
372 tx->tx_lnet_msg = lntmsg;
373 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
374 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_PUT,
375 target, sizeof(kptl_rdma_msg_t));
377 CDEBUG(D_NETTRACE, "%s: passive PUT p %d %p\n",
378 libcfs_id2str(target),
379 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
381 kptllnd_tx_launch(peer, tx, 0);
385 /* routed gets don't RDMA */
386 if (target_is_router || routing)
389 /* Is the payload small enough not to need RDMA? */
390 nob = lntmsg->msg_md->md_length;
391 nob = offsetof(kptl_msg_t,
392 ptlm_u.immediate.kptlim_payload[nob]);
393 if (nob <= peer->peer_max_msg_size)
396 tx = kptllnd_get_idle_tx(TX_TYPE_GET_REQUEST);
398 CERROR("Can't send GET to %s: can't allocate descriptor\n",
399 libcfs_id2str(target));
404 tx->tx_lnet_replymsg = lnet_create_reply_msg(ni, lntmsg);
405 if (tx->tx_lnet_replymsg == NULL) {
406 CERROR("Failed to allocate LNET reply for %s\n",
407 libcfs_id2str(target));
408 kptllnd_tx_decref(tx);
413 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
414 kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
415 lntmsg->msg_md->md_iov.iov, NULL,
416 0, lntmsg->msg_md->md_length);
418 kptllnd_init_rdma_md(tx, lntmsg->msg_md->md_niov,
419 NULL, lntmsg->msg_md->md_iov.kiov,
420 0, lntmsg->msg_md->md_length);
422 tx->tx_lnet_msg = lntmsg;
423 tx->tx_msg->ptlm_u.rdma.kptlrm_hdr = *hdr;
424 kptllnd_init_msg (tx->tx_msg, PTLLND_MSG_TYPE_GET,
425 target, sizeof(kptl_rdma_msg_t));
427 CDEBUG(D_NETTRACE, "%s: passive GET p %d %p\n",
428 libcfs_id2str(target),
429 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index), tx);
431 kptllnd_tx_launch(peer, tx, 0);
435 CDEBUG(D_NET, "LNET_MSG_ACK\n");
436 LASSERT (payload_nob == 0);
440 /* I don't have to handle kiovs */
441 LASSERT (payload_nob == 0 || payload_iov != NULL);
443 tx = kptllnd_get_idle_tx(TX_TYPE_SMALL_MESSAGE);
445 CERROR("Can't send %s to %s: can't allocate descriptor\n",
446 lnet_msgtyp2str(type), libcfs_id2str(target));
451 tx->tx_lnet_msg = lntmsg;
452 tx->tx_msg->ptlm_u.immediate.kptlim_hdr = *hdr;
454 if (payload_nob == 0) {
457 tx->tx_frags->iov[0].iov_base = tx->tx_msg;
458 tx->tx_frags->iov[0].iov_len = offsetof(kptl_msg_t,
459 ptlm_u.immediate.kptlim_payload);
461 /* NB relying on lustre not asking for PTL_MD_MAX_IOV
463 #ifdef _USING_LUSTRE_PORTALS_
464 nfrag = 1 + lnet_extract_iov(PTL_MD_MAX_IOV - 1,
465 &tx->tx_frags->iov[1],
466 payload_niov, payload_iov,
467 payload_offset, payload_nob);
469 nfrag = 1 + kptllnd_extract_iov(PTL_MD_MAX_IOV - 1,
470 &tx->tx_frags->iov[1],
471 payload_niov, payload_iov,
472 payload_offset, payload_nob);
476 nob = offsetof(kptl_immediate_msg_t, kptlim_payload[payload_nob]);
477 kptllnd_init_msg(tx->tx_msg, PTLLND_MSG_TYPE_IMMEDIATE, target, nob);
479 CDEBUG(D_NETTRACE, "%s: immediate %s p %d %p\n",
480 libcfs_id2str(target),
481 lnet_msgtyp2str(lntmsg->msg_type),
482 (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_PUT) ?
483 le32_to_cpu(lntmsg->msg_hdr.msg.put.ptl_index) :
484 (le32_to_cpu(lntmsg->msg_type) == LNET_MSG_GET) ?
485 le32_to_cpu(lntmsg->msg_hdr.msg.get.ptl_index) : -1,
488 kptllnd_tx_launch(peer, tx, nfrag);
491 if (lntmsg->msg_vmflush)
492 cfs_memory_pressure_restore(mpflag);
494 kptllnd_peer_decref(peer);
499 kptllnd_eager_recv(struct lnet_ni *ni, void *private,
500 lnet_msg_t *msg, void **new_privatep)
502 kptl_rx_t *rx = private;
504 CDEBUG(D_NET, "Eager RX=%p RXB=%p\n", rx, rx->rx_rxb);
506 /* I have to release my ref on rxb (if I have one) to ensure I'm an
507 * eager receiver, so I copy the incoming request from the buffer it
508 * landed in, into space reserved in the descriptor... */
510 #if (PTL_MD_LOCAL_ALIGN8 == 0)
511 if (rx->rx_rxb == NULL) /* already copied */
512 return 0; /* to fix alignment */
514 LASSERT(rx->rx_rxb != NULL);
516 LASSERT(rx->rx_nob <= *kptllnd_tunables.kptl_max_msg_size);
518 memcpy(rx->rx_space, rx->rx_msg, rx->rx_nob);
519 rx->rx_msg = (kptl_msg_t *)rx->rx_space;
521 kptllnd_rx_buffer_decref(rx->rx_rxb);
529 kptllnd_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg, int delayed,
530 unsigned int niov, struct iovec *iov, lnet_kiov_t *kiov,
531 unsigned int offset, unsigned int mlen, unsigned int rlen)
533 kptl_rx_t *rx = private;
534 kptl_msg_t *rxmsg = rx->rx_msg;
538 CDEBUG(D_NET, "%s niov=%d offset=%d mlen=%d rlen=%d\n",
539 kptllnd_msgtype2str(rxmsg->ptlm_type),
540 niov, offset, mlen, rlen);
542 LASSERT (mlen <= rlen);
544 LASSERT (!cfs_in_interrupt());
545 LASSERT (!(kiov != NULL && iov != NULL)); /* never both */
546 LASSERT (niov <= PTL_MD_MAX_IOV); /* !!! */
549 if (lntmsg != NULL &&
551 /* Set the UID if the sender's uid isn't 0; i.e. non-root
552 * running in userspace (e.g. a catamount node; linux kernel
553 * senders, including routers have uid 0). If this is a lustre
554 * RPC request, this tells lustre not to trust the creds in the
555 * RPC message body. */
556 lnet_set_msg_uid(ni, lntmsg, rx->rx_uid);
559 switch(rxmsg->ptlm_type)
566 case PTLLND_MSG_TYPE_IMMEDIATE:
567 CDEBUG(D_NET, "PTLLND_MSG_TYPE_IMMEDIATE %d,%d\n", mlen, rlen);
569 nob = offsetof(kptl_msg_t, ptlm_u.immediate.kptlim_payload[rlen]);
570 if (nob > rx->rx_nob) {
571 CERROR ("Immediate message from %s too big: %d(%d)\n",
572 libcfs_id2str(rx->rx_peer->peer_id), nob,
581 *kptllnd_tunables.kptl_max_msg_size,
582 rxmsg->ptlm_u.immediate.kptlim_payload,
588 *kptllnd_tunables.kptl_max_msg_size,
589 rxmsg->ptlm_u.immediate.kptlim_payload,
593 lnet_finalize (ni, lntmsg, 0);
597 case PTLLND_MSG_TYPE_GET:
598 CDEBUG(D_NET, "PTLLND_MSG_TYPE_GET %d,%d\n", mlen, rlen);
600 /* NB always send RDMA so the peer can complete. I send
601 * success/failure in the portals 'hdr_data' */
604 rc = kptllnd_active_rdma(rx, NULL,
605 TX_TYPE_GET_RESPONSE,
606 0, NULL, NULL, 0, 0);
608 rc = kptllnd_active_rdma(rx, lntmsg,
609 TX_TYPE_GET_RESPONSE,
617 case PTLLND_MSG_TYPE_PUT:
618 CDEBUG(D_NET, "PTLLND_MSG_TYPE_PUT %d,%d\n", mlen, rlen);
620 /* NB always send RDMA so the peer can complete; it'll be 0
621 * bytes if there was no match (lntmsg == NULL). I have no way
622 * to let my peer know this, but she's only interested in when
623 * the net has stopped accessing her buffer in any case. */
625 rc = kptllnd_active_rdma(rx, lntmsg, TX_TYPE_PUT_RESPONSE,
626 niov, iov, kiov, offset, mlen);
631 * We're done with the RX
633 kptllnd_rx_done(rx, PTLLND_POSTRX_PEER_CREDIT);
638 kptllnd_eq_callback(ptl_event_t *ev)
640 kptl_eventarg_t *eva = ev->md.user_ptr;
642 switch (eva->eva_type) {
646 case PTLLND_EVENTARG_TYPE_MSG:
647 case PTLLND_EVENTARG_TYPE_RDMA:
648 kptllnd_tx_callback(ev);
651 case PTLLND_EVENTARG_TYPE_BUF:
652 kptllnd_rx_buffer_callback(ev);
658 kptllnd_thread_fini (void)
660 cfs_atomic_dec(&kptllnd_data.kptl_nthreads);
664 kptllnd_thread_start (int (*fn)(void *arg), void *arg)
668 cfs_atomic_inc(&kptllnd_data.kptl_nthreads);
670 pid = cfs_create_thread (fn, arg, 0);
674 CERROR("Failed to start thread: error %d\n", (int)pid);
675 kptllnd_thread_fini();
680 kptllnd_watchdog(void *arg)
684 cfs_waitlink_t waitlink;
687 unsigned long deadline = jiffies;
691 snprintf(name, sizeof(name), "kptllnd_wd_%02d", id);
695 cfs_waitlink_init(&waitlink);
697 /* threads shut down in phase 2 after all peers have been destroyed */
698 while (kptllnd_data.kptl_shutdown < 2) {
700 /* add a check for needs ptltrace
701 * yes, this is blatant hijacking of this thread
702 * we can't dump directly from tx or rx _callbacks as it
703 * deadlocks portals and takes out the node
706 if (cfs_atomic_read(&kptllnd_data.kptl_needs_ptltrace)) {
708 kptllnd_dump_ptltrace();
709 /* we only dump once, no matter how many pending */
710 cfs_atomic_set(&kptllnd_data.kptl_needs_ptltrace, 0);
716 timeout = (int)(deadline - jiffies);
721 int chunk = kptllnd_data.kptl_peer_hash_size;
724 /* Time to check for RDMA timeouts on a few more
725 * peers: I do checks every 'p' seconds on a
726 * proportion of the peer table and I need to check
727 * every connection 'n' times within a timeout
728 * interval, to ensure I detect a timeout on any
729 * connection within (n+1)/n times the timeout
732 if ((*kptllnd_tunables.kptl_timeout) > n * p)
733 chunk = (chunk * n * p) /
734 (*kptllnd_tunables.kptl_timeout);
738 for (i = 0; i < chunk; i++) {
739 kptllnd_peer_check_bucket(peer_index, stamp);
740 peer_index = (peer_index + 1) %
741 kptllnd_data.kptl_peer_hash_size;
744 deadline += p * CFS_HZ;
749 kptllnd_handle_closing_peers();
751 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
752 cfs_waitq_add_exclusive(&kptllnd_data.kptl_watchdog_waitq,
755 cfs_waitq_timedwait(&waitlink, CFS_TASK_INTERRUPTIBLE, timeout);
757 cfs_set_current_state (CFS_TASK_RUNNING);
758 cfs_waitq_del(&kptllnd_data.kptl_watchdog_waitq, &waitlink);
761 kptllnd_thread_fini();
762 CDEBUG(D_NET, "<<<\n");
767 kptllnd_scheduler (void *arg)
771 cfs_waitlink_t waitlink;
776 kptl_rx_buffer_t *rxb;
779 snprintf(name, sizeof(name), "kptllnd_sd_%02d", id);
783 cfs_waitlink_init(&waitlink);
785 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
787 /* threads shut down in phase 2 after all peers have been destroyed */
788 while (kptllnd_data.kptl_shutdown < 2) {
792 if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxq)) {
793 rx = cfs_list_entry (kptllnd_data.kptl_sched_rxq.next,
795 cfs_list_del(&rx->rx_list);
797 cfs_spin_unlock_irqrestore(&kptllnd_data. \
801 kptllnd_rx_parse(rx);
804 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
808 if (!cfs_list_empty(&kptllnd_data.kptl_sched_rxbq)) {
809 rxb = cfs_list_entry (kptllnd_data.kptl_sched_rxbq.next,
812 cfs_list_del(&rxb->rxb_repost_list);
814 cfs_spin_unlock_irqrestore(&kptllnd_data. \
818 kptllnd_rx_buffer_post(rxb);
821 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
825 if (!cfs_list_empty(&kptllnd_data.kptl_sched_txq)) {
826 tx = cfs_list_entry (kptllnd_data.kptl_sched_txq.next,
828 cfs_list_del_init(&tx->tx_list);
830 cfs_spin_unlock_irqrestore(&kptllnd_data. \
831 kptl_sched_lock, flags);
836 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock,
841 if (++counter != *kptllnd_tunables.kptl_reschedule_loops)
845 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
846 cfs_waitq_add_exclusive(&kptllnd_data.kptl_sched_waitq,
848 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock,
852 cfs_waitq_wait(&waitlink, CFS_TASK_INTERRUPTIBLE);
856 cfs_set_current_state(CFS_TASK_RUNNING);
857 cfs_waitq_del(&kptllnd_data.kptl_sched_waitq, &waitlink);
859 cfs_spin_lock_irqsave(&kptllnd_data.kptl_sched_lock, flags);
864 cfs_spin_unlock_irqrestore(&kptllnd_data.kptl_sched_lock, flags);
866 kptllnd_thread_fini();