1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid)
47 for (i = 0; i < kranal_data.kra_ndevs; i++) {
49 dev = &kranal_data.kra_devices[i];
50 if (dev->rad_id != devid)
53 spin_lock_irqsave(&dev->rad_lock, flags);
55 if (!dev->rad_ready) {
57 wake_up(&dev->rad_waitq);
60 spin_unlock_irqrestore(&dev->rad_lock, flags);
64 CWARN("callback for unknown device %d\n", devid);
68 kranal_schedule_conn(kra_conn_t *conn)
70 kra_device_t *dev = conn->rac_device;
73 spin_lock_irqsave(&dev->rad_lock, flags);
75 if (!conn->rac_scheduled) {
76 kranal_conn_addref(conn); /* +1 ref for scheduler */
77 conn->rac_scheduled = 1;
78 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79 wake_up(&dev->rad_waitq);
82 spin_unlock_irqrestore(&dev->rad_lock, flags);
86 kranal_get_idle_tx (int may_block)
92 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
94 /* "normal" descriptor is free */
95 if (!list_empty(&kranal_data.kra_idle_txs)) {
96 tx = list_entry(kranal_data.kra_idle_txs.next,
102 /* may dip into reserve pool */
103 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104 CERROR("reserved tx desc pool exhausted\n");
108 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
113 /* block for idle tx */
114 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
116 wait_event(kranal_data.kra_idle_tx_waitq,
117 !list_empty(&kranal_data.kra_idle_txs));
121 list_del(&tx->tx_list);
123 /* Allocate a new completion cookie. It might not be
124 * needed, but we've got a lock right now... */
125 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
127 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129 LASSERT (tx->tx_conn == NULL);
130 LASSERT (tx->tx_libmsg[0] == NULL);
131 LASSERT (tx->tx_libmsg[1] == NULL);
134 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
140 kranal_init_msg(kra_msg_t *msg, int type)
142 msg->ram_magic = RANAL_MSG_MAGIC;
143 msg->ram_version = RANAL_MSG_VERSION;
144 msg->ram_type = type;
145 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146 /* ram_connstamp gets set when FMA is sent */
150 kranal_new_tx_msg (int may_block, int type)
152 kra_tx_t *tx = kranal_get_idle_tx(may_block);
157 kranal_init_msg(&tx->tx_msg, type);
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
166 /* For now this is almost identical to kranal_setup_virt_buffer, but we
167 * could "flatten" the payload into a single contiguous buffer ready
168 * for sending direct over an FMA if we ever needed to. */
172 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
174 while (offset >= iov->iov_len) {
175 offset -= iov->iov_len;
181 if (nob > iov->iov_len - offset) {
182 CERROR("Can't handle multiple vaddr fragments\n");
186 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
188 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
199 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
201 while (offset >= iov->iov_len) {
202 offset -= iov->iov_len;
208 if (nob > iov->iov_len - offset) {
209 CERROR("Can't handle multiple vaddr fragments\n");
213 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
215 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
223 RAP_PHYS_REGION *phys = tx->tx_phys;
226 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
230 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
232 while (offset >= kiov->kiov_len) {
233 offset -= kiov->kiov_len;
239 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
241 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
243 phys->Address = kranal_page2phys(kiov->kiov_page);
244 phys->Length = PAGE_SIZE;
247 resid = nob - (kiov->kiov_len - offset);
253 if (kiov->kiov_offset != 0 ||
254 ((resid > PAGE_SIZE) &&
255 kiov->kiov_len < PAGE_SIZE)) {
256 /* Can't have gaps */
257 CERROR("Can't make payload contiguous in I/O VM:"
258 "page %d, offset %d, len %d \n",
260 kiov->kiov_offset, kiov->kiov_len);
264 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
265 CERROR ("payload too big (%d)\n", phys - tx->tx_phys);
269 phys->Address = kranal_page2phys(kiov->kiov_page);
270 phys->Length = PAGE_SIZE;
276 tx->tx_phys_npages = phys - tx->tx_phys;
281 kranal_setup_buffer (kra_tx_t *tx, int niov,
282 struct iovec *iov, ptl_kiov_t *kiov,
285 LASSERT ((iov == NULL) != (kiov == NULL));
288 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
290 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
294 kranal_map_buffer (kra_tx_t *tx)
296 kra_conn_t *conn = tx->tx_conn;
297 kra_device_t *dev = conn->rac_device;
300 LASSERT (current == dev->rad_scheduler);
302 switch (tx->tx_buftype) {
307 case RANAL_BUF_IMMEDIATE:
308 case RANAL_BUF_PHYS_MAPPED:
309 case RANAL_BUF_VIRT_MAPPED:
312 case RANAL_BUF_PHYS_UNMAPPED:
313 rrc = RapkRegisterPhys(dev->rad_handle,
314 tx->tx_phys, tx->tx_phys_npages,
315 dev->rad_ptag, &tx->tx_map_key);
316 LASSERT (rrc == RAP_SUCCESS);
317 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
320 case RANAL_BUF_VIRT_UNMAPPED:
321 rrc = RapkRegisterMemory(dev->rad_handle,
322 tx->tx_buffer, tx->tx_nob,
323 dev->rad_ptag, &tx->tx_map_key);
324 LASSERT (rrc == RAP_SUCCESS);
325 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
331 kranal_unmap_buffer (kra_tx_t *tx)
336 switch (tx->tx_buftype) {
341 case RANAL_BUF_IMMEDIATE:
342 case RANAL_BUF_PHYS_UNMAPPED:
343 case RANAL_BUF_VIRT_UNMAPPED:
346 case RANAL_BUF_PHYS_MAPPED:
347 LASSERT (tx->tx_conn != NULL);
348 dev = tx->tx_conn->rac_device;
349 LASSERT (current == dev->rad_scheduler);
350 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
351 dev->rad_ptag, &tx->tx_map_key);
352 LASSERT (rrc == RAP_SUCCESS);
353 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
356 case RANAL_BUF_VIRT_MAPPED:
357 LASSERT (tx->tx_conn != NULL);
358 dev = tx->tx_conn->rac_device;
359 LASSERT (current == dev->rad_scheduler);
360 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
361 dev->rad_ptag, &tx->tx_map_key);
362 LASSERT (rrc == RAP_SUCCESS);
363 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
369 kranal_tx_done (kra_tx_t *tx, int completion)
371 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
375 LASSERT (!in_interrupt());
377 kranal_unmap_buffer(tx);
379 for (i = 0; i < 2; i++) {
380 /* tx may have up to 2 libmsgs to finalise */
381 if (tx->tx_libmsg[i] == NULL)
384 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
385 tx->tx_libmsg[i] = NULL;
388 tx->tx_buftype = RANAL_BUF_NONE;
389 tx->tx_msg.ram_type = RANAL_MSG_NONE;
392 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
395 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
397 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
398 wake_up(&kranal_data.kra_idle_tx_waitq);
401 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
405 kranal_find_conn_locked (kra_peer_t *peer)
407 struct list_head *tmp;
409 /* just return the first connection */
410 list_for_each (tmp, &peer->rap_conns) {
411 return list_entry(tmp, kra_conn_t, rac_list);
418 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
424 spin_lock_irqsave(&conn->rac_lock, flags);
425 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
426 tx->tx_qtime = jiffies;
427 spin_unlock_irqrestore(&conn->rac_lock, flags);
429 kranal_schedule_conn(conn);
433 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
439 rwlock_t *g_lock = &kranal_data.kra_global_lock;
441 /* If I get here, I've committed to send, so I complete the tx with
442 * failure on any problems */
444 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
448 peer = kranal_find_peer_locked(nid);
451 kranal_tx_done(tx, -EHOSTUNREACH);
455 conn = kranal_find_conn_locked(peer);
457 kranal_post_fma(conn, tx);
462 /* Making one or more connections; I'll need a write lock... */
464 write_lock_irqsave(g_lock, flags);
466 peer = kranal_find_peer_locked(nid);
468 write_unlock_irqrestore(g_lock, flags);
469 kranal_tx_done(tx, -EHOSTUNREACH);
473 conn = kranal_find_conn_locked(peer);
475 /* Connection exists; queue message on it */
476 kranal_post_fma(conn, tx);
477 write_unlock_irqrestore(g_lock, flags);
481 LASSERT (peer->rap_persistence > 0);
483 if (!peer->rap_connecting) {
484 LASSERT (list_empty(&peer->rap_tx_queue));
487 if (now < peer->rap_reconnect_time) {
488 write_unlock_irqrestore(g_lock, flags);
489 kranal_tx_done(tx, -EHOSTUNREACH);
493 peer->rap_connecting = 1;
494 kranal_peer_addref(peer); /* extra ref for connd */
496 spin_lock(&kranal_data.kra_connd_lock);
498 list_add_tail(&peer->rap_connd_list,
499 &kranal_data.kra_connd_peers);
500 wake_up(&kranal_data.kra_connd_waitq);
502 spin_unlock(&kranal_data.kra_connd_lock);
505 /* A connection is being established; queue the message... */
506 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
508 write_unlock_irqrestore(g_lock, flags);
512 kranal_rdma(kra_tx_t *tx, int type,
513 kra_rdma_desc_t *sink, int nob, __u64 cookie)
515 kra_conn_t *conn = tx->tx_conn;
519 LASSERT (kranal_tx_mapped(tx));
520 LASSERT (nob <= sink->rard_nob);
521 LASSERT (nob <= tx->tx_nob);
523 /* No actual race with scheduler sending CLOSE (I'm she!) */
524 LASSERT (current == conn->rac_device->rad_scheduler);
526 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
527 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
528 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
529 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
530 tx->tx_rdma_desc.DstKey = sink->rard_key;
531 tx->tx_rdma_desc.Length = nob;
532 tx->tx_rdma_desc.AppPtr = tx;
534 /* prep final completion message */
535 kranal_init_msg(&tx->tx_msg, type);
536 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
538 if (nob == 0) { /* Immediate completion */
539 kranal_post_fma(conn, tx);
543 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
545 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
546 LASSERT (rrc == RAP_SUCCESS);
548 spin_lock_irqsave(&conn->rac_lock, flags);
549 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
550 tx->tx_qtime = jiffies;
551 spin_unlock_irqrestore(&conn->rac_lock, flags);
555 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
557 __u32 nob_received = nob;
560 LASSERT (conn->rac_rxmsg != NULL);
562 rrc = RapkFmaCopyToUser(conn->rac_rihandle, buffer,
563 &nob_received, sizeof(kra_msg_t));
564 LASSERT (rrc == RAP_SUCCESS);
566 conn->rac_rxmsg = NULL;
568 if (nob_received != nob) {
569 CWARN("Expected %d immediate bytes but got %d\n",
578 kranal_do_send (lib_nal_t *nal,
595 /* NB 'private' is different depending on what we're sending.... */
597 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
598 " pid %d\n", nob, niov, nid , pid);
600 LASSERT (nob == 0 || niov > 0);
601 LASSERT (niov <= PTL_MD_MAX_IOV);
603 LASSERT (!in_interrupt());
604 /* payload is either all vaddrs or all pages */
605 LASSERT (!(kiov != NULL && iov != NULL));
611 case PTL_MSG_REPLY: {
612 /* reply's 'private' is the conn that received the GET_REQ */
614 LASSERT (conn->rac_rxmsg != NULL);
616 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
617 if (nob > RANAL_FMA_MAX_DATA) {
618 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
622 break; /* RDMA not expected */
625 /* Incoming message consistent with immediate reply? */
626 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
627 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
628 nid, conn->rac_rxmsg->ram_type);
632 tx = kranal_get_idle_tx(0);
636 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
638 kranal_tx_done(tx, rc);
643 tx->tx_libmsg[0] = libmsg;
645 kranal_map_buffer(tx);
646 kranal_rdma(tx, RANAL_MSG_GET_DONE,
647 &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
648 conn->rac_rxmsg->ram_u.get.ragm_cookie);
653 if (kiov == NULL && /* not paged */
654 nob <= RANAL_FMA_MAX_DATA && /* small enough */
655 nob <= kranal_tunables.kra_max_immediate)
656 break; /* send IMMEDIATE */
658 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
662 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
664 kranal_tx_done(tx, rc);
668 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
669 if (tx->tx_libmsg[1] == NULL) {
670 CERROR("Can't create reply for GET to "LPX64"\n", nid);
671 kranal_tx_done(tx, rc);
675 tx->tx_libmsg[0] = libmsg;
676 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
677 /* rest of tx_msg is setup just before it is sent */
678 kranal_launch_tx(tx, nid);
686 if (kiov == NULL && /* not paged */
687 nob <= RANAL_MAX_IMMEDIATE && /* small enough */
688 nob <= kranal_tunables.kra_max_immediate)
689 break; /* send IMMEDIATE */
691 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
695 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, nob);
697 kranal_tx_done(tx, rc);
701 tx->tx_libmsg[0] = libmsg;
702 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
703 /* rest of tx_msg is setup just before it is sent */
704 kranal_launch_tx(tx, nid);
708 LASSERT (kiov == NULL);
709 LASSERT (nob <= RANAL_MAX_IMMEDIATE);
711 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
712 type == PTL_MSG_REPLY ||
714 RANAL_MSG_IMMEDIATE);
718 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
720 kranal_tx_done(tx, rc);
724 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
725 tx->tx_libmsg[0] = libmsg;
726 kranal_launch_tx(tx, nid);
731 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
732 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
733 unsigned int niov, struct iovec *iov,
734 size_t offset, size_t len)
736 return kranal_do_send(nal, private, cookie,
743 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
744 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
745 unsigned int niov, ptl_kiov_t *kiov,
746 size_t offset, size_t len)
748 return kranal_do_send(nal, private, cookie,
755 kranal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
756 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
757 size_t offset, size_t mlen, size_t rlen)
759 kra_conn_t *conn = private;
760 kra_msg_t *rxmsg = conn->rac_rxmsg;
765 LASSERT (mlen <= rlen);
766 LASSERT (!in_interrupt());
767 /* Either all pages or all vaddrs */
768 LASSERT (!(kiov != NULL && iov != NULL));
770 switch(rxmsg->ram_type) {
775 case RANAL_MSG_IMMEDIATE:
778 } else if (kiov != NULL) {
779 CERROR("Can't recv immediate into paged buffer\n");
783 while (offset >= iov->iov_len) {
784 offset -= iov->iov_len;
789 if (mlen > iov->iov_len - offset) {
790 CERROR("Can't handle immediate frags\n");
793 buffer = ((char *)iov->iov_base) + offset;
795 rc = kranal_consume_rxmsg(conn, buffer, mlen);
796 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
799 case RANAL_MSG_GET_REQ:
800 /* If the GET matched, we've already handled it in
801 * kranal_do_send which is called to send the REPLY. We're
802 * only called here to complete the GET receive (if we needed
803 * it which we don't, but I digress...) */
804 LASSERT (libmsg == NULL);
805 lib_finalize(nal, NULL, libmsg, PTL_OK);
808 case RANAL_MSG_PUT_REQ:
809 if (libmsg == NULL) { /* PUT didn't match... */
810 lib_finalize(nal, NULL, libmsg, PTL_OK);
814 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
818 rc = kranal_setup_buffer(tx, niov, iov, kiov, offset, mlen);
820 kranal_tx_done(tx, rc);
825 kranal_map_buffer(tx);
827 tx->tx_msg.ram_u.putack.rapam_src_cookie =
828 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
829 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
830 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
831 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
832 (__u64)((unsigned long)tx->tx_buffer);
833 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
835 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
837 kranal_post_fma(conn, tx);
839 /* flag matched by consuming rx message */
840 kranal_consume_rxmsg(conn, NULL, 0);
846 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
847 unsigned int niov, struct iovec *iov,
848 size_t offset, size_t mlen, size_t rlen)
850 return kranal_recvmsg(nal, private, msg, niov, iov, NULL,
855 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
856 unsigned int niov, ptl_kiov_t *kiov,
857 size_t offset, size_t mlen, size_t rlen)
859 return kranal_recvmsg(nal, private, msg, niov, NULL, kiov,
864 kranal_thread_start (int(*fn)(void *arg), void *arg)
866 long pid = kernel_thread(fn, arg, 0);
871 atomic_inc(&kranal_data.kra_nthreads);
876 kranal_thread_fini (void)
878 atomic_dec(&kranal_data.kra_nthreads);
882 kranal_check_conn_timeouts (kra_conn_t *conn)
885 struct list_head *ttmp;
888 unsigned long now = jiffies;
890 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
891 conn->rac_state == RANAL_CONN_CLOSING);
893 if (!conn->rac_close_sent &&
894 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
895 /* not sent in a while; schedule conn so scheduler sends a keepalive */
896 kranal_schedule_conn(conn);
899 timeout = conn->rac_timeout * HZ;
901 if (!conn->rac_close_recvd &&
902 time_after_eq(now, conn->rac_last_rx + timeout)) {
903 CERROR("Nothing received from "LPX64" within %lu seconds\n",
904 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
908 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
911 /* Check the conn's queues are moving. These are "belt+braces" checks,
912 * in case of hardware/software errors that make this conn seem
913 * responsive even though it isn't progressing its message queues. */
915 spin_lock_irqsave(&conn->rac_lock, flags);
917 list_for_each (ttmp, &conn->rac_fmaq) {
918 tx = list_entry(ttmp, kra_tx_t, tx_list);
920 if (time_after_eq(now, tx->tx_qtime + timeout)) {
921 spin_unlock_irqrestore(&conn->rac_lock, flags);
922 CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
923 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
928 list_for_each (ttmp, &conn->rac_rdmaq) {
929 tx = list_entry(ttmp, kra_tx_t, tx_list);
931 if (time_after_eq(now, tx->tx_qtime + timeout)) {
932 spin_unlock_irqrestore(&conn->rac_lock, flags);
933 CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
934 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
939 list_for_each (ttmp, &conn->rac_replyq) {
940 tx = list_entry(ttmp, kra_tx_t, tx_list);
942 if (time_after_eq(now, tx->tx_qtime + timeout)) {
943 spin_unlock_irqrestore(&conn->rac_lock, flags);
944 CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
945 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
950 spin_unlock_irqrestore(&conn->rac_lock, flags);
955 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
957 struct list_head *conns = &kranal_data.kra_conns[idx];
958 struct list_head *ctmp;
964 /* NB. We expect to check all the conns and not find any problems, so
965 * we just use a shared lock while we take a look... */
966 read_lock(&kranal_data.kra_global_lock);
968 list_for_each (ctmp, conns) {
969 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
971 if (conn->rac_timeout < *min_timeoutp )
972 *min_timeoutp = conn->rac_timeout;
973 if (conn->rac_keepalive < *min_timeoutp )
974 *min_timeoutp = conn->rac_keepalive;
976 rc = kranal_check_conn_timeouts(conn);
980 kranal_conn_addref(conn);
981 read_unlock(&kranal_data.kra_global_lock);
983 CERROR("Conn to "LPX64", cqid %d timed out\n",
984 conn->rac_peer->rap_nid, conn->rac_cqid);
986 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
988 switch (conn->rac_state) {
992 case RANAL_CONN_ESTABLISHED:
993 kranal_close_conn_locked(conn, -ETIMEDOUT);
996 case RANAL_CONN_CLOSING:
997 kranal_terminate_conn_locked(conn);
1001 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1003 kranal_conn_decref(conn);
1005 /* start again now I've dropped the lock */
1009 read_unlock(&kranal_data.kra_global_lock);
1013 kranal_connd (void *arg)
1017 unsigned long flags;
1020 snprintf(name, sizeof(name), "kranal_connd_%02ld", (long)arg);
1021 kportal_daemonize(name);
1022 kportal_blockallsigs();
1024 init_waitqueue_entry(&wait, current);
1026 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1028 while (!kranal_data.kra_shutdown) {
1029 /* Safe: kra_shutdown only set when quiescent */
1031 if (!list_empty(&kranal_data.kra_connd_peers)) {
1032 peer = list_entry(kranal_data.kra_connd_peers.next,
1033 kra_peer_t, rap_connd_list);
1035 list_del_init(&peer->rap_connd_list);
1036 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1038 kranal_connect(peer);
1039 kranal_peer_decref(peer);
1041 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1045 set_current_state(TASK_INTERRUPTIBLE);
1046 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1048 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1052 set_current_state(TASK_RUNNING);
1053 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1055 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1058 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1060 kranal_thread_fini();
1065 kranal_update_reaper_timeout(long timeout)
1067 unsigned long flags;
1069 LASSERT (timeout > 0);
1071 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1073 if (timeout < kranal_data.kra_new_min_timeout)
1074 kranal_data.kra_new_min_timeout = timeout;
1076 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1080 kranal_reaper (void *arg)
1083 unsigned long flags;
1086 int conn_entries = kranal_data.kra_conn_hash_size;
1088 int base_index = conn_entries - 1;
1089 unsigned long next_check_time = jiffies;
1090 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1091 long current_min_timeout = 1;
1093 kportal_daemonize("kranal_reaper");
1094 kportal_blockallsigs();
1096 init_waitqueue_entry(&wait, current);
1098 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1100 while (!kranal_data.kra_shutdown) {
1102 /* careful with the jiffy wrap... */
1103 timeout = (long)(next_check_time - jiffies);
1106 /* I wake up every 'p' seconds to check for
1107 * timeouts on some more peers. I try to check
1108 * every connection 'n' times within the global
1109 * minimum of all keepalive and timeout intervals,
1110 * to ensure I attend to every connection within
1111 * (n+1)/n times its timeout intervals. */
1115 unsigned long min_timeout;
1118 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1119 /* new min timeout set: restart min timeout scan */
1120 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1121 base_index = conn_index - 1;
1123 base_index = conn_entries - 1;
1125 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1126 current_min_timeout = kranal_data.kra_new_min_timeout;
1127 CWARN("Set new min timeout %ld\n",
1128 current_min_timeout);
1131 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1133 min_timeout = current_min_timeout;
1135 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1138 LASSERT (min_timeout > 0);
1140 /* Compute how many table entries to check now so I
1141 * get round the whole table fast enough (NB I do
1142 * this at fixed intervals of 'p' seconds) */
1143 chunk = conn_entries;
1144 if (min_timeout > n * p)
1145 chunk = (chunk * n * p) / min_timeout;
1149 for (i = 0; i < chunk; i++) {
1150 kranal_reaper_check(conn_index,
1152 conn_index = (conn_index + 1) % conn_entries;
1155 next_check_time += p * HZ;
1157 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1159 if (((conn_index - chunk <= base_index &&
1160 base_index < conn_index) ||
1161 (conn_index - conn_entries - chunk <= base_index &&
1162 base_index < conn_index - conn_entries))) {
1164 /* Scanned all conns: set current_min_timeout... */
1165 if (current_min_timeout != next_min_timeout) {
1166 current_min_timeout = next_min_timeout;
1167 CWARN("Set new min timeout %ld\n",
1168 current_min_timeout);
1171 /* ...and restart min timeout scan */
1172 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1173 base_index = conn_index - 1;
1175 base_index = conn_entries - 1;
1179 set_current_state(TASK_INTERRUPTIBLE);
1180 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1182 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1184 schedule_timeout(timeout);
1186 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1188 set_current_state(TASK_RUNNING);
1189 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1192 kranal_thread_fini();
1197 kranal_check_rdma_cq (kra_device_t *dev)
1202 unsigned long flags;
1203 RAP_RDMA_DESCRIPTOR *desc;
1208 rrc = RapkCQDone(dev->rad_rdma_cq, &cqid, &event_type);
1209 if (rrc == RAP_NOT_DONE)
1212 LASSERT (rrc == RAP_SUCCESS);
1213 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1215 read_lock(&kranal_data.kra_global_lock);
1217 conn = kranal_cqid2conn_locked(cqid);
1219 /* Conn was destroyed? */
1220 CWARN("RDMA CQID lookup %d failed\n", cqid);
1221 read_unlock(&kranal_data.kra_global_lock);
1225 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1226 LASSERT (rrc == RAP_SUCCESS);
1228 spin_lock_irqsave(&conn->rac_lock, flags);
1230 LASSERT (!list_empty(&conn->rac_rdmaq));
1231 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1232 list_del(&tx->tx_list);
1234 LASSERT(desc->AppPtr == (void *)tx);
1235 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1236 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1238 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1239 tx->tx_qtime = jiffies;
1241 spin_unlock_irqrestore(&conn->rac_lock, flags);
1243 /* Get conn's fmaq processed, now I've just put something
1245 kranal_schedule_conn(conn);
1247 read_unlock(&kranal_data.kra_global_lock);
1252 kranal_check_fma_cq (kra_device_t *dev)
1258 struct list_head *conns;
1259 struct list_head *tmp;
1263 rrc = RapkCQDone(dev->rad_fma_cq, &cqid, &event_type);
1264 if (rrc != RAP_NOT_DONE)
1267 LASSERT (rrc == RAP_SUCCESS);
1269 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1271 read_lock(&kranal_data.kra_global_lock);
1273 conn = kranal_cqid2conn_locked(cqid);
1275 CWARN("FMA CQID lookup %d failed\n", cqid);
1277 kranal_schedule_conn(conn);
1279 read_unlock(&kranal_data.kra_global_lock);
1283 /* FMA CQ has overflowed: check ALL conns */
1284 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1286 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1288 read_lock(&kranal_data.kra_global_lock);
1290 conns = &kranal_data.kra_conns[i];
1292 list_for_each (tmp, conns) {
1293 conn = list_entry(tmp, kra_conn_t,
1296 if (conn->rac_device == dev)
1297 kranal_schedule_conn(conn);
1300 /* don't block write lockers for too long... */
1301 read_unlock(&kranal_data.kra_global_lock);
1307 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1308 void *immediate, int immediatenob)
1310 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1313 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1314 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1315 immediatenob <= RANAL_FMA_MAX_DATA :
1318 msg->ram_connstamp = conn->rac_my_connstamp;
1319 msg->ram_seq = conn->rac_tx_seq;
1322 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1323 immediate, immediatenob,
1326 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1327 immediate, immediatenob,
1335 conn->rac_last_tx = jiffies;
1345 kranal_process_fmaq (kra_conn_t *conn)
1347 unsigned long flags;
1353 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1354 * However I will be rescheduled some by a rad_fma_cq event when
1355 * I eventually get some.
1356 * NB 2. Sampling rac_state here, races with setting it elsewhere
1357 * kranal_close_conn_locked. But it doesn't matter if I try to
1358 * send a "real" message just as I start closing because I'll get
1359 * scheduled to send the close anyway. */
1361 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1362 if (!list_empty(&conn->rac_rdmaq)) {
1363 /* RDMAs in progress */
1364 LASSERT (!conn->rac_close_sent);
1366 if (time_after_eq(jiffies,
1368 conn->rac_keepalive)) {
1369 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1370 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1375 if (conn->rac_close_sent)
1378 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1379 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1383 conn->rac_close_sent = 1;
1384 if (!conn->rac_close_recvd)
1387 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1389 if (conn->rac_state == RANAL_CONN_CLOSING)
1390 kranal_terminate_conn_locked(conn);
1392 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1396 spin_lock_irqsave(&conn->rac_lock, flags);
1398 if (list_empty(&conn->rac_fmaq)) {
1400 spin_unlock_irqrestore(&conn->rac_lock, flags);
1402 if (time_after_eq(jiffies,
1403 conn->rac_last_tx + conn->rac_keepalive)) {
1404 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1405 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1410 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1411 list_del(&tx->tx_list);
1412 more_to_do = !list_empty(&conn->rac_fmaq);
1414 spin_unlock_irqrestore(&conn->rac_lock, flags);
1417 switch (tx->tx_msg.ram_type) {
1421 case RANAL_MSG_IMMEDIATE:
1422 case RANAL_MSG_PUT_NAK:
1423 case RANAL_MSG_PUT_DONE:
1424 case RANAL_MSG_GET_NAK:
1425 case RANAL_MSG_GET_DONE:
1426 rc = kranal_sendmsg(conn, &tx->tx_msg,
1427 tx->tx_buffer, tx->tx_nob);
1431 case RANAL_MSG_PUT_REQ:
1432 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1433 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1434 kranal_map_buffer(tx);
1438 case RANAL_MSG_PUT_ACK:
1439 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1443 case RANAL_MSG_GET_REQ:
1444 kranal_map_buffer(tx);
1445 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1446 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1447 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1448 (__u64)((unsigned long)tx->tx_buffer);
1449 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1450 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1455 if (rc == -EAGAIN) {
1456 /* I need credits to send this. Replace tx at the head of the
1457 * fmaq and I'll get rescheduled when credits appear */
1458 spin_lock_irqsave(&conn->rac_lock, flags);
1459 list_add(&tx->tx_list, &conn->rac_fmaq);
1460 spin_unlock_irqrestore(&conn->rac_lock, flags);
1466 if (!expect_reply) {
1467 kranal_tx_done(tx, 0);
1469 spin_lock_irqsave(&conn->rac_lock, flags);
1470 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1471 tx->tx_qtime = jiffies;
1472 spin_unlock_irqrestore(&conn->rac_lock, flags);
1476 kranal_schedule_conn(conn);
1480 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1482 __swab64s(&d->rard_key.Key);
1483 __swab16s(&d->rard_key.Cookie);
1484 __swab16s(&d->rard_key.MdHandle);
1485 __swab32s(&d->rard_key.Flags);
1486 __swab64s(&d->rard_addr.AddressBits);
1487 __swab32s(&d->rard_nob);
1491 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1493 struct list_head *ttmp;
1496 list_for_each(ttmp, &conn->rac_replyq) {
1497 tx = list_entry(ttmp, kra_tx_t, tx_list);
1499 if (tx->tx_cookie != cookie)
1502 if (tx->tx_msg.ram_type != type) {
1503 CWARN("Unexpected type %x (%x expected) "
1504 "matched reply from "LPX64"\n",
1505 tx->tx_msg.ram_type, type,
1506 conn->rac_peer->rap_nid);
1511 CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1516 kranal_check_fma_rx (kra_conn_t *conn)
1518 unsigned long flags;
1523 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1524 kra_peer_t *peer = conn->rac_peer;
1526 if (rrc == RAP_NOT_DONE)
1529 LASSERT (rrc == RAP_SUCCESS);
1530 conn->rac_last_rx = jiffies;
1531 seq = conn->rac_rx_seq++;
1532 msg = (kra_msg_t *)prefix;
1534 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1535 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1536 CERROR("Unexpected magic %08x from "LPX64"\n",
1537 msg->ram_magic, peer->rap_nid);
1541 __swab32s(&msg->ram_magic);
1542 __swab16s(&msg->ram_version);
1543 __swab16s(&msg->ram_type);
1544 __swab64s(&msg->ram_srcnid);
1545 __swab64s(&msg->ram_connstamp);
1546 __swab32s(&msg->ram_seq);
1548 /* NB message type checked below; NOT here... */
1549 switch (msg->ram_type) {
1550 case RANAL_MSG_PUT_ACK:
1551 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1554 case RANAL_MSG_GET_REQ:
1555 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1563 if (msg->ram_version != RANAL_MSG_VERSION) {
1564 CERROR("Unexpected protocol version %d from "LPX64"\n",
1565 msg->ram_version, peer->rap_nid);
1569 if (msg->ram_srcnid != peer->rap_nid) {
1570 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1571 msg->ram_srcnid, peer->rap_nid);
1575 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1576 CERROR("Unexpected connstamp "LPX64"("LPX64
1577 " expected) from "LPX64"\n",
1578 msg->ram_connstamp, conn->rac_peer_connstamp,
1583 if (msg->ram_seq != seq) {
1584 CERROR("Unexpected sequence number %d(%d expected) from "
1585 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1589 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1590 /* This message signals RDMA completion... */
1591 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1592 LASSERT (rrc == RAP_SUCCESS);
1595 if (conn->rac_close_recvd) {
1596 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1597 msg->ram_type, conn->rac_peer->rap_nid);
1601 if (msg->ram_type == RANAL_MSG_CLOSE) {
1602 conn->rac_close_recvd = 1;
1603 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1605 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1606 kranal_close_conn_locked(conn, 0);
1607 else if (conn->rac_close_sent)
1608 kranal_terminate_conn_locked(conn);
1610 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1614 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1617 conn->rac_rxmsg = msg; /* stash message for portals callbacks */
1618 /* they'll NULL rac_rxmsg if they consume it */
1619 switch (msg->ram_type) {
1620 case RANAL_MSG_NOOP:
1621 /* Nothing to do; just a keepalive */
1624 case RANAL_MSG_IMMEDIATE:
1625 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1628 case RANAL_MSG_PUT_REQ:
1629 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1631 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1634 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1638 tx->tx_msg.ram_u.completion.racm_cookie =
1639 msg->ram_u.putreq.raprm_cookie;
1640 kranal_post_fma(conn, tx);
1643 case RANAL_MSG_PUT_NAK:
1644 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1645 msg->ram_u.completion.racm_cookie);
1649 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1650 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1651 kranal_tx_done(tx, -ENOENT); /* no match */
1654 case RANAL_MSG_PUT_ACK:
1655 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1656 msg->ram_u.putack.rapam_src_cookie);
1660 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1661 &msg->ram_u.putack.rapam_desc,
1662 msg->ram_u.putack.rapam_desc.rard_nob,
1663 msg->ram_u.putack.rapam_dst_cookie);
1666 case RANAL_MSG_PUT_DONE:
1667 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1668 msg->ram_u.completion.racm_cookie);
1672 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1673 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1674 kranal_tx_done(tx, 0);
1677 case RANAL_MSG_GET_REQ:
1678 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1680 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1683 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1687 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1688 kranal_post_fma(conn, tx);
1691 case RANAL_MSG_GET_NAK:
1692 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1693 msg->ram_u.completion.racm_cookie);
1697 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1698 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1699 kranal_tx_done(tx, -ENOENT); /* no match */
1702 case RANAL_MSG_GET_DONE:
1703 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1704 msg->ram_u.completion.racm_cookie);
1708 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1709 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1710 kranal_tx_done(tx, 0);
1715 if (conn->rac_rxmsg != NULL)
1716 kranal_consume_rxmsg(conn, NULL, 0);
1718 /* check again later */
1719 kranal_schedule_conn(conn);
1723 kranal_complete_closed_conn (kra_conn_t *conn)
1727 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1729 while (!list_empty(&conn->rac_fmaq)) {
1730 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1732 list_del(&tx->tx_list);
1733 kranal_tx_done(tx, -ECONNABORTED);
1736 LASSERT (list_empty(&conn->rac_rdmaq));
1738 while (!list_empty(&conn->rac_replyq)) {
1739 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1741 list_del(&tx->tx_list);
1742 kranal_tx_done(tx, -ECONNABORTED);
1747 kranal_scheduler (void *arg)
1749 kra_device_t *dev = (kra_device_t *)arg;
1753 unsigned long flags;
1756 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1757 kportal_daemonize(name);
1758 kportal_blockallsigs();
1760 dev->rad_scheduler = current;
1761 init_waitqueue_entry(&wait, current);
1763 spin_lock_irqsave(&dev->rad_lock, flags);
1765 while (!kranal_data.kra_shutdown) {
1766 /* Safe: kra_shutdown only set when quiescent */
1768 if (busy_loops++ >= RANAL_RESCHED) {
1769 spin_unlock_irqrestore(&dev->rad_lock, flags);
1774 spin_lock_irqsave(&dev->rad_lock, flags);
1777 if (dev->rad_ready) {
1778 /* Device callback fired since I last checked it */
1780 spin_unlock_irqrestore(&dev->rad_lock, flags);
1782 kranal_check_rdma_cq(dev);
1783 kranal_check_fma_cq(dev);
1785 spin_lock_irqsave(&dev->rad_lock, flags);
1788 if (!list_empty(&dev->rad_connq)) {
1789 /* Connection needs attention */
1790 conn = list_entry(dev->rad_connq.next,
1791 kra_conn_t, rac_schedlist);
1792 list_del_init(&conn->rac_schedlist);
1793 LASSERT (conn->rac_scheduled);
1794 conn->rac_scheduled = 0;
1795 spin_unlock_irqrestore(&dev->rad_lock, flags);
1797 kranal_check_fma_rx(conn);
1798 kranal_process_fmaq(conn);
1800 if (conn->rac_state == RANAL_CONN_CLOSED)
1801 kranal_complete_closed_conn(conn);
1803 kranal_conn_decref(conn);
1805 spin_lock_irqsave(&dev->rad_lock, flags);
1809 add_wait_queue(&dev->rad_waitq, &wait);
1810 set_current_state(TASK_INTERRUPTIBLE);
1812 spin_unlock_irqrestore(&dev->rad_lock, flags);
1817 set_current_state(TASK_RUNNING);
1818 remove_wait_queue(&dev->rad_waitq, &wait);
1820 spin_lock_irqsave(&dev->rad_lock, flags);
1823 spin_unlock_irqrestore(&dev->rad_lock, flags);
1825 dev->rad_scheduler = NULL;
1826 kranal_thread_fini();
1831 lib_nal_t kranal_lib = {
1832 libnal_data: &kranal_data, /* NAL private data */
1833 libnal_send: kranal_send,
1834 libnal_send_pages: kranal_send_pages,
1835 libnal_recv: kranal_recv,
1836 libnal_recv_pages: kranal_recv_pages,
1837 libnal_dist: kranal_dist