1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid)
47 for (i = 0; i < kranal_data.kra_ndevs; i++) {
49 dev = &kranal_data.kra_devices[i];
50 if (dev->rad_id != devid)
53 spin_lock_irqsave(&dev->rad_lock, flags);
55 if (!dev->rad_ready) {
57 wake_up(&dev->rad_waitq);
60 spin_unlock_irqrestore(&dev->rad_lock, flags);
64 CWARN("callback for unknown device %d\n", devid);
68 kranal_schedule_conn(kra_conn_t *conn)
70 kra_device_t *dev = conn->rac_device;
73 spin_lock_irqsave(&dev->rad_lock, flags);
75 if (!conn->rac_scheduled) {
76 kranal_conn_addref(conn); /* +1 ref for scheduler */
77 conn->rac_scheduled = 1;
78 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79 wake_up(&dev->rad_waitq);
82 spin_unlock_irqrestore(&dev->rad_lock, flags);
86 kranal_schedule_cqid (__u32 cqid)
89 struct list_head *conns;
90 struct list_head *tmp;
92 conns = kranal_cqid2connlist(cqid);
94 read_lock(&kranal_data.kra_global_lock);
96 conn = kranal_cqid2conn_locked(cqid);
99 CWARN("no cqid %x\n", cqid);
101 kranal_schedule_conn(conn);
103 read_unlock(&kranal_data.kra_global_lock);
107 kranal_schedule_dev(kra_device_t *dev)
110 struct list_head *conns;
111 struct list_head *tmp;
114 /* Don't do this in IRQ context (servers may have 1000s of clients) */
115 LASSERT (!in_interrupt());
117 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
119 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
121 /* Drop the lock on each hash bucket to ensure we don't
122 * block anyone for too long at IRQ priority on another CPU */
124 read_lock(&kranal_data.kra_global_lock);
126 conns = &kranal_data.kra_conns[i];
128 list_for_each (tmp, conns) {
129 conn = list_entry(tmp, kra_conn_t, rac_hashlist);
131 if (conn->rac_device == dev)
132 kranal_schedule_conn(conn);
134 read_unlock(&kranal_data.kra_global_lock);
139 kranal_tx_done (kra_tx_t *tx, int completion)
141 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
147 LASSERT (!in_interrupt());
149 switch (tx->tx_buftype) {
154 case RANAL_BUF_IMMEDIATE:
155 case RANAL_BUF_PHYS_UNMAPPED:
156 case RANAL_BUF_VIRT_UNMAPPED:
159 case RANAL_BUF_PHYS_MAPPED:
160 LASSERT (tx->tx_conn != NULL);
161 dev = tx->tx_conn->rac_device;
162 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
163 dev->rad_ptag, &tx->tx_map_key);
164 LASSERT (rrc == RAP_SUCCESS);
167 case RANAL_BUF_VIRT_MAPPED:
168 LASSERT (tx->tx_conn != NULL);
169 dev = tx->tx_conn->rac_device;
170 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
171 dev->rad_ptag, &tx->tx_map_key);
172 LASSERT (rrc == RAP_SUCCESS);
176 for (i = 0; i < 2; i++) {
177 /* tx may have up to 2 libmsgs to finalise */
178 if (tx->tx_libmsg[i] == NULL)
181 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
182 tx->tx_libmsg[i] = NULL;
185 tx->tx_buftype = RANAL_BUF_NONE;
186 tx->tx_msg.ram_type = RANAL_MSG_NONE;
189 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
192 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
194 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
195 wake_up(&kranal_data.kra_idle_tx_waitq);
198 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
202 kranal_get_idle_tx (int may_block)
208 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
210 /* "normal" descriptor is free */
211 if (!list_empty(&kranal_data.kra_idle_txs)) {
212 tx = list_entry(kranal_data.kra_idle_txs.next,
218 /* may dip into reserve pool */
219 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
220 CERROR("reserved tx desc pool exhausted\n");
224 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
229 /* block for idle tx */
230 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
232 wait_event(kranal_data.kra_idle_tx_waitq,
233 !list_empty(&kranal_data.kra_idle_txs));
237 list_del(&tx->tx_list);
239 /* Allocate a new completion cookie. It might not be
240 * needed, but we've got a lock right now... */
241 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
243 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
244 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
245 LASSERT (tx->tx_conn == NULL);
246 LASSERT (tx->tx_libmsg[0] == NULL);
247 LASSERT (tx->tx_libmsg[1] == NULL);
250 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
256 kranal_init_msg(kra_msg_t *msg, int type)
258 msg->ram_magic = RANAL_MSG_MAGIC;
259 msg->ram_version = RANAL_MSG_VERSION;
260 msg->ram_type = type;
261 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
262 /* ram_incarnation gets set when FMA is sent */
266 kranal_new_tx_msg (int may_block, int type)
268 kra_tx_t *tx = kranal_get_idle_tx(may_block);
273 kranal_init_msg(&tx->tx_msg, type);
278 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
284 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
286 while (offset >= iov->iov_len) {
287 offset -= iov->iov_len;
293 if (nob > iov->iov_len - offset) {
294 CERROR("Can't handle multiple vaddr fragments\n");
298 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
300 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
305 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
311 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
313 while (offset >= iov->iov_len) {
314 offset -= iov->iov_len;
320 if (nob > iov->iov_len - offset) {
321 CERROR("Can't handle multiple vaddr fragments\n");
325 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
327 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
332 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
335 RAP_PHYS_REGION *phys = tx->tx_phys;
338 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
342 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
344 while (offset >= kiov->kiov_len) {
345 offset -= kiov->kiov_len;
351 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
353 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
355 phys->Address = kranal_page2phys(kiov->kiov_page);
356 phys->Length = PAGE_SIZE;
359 resid = nob - (kiov->kiov_len - offset);
365 if (kiov->kiov_offset != 0 ||
366 ((resid > PAGE_SIZE) &&
367 kiov->kiov_len < PAGE_SIZE)) {
369 /* Can't have gaps */
370 CERROR("Can't make payload contiguous in I/O VM:"
371 "page %d, offset %d, len %d \n",
373 kiov->kiov_offset, kiov->kiov_len);
377 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
378 CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
382 phys->Address = kranal_page2phys(kiov->kiov_page);
383 phys->Length = PAGE_SIZE;
389 tx->tx_phys_npages = phys - tx->tx_phys;
394 kranal_setup_buffer (kra_tx_t *tx, int niov,
395 struct iovec *iov, ptl_kiov_t *kiov,
398 LASSERT ((iov == NULL) != (kiov == NULL));
401 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
403 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
407 kranal_map_buffer (kra_tx_t *tx)
409 kra_conn_t *conn = tx->tx_conn;
410 kra_device_t *dev = conn->rac_device;
413 switch (tx->tx_buftype) {
416 case RANAL_BUF_PHYS_UNMAPPED:
417 rrc = RapkRegisterPhys(conn->rac_device->rad_handle,
418 tx->tx_phys, tx->tx_phys_npages,
419 conn->rac_device->rad_ptag,
421 LASSERT (rrc == RAP_SUCCESS);
422 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
425 case RANAL_BUF_VIRT_UNMAPPED:
426 rrc = RapkRegisterMemory(conn->rac_device->rad_handle,
427 tx->tx_buffer, tx->tx_nob,
428 conn->rac_device->rad_ptag,
430 LASSERT (rrc == RAP_SUCCESS);
431 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
437 kranal_find_conn_locked (kra_peer_t *peer)
439 struct list_head *tmp;
441 /* just return the first connection */
442 list_for_each (tmp, &peer->rap_conns) {
443 return list_entry(tmp, kra_conn_t, rac_list);
450 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
456 spin_lock_irqsave(&conn->rac_lock, flags);
457 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
458 tx->tx_qtime = jiffies;
459 spin_unlock_irqrestore(&conn->rac_lock, flags);
461 kranal_schedule_conn(conn);
465 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
471 rwlock_t *g_lock = &kranal_data.kra_global_lock;
473 /* If I get here, I've committed to send, so I complete the tx with
474 * failure on any problems */
476 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
480 peer = kranal_find_peer_locked(nid);
483 kranal_tx_done(tx, -EHOSTUNREACH);
487 conn = kranal_find_conn_locked(peer);
489 kranal_post_fma(conn, tx);
494 /* Making one or more connections; I'll need a write lock... */
496 write_lock_irqsave(g_lock, flags);
498 peer = kranal_find_peer_locked(nid);
500 write_unlock_irqrestore(g_lock, flags);
501 kranal_tx_done(tx, -EHOSTUNREACH);
505 conn = kranal_find_conn_locked(peer);
507 /* Connection exists; queue message on it */
508 kranal_post_fma(conn, tx);
509 write_unlock_irqrestore(g_lock, flags);
513 LASSERT (peer->rap_persistence > 0);
515 if (!peer->rap_connecting) {
517 if (now < peer->rap_reconnect_time) {
518 write_unlock_irqrestore(g_lock, flags);
519 kranal_tx_done(tx, -EHOSTUNREACH);
523 peer->rap_connecting = 1;
524 kranal_peer_addref(peer); /* extra ref for connd */
526 spin_lock(&kranal_data.kra_connd_lock);
528 list_add_tail(&peer->rap_connd_list,
529 &kranal_data.kra_connd_peers);
530 wake_up(&kranal_data.kra_connd_waitq);
532 spin_unlock(&kranal_data.kra_connd_lock);
535 /* A connection is being established; queue the message... */
536 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
538 write_unlock_irqrestore(g_lock, flags);
542 kranal_rdma(kra_tx_t *tx, int type,
543 kra_rdma_desc_t *rard, int nob, __u64 cookie)
545 kra_conn_t *conn = tx->tx_conn;
549 /* prep final completion message */
550 kranal_init_msg(&tx->tx_msg, type);
551 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
553 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
554 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
555 LASSERT (nob <= rard->rard_nob);
557 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
558 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
559 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
560 tx->tx_rdma_desc.DstPtr = rard->rard_addr;
561 tx->tx_rdma_desc.DstKey = rard->rard_key;
562 tx->tx_rdma_desc.Length = nob;
563 tx->tx_rdma_desc.AppPtr = tx;
565 if (nob == 0) { /* Immediate completion */
566 kranal_post_fma(conn, tx);
570 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
571 LASSERT (rrc == RAP_SUCCESS);
573 spin_lock_irqsave(&conn->rac_lock, flags);
574 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
575 tx->tx_qtime = jiffies;
576 spin_unlock_irqrestore(&conn->rac_lock, flags);
580 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
582 __u32 nob_received = nob;
585 LASSERT (conn->rac_rxmsg != NULL);
587 rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
588 &nob_received, sizeof(kra_msg_t));
589 LASSERT (rrc == RAP_SUCCESS);
591 conn->rac_rxmsg = NULL;
593 if (nob_received != nob) {
594 CWARN("Expected %d immediate bytes but got %d\n",
603 kranal_do_send (lib_nal_t *nal,
620 /* NB 'private' is different depending on what we're sending.... */
622 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
623 " pid %d\n", nob, niov, nid , pid);
625 LASSERT (nob == 0 || niov > 0);
626 LASSERT (niov <= PTL_MD_MAX_IOV);
628 LASSERT (!in_interrupt());
629 /* payload is either all vaddrs or all pages */
630 LASSERT (!(kiov != NULL && iov != NULL));
636 case PTL_MSG_REPLY: {
637 /* reply's 'private' is the conn that received the GET_REQ */
639 LASSERT (conn->rac_rxmsg != NULL);
641 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
642 if (nob > RANAL_MAX_IMMEDIATE) {
643 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
647 break; /* RDMA not expected */
650 /* Incoming message consistent with immediate reply? */
651 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
652 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
653 nid, conn->rac_rxmsg->ram_type);
657 tx = kranal_get_idle_tx(0);
661 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
663 kranal_tx_done(tx, rc);
668 tx->tx_libmsg[0] = libmsg;
670 kranal_map_buffer(tx);
671 kranal_rdma(tx, RANAL_MSG_GET_DONE,
672 &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
673 conn->rac_rxmsg->ram_u.get.ragm_cookie);
678 if (kiov == NULL && /* not paged */
679 nob <= RANAL_MAX_IMMEDIATE && /* small enough */
680 nob <= kranal_tunables.kra_max_immediate)
681 break; /* send IMMEDIATE */
683 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_REQ);
687 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
689 kranal_tx_done(tx, rc);
693 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
694 if (tx->tx_libmsg[1] == NULL) {
695 CERROR("Can't create reply for GET to "LPX64"\n", nid);
696 kranal_tx_done(tx, rc);
700 tx->tx_libmsg[0] = libmsg;
701 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
702 /* rest of tx_msg is setup just before it is sent */
703 kranal_launch_tx(tx, nid);
711 if (kiov == NULL && /* not paged */
712 nob <= RANAL_MAX_IMMEDIATE && /* small enough */
713 nob <= kranal_tunables.kra_max_immediate)
714 break; /* send IMMEDIATE */
716 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
720 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
722 kranal_tx_done(tx, rc);
726 tx->tx_libmsg[0] = libmsg;
727 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
728 /* rest of tx_msg is setup just before it is sent */
729 kranal_launch_tx(tx, nid);
733 LASSERT (kiov == NULL);
734 LASSERT (nob <= RANAL_MAX_IMMEDIATE);
736 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
737 type == PTL_MSG_REPLY ||
739 RANAL_MSG_IMMEDIATE);
743 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
745 kranal_tx_done(tx, rc);
749 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
750 tx->tx_libmsg[0] = libmsg;
751 kranal_launch_tx(tx, nid);
756 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
757 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
758 unsigned int niov, struct iovec *iov,
759 size_t offset, size_t len)
761 return kranal_do_send(nal, private, cookie,
768 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
769 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
770 unsigned int niov, ptl_kiov_t *kiov,
771 size_t offset, size_t len)
773 return kranal_do_send(nal, private, cookie,
780 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
781 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
782 size_t offset, size_t mlen, size_t rlen)
784 kra_conn_t *conn = private;
785 kra_msg_t *rxmsg = conn->rac_rxmsg;
790 LASSERT (mlen <= rlen);
791 LASSERT (!in_interrupt());
792 /* Either all pages or all vaddrs */
793 LASSERT (!(kiov != NULL && iov != NULL));
795 switch(rxmsg->ram_type) {
800 case RANAL_MSG_IMMEDIATE:
803 } else if (kiov != NULL) {
804 CERROR("Can't recv immediate into paged buffer\n");
808 while (offset >= iov->iov_len) {
809 offset -= iov->iov_len;
814 if (mlen > iov->iov_len - offset) {
815 CERROR("Can't handle immediate frags\n");
818 buffer = ((char *)iov->iov_base) + offset;
820 rc = kranal_consume_rxmsg(conn, buffer, mlen);
821 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
824 case RANAL_MSG_GET_REQ:
825 /* If the GET matched, we've already handled it in
826 * kranal_do_send which is called to send the REPLY. We're
827 * only called here to complete the GET receive (if we needed
828 * it which we don't, but I digress...) */
829 LASSERT (libmsg == NULL);
830 lib_finalize(nal, NULL, libmsg, PTL_OK);
833 case RANAL_MSG_PUT_REQ:
834 if (libmsg == NULL) { /* PUT didn't match... */
835 lib_finalize(nal, NULL, libmsg, PTL_OK);
839 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
843 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
845 kranal_tx_done(tx, rc);
849 kranal_map_buffer(tx);
851 tx->tx_msg.ram_u.putack.rapam_src_cookie =
852 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
853 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
854 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
855 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
856 (__u64)((unsigned long)tx->tx_buffer);
857 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
859 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
861 kranal_post_fma(conn, tx);
863 /* flag matched by consuming rx message */
864 kranal_consume_rxmsg(conn, NULL, 0);
870 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
871 unsigned int niov, struct iovec *iov,
872 size_t offset, size_t mlen, size_t rlen)
874 return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
879 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
880 unsigned int niov, ptl_kiov_t *kiov,
881 size_t offset, size_t mlen, size_t rlen)
883 return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
888 kranal_thread_start (int(*fn)(void *arg), void *arg)
890 long pid = kernel_thread(fn, arg, 0);
895 atomic_inc(&kranal_data.kra_nthreads);
900 kranal_thread_fini (void)
902 atomic_dec(&kranal_data.kra_nthreads);
906 kranal_check_conn (kra_conn_t *conn)
909 struct list_head *ttmp;
912 unsigned long now = jiffies;
914 if (!conn->rac_closing &&
915 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
916 /* not sent in a while; schedule conn so scheduler sends a keepalive */
917 kranal_schedule_conn(conn);
920 /* wait twice as long for CLOSE to be sure peer is dead */
921 timeout = (conn->rac_closing ? 1 : 2) * conn->rac_timeout * HZ;
923 if (!conn->rac_close_recvd &&
924 time_after_eq(now, conn->rac_last_rx + timeout)) {
925 CERROR("Nothing received from "LPX64" within %lu seconds\n",
926 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
930 if (conn->rac_closing)
933 /* Check the conn's queues are moving. These are "belt+braces" checks,
934 * in case of hardware/software errors that make this conn seem
935 * responsive even though it isn't progressing its message queues. */
937 spin_lock_irqsave(&conn->rac_lock, flags);
939 list_for_each (ttmp, &conn->rac_fmaq) {
940 tx = list_entry(ttmp, kra_tx_t, tx_list);
942 if (time_after_eq(now, tx->tx_qtime + timeout)) {
943 spin_unlock_irqrestore(&conn->rac_lock, flags);
944 CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
945 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
950 list_for_each (ttmp, &conn->rac_rdmaq) {
951 tx = list_entry(ttmp, kra_tx_t, tx_list);
953 if (time_after_eq(now, tx->tx_qtime + timeout)) {
954 spin_unlock_irqrestore(&conn->rac_lock, flags);
955 CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
956 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
961 list_for_each (ttmp, &conn->rac_replyq) {
962 tx = list_entry(ttmp, kra_tx_t, tx_list);
964 if (time_after_eq(now, tx->tx_qtime + timeout)) {
965 spin_unlock_irqrestore(&conn->rac_lock, flags);
966 CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
967 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
972 spin_unlock_irqrestore(&conn->rac_lock, flags);
977 kranal_check_conns (int idx, unsigned long *min_timeoutp)
979 struct list_head *conns = &kranal_data.kra_conns[idx];
980 struct list_head *ctmp;
986 /* NB. We expect to check all the conns and not find any problems, so
987 * we just use a shared lock while we take a look... */
988 read_lock(&kranal_data.kra_global_lock);
990 list_for_each (ctmp, conns) {
991 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
993 if (conn->rac_timeout < *min_timeoutp )
994 *min_timeoutp = conn->rac_timeout;
995 if (conn->rac_keepalive < *min_timeoutp )
996 *min_timeoutp = conn->rac_keepalive;
998 rc = kranal_check_conn(conn);
1002 kranal_conn_addref(conn);
1003 read_unlock(&kranal_data.kra_global_lock);
1005 CERROR("Check on conn to "LPX64"failed: %d\n",
1006 conn->rac_peer->rap_nid, rc);
1008 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1010 if (!conn->rac_closing)
1011 kranal_close_conn_locked(conn, -ETIMEDOUT);
1013 kranal_terminate_conn_locked(conn);
1015 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1017 kranal_conn_decref(conn);
1019 /* start again now I've dropped the lock */
1023 read_unlock(&kranal_data.kra_global_lock);
1027 kranal_connd (void *arg)
1031 unsigned long flags;
1035 snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1036 kportal_daemonize(name);
1037 kportal_blockallsigs();
1039 init_waitqueue_entry(&wait, current);
1041 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1043 while (!kranal_data.kra_shutdown) {
1044 /* Safe: kra_shutdown only set when quiescent */
1046 if (!list_empty(&kranal_data.kra_connd_peers)) {
1047 peer = list_entry(kranal_data.kra_connd_peers.next,
1048 kra_peer_t, rap_connd_list);
1050 list_del_init(&peer->rap_connd_list);
1051 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1053 kranal_connect(peer);
1054 kranal_peer_decref(peer);
1056 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1060 set_current_state(TASK_INTERRUPTIBLE);
1061 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1063 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1067 set_current_state(TASK_RUNNING);
1068 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1070 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1073 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1075 kranal_thread_fini();
1080 kranal_update_reaper_timeout(long timeout)
1082 unsigned long flags;
1084 LASSERT (timeout > 0);
1086 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1088 if (timeout < kranal_data.kra_new_min_timeout)
1089 kranal_data.kra_new_min_timeout = timeout;
1091 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1095 kranal_reaper (void *arg)
1098 unsigned long flags;
1103 int conn_entries = kranal_data.kra_conn_hash_size;
1105 int base_index = conn_entries - 1;
1106 unsigned long next_check_time = jiffies;
1107 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1108 long current_min_timeout = 1;
1110 kportal_daemonize("kranal_reaper");
1111 kportal_blockallsigs();
1113 init_waitqueue_entry(&wait, current);
1115 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1116 kranal_data.kra_new_min_timeout = 1;
1118 while (!kranal_data.kra_shutdown) {
1120 /* careful with the jiffy wrap... */
1121 timeout = (long)(next_check_time - jiffies);
1124 /* I wake up every 'p' seconds to check for
1125 * timeouts on some more peers. I try to check
1126 * every connection 'n' times within the global
1127 * minimum of all keepalive and timeout intervals,
1128 * to ensure I attend to every connection within
1129 * (n+1)/n times its timeout intervals. */
1133 unsigned long min_timeout;
1136 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1137 /* new min timeout set: restart min timeout scan */
1138 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1139 base_index = conn_index - 1;
1141 base_index = conn_entries - 1;
1143 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1144 current_min_timeout = kranal_data.kra_new_min_timeout;
1145 CWARN("Set new min timeout %ld\n",
1146 current_min_timeout);
1149 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1151 min_timeout = current_min_timeout;
1153 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1156 LASSERT (min_timeout > 0);
1158 /* Compute how many table entries to check now so I
1159 * get round the whole table fast enough (NB I do
1160 * this at fixed intervals of 'p' seconds) */
1161 chunk = conn_entries;
1162 if (min_timeout > n * p)
1163 chunk = (chunk * n * p) / min_timeout;
1167 for (i = 0; i < chunk; i++) {
1168 kranal_check_conns(conn_index,
1170 conn_index = (conn_index + 1) % conn_entries;
1173 next_check_time += p * HZ;
1175 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1177 if (((conn_index - chunk <= base_index &&
1178 base_index < conn_index) ||
1179 (conn_index - conn_entries - chunk <= base_index &&
1180 base_index < conn_index - conn_entries))) {
1182 /* Scanned all conns: set current_min_timeout... */
1183 if (current_min_timeout != next_min_timeout) {
1184 current_min_timeout = next_min_timeout;
1185 CWARN("Set new min timeout %ld\n",
1186 current_min_timeout);
1189 /* ...and restart min timeout scan */
1190 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1191 base_index = conn_index - 1;
1193 base_index = conn_entries - 1;
1197 set_current_state(TASK_INTERRUPTIBLE);
1198 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1200 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1202 schedule_timeout(timeout);
1204 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1206 set_current_state(TASK_RUNNING);
1207 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1210 kranal_thread_fini();
1215 kranal_process_rdmaq (__u32 cqid)
1220 unsigned long flags;
1221 RAP_RDMA_DESCRIPTOR *desc;
1223 read_lock(&kranal_data.kra_global_lock);
1225 conn = kranal_cqid2conn_locked(cqid);
1226 LASSERT (conn != NULL);
1228 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1229 LASSERT (rrc == RAP_SUCCESS);
1231 spin_lock_irqsave(&conn->rac_lock, flags);
1233 LASSERT (!list_empty(&conn->rac_rdmaq));
1234 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1235 list_del(&tx->tx_list);
1237 LASSERT(desc->AppPtr == (void *)tx);
1238 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1239 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1241 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1242 tx->tx_qtime = jiffies;
1244 spin_unlock_irqrestore(&conn->rac_lock, flags);
1246 /* Get conn's fmaq processed, now I've just put something there */
1247 kranal_schedule_conn(conn);
1249 read_unlock(&kranal_data.kra_global_lock);
1253 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1254 void *immediate, int immediatenob)
1256 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1259 LASSERT (sizeof(*msg) <= RANAL_FMA_PREFIX_LEN);
1260 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1261 immediatenob <= RANAL_FMA_MAX_DATA_LEN :
1264 msg->ram_incarnation = conn->rac_my_incarnation;
1265 msg->ram_seq = conn->rac_tx_seq;
1268 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1269 immediate, immediatenob,
1272 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1273 immediate, immediatenob,
1281 conn->rac_last_tx = jiffies;
1291 kranal_process_fmaq (kra_conn_t *conn)
1293 unsigned long flags;
1299 /* NB I will be rescheduled some via a rad_fma_cq event if my FMA is
1300 * out of credits when I try to send right now... */
1302 if (conn->rac_closing) {
1304 if (!list_empty(&conn->rac_rdmaq)) {
1305 /* Can't send CLOSE yet; I'm still waiting for RDMAs I
1306 * posted to finish */
1307 LASSERT (!conn->rac_close_sent);
1308 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1309 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1313 if (conn->rac_close_sent)
1316 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1317 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1318 conn->rac_close_sent = (rc == 0);
1322 spin_lock_irqsave(&conn->rac_lock, flags);
1324 if (list_empty(&conn->rac_fmaq)) {
1326 spin_unlock_irqrestore(&conn->rac_lock, flags);
1328 if (time_after_eq(jiffies,
1329 conn->rac_last_tx + conn->rac_keepalive)) {
1330 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1331 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1336 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1337 list_del(&tx->tx_list);
1338 more_to_do = !list_empty(&conn->rac_fmaq);
1340 spin_unlock_irqrestore(&conn->rac_lock, flags);
1343 switch (tx->tx_msg.ram_type) {
1347 case RANAL_MSG_IMMEDIATE:
1348 case RANAL_MSG_PUT_NAK:
1349 case RANAL_MSG_PUT_DONE:
1350 case RANAL_MSG_GET_NAK:
1351 case RANAL_MSG_GET_DONE:
1352 rc = kranal_sendmsg(conn, &tx->tx_msg,
1353 tx->tx_buffer, tx->tx_nob);
1357 case RANAL_MSG_PUT_REQ:
1358 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1359 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1360 kranal_map_buffer(tx);
1364 case RANAL_MSG_PUT_ACK:
1365 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1369 case RANAL_MSG_GET_REQ:
1370 kranal_map_buffer(tx);
1371 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1372 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1373 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1374 (__u64)((unsigned long)tx->tx_buffer);
1375 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1376 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1381 if (rc == -EAGAIN) {
1382 /* replace at the head of the list for later */
1383 spin_lock_irqsave(&conn->rac_lock, flags);
1384 list_add(&tx->tx_list, &conn->rac_fmaq);
1385 spin_unlock_irqrestore(&conn->rac_lock, flags);
1392 if (!expect_reply) {
1393 kranal_tx_done(tx, 0);
1395 spin_lock_irqsave(&conn->rac_lock, flags);
1396 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1397 tx->tx_qtime = jiffies;
1398 spin_unlock_irqrestore(&conn->rac_lock, flags);
1405 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1407 __swab64s(&d->rard_key.Key);
1408 __swab16s(&d->rard_key.Cookie);
1409 __swab16s(&d->rard_key.MdHandle);
1410 __swab32s(&d->rard_key.Flags);
1411 __swab64s(&d->rard_addr.AddressBits);
1412 __swab32s(&d->rard_nob);
1416 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1418 unsigned long flags;
1419 struct list_head *ttmp;
1422 list_for_each(ttmp, &conn->rac_replyq) {
1423 tx = list_entry(ttmp, kra_tx_t, tx_list);
1425 if (tx->tx_cookie != cookie)
1428 if (tx->tx_msg.ram_type != type) {
1429 CWARN("Unexpected type %x (%x expected) "
1430 "matched reply from "LPX64"\n",
1431 tx->tx_msg.ram_type, type,
1432 conn->rac_peer->rap_nid);
1437 CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1442 kranal_process_receives(kra_conn_t *conn)
1444 unsigned long flags;
1450 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1451 kra_peer_t *peer = conn->rac_peer;
1453 if (rrc == RAP_NOT_DONE)
1456 LASSERT (rrc == RAP_SUCCESS);
1457 conn->rac_last_rx = jiffies;
1458 seq = conn->rac_rx_seq++;
1459 msg = (kra_msg_t *)prefix;
1461 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1462 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1463 CERROR("Unexpected magic %08x from "LPX64"\n",
1464 msg->ram_magic, peer->rap_nid);
1468 __swab32s(&msg->ram_magic);
1469 __swab16s(&msg->ram_version);
1470 __swab16s(&msg->ram_type);
1471 __swab64s(&msg->ram_srcnid);
1472 __swab64s(&msg->ram_incarnation);
1473 __swab32s(&msg->ram_seq);
1475 /* NB message type checked below; NOT here... */
1476 switch (msg->ram_type) {
1477 case RANAL_MSG_PUT_ACK:
1478 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1481 case RANAL_MSG_GET_REQ:
1482 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1490 if (msg->ram_version != RANAL_MSG_VERSION) {
1491 CERROR("Unexpected protocol version %d from "LPX64"\n",
1492 msg->ram_version, peer->rap_nid);
1496 if (msg->ram_srcnid != peer->rap_nid) {
1497 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1498 msg->ram_srcnid, peer->rap_nid);
1502 if (msg->ram_incarnation != conn->rac_peer_incarnation) {
1503 CERROR("Unexpected incarnation "LPX64"("LPX64
1504 " expected) from "LPX64"\n",
1505 msg->ram_incarnation, conn->rac_peer_incarnation,
1510 if (msg->ram_seq != seq) {
1511 CERROR("Unexpected sequence number %d(%d expected) from "
1512 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1516 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1517 /* This message signals RDMA completion: wait now... */
1518 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1519 LASSERT (rrc == RAP_SUCCESS);
1522 if (msg->ram_type == RANAL_MSG_CLOSE) {
1523 conn->rac_close_recvd = 1;
1524 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1526 if (!conn->rac_closing)
1527 kranal_close_conn_locked(conn, -ETIMEDOUT);
1528 else if (conn->rac_close_sent)
1529 kranal_terminate_conn_locked(conn);
1531 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1535 if (conn->rac_closing)
1538 conn->rac_rxmsg = msg; /* stash message for portals callbacks */
1539 /* they'll NULL rac_rxmsg if they consume it */
1540 switch (msg->ram_type) {
1541 case RANAL_MSG_NOOP:
1542 /* Nothing to do; just a keepalive */
1545 case RANAL_MSG_IMMEDIATE:
1546 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1549 case RANAL_MSG_PUT_REQ:
1550 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1552 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1555 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1559 tx->tx_msg.ram_u.completion.racm_cookie =
1560 msg->ram_u.putreq.raprm_cookie;
1561 kranal_post_fma(conn, tx);
1564 case RANAL_MSG_PUT_NAK:
1565 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1566 msg->ram_u.completion.racm_cookie);
1570 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1571 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1572 kranal_tx_done(tx, -ENOENT); /* no match */
1575 case RANAL_MSG_PUT_ACK:
1576 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1577 msg->ram_u.putack.rapam_src_cookie);
1581 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1582 &msg->ram_u.putack.rapam_desc,
1583 msg->ram_u.putack.rapam_desc.rard_nob,
1584 msg->ram_u.putack.rapam_dst_cookie);
1587 case RANAL_MSG_PUT_DONE:
1588 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1589 msg->ram_u.completion.racm_cookie);
1593 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1594 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1595 kranal_tx_done(tx, 0);
1598 case RANAL_MSG_GET_REQ:
1599 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1601 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1604 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1608 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1609 kranal_post_fma(conn, tx);
1612 case RANAL_MSG_GET_NAK:
1613 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1614 msg->ram_u.completion.racm_cookie);
1618 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1619 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1620 kranal_tx_done(tx, -ENOENT); /* no match */
1623 case RANAL_MSG_GET_DONE:
1624 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1625 msg->ram_u.completion.racm_cookie);
1629 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1630 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1631 kranal_tx_done(tx, 0);
1636 if (conn->rac_rxmsg != NULL)
1637 kranal_consume_rxmsg(conn, NULL, 0);
1643 kranal_scheduler (void *arg)
1645 kra_device_t *dev = (kra_device_t *)arg;
1649 unsigned long flags;
1659 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1660 kportal_daemonize(name);
1661 kportal_blockallsigs();
1663 init_waitqueue_entry(&wait, current);
1665 spin_lock_irqsave(&dev->rad_lock, flags);
1667 while (!kranal_data.kra_shutdown) {
1668 /* Safe: kra_shutdown only set when quiescent */
1670 if (busy_loops++ >= RANAL_RESCHED) {
1671 spin_unlock_irqrestore(&dev->rad_lock, flags);
1676 spin_lock_irqsave(&dev->rad_lock, flags);
1681 if (dev->rad_ready) {
1683 spin_unlock_irqrestore(&dev->rad_lock, flags);
1685 rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1687 LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1688 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1690 if (rrc == RAP_SUCCESS) {
1691 kranal_process_rdmaq(cqid);
1695 rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1696 LASSERT (rrc == RAP_SUCCESS || rrc == RAP_NOT_DONE);
1698 if (rrc == RAP_SUCCESS) {
1699 if ((event_type & RAPK_CQ_EVENT_OVERRUN) != 0)
1700 kranal_schedule_dev(dev);
1702 kranal_schedule_cqid(cqid);
1706 spin_lock_irqsave(&dev->rad_lock, flags);
1708 /* If there were no completions to handle, I leave
1709 * rad_ready clear. NB I cleared it BEFORE I checked
1710 * the completion queues since I'm racing with the
1711 * device callback. */
1717 if (!list_empty(&dev->rad_connq)) {
1718 conn = list_entry(dev->rad_connq.next,
1719 kra_conn_t, rac_schedlist);
1720 list_del(&conn->rac_schedlist);
1721 spin_unlock_irqrestore(&dev->rad_lock, flags);
1723 LASSERT (conn->rac_scheduled);
1725 resched = kranal_process_fmaq(conn);
1726 resched |= kranal_process_receives(conn);
1729 spin_lock_irqsave(&dev->rad_lock, flags);
1731 list_add_tail(&conn->rac_schedlist,
1738 add_wait_queue(&dev->rad_waitq, &wait);
1739 set_current_state(TASK_INTERRUPTIBLE);
1741 spin_unlock_irqrestore(&dev->rad_lock, flags);
1746 set_current_state(TASK_RUNNING);
1747 remove_wait_queue(&dev->rad_waitq, &wait);
1749 spin_lock_irqsave(&dev->rad_lock, flags);
1752 spin_unlock_irqrestore(&dev->rad_lock, flags);
1754 kranal_thread_fini();
1759 lib_nal_t kranal_lib = {
1760 libnal_data: &kranal_data, /* NAL private data */
1761 libnal_send: kranal_send,
1762 libnal_send_pages: kranal_send_pages,
1763 libnal_recv: kranal_recv,
1764 libnal_recv_pages: kranal_recv_pages,
1765 libnal_dist: kranal_dist