1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid)
46 for (i = 0; i < kranal_data.kra_ndevs; i++) {
48 dev = &kranal_data.kra_devices[i];
49 if (dev->rad_id != devid)
52 spin_lock_irqsave(&dev->rad_lock, flags);
54 if (!dev->rad_ready) {
56 wake_up(&dev->rad_waitq);
59 spin_unlock_irqrestore(&dev->rad_lock, flags);
63 CWARN("callback for unknown device %d\n", devid);
67 kranal_schedule_conn(kra_conn_t *conn)
69 kra_device_t *dev = conn->rac_device;
72 spin_lock_irqsave(&dev->rad_lock, flags);
74 if (!conn->rac_scheduled) {
75 kranal_conn_addref(conn); /* +1 ref for scheduler */
76 conn->rac_scheduled = 1;
77 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
78 wake_up(&dev->rad_waitq);
81 spin_unlock_irqrestore(&dev->rad_lock, flags);
85 kranal_schedule_cqid (__u32 cqid)
88 struct list_head *conns;
89 struct list_head *tmp;
91 conns = kranal_cqid2connlist(cqid);
93 read_lock(&kranal_data.kra_global_lock);
95 conn = kranal_cqid2conn_locked(cqid);
98 CWARN("no cqid %x\n", cqid);
100 kranal_schedule_conn(conn);
102 read_unlock(&kranal_data.kra_global_lock);
106 kranal_schedule_dev(kra_device_t *dev)
109 struct list_head *conns;
110 struct list_head *tmp;
113 /* Don't do this in IRQ context (servers may have 1000s of clients) */
114 LASSERT (!in_interrupt());
116 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
118 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
120 /* Drop the lock on each hash bucket to ensure we don't
121 * block anyone for too long at IRQ priority on another CPU */
123 read_lock(&kranal_data.kra_global_lock);
125 conns = &kranal_data.kra_conns[i];
127 list_for_each (tmp, conns) {
128 conn = list_entry(tmp, kra_conn_t, rac_hashlist);
130 if (conn->rac_device == dev)
131 kranal_schedule_conn(conn);
133 read_unlock(&kranal_data.kra_global_lock);
138 kranal_tx_done (kra_tx_t *tx, int completion)
140 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
146 LASSERT (!in_interrupt());
148 switch (tx->tx_buftype) {
153 case RANAL_BUF_IMMEDIATE:
154 case RANAL_BUF_PHYS_UNMAPPED:
155 case RANAL_BUF_VIRT_UNMAPPED:
158 case RANAL_BUF_PHYS_MAPPED:
159 LASSERT (tx->tx_conn != NULL);
160 dev = tx->tx_con->rac_device;
161 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
162 dev->rad_ptag, &tx->tx_map_key);
163 LASSERT (rrc == RAP_SUCCESS);
166 case RANAL_BUF_VIRT_MAPPED:
167 LASSERT (tx->tx_conn != NULL);
168 dev = tx->tx_con->rac_device;
169 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer
170 dev->rad_ptag, &tx->tx_map_key);
171 LASSERT (rrc == RAP_SUCCESS);
175 for (i = 0; i < 2; i++) {
176 /* tx may have up to 2 libmsgs to finalise */
177 if (tx->tx_libmsg[i] == NULL)
180 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
181 tx->tx_libmsg[i] = NULL;
184 tx->tx_buftype = RANAL_BUF_NONE;
185 tx->tx_msg.ram_type = RANAL_MSG_NONE;
188 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
191 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
193 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
194 wake_up(&kranal_data.kra_idle_tx_waitq);
197 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
201 kranal_get_idle_tx (int may_block)
207 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
209 /* "normal" descriptor is free */
210 if (!list_empty(&kranal_data.kra_idle_txs)) {
211 tx = list_entry(kranal_data.kra_idle_txs.next,
217 /* may dip into reserve pool */
218 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
219 CERROR("reserved tx desc pool exhausted\n");
223 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
228 /* block for idle tx */
229 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
231 wait_event(kranal_data.kra_idle_tx_waitq,
232 !list_empty(&kranal_data.kra_idle_txs));
236 list_del(&tx->tx_list);
238 /* Allocate a new completion cookie. It might not be
239 * needed, but we've got a lock right now... */
240 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
242 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
243 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
244 LASSERT (tx->tx_conn == NULL);
245 LASSERT (tx->tx_libmsg[0] == NULL);
246 LASSERT (tx->tx_libmsg[1] == NULL);
249 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
255 kranal_init_msg(kra_msg_t *msg, int type)
257 msg->ram_magic = RANAL_MSG_MAGIC;
258 msg->ram_version = RANAL_MSG_VERSION;
259 msg->ram_type = type;
260 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
261 /* ram_incarnation gets set when FMA is sent */
265 kranal_new_tx_msg (int may_block, int type)
267 kra_tx_t *tx = kranal_get_idle_tx(may_block);
272 kranal_init_msg(&tx->tx_msg, type);
277 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
283 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
285 while (offset >= iov->iov_len) {
286 offset -= iov->iov_len;
292 if (nob > iov->iov_len - offset) {
293 CERROR("Can't handle multiple vaddr fragments\n");
297 tx->tx_bufftype = RANAL_BUF_IMMEDIATE;
299 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
304 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
310 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
312 while (offset >= iov->iov_len) {
313 offset -= iov->iov_len;
319 if (nob > iov->iov_len - offset) {
320 CERROR("Can't handle multiple vaddr fragments\n");
324 tx->tx_bufftype = RANAL_BUF_VIRT_UNMAPPED;
326 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
331 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
334 RAP_PHYS_REGION *phys = tx->tx_phys;
337 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
341 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
343 while (offset >= kiov->kiov_len) {
344 offset -= kiov->kiov_len;
350 tx->tx_bufftype = RANAL_BUF_PHYS_UNMAPPED;
352 tx->tx_buffer = NULL;
353 tx->tx_phys_offset = kiov->kiov_offset + offset;
355 phys->Address = kranal_page2phys(kiov->kiov_page);
356 phys->Length = PAGE_SIZE;
359 resid = nob - (kiov->kiov_len - offset);
365 if (kiov->kiov_offset != 0 ||
366 ((resid > PAGE_SIZE) &&
367 kiov->kiov_len < PAGE_SIZE)) {
369 /* Can't have gaps */
370 CERROR("Can't make payload contiguous in I/O VM:"
371 "page %d, offset %d, len %d \n", nphys,
372 kiov->kiov_offset, kiov->kiov_len);
374 for (i = -nphys; i < nkiov; i++) {
375 CERROR("kiov[%d] %p +%d for %d\n",
376 i, kiov[i].kiov_page,
377 kiov[i].kiov_offset, kiov[i].kiov_len);
383 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
384 CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
388 phys->Address = kranal_page2phys(kiov->kiov_page);
389 phys->Length = PAGE_SIZE;
395 tx->tx_phys_npages = phys - tx->tx_phys;
400 kranal_setup_buffer (kra_tx_t *tx, int niov,
401 struct iovec *iov, ptl_kiov_t *kiov,
404 LASSERT ((iov == NULL) != (kiov == NULL));
407 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
409 return kranal_setup_virt_buffer(tx, niov, kiov, offset, nob);
413 kranal_map_buffer (kra_tx_t *tx)
415 kra_conn_t *conn = tx->tx_conn;
416 kra_device_t *dev = conn->rac_device;
418 switch (tx->tx_buftype) {
421 case RANAL_BUF_PHYS_UNMAPPED:
422 rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
423 tx->tx_phys, tx->tx_phys_npages,
424 conn->rac_device->rad_ptag,
426 LASSERT (rrc == RAP_SUCCESS);
427 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
430 case RANAL_BUF_VIRT_UNMAPPED:
431 rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
432 tx->tx_buffer, tx->tx_nob,
433 conn->rac_device->rad_ptag,
435 LASSERT (rrc == RAP_SUCCESS);
436 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
442 kranal_find_conn_locked (kra_peer_t *peer)
444 struct list_head *tmp;
446 /* just return the first connection */
447 list_for_each (tmp, &peer->rap_conns) {
448 return list_entry(tmp, kra_conn_t, rac_list);
455 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
461 spin_lock_irqsave(&conn->rac_lock, flags);
462 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
463 tx->tx_qtime = jiffies;
464 spin_unlock_irqrestore(&conn->rac_lock, flags);
466 kranal_schedule_conn(conn);
470 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
476 rwlock_t *g_lock = &kranal_data.kra_global_lock;
478 /* If I get here, I've committed to send, so I complete the tx with
479 * failure on any problems */
481 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
485 peer = kranal_find_peer_locked(nid);
488 kranal_tx_done(tx, -EHOSTUNREACH);
492 conn = kranal_find_conn_locked(peer);
494 kranal_post_fma(conn, tx);
499 /* Making one or more connections; I'll need a write lock... */
501 write_lock_irqsave(g_lock, flags);
503 peer = kranal_find_peer_locked(nid);
505 write_unlock_irqrestore(g_lock, flags);
506 kranal_tx_done(tx -EHOSTUNREACH);
510 conn = kranal_find_conn_locked(peer);
512 /* Connection exists; queue message on it */
513 kranal_post_fma(conn, tx);
514 write_unlock_irqrestore(g_lock, flags);
518 LASSERT (peer->rap_persistence > 0);
520 if (!peer->rap_connecting) {
522 if (now < peer->rap_reconnect_time) {
523 write_unlock_irqrestore(g_lock, flags);
524 kranal_tx_done(tx, -EHOSTUNREACH);
528 peer->rap_connecting = 1;
529 kranal_peer_addref(peer); /* extra ref for connd */
531 spin_lock(&kranal_data.kra_connd_lock);
533 list_add_tail(&peer->rap_connd_list,
534 &kranal_data.kra_connd_peers);
535 wake_up(&kranal_data.kra_connd_waitq);
537 spin_unlock(&kranal_data.kra_connd_lock);
540 /* A connection is being established; queue the message... */
541 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
543 write_unlock_irqrestore(g_lock, flags);
547 kranal_rdma(kra_tx_t *tx, int type,
548 kra_rdma_desc_t *rard, int nob, __u64 cookie)
550 kra_conn_t *conn = tx->tx_conn;
553 /* prep final completion message */
554 kranal_init_msg(&tx->tx_msg, type);
555 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
557 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
558 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
559 LASSERT (nob <= rard->rard_nob);
561 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
562 tx->tx_rdma_desc.SrcPtr = tx->tx_buffer;
563 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
564 tx->tx_rdma_desc.DstPtr = rard->rard_addr;
565 tx->tx_rdma_desc.DstKey = rard->rard_key;
566 tx->tx_rdma_desc.Length = nob;
567 tx->tx_rdma_desc.AppPtr = tx;
569 if (nob == 0) { /* Immediate completion */
570 kranal_post_fma(conn, tx);
574 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
575 LASSERT (rrc == RAP_SUCCESS);
577 spin_lock_irqsave(&conn->rac_lock, flags);
578 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
579 tx->tx_qtime = jiffies;
580 spin_unlock_irqrestore(&conn->rac_lock, flags);
584 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
586 __u32 nob_received = nob;
589 LASSERT (conn->rac_rxmsg != NULL);
591 rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
592 &nob_received, sizeof(kra_msg_t));
593 LASSERT (rrc == RAP_SUCCESS);
595 conn->rac_rxmsg = NULL;
597 if (nob_received != nob) {
598 CWARN("Expected %d immediate bytes but got %d\n",
607 kranal_do_send (lib_nal_t *nal,
623 /* NB 'private' is different depending on what we're sending.... */
625 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
626 " pid %d\n", nob, niov, nid , pid);
628 LASSERT (nob == 0 || niov > 0);
629 LASSERT (niov <= PTL_MD_MAX_IOV);
631 LASSERT (!in_interrupt());
632 /* payload is either all vaddrs or all pages */
633 LASSERT (!(kiov != NULL && iov != NULL));
639 case PTL_MSG_REPLY: {
640 /* reply's 'private' is the conn that received the GET_REQ */
642 LASSERT (conn->rac_rxmsg != NULL);
644 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
645 if (nob > RANAL_MAX_IMMEDIATE) {
646 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
650 break; /* RDMA not expected */
653 /* Incoming message consistent with immediate reply? */
654 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
655 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
656 nid, conn->rac_rxmsg->ram_type);
660 tx = kranal_get_idle_tx(0);
664 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
666 kranal_tx_done(tx, rc);
671 tx->tx_libmsg[0] = libmsg;
673 kranal_map_buffer(tx);
674 kranal_rdma(tx, RANAL_MSG_GET_DONE,
675 &conn->rac_rxmsg->ram_u.getreq.ragm_desc, nob,
676 &conn->rac_rxmsg->ram_u.getreq.ragm_cookie);
681 if (kiov == NULL && /* not paged */
682 nob <= RANAL_MAX_IMMEDIATE && /* small enough */
683 nob <= kranal_tunables.kra_max_immediate)
684 break; /* send IMMEDIATE */
686 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
690 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
692 kranal_tx_done(tx, rc);
696 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
697 if (tx->tx_libmsg[1] == NULL) {
698 CERROR("Can't create reply for GET to "LPX64"\n", nid);
699 kranal_tx_done(tx, rc);
703 tx->tx_libmsg[0] = libmsg;
704 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
705 /* rest of tx_msg is setup just before it is sent */
706 kranal_launch_tx(tx, nid);
714 if (kiov == NULL && /* not paged */
715 nob <= RANAL_MAX_IMMEDIATE && /* small enough */
716 nob <= kranal_tunables.kra_max_immediate)
717 break; /* send IMMEDIATE */
719 tx = kranal_new_tx_msg(!in_interrupt(), RANA_MSG_PUT_REQ);
723 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
725 kranal_tx_done(tx, rc);
729 tx->tx_libmsg[0] = libmsg;
730 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
731 /* rest of tx_msg is setup just before it is sent */
732 kranal_launch_tx(tx, nid);
736 LASSERT (kiov == NULL);
737 LASSERT (nob <= RANAL_MAX_IMMEDIATE);
739 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
740 type == PTL_MSG_REPLY ||
742 RANAL_MSG_IMMEDIATE);
746 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
748 kranal_tx_done(tx, rc);
752 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
753 tx->tx_libmsg[0] = libmsg;
754 kranal_launch_tx(tx, nid);
759 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
760 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
761 unsigned int niov, struct iovec *iov,
762 size_t offset, size_t len)
764 return kranal_do_send(nal, private, cookie,
771 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
772 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
773 unsigned int niov, ptl_kiov_t *kiov,
774 size_t offset, size_t len)
776 return kranal_do_send(nal, private, cookie,
783 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
784 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
785 size_t offset, size_t mlen, size_t rlen)
787 kra_conn_t *conn = private;
788 kra_msg_t *rxmsg = conn->rac_rxmsg;
792 LASSERT (mlen <= rlen);
793 LASSERT (!in_interrupt());
794 /* Either all pages or all vaddrs */
795 LASSERT (!(kiov != NULL && iov != NULL));
797 switch(rxmsg->ram_type) {
802 case RANAL_MSG_IMMEDIATE:
805 } else if (kiov != NULL) {
806 CERROR("Can't recv immediate into paged buffer\n");
810 while (offset >= iov->iov_len) {
811 offset -= iov->iov_len;
816 if (mlen > iov->iov_len - offset) {
817 CERROR("Can't handle immediate frags\n");
820 buffer = ((char *)iov->iov_base) + offset;
822 rc = kranal_consume_rxmsg(conn, buffer, mlen);
823 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
826 case RANAL_MSG_GET_REQ:
827 /* If the GET matched, we've already handled it in
828 * kranal_do_send which is called to send the REPLY. We're
829 * only called here to complete the GET receive (if we needed
830 * it which we don't, but I digress...) */
831 LASSERT (libmsg == NULL);
832 lib_finalize(nal, NULL, libmsg, PTL_OK);
835 case RANAL_MSG_PUT_REQ:
836 if (libmsg == NULL) { /* PUT didn't match... */
837 lib_finalize(null, NULL, libmsg, PTL_OK);
841 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
845 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
847 kranal_tx_done(tx, rc);
851 kranal_map_buffer(tx);
853 tx->tx_msg.ram_u.putack.rapam_src_cookie =
854 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
855 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
856 tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_key = tx->tx_map_key;
857 tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_addr = tx->tx_buffer;
858 tx->tx_msg.ram_u.putack.rapam_dst.desc.rard_nob = mlen;
860 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
862 kranal_post_fma(conn, tx);
864 /* flag matched by consuming rx message */
865 kranal_consume_rxmsg(conn, NULL, 0);
871 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
872 unsigned int niov, struct iovec *iov,
873 size_t offset, size_t mlen, size_t rlen)
875 return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
880 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
881 unsigned int niov, ptl_kiov_t *kiov,
882 size_t offset, size_t mlen, size_t rlen)
884 return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
889 kranal_thread_start (int(*fn)(void *arg), void *arg)
891 long pid = kernel_thread(fn, arg, 0);
896 atomic_inc(&kranal_data.kra_nthreads);
901 kranal_thread_fini (void)
903 atomic_dec(&kranal_data.kra_nthreads);
907 kranal_check_conn (kra_conn_t *conn)
910 struct list_head *ttmp;
913 unsigned long now = jiffies;
915 if (!conn->rac_closing &&
916 time_after_eq(now, conn->rac_last_sent + conn->rac_keepalive * HZ)) {
917 /* not sent in a while; schedule conn so scheduler sends a keepalive */
918 kranal_schedule_conn(conn);
921 /* wait twice as long for CLOSE to be sure peer is dead */
922 timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
924 if (!conn->rac_close_recvd &&
925 time_after_eq(now, conn->rac_last_rx + timeout)) {
926 CERROR("Nothing received from "LPX64" within %d seconds\n",
927 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
931 if (conn->rac_closing)
934 /* Check the conn's queues are moving. These are "belt+braces" checks,
935 * in case of hardware/software errors that make this conn seem
936 * responsive even though it isn't progressing its message queues. */
938 spin_lock_irqsave(&conn->rac_lock, flags);
940 list_for_each (ttmp, &conn->rac_fmaq) {
941 tx = list_entry(ttmp, kra_tx_t, tx_list);
943 if (time_after_eq(now, tx->tx_qtime + timeout)) {
944 spin_unlock_irqrestore(&conn->rac_lock, flags);
945 CERROR("tx on fmaq for "LPX64" blocked %d seconds\n",
946 conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
951 list_for_each (ttmp, &conn->rac_rdmaq) {
952 tx = list_entry(ttmp, kra_tx_t, tx_list);
954 if (time_after_eq(now, tx->tx_qtime + timeout)) {
955 spin_unlock_irqrestore(&conn->rac_lock, flags);
956 CERROR("tx on rdmaq for "LPX64" blocked %d seconds\n",
957 conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
962 list_for_each (ttmp, &conn->rac_replyq) {
963 tx = list_entry(ttmp, kra_tx_t, tx_list);
965 if (time_after_eq(now, tx->tx_qtime + timeout)) {
966 spin_unlock_irqrestore(&conn->rac_lock, flags);
967 CERROR("tx on replyq for "LPX64" blocked %d seconds\n",
968 conn->rac_perr->rap_nid, (now - tx->tx_qtime)/HZ);
973 spin_unlock_irqrestore(&conn->rac_lock, flags);
978 kranal_check_conns (int idx, unsigned long *min_timeoutp)
980 struct list_head *conns = &kranal_data.kra_conns[idx];
981 struct list_head *ctmp;
985 /* NB. We expect to check all the conns and not find any problems, so
986 * we just use a shared lock while we take a look... */
987 read_lock(&kranal_data.kra_global_lock);
989 list_for_each (ctmp, conns) {
990 conn = list_entry(ptmp, kra_conn_t, rac_hashlist);
992 if (conn->rac_timeout < *min_timeoutp )
993 *min_timeoutp = conn->rac_timeout;
994 if (conn->rac_keepalive < *min_timeoutp )
995 *min_timeoutp = conn->rac_keepalive;
997 rc = kranal_check_conn(conn);
1001 kranal_conn_addref(conn);
1002 read_unlock(&kranal_data.kra_global_lock);
1004 CERROR("Check on conn to "LPX64"failed: %d\n",
1005 conn->rac_peer->rap_nid, rc);
1007 write_lock_irqsave(&kranal_data.kra_global_lock);
1009 if (!conn->rac_closing)
1010 kranal_close_conn_locked(conn, -ETIMEDOUT);
1012 kranal_terminate_conn_locked(conn);
1014 kranal_conn_decref(conn);
1016 /* start again now I've dropped the lock */
1020 read_unlock(&kranal_data.kra_global_lock);
1024 kranal_connd (void *arg)
1028 unsigned long flags;
1032 snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1033 kportal_daemonize(name);
1034 kportal_blockallsigs();
1036 init_waitqueue_entry(&wait, current);
1038 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1040 while (!kranal_data.kra_shutdown) {
1041 /* Safe: kra_shutdown only set when quiescent */
1043 if (!list_empty(&kranal_data.kra_connd_peers)) {
1044 peer = list_entry(kranal_data.kra_connd_peers.next,
1045 kra_peer_t, rap_connd_list);
1047 list_del_init(&peer->rap_connd_list);
1048 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1050 kranal_connect(peer);
1051 kranal_put_peer(peer);
1053 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1057 set_current_state(TASK_INTERRUPTIBLE);
1058 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1060 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1064 set_current_state(TASK_RUNNING);
1065 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1067 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1070 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1072 kranal_thread_fini();
1077 kranal_update_reaper_timeout(long timeout)
1079 unsigned long flags;
1081 LASSERT (timeout > 0);
1083 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1085 if (timeout < kranal_data.kra_new_min_timeout)
1086 kranal_data.kra_new_min_timeout = timeout;
1088 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1092 kranal_reaper (void *arg)
1095 unsigned long flags;
1098 unsigned long flags;
1101 int conn_entries = kranal_data.kra_conn_hash_size;
1103 int base_index = conn_entries - 1;
1104 unsigned long next_check_time = jiffies;
1105 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1106 long current_min_timeout = 1;
1108 kportal_daemonize("kranal_reaper");
1109 kportal_blockallsigs();
1111 init_waitqueue_entry(&wait, current);
1113 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1114 kranal_data.kra_new_min_timeout = 1;
1116 while (!kranal_data.kra_shutdown) {
1118 /* careful with the jiffy wrap... */
1119 timeout = (long)(next_check_time - jiffies);
1122 /* I wake up every 'p' seconds to check for
1123 * timeouts on some more peers. I try to check
1124 * every connection 'n' times within the global
1125 * minimum of all keepalive and timeout intervals,
1126 * to ensure I attend to every connection within
1127 * (n+1)/n times its timeout intervals. */
1131 unsigned long min_timeout;
1134 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1135 /* new min timeout set: restart min timeout scan */
1136 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1137 base_index = conn_index - 1;
1139 base_index = conn_entries - 1;
1141 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1142 current_min_timeout = kranal_data.kra_new_min_timeout;
1143 CWARN("Set new min timeout %ld\n",
1144 current_min_timeout);
1147 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1149 min_timeout = current_min_timeout;
1151 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1154 LASSERT (min_timeout > 0);
1156 /* Compute how many table entries to check now so I
1157 * get round the whole table fast enough (NB I do
1158 * this at fixed intervals of 'p' seconds) */
1159 chunk = conn_entries;
1160 if (min_timeout > n * p)
1161 chunk = (chunk * n * p) / min_timeout;
1165 for (i = 0; i < chunk; i++) {
1166 kranal_check_conns(conn_index,
1168 conn_index = (conn_index + 1) % conn_entries;
1171 next_check_time += p * HZ;
1173 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1175 if (((conn_index - chunk <= base_index &&
1176 base_index < conn_index) ||
1177 (conn_index - conn_entries - chunk <= base_index &&
1178 base_index < conn_index - conn_entries))) {
1180 /* Scanned all conns: set current_min_timeout... */
1181 if (current_min_timeout != next_min_timeout) {
1182 current_min_timeout = next_min_timeout;
1183 CWARN("Set new min timeout %ld\n",
1184 current_min_timeout);
1187 /* ...and restart min timeout scan */
1188 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1189 base_index = conn_index - 1;
1191 base_index = conn_entries - 1;
1195 set_current_state(TASK_INTERRUPTIBLE);
1196 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1198 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1201 schedule_timeout(timeout);
1203 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1205 set_current_state(TASK_RUNNING);
1206 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1209 kranal_thread_fini();
1214 kranal_process_rdmaq (__u32 cqid)
1219 unsigned long flags;
1220 RAP_RDMA_DESCRIPTOR *desc;
1222 read_lock(&kranal_data.kra_global_lock);
1224 conn = kranal_cqid2conn_locked(cqid);
1225 LASSERT (conn != NULL);
1227 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1228 LASSERT (rrc == RAP_SUCCESS);
1230 spin_lock_irqsave(&conn->rac_lock, flags);
1232 LASSERT (!list_empty(&conn->rac_rdmaq));
1233 tx = list_entry(con->rac_rdmaq.next, kra_tx_t, tx_list);
1234 list_del(&tx->tx_list);
1236 LASSERT(desc->AppPtr == (void *)tx);
1237 LASSERT(desc->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1238 desc->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1240 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1241 tx->tx_qtime = jiffies;
1243 spin_unlock_irqrestore(&conn->rac_lock, flags);
1245 /* Get conn's fmaq processed, now I've just put something there */
1246 kranal_schedule_conn(conn);
1248 read_unlock(&kranal_data.kra_global_lock);
1252 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1253 void *immediate, int immediatenob)
1255 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1257 LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
1258 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1259 immediatenob <= RANAL_FMA_MAX_DATA_LEN :
1262 msg->ram_incarnation = conn->rac_incarnation;
1263 msg->ram_seq = conn->rac_tx_seq;
1266 rrc = RapkFmaSyncSend(conn->rac_device.rad_handle,
1267 immediate, immediatenob,
1270 rrc = RapkFmaSend(conn->rac_device.rad_handle,
1271 immediate, immediatenob,
1276 conn->rac_last_tx = jiffies;
1289 kranal_process_fmaq (kra_conn_t *conn)
1291 unsigned long flags;
1297 /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
1298 * out of credits when I try to send right now... */
1300 if (conn->rac_closing) {
1302 if (!list_empty(&conn->rac_rdmaq)) {
1303 /* Can't send CLOSE yet; I'm still waiting for RDMAs I
1304 * posted to finish */
1305 LASSERT (!conn->rac_close_sent);
1306 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1307 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1311 if (conn->rac_close_sent)
1314 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1315 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1316 conn->rac_close_sent = (rc == 0);
1320 spin_lock_irqsave(&conn->rac_lock, flags);
1322 if (list_empty(&conn->rac_fmaq)) {
1324 spin_unlock_irqrestore(&conn->rac_lock, flags);
1326 if (time_after_eq(conn->rac_last_tx + conn->rac_keepalive)) {
1327 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1328 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1333 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1334 list_del(&tx->tx_list);
1335 more_to_do = !list_empty(&conn->rac_fmaq);
1337 spin_unlock_irqrestore(&conn->rac_lock, flags);
1340 switch (tx->tx_msg.ram_type) {
1344 case RANAL_MSG_IMMEDIATE:
1345 case RANAL_MSG_PUT_NAK:
1346 case RANAL_MSG_PUT_DONE:
1347 case RANAL_MSG_GET_NAK:
1348 case RANAL_MSG_GET_DONE:
1349 rc = kranal_sendmsg(conn, &tx->tx_msg,
1350 tx->tx_buffer, tx->tx_nob);
1354 case RANAL_MSG_PUT_REQ:
1355 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1356 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1357 kranal_map_buffer(tx);
1361 case RANAL_MSG_PUT_ACK:
1362 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1366 case RANAL_MSG_GET_REQ:
1367 kranal_map_buffer(tx);
1368 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1369 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1370 tx->tx_msg.ram_u.get.ragm_desc.rard_addr = tx->tx_buffer;
1371 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1372 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1377 if (rc == -EAGAIN) {
1378 /* replace at the head of the list for later */
1379 spin_lock_irqsave(&conn->rac_lock, flags);
1380 list_add(&tx->tx_list, &conn->rac_fmaq);
1381 spin_unlock_irqrestore(&conn->rac_lock, flags);
1388 if (!expect_reply) {
1389 kranal_tx_done(tx, 0);
1391 spin_lock_irqsave(&conn->rac_lock, flags);
1392 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1393 tx->tx_qtime = jiffies;
1394 spin_unlock_irqrestore(&conn->rac_lock, flags);
1401 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1403 __swab64s(&d->rard_key.Key);
1404 __swab16s(&d->rard_key.Cookie);
1405 __swab16s(&d->rard_key.MdHandle);
1406 __swab32s(&d->rard_key.Flags);
1407 __swab64s(&d->rard_addr);
1408 __swab32s(&d->rard_nob);
1412 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1414 unsigned long flags;
1415 struct list_head *ttmp;
1418 list_for_each(ttmp, &conn->rac_replyq) {
1419 tx = list_entry(ttmp, kra_tx_t, tx_list);
1421 if (tx->tx_cookie != cookie)
1424 if (tx->tx_msg.ram_type != type) {
1425 CWARN("Unexpected type %x (%x expected) "
1426 "matched reply from "LPX64"\n",
1427 tx->tx_msg.ram_type, type,
1428 conn->rac_peer->rap_nid);
1433 CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1438 kranal_process_receives(kra_conn_t *conn)
1440 unsigned long flags;
1444 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &msg);
1445 kra_peer_t *peer = conn->rac_peer;
1447 if (rrc == RAP_NOT_DONE)
1450 LASSERT (rrc == RAP_SUCCESS);
1451 conn->rac_last_rx = jiffies;
1452 seq = conn->rac_seq++;
1454 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1455 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1456 CERROR("Unexpected magic %08x from "LPX64"\n",
1457 msg->ram_magic, peer->rap_nid);
1461 __swab32s(&msg->ram_magic);
1462 __swab16s(&msg->ram_version);
1463 __swab16s(&msg->ram_type);
1464 __swab64s(&msg->ram_srcnid);
1465 __swab64s(&msg->ram_incarnation);
1466 __swab32s(&msg->ram_seq);
1468 /* NB message type checked below; NOT here... */
1469 switch (msg->ram_type) {
1470 case RANAL_MSG_PUT_ACK:
1471 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1474 case RANAL_MSG_GET_REQ:
1475 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1483 if (msg->ram_version != RANAL_MSG_VERSION) {
1484 CERROR("Unexpected protocol version %d from "LPX64"\n",
1485 msg->ram_version, peer->rap_nid);
1489 if (msg->ram_srcnid != peer->rap_nid) {
1490 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1491 msg->ram_srcnid, peer->rap_nid);
1495 if (msg->ram_incarnation != conn->rac_incarnation) {
1496 CERROR("Unexpected incarnation "LPX64"("LPX64
1497 " expected) from "LPX64"\n",
1498 msg->ram_incarnation, conn->rac_incarnation,
1503 if (msg->ram_seq != seq) {
1504 CERROR("Unexpected sequence number %d(%d expected) from "
1505 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1509 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1510 /* This message signals RDMA completion: wait now... */
1511 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1512 LASSERT (rrc == RAP_SUCCESS);
1515 if (msg->ram_type == RANAL_MSG_CLOSE) {
1516 conn->rac_close_recvd = 1;
1517 write_lock_irqsave(&kranal_data.kra_global_lock);
1519 if (!conn->rac_closing)
1520 kranal_close_conn_locked(conn, -ETIMEDOUT);
1521 else if (conn->rac_close_sent)
1522 kranal_terminate_conn_locked(conn);
1527 if (conn->rac_closing)
1530 conn->rac_rxmsg = msg; /* stash message for portals callbacks */
1531 /* they'll NULL rac_rxmsg if they consume it */
1532 switch (msg->ram_type) {
1533 case RANAL_MSG_NOOP:
1534 /* Nothing to do; just a keepalive */
1537 case RANAL_MSG_IMMEDIATE:
1538 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1541 case RANAL_MSG_PUT_REQ:
1542 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1544 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1547 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1551 tx->tx_msg.ram_u.racm_cookie = msg->msg_u.putreq.raprm_cookie;
1552 kranal_post_fma(conn, tx);
1555 case RANAL_MSG_PUT_NAK:
1556 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1557 msg->ram_u.completion.racm_cookie);
1561 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1562 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1563 kranal_tx_done(tx, -ENOENT); /* no match */
1566 case RANAL_MSG_PUT_ACK:
1567 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1568 msg->ram_u.putack.rapam_src_cookie);
1572 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1573 &msg->ram_u.putack.rapam_desc,
1574 msg->msg_u.putack.rapam_desc.rard_nob,
1575 msg->ram_u.putack.rapam_dst_cookie);
1578 case RANAL_MSG_PUT_DONE:
1579 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1580 msg->ram_u.completion.racm_cookie);
1584 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1585 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1586 kranal_tx_done(tx, 0);
1589 case RANAL_MSG_GET_REQ:
1590 lib_parse(&kranal_lib, &msg->ram_u.getreq.ragm_hdr, conn);
1592 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1595 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1599 tx->tx_msg.ram_u.racm_cookie = msg->msg_u.getreq.ragm_cookie;
1600 kranal_post_fma(conn, tx);
1603 case RANAL_MSG_GET_NAK:
1604 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1605 msg->ram_u.completion.racm_cookie);
1609 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1610 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1611 kranal_tx_done(tx, -ENOENT); /* no match */
1614 case RANAL_MSG_GET_DONE:
1615 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1616 msg->ram_u.completion.racm_cookie);
1620 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1621 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1622 kranal_tx_done(tx, 0);
1627 if (conn->rac_msg != NULL)
1628 kranal_consume_rxmsg(conn, NULL, 0);
1634 kranal_scheduler (void *arg)
1636 kra_device_t *dev = (kra_device_t *)arg;
1640 unsigned long flags;
1647 snprintf(name, sizeof(name), "kranal_sd_%02ld", dev->rad_idx);
1648 kportal_daemonize(name);
1649 kportal_blockallsigs();
1651 init_waitqueue_entry(&wait, current);
1653 spin_lock_irqsave(&dev->rad_lock, flags);
1655 while (!kranal_data.kra_shutdown) {
1656 /* Safe: kra_shutdown only set when quiescent */
1658 if (busy_loops++ >= RANAL_RESCHED) {
1659 spin_unlock_irqrestore(&dev->rad_lock, flags);
1664 spin_lock_irqsave(&dev->rad_lock, flags);
1669 if (dev->rad_ready) {
1671 spin_unlock_irqrestore(&dev->rad_lock, flags);
1673 rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1675 LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1676 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1678 if (rrc == RAP_SUCCESS) {
1679 kranal_process_rdmaq(cqid);
1683 rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1684 LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1686 if (rrc == RAP_SUCCESS) {
1687 if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
1688 kranal_schedule_dev(dev);
1690 kranal_schedule_cqid(cqid);
1694 spin_lock_irqsave(&dev->rad_lock, flags);
1696 /* If there were no completions to handle, I leave
1697 * rad_ready clear. NB I cleared it BEFORE I checked
1698 * the completion queues since I'm racing with the
1699 * device callback. */
1705 if (!list_empty(&dev->rad_connq)) {
1706 conn = list_entry(dev->rad_connq.next,
1707 kra_conn_t, rac_schedlist);
1708 list_del(&conn->rac_schedlist);
1709 spin_unlock_irqrestore(&dev->rad_lock, flags);
1711 LASSERT (conn->rac_scheduled);
1713 resched = kranal_process_fmaq(conn);
1714 resched |= kranal_process_receives(conn);
1717 spin_lock_irqsave(&dev->rad_lock, flags);
1719 list_add_tail(&conn->rac_schedlist,
1726 add_wait_queue(&dev->rad_waitq, &wait);
1727 set_current_state(TASK_INTERRUPTIBLE);
1729 spin_unlock_irqrestore(&dev->rad_lock, flags);
1734 set_current_state(TASK_RUNNING);
1735 remove_wait_queue(&dev->rad_waitq, &wait);
1737 spin_lock_irqsave(&dev->rad_lock, flags);
1740 spin_unlock_irqrestore(&dev->rad_lock, flags);
1742 kranal_thread_fini();
1747 lib_nal_t kranal_lib = {
1748 libnal_data: &kranal_data, /* NAL private data */
1749 libnal_send: kranal_send,
1750 libnal_send_pages: kranal_send_pages,
1751 libnal_recv: kranal_recv,
1752 libnal_recv_pages: kranal_recv_pages,
1753 libnal_dist: kranal_dist