1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
47 for (i = 0; i < kranal_data.kra_ndevs; i++) {
49 dev = &kranal_data.kra_devices[i];
50 if (dev->rad_id != devid)
53 spin_lock_irqsave(&dev->rad_lock, flags);
55 if (!dev->rad_ready) {
57 wake_up(&dev->rad_waitq);
60 spin_unlock_irqrestore(&dev->rad_lock, flags);
64 CWARN("callback for unknown device %d\n", devid);
68 kranal_schedule_conn(kra_conn_t *conn)
70 kra_device_t *dev = conn->rac_device;
73 spin_lock_irqsave(&dev->rad_lock, flags);
75 if (!conn->rac_scheduled) {
76 kranal_conn_addref(conn); /* +1 ref for scheduler */
77 conn->rac_scheduled = 1;
78 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
79 wake_up(&dev->rad_waitq);
82 spin_unlock_irqrestore(&dev->rad_lock, flags);
86 kranal_get_idle_tx (int may_block)
92 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
94 /* "normal" descriptor is free */
95 if (!list_empty(&kranal_data.kra_idle_txs)) {
96 tx = list_entry(kranal_data.kra_idle_txs.next,
102 /* may dip into reserve pool */
103 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
104 CERROR("reserved tx desc pool exhausted\n");
108 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
113 /* block for idle tx */
114 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
116 wait_event(kranal_data.kra_idle_tx_waitq,
117 !list_empty(&kranal_data.kra_idle_txs));
121 list_del(&tx->tx_list);
123 /* Allocate a new completion cookie. It might not be
124 * needed, but we've got a lock right now... */
125 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
127 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
128 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
129 LASSERT (tx->tx_conn == NULL);
130 LASSERT (tx->tx_libmsg[0] == NULL);
131 LASSERT (tx->tx_libmsg[1] == NULL);
134 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
140 kranal_init_msg(kra_msg_t *msg, int type)
142 msg->ram_magic = RANAL_MSG_MAGIC;
143 msg->ram_version = RANAL_MSG_VERSION;
144 msg->ram_type = type;
145 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
146 /* ram_connstamp gets set when FMA is sent */
150 kranal_new_tx_msg (int may_block, int type)
152 kra_tx_t *tx = kranal_get_idle_tx(may_block);
157 kranal_init_msg(&tx->tx_msg, type);
162 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
166 /* For now this is almost identical to kranal_setup_virt_buffer, but we
167 * could "flatten" the payload into a single contiguous buffer ready
168 * for sending direct over an FMA if we ever needed to. */
172 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
174 while (offset >= iov->iov_len) {
175 offset -= iov->iov_len;
181 if (nob > iov->iov_len - offset) {
182 CERROR("Can't handle multiple vaddr fragments\n");
186 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
188 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
193 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
199 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
201 while (offset >= iov->iov_len) {
202 offset -= iov->iov_len;
208 if (nob > iov->iov_len - offset) {
209 CERROR("Can't handle multiple vaddr fragments\n");
213 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
215 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
220 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
223 RAP_PHYS_REGION *phys = tx->tx_phys;
226 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
230 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
232 while (offset >= kiov->kiov_len) {
233 offset -= kiov->kiov_len;
239 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
241 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
243 phys->Address = kranal_page2phys(kiov->kiov_page);
246 resid = nob - (kiov->kiov_len - offset);
252 if (kiov->kiov_offset != 0 ||
253 ((resid > PAGE_SIZE) &&
254 kiov->kiov_len < PAGE_SIZE)) {
255 /* Can't have gaps */
256 CERROR("Can't make payload contiguous in I/O VM:"
257 "page %d, offset %d, len %d \n",
258 (int)(phys - tx->tx_phys),
259 kiov->kiov_offset, kiov->kiov_len);
263 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
264 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
268 phys->Address = kranal_page2phys(kiov->kiov_page);
274 tx->tx_phys_npages = phys - tx->tx_phys;
279 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
280 struct iovec *iov, ptl_kiov_t *kiov,
283 LASSERT ((iov == NULL) != (kiov == NULL));
286 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
288 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
292 kranal_map_buffer (kra_tx_t *tx)
294 kra_conn_t *conn = tx->tx_conn;
295 kra_device_t *dev = conn->rac_device;
298 LASSERT (current == dev->rad_scheduler);
300 switch (tx->tx_buftype) {
305 case RANAL_BUF_IMMEDIATE:
306 case RANAL_BUF_PHYS_MAPPED:
307 case RANAL_BUF_VIRT_MAPPED:
310 case RANAL_BUF_PHYS_UNMAPPED:
311 rrc = RapkRegisterPhys(dev->rad_handle,
312 tx->tx_phys, tx->tx_phys_npages,
314 LASSERT (rrc == RAP_SUCCESS);
315 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
318 case RANAL_BUF_VIRT_UNMAPPED:
319 rrc = RapkRegisterMemory(dev->rad_handle,
320 tx->tx_buffer, tx->tx_nob,
322 LASSERT (rrc == RAP_SUCCESS);
323 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
329 kranal_unmap_buffer (kra_tx_t *tx)
334 switch (tx->tx_buftype) {
339 case RANAL_BUF_IMMEDIATE:
340 case RANAL_BUF_PHYS_UNMAPPED:
341 case RANAL_BUF_VIRT_UNMAPPED:
344 case RANAL_BUF_PHYS_MAPPED:
345 LASSERT (tx->tx_conn != NULL);
346 dev = tx->tx_conn->rac_device;
347 LASSERT (current == dev->rad_scheduler);
348 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
350 LASSERT (rrc == RAP_SUCCESS);
351 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
354 case RANAL_BUF_VIRT_MAPPED:
355 LASSERT (tx->tx_conn != NULL);
356 dev = tx->tx_conn->rac_device;
357 LASSERT (current == dev->rad_scheduler);
358 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
360 LASSERT (rrc == RAP_SUCCESS);
361 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
367 kranal_tx_done (kra_tx_t *tx, int completion)
369 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
373 LASSERT (!in_interrupt());
375 kranal_unmap_buffer(tx);
377 for (i = 0; i < 2; i++) {
378 /* tx may have up to 2 libmsgs to finalise */
379 if (tx->tx_libmsg[i] == NULL)
382 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
383 tx->tx_libmsg[i] = NULL;
386 tx->tx_buftype = RANAL_BUF_NONE;
387 tx->tx_msg.ram_type = RANAL_MSG_NONE;
390 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
393 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
395 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
396 wake_up(&kranal_data.kra_idle_tx_waitq);
399 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
403 kranal_find_conn_locked (kra_peer_t *peer)
405 struct list_head *tmp;
407 /* just return the first connection */
408 list_for_each (tmp, &peer->rap_conns) {
409 return list_entry(tmp, kra_conn_t, rac_list);
416 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
422 spin_lock_irqsave(&conn->rac_lock, flags);
423 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
424 tx->tx_qtime = jiffies;
425 spin_unlock_irqrestore(&conn->rac_lock, flags);
427 kranal_schedule_conn(conn);
431 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
437 rwlock_t *g_lock = &kranal_data.kra_global_lock;
439 /* If I get here, I've committed to send, so I complete the tx with
440 * failure on any problems */
442 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
446 peer = kranal_find_peer_locked(nid);
449 kranal_tx_done(tx, -EHOSTUNREACH);
453 conn = kranal_find_conn_locked(peer);
455 kranal_post_fma(conn, tx);
460 /* Making one or more connections; I'll need a write lock... */
462 write_lock_irqsave(g_lock, flags);
464 peer = kranal_find_peer_locked(nid);
466 write_unlock_irqrestore(g_lock, flags);
467 kranal_tx_done(tx, -EHOSTUNREACH);
471 conn = kranal_find_conn_locked(peer);
473 /* Connection exists; queue message on it */
474 kranal_post_fma(conn, tx);
475 write_unlock_irqrestore(g_lock, flags);
479 LASSERT (peer->rap_persistence > 0);
481 if (!peer->rap_connecting) {
482 LASSERT (list_empty(&peer->rap_tx_queue));
484 now = CURRENT_SECONDS;
485 if (now < peer->rap_reconnect_time) {
486 write_unlock_irqrestore(g_lock, flags);
487 kranal_tx_done(tx, -EHOSTUNREACH);
491 peer->rap_connecting = 1;
492 kranal_peer_addref(peer); /* extra ref for connd */
494 spin_lock(&kranal_data.kra_connd_lock);
496 list_add_tail(&peer->rap_connd_list,
497 &kranal_data.kra_connd_peers);
498 wake_up(&kranal_data.kra_connd_waitq);
500 spin_unlock(&kranal_data.kra_connd_lock);
503 /* A connection is being established; queue the message... */
504 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
506 write_unlock_irqrestore(g_lock, flags);
510 kranal_rdma(kra_tx_t *tx, int type,
511 kra_rdma_desc_t *sink, int nob, __u64 cookie)
513 kra_conn_t *conn = tx->tx_conn;
517 LASSERT (kranal_tx_mapped(tx));
518 LASSERT (nob <= sink->rard_nob);
519 LASSERT (nob <= tx->tx_nob);
521 /* No actual race with scheduler sending CLOSE (I'm she!) */
522 LASSERT (current == conn->rac_device->rad_scheduler);
524 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
525 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
526 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
527 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
528 tx->tx_rdma_desc.DstKey = sink->rard_key;
529 tx->tx_rdma_desc.Length = nob;
530 tx->tx_rdma_desc.AppPtr = tx;
532 /* prep final completion message */
533 kranal_init_msg(&tx->tx_msg, type);
534 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
536 if (nob == 0) { /* Immediate completion */
537 kranal_post_fma(conn, tx);
541 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
543 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
544 LASSERT (rrc == RAP_SUCCESS);
546 spin_lock_irqsave(&conn->rac_lock, flags);
547 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
548 tx->tx_qtime = jiffies;
549 spin_unlock_irqrestore(&conn->rac_lock, flags);
553 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
555 __u32 nob_received = nob;
558 LASSERT (conn->rac_rxmsg != NULL);
560 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
561 &nob_received, sizeof(kra_msg_t));
562 LASSERT (rrc == RAP_SUCCESS);
564 conn->rac_rxmsg = NULL;
566 if (nob_received != nob) {
567 CWARN("Expected %d immediate bytes but got %d\n",
576 kranal_do_send (lib_nal_t *nal,
593 /* NB 'private' is different depending on what we're sending.... */
595 CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
596 nob, niov, nid, pid);
598 LASSERT (nob == 0 || niov > 0);
599 LASSERT (niov <= PTL_MD_MAX_IOV);
601 LASSERT (!in_interrupt());
602 /* payload is either all vaddrs or all pages */
603 LASSERT (!(kiov != NULL && iov != NULL));
609 case PTL_MSG_REPLY: {
610 /* reply's 'private' is the conn that received the GET_REQ */
612 LASSERT (conn->rac_rxmsg != NULL);
614 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
615 if (nob > RANAL_FMA_MAX_DATA) {
616 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
620 break; /* RDMA not expected */
623 /* Incoming message consistent with immediate reply? */
624 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
625 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
626 nid, conn->rac_rxmsg->ram_type);
630 tx = kranal_get_idle_tx(0);
634 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
636 kranal_tx_done(tx, rc);
641 tx->tx_libmsg[0] = libmsg;
643 kranal_map_buffer(tx);
644 kranal_rdma(tx, RANAL_MSG_GET_DONE,
645 &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
646 conn->rac_rxmsg->ram_u.get.ragm_cookie);
653 /* We have to consider the eventual sink buffer rather than any
654 * payload passed here (there isn't any, and strictly, looking
655 * inside libmsg is a layering violation). We send a simple
656 * IMMEDIATE GET if the sink buffer is mapped already and small
659 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
660 libmsg->md->length <= RANAL_FMA_MAX_DATA &&
661 libmsg->md->length <= kranal_tunables.kra_max_immediate)
664 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
668 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
669 rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
670 libmsg->md->md_iov.iov,
671 0, libmsg->md->length);
673 rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
674 libmsg->md->md_iov.kiov,
675 0, libmsg->md->length);
677 kranal_tx_done(tx, rc);
681 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
682 if (tx->tx_libmsg[1] == NULL) {
683 CERROR("Can't create reply for GET to "LPX64"\n", nid);
684 kranal_tx_done(tx, rc);
688 tx->tx_libmsg[0] = libmsg;
689 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
690 /* rest of tx_msg is setup just before it is sent */
691 kranal_launch_tx(tx, nid);
699 if (kiov == NULL && /* not paged */
700 nob <= RANAL_MAX_IMMEDIATE && /* small enough */
701 nob <= kranal_tunables.kra_max_immediate)
702 break; /* send IMMEDIATE */
704 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
708 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
710 kranal_tx_done(tx, rc);
714 tx->tx_libmsg[0] = libmsg;
715 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
716 /* rest of tx_msg is setup just before it is sent */
717 kranal_launch_tx(tx, nid);
721 LASSERT (kiov == NULL);
722 LASSERT (nob <= RANAL_MAX_IMMEDIATE);
724 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
725 type == PTL_MSG_REPLY ||
727 RANAL_MSG_IMMEDIATE);
731 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
733 kranal_tx_done(tx, rc);
737 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
738 tx->tx_libmsg[0] = libmsg;
739 kranal_launch_tx(tx, nid);
744 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
745 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
746 unsigned int niov, struct iovec *iov,
747 size_t offset, size_t len)
749 return kranal_do_send(nal, private, cookie,
756 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
757 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
758 unsigned int niov, ptl_kiov_t *kiov,
759 size_t offset, size_t len)
761 return kranal_do_send(nal, private, cookie,
768 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
769 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
770 int offset, int mlen, int rlen)
772 kra_conn_t *conn = private;
773 kra_msg_t *rxmsg = conn->rac_rxmsg;
778 LASSERT (mlen <= rlen);
779 LASSERT (!in_interrupt());
780 /* Either all pages or all vaddrs */
781 LASSERT (!(kiov != NULL && iov != NULL));
783 switch(rxmsg->ram_type) {
788 case RANAL_MSG_IMMEDIATE:
791 } else if (kiov != NULL) {
792 CERROR("Can't recv immediate into paged buffer\n");
796 while (offset >= iov->iov_len) {
797 offset -= iov->iov_len;
802 if (mlen > iov->iov_len - offset) {
803 CERROR("Can't handle immediate frags\n");
806 buffer = ((char *)iov->iov_base) + offset;
808 rc = kranal_consume_rxmsg(conn, buffer, mlen);
809 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
812 case RANAL_MSG_GET_REQ:
813 /* If the GET matched, we've already handled it in
814 * kranal_do_send which is called to send the REPLY. We're
815 * only called here to complete the GET receive (if we needed
816 * it which we don't, but I digress...) */
817 LASSERT (libmsg == NULL);
818 lib_finalize(nal, NULL, libmsg, PTL_OK);
821 case RANAL_MSG_PUT_REQ:
822 if (libmsg == NULL) { /* PUT didn't match... */
823 lib_finalize(nal, NULL, libmsg, PTL_OK);
827 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
831 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
833 kranal_tx_done(tx, rc);
838 kranal_map_buffer(tx);
840 tx->tx_msg.ram_u.putack.rapam_src_cookie =
841 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
842 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
843 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
844 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
845 (__u64)((unsigned long)tx->tx_buffer);
846 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
848 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
850 kranal_post_fma(conn, tx);
852 /* flag matched by consuming rx message */
853 kranal_consume_rxmsg(conn, NULL, 0);
859 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
860 unsigned int niov, struct iovec *iov,
861 size_t offset, size_t mlen, size_t rlen)
863 return kranal_do_recv(nal, private, msg, niov, iov, NULL,
868 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
869 unsigned int niov, ptl_kiov_t *kiov,
870 size_t offset, size_t mlen, size_t rlen)
872 return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
877 kranal_thread_start (int(*fn)(void *arg), void *arg)
879 long pid = kernel_thread(fn, arg, 0);
884 atomic_inc(&kranal_data.kra_nthreads);
889 kranal_thread_fini (void)
891 atomic_dec(&kranal_data.kra_nthreads);
895 kranal_check_conn_timeouts (kra_conn_t *conn)
898 struct list_head *ttmp;
901 unsigned long now = jiffies;
903 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
904 conn->rac_state == RANAL_CONN_CLOSING);
906 if (!conn->rac_close_sent &&
907 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
908 /* not sent in a while; schedule conn so scheduler sends a keepalive */
909 kranal_schedule_conn(conn);
912 timeout = conn->rac_timeout * HZ;
914 if (!conn->rac_close_recvd &&
915 time_after_eq(now, conn->rac_last_rx + timeout)) {
916 CERROR("Nothing received from "LPX64" within %lu seconds\n",
917 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
921 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
924 /* Check the conn's queues are moving. These are "belt+braces" checks,
925 * in case of hardware/software errors that make this conn seem
926 * responsive even though it isn't progressing its message queues. */
928 spin_lock_irqsave(&conn->rac_lock, flags);
930 list_for_each (ttmp, &conn->rac_fmaq) {
931 tx = list_entry(ttmp, kra_tx_t, tx_list);
933 if (time_after_eq(now, tx->tx_qtime + timeout)) {
934 spin_unlock_irqrestore(&conn->rac_lock, flags);
935 CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
936 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
941 list_for_each (ttmp, &conn->rac_rdmaq) {
942 tx = list_entry(ttmp, kra_tx_t, tx_list);
944 if (time_after_eq(now, tx->tx_qtime + timeout)) {
945 spin_unlock_irqrestore(&conn->rac_lock, flags);
946 CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
947 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
952 list_for_each (ttmp, &conn->rac_replyq) {
953 tx = list_entry(ttmp, kra_tx_t, tx_list);
955 if (time_after_eq(now, tx->tx_qtime + timeout)) {
956 spin_unlock_irqrestore(&conn->rac_lock, flags);
957 CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
958 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
963 spin_unlock_irqrestore(&conn->rac_lock, flags);
968 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
970 struct list_head *conns = &kranal_data.kra_conns[idx];
971 struct list_head *ctmp;
977 /* NB. We expect to check all the conns and not find any problems, so
978 * we just use a shared lock while we take a look... */
979 read_lock(&kranal_data.kra_global_lock);
981 list_for_each (ctmp, conns) {
982 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
984 if (conn->rac_timeout < *min_timeoutp )
985 *min_timeoutp = conn->rac_timeout;
986 if (conn->rac_keepalive < *min_timeoutp )
987 *min_timeoutp = conn->rac_keepalive;
989 rc = kranal_check_conn_timeouts(conn);
993 kranal_conn_addref(conn);
994 read_unlock(&kranal_data.kra_global_lock);
996 CERROR("Conn to "LPX64", cqid %d timed out\n",
997 conn->rac_peer->rap_nid, conn->rac_cqid);
999 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1001 switch (conn->rac_state) {
1005 case RANAL_CONN_ESTABLISHED:
1006 kranal_close_conn_locked(conn, -ETIMEDOUT);
1009 case RANAL_CONN_CLOSING:
1010 kranal_terminate_conn_locked(conn);
1014 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1016 kranal_conn_decref(conn);
1018 /* start again now I've dropped the lock */
1022 read_unlock(&kranal_data.kra_global_lock);
1026 kranal_connd (void *arg)
1028 long id = (long)arg;
1031 unsigned long flags;
1033 kra_acceptsock_t *ras;
1036 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1037 kportal_daemonize(name);
1038 kportal_blockallsigs();
1040 init_waitqueue_entry(&wait, current);
1042 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1044 while (!kranal_data.kra_shutdown) {
1047 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1048 ras = list_entry(kranal_data.kra_connd_acceptq.next,
1049 kra_acceptsock_t, ras_list);
1050 list_del(&ras->ras_list);
1052 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1054 CDEBUG(D_WARNING,"About to handshake someone\n");
1056 kranal_conn_handshake(ras->ras_sock, NULL);
1057 kranal_free_acceptsock(ras);
1059 CDEBUG(D_WARNING,"Finished handshaking someone\n");
1061 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1065 if (!list_empty(&kranal_data.kra_connd_peers)) {
1066 peer = list_entry(kranal_data.kra_connd_peers.next,
1067 kra_peer_t, rap_connd_list);
1069 list_del_init(&peer->rap_connd_list);
1070 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1072 kranal_connect(peer);
1073 kranal_peer_decref(peer);
1075 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1082 set_current_state(TASK_INTERRUPTIBLE);
1083 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1085 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1089 set_current_state(TASK_RUNNING);
1090 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1092 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1095 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1097 kranal_thread_fini();
1102 kranal_update_reaper_timeout(long timeout)
1104 unsigned long flags;
1106 LASSERT (timeout > 0);
1108 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1110 if (timeout < kranal_data.kra_new_min_timeout)
1111 kranal_data.kra_new_min_timeout = timeout;
1113 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1117 kranal_reaper (void *arg)
1120 unsigned long flags;
1123 int conn_entries = kranal_data.kra_conn_hash_size;
1125 int base_index = conn_entries - 1;
1126 unsigned long next_check_time = jiffies;
1127 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1128 long current_min_timeout = 1;
1130 kportal_daemonize("kranal_reaper");
1131 kportal_blockallsigs();
1133 init_waitqueue_entry(&wait, current);
1135 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1137 while (!kranal_data.kra_shutdown) {
1139 /* careful with the jiffy wrap... */
1140 timeout = (long)(next_check_time - jiffies);
1143 /* I wake up every 'p' seconds to check for
1144 * timeouts on some more peers. I try to check
1145 * every connection 'n' times within the global
1146 * minimum of all keepalive and timeout intervals,
1147 * to ensure I attend to every connection within
1148 * (n+1)/n times its timeout intervals. */
1152 unsigned long min_timeout;
1155 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1156 /* new min timeout set: restart min timeout scan */
1157 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1158 base_index = conn_index - 1;
1160 base_index = conn_entries - 1;
1162 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1163 current_min_timeout = kranal_data.kra_new_min_timeout;
1164 CWARN("Set new min timeout %ld\n",
1165 current_min_timeout);
1168 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1170 min_timeout = current_min_timeout;
1172 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1175 LASSERT (min_timeout > 0);
1177 /* Compute how many table entries to check now so I
1178 * get round the whole table fast enough (NB I do
1179 * this at fixed intervals of 'p' seconds) */
1180 chunk = conn_entries;
1181 if (min_timeout > n * p)
1182 chunk = (chunk * n * p) / min_timeout;
1186 for (i = 0; i < chunk; i++) {
1187 kranal_reaper_check(conn_index,
1189 conn_index = (conn_index + 1) % conn_entries;
1192 next_check_time += p * HZ;
1194 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1196 if (((conn_index - chunk <= base_index &&
1197 base_index < conn_index) ||
1198 (conn_index - conn_entries - chunk <= base_index &&
1199 base_index < conn_index - conn_entries))) {
1201 /* Scanned all conns: set current_min_timeout... */
1202 if (current_min_timeout != next_min_timeout) {
1203 current_min_timeout = next_min_timeout;
1204 CWARN("Set new min timeout %ld\n",
1205 current_min_timeout);
1208 /* ...and restart min timeout scan */
1209 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1210 base_index = conn_index - 1;
1212 base_index = conn_entries - 1;
1216 set_current_state(TASK_INTERRUPTIBLE);
1217 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1219 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1221 schedule_timeout(timeout);
1223 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1225 set_current_state(TASK_RUNNING);
1226 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1229 kranal_thread_fini();
1234 kranal_check_rdma_cq (kra_device_t *dev)
1239 unsigned long flags;
1240 RAP_RDMA_DESCRIPTOR *desc;
1245 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1246 if (rrc == RAP_NOT_DONE)
1249 LASSERT (rrc == RAP_SUCCESS);
1250 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1252 read_lock(&kranal_data.kra_global_lock);
1254 conn = kranal_cqid2conn_locked(cqid);
1256 /* Conn was destroyed? */
1257 CWARN("RDMA CQID lookup %d failed\n", cqid);
1258 read_unlock(&kranal_data.kra_global_lock);
1262 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1263 LASSERT (rrc == RAP_SUCCESS);
1265 spin_lock_irqsave(&conn->rac_lock, flags);
1267 LASSERT (!list_empty(&conn->rac_rdmaq));
1268 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1269 list_del(&tx->tx_list);
1271 LASSERT(desc->AppPtr == (void *)tx);
1272 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1273 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1275 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1276 tx->tx_qtime = jiffies;
1278 spin_unlock_irqrestore(&conn->rac_lock, flags);
1280 /* Get conn's fmaq processed, now I've just put something
1282 kranal_schedule_conn(conn);
1284 read_unlock(&kranal_data.kra_global_lock);
1289 kranal_check_fma_cq (kra_device_t *dev)
1295 struct list_head *conns;
1296 struct list_head *tmp;
1300 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1301 if (rrc != RAP_NOT_DONE)
1304 LASSERT (rrc == RAP_SUCCESS);
1306 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1308 read_lock(&kranal_data.kra_global_lock);
1310 conn = kranal_cqid2conn_locked(cqid);
1312 CWARN("FMA CQID lookup %d failed\n", cqid);
1314 kranal_schedule_conn(conn);
1316 read_unlock(&kranal_data.kra_global_lock);
1320 /* FMA CQ has overflowed: check ALL conns */
1321 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1323 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1325 read_lock(&kranal_data.kra_global_lock);
1327 conns = &kranal_data.kra_conns[i];
1329 list_for_each (tmp, conns) {
1330 conn = list_entry(tmp, kra_conn_t,
1333 if (conn->rac_device == dev)
1334 kranal_schedule_conn(conn);
1337 /* don't block write lockers for too long... */
1338 read_unlock(&kranal_data.kra_global_lock);
1344 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1345 void *immediate, int immediatenob)
1347 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1350 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1351 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1352 immediatenob <= RANAL_FMA_MAX_DATA :
1355 msg->ram_connstamp = conn->rac_my_connstamp;
1356 msg->ram_seq = conn->rac_tx_seq;
1359 rrc = RapkFmaSyncSend(conn->rac_device->rad_handle,
1360 immediate, immediatenob,
1363 rrc = RapkFmaSend(conn->rac_device->rad_handle,
1364 immediate, immediatenob,
1372 conn->rac_last_tx = jiffies;
1382 kranal_process_fmaq (kra_conn_t *conn)
1384 unsigned long flags;
1390 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1391 * However I will be rescheduled some by an FMA completion event
1392 * when I eventually get some.
1393 * NB 2. Sampling rac_state here, races with setting it elsewhere
1394 * kranal_close_conn_locked. But it doesn't matter if I try to
1395 * send a "real" message just as I start closing because I'll get
1396 * scheduled to send the close anyway. */
1398 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1399 if (!list_empty(&conn->rac_rdmaq)) {
1400 /* RDMAs in progress */
1401 LASSERT (!conn->rac_close_sent);
1403 if (time_after_eq(jiffies,
1405 conn->rac_keepalive)) {
1406 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1407 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1412 if (conn->rac_close_sent)
1415 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1416 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1420 conn->rac_close_sent = 1;
1421 if (!conn->rac_close_recvd)
1424 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1426 if (conn->rac_state == RANAL_CONN_CLOSING)
1427 kranal_terminate_conn_locked(conn);
1429 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1433 spin_lock_irqsave(&conn->rac_lock, flags);
1435 if (list_empty(&conn->rac_fmaq)) {
1437 spin_unlock_irqrestore(&conn->rac_lock, flags);
1439 if (time_after_eq(jiffies,
1440 conn->rac_last_tx + conn->rac_keepalive)) {
1441 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1442 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1447 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1448 list_del(&tx->tx_list);
1449 more_to_do = !list_empty(&conn->rac_fmaq);
1451 spin_unlock_irqrestore(&conn->rac_lock, flags);
1454 switch (tx->tx_msg.ram_type) {
1458 case RANAL_MSG_IMMEDIATE:
1459 case RANAL_MSG_PUT_NAK:
1460 case RANAL_MSG_PUT_DONE:
1461 case RANAL_MSG_GET_NAK:
1462 case RANAL_MSG_GET_DONE:
1463 rc = kranal_sendmsg(conn, &tx->tx_msg,
1464 tx->tx_buffer, tx->tx_nob);
1468 case RANAL_MSG_PUT_REQ:
1469 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1470 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1471 kranal_map_buffer(tx);
1475 case RANAL_MSG_PUT_ACK:
1476 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1480 case RANAL_MSG_GET_REQ:
1481 kranal_map_buffer(tx);
1482 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1483 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1484 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1485 (__u64)((unsigned long)tx->tx_buffer);
1486 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1487 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1492 if (rc == -EAGAIN) {
1493 /* I need credits to send this. Replace tx at the head of the
1494 * fmaq and I'll get rescheduled when credits appear */
1495 spin_lock_irqsave(&conn->rac_lock, flags);
1496 list_add(&tx->tx_list, &conn->rac_fmaq);
1497 spin_unlock_irqrestore(&conn->rac_lock, flags);
1503 if (!expect_reply) {
1504 kranal_tx_done(tx, 0);
1506 spin_lock_irqsave(&conn->rac_lock, flags);
1507 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1508 tx->tx_qtime = jiffies;
1509 spin_unlock_irqrestore(&conn->rac_lock, flags);
1513 kranal_schedule_conn(conn);
1517 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1519 __swab64s(&d->rard_key.Key);
1520 __swab16s(&d->rard_key.Cookie);
1521 __swab16s(&d->rard_key.MdHandle);
1522 __swab32s(&d->rard_key.Flags);
1523 __swab64s(&d->rard_addr.AddressBits);
1524 __swab32s(&d->rard_nob);
1528 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1530 struct list_head *ttmp;
1533 list_for_each(ttmp, &conn->rac_replyq) {
1534 tx = list_entry(ttmp, kra_tx_t, tx_list);
1536 if (tx->tx_cookie != cookie)
1539 if (tx->tx_msg.ram_type != type) {
1540 CWARN("Unexpected type %x (%x expected) "
1541 "matched reply from "LPX64"\n",
1542 tx->tx_msg.ram_type, type,
1543 conn->rac_peer->rap_nid);
1548 CWARN("Unmatched reply from "LPX64"\n", conn->rac_peer->rap_nid);
1553 kranal_check_fma_rx (kra_conn_t *conn)
1555 unsigned long flags;
1560 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1561 kra_peer_t *peer = conn->rac_peer;
1563 if (rrc == RAP_NOT_DONE)
1566 LASSERT (rrc == RAP_SUCCESS);
1567 conn->rac_last_rx = jiffies;
1568 seq = conn->rac_rx_seq++;
1569 msg = (kra_msg_t *)prefix;
1571 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1572 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1573 CERROR("Unexpected magic %08x from "LPX64"\n",
1574 msg->ram_magic, peer->rap_nid);
1578 __swab32s(&msg->ram_magic);
1579 __swab16s(&msg->ram_version);
1580 __swab16s(&msg->ram_type);
1581 __swab64s(&msg->ram_srcnid);
1582 __swab64s(&msg->ram_connstamp);
1583 __swab32s(&msg->ram_seq);
1585 /* NB message type checked below; NOT here... */
1586 switch (msg->ram_type) {
1587 case RANAL_MSG_PUT_ACK:
1588 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1591 case RANAL_MSG_GET_REQ:
1592 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1600 if (msg->ram_version != RANAL_MSG_VERSION) {
1601 CERROR("Unexpected protocol version %d from "LPX64"\n",
1602 msg->ram_version, peer->rap_nid);
1606 if (msg->ram_srcnid != peer->rap_nid) {
1607 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1608 msg->ram_srcnid, peer->rap_nid);
1612 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1613 CERROR("Unexpected connstamp "LPX64"("LPX64
1614 " expected) from "LPX64"\n",
1615 msg->ram_connstamp, conn->rac_peer_connstamp,
1620 if (msg->ram_seq != seq) {
1621 CERROR("Unexpected sequence number %d(%d expected) from "
1622 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1626 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1627 /* This message signals RDMA completion... */
1628 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1629 LASSERT (rrc == RAP_SUCCESS);
1632 if (conn->rac_close_recvd) {
1633 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1634 msg->ram_type, conn->rac_peer->rap_nid);
1638 if (msg->ram_type == RANAL_MSG_CLOSE) {
1639 conn->rac_close_recvd = 1;
1640 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1642 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1643 kranal_close_conn_locked(conn, 0);
1644 else if (conn->rac_close_sent)
1645 kranal_terminate_conn_locked(conn);
1647 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1651 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1654 conn->rac_rxmsg = msg; /* stash message for portals callbacks */
1655 /* they'll NULL rac_rxmsg if they consume it */
1656 switch (msg->ram_type) {
1657 case RANAL_MSG_NOOP:
1658 /* Nothing to do; just a keepalive */
1661 case RANAL_MSG_IMMEDIATE:
1662 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1665 case RANAL_MSG_PUT_REQ:
1666 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1668 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1671 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1675 tx->tx_msg.ram_u.completion.racm_cookie =
1676 msg->ram_u.putreq.raprm_cookie;
1677 kranal_post_fma(conn, tx);
1680 case RANAL_MSG_PUT_NAK:
1681 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1682 msg->ram_u.completion.racm_cookie);
1686 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1687 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1688 kranal_tx_done(tx, -ENOENT); /* no match */
1691 case RANAL_MSG_PUT_ACK:
1692 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1693 msg->ram_u.putack.rapam_src_cookie);
1697 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1698 &msg->ram_u.putack.rapam_desc,
1699 msg->ram_u.putack.rapam_desc.rard_nob,
1700 msg->ram_u.putack.rapam_dst_cookie);
1703 case RANAL_MSG_PUT_DONE:
1704 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1705 msg->ram_u.completion.racm_cookie);
1709 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1710 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1711 kranal_tx_done(tx, 0);
1714 case RANAL_MSG_GET_REQ:
1715 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1717 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1720 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1724 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1725 kranal_post_fma(conn, tx);
1728 case RANAL_MSG_GET_NAK:
1729 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1730 msg->ram_u.completion.racm_cookie);
1734 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1735 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1736 kranal_tx_done(tx, -ENOENT); /* no match */
1739 case RANAL_MSG_GET_DONE:
1740 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1741 msg->ram_u.completion.racm_cookie);
1745 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1746 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1747 kranal_tx_done(tx, 0);
1752 if (conn->rac_rxmsg != NULL)
1753 kranal_consume_rxmsg(conn, NULL, 0);
1755 /* check again later */
1756 kranal_schedule_conn(conn);
1760 kranal_complete_closed_conn (kra_conn_t *conn)
1764 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1766 while (!list_empty(&conn->rac_fmaq)) {
1767 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1769 list_del(&tx->tx_list);
1770 kranal_tx_done(tx, -ECONNABORTED);
1773 LASSERT (list_empty(&conn->rac_rdmaq));
1775 while (!list_empty(&conn->rac_replyq)) {
1776 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1778 list_del(&tx->tx_list);
1779 kranal_tx_done(tx, -ECONNABORTED);
1784 kranal_scheduler (void *arg)
1786 kra_device_t *dev = (kra_device_t *)arg;
1790 unsigned long flags;
1793 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1794 kportal_daemonize(name);
1795 kportal_blockallsigs();
1797 dev->rad_scheduler = current;
1798 init_waitqueue_entry(&wait, current);
1800 spin_lock_irqsave(&dev->rad_lock, flags);
1802 while (!kranal_data.kra_shutdown) {
1803 /* Safe: kra_shutdown only set when quiescent */
1805 if (busy_loops++ >= RANAL_RESCHED) {
1806 spin_unlock_irqrestore(&dev->rad_lock, flags);
1811 spin_lock_irqsave(&dev->rad_lock, flags);
1814 if (dev->rad_ready) {
1815 /* Device callback fired since I last checked it */
1817 spin_unlock_irqrestore(&dev->rad_lock, flags);
1819 kranal_check_rdma_cq(dev);
1820 kranal_check_fma_cq(dev);
1822 spin_lock_irqsave(&dev->rad_lock, flags);
1825 if (!list_empty(&dev->rad_connq)) {
1826 /* Connection needs attention */
1827 conn = list_entry(dev->rad_connq.next,
1828 kra_conn_t, rac_schedlist);
1829 list_del_init(&conn->rac_schedlist);
1830 LASSERT (conn->rac_scheduled);
1831 conn->rac_scheduled = 0;
1832 spin_unlock_irqrestore(&dev->rad_lock, flags);
1834 kranal_check_fma_rx(conn);
1835 kranal_process_fmaq(conn);
1837 if (conn->rac_state == RANAL_CONN_CLOSED)
1838 kranal_complete_closed_conn(conn);
1840 kranal_conn_decref(conn);
1842 spin_lock_irqsave(&dev->rad_lock, flags);
1846 add_wait_queue(&dev->rad_waitq, &wait);
1847 set_current_state(TASK_INTERRUPTIBLE);
1849 spin_unlock_irqrestore(&dev->rad_lock, flags);
1854 set_current_state(TASK_RUNNING);
1855 remove_wait_queue(&dev->rad_waitq, &wait);
1857 spin_lock_irqsave(&dev->rad_lock, flags);
1860 spin_unlock_irqrestore(&dev->rad_lock, flags);
1862 dev->rad_scheduler = NULL;
1863 kranal_thread_fini();
1868 lib_nal_t kranal_lib = {
1869 libnal_data: &kranal_data, /* NAL private data */
1870 libnal_send: kranal_send,
1871 libnal_send_pages: kranal_send_pages,
1872 libnal_recv: kranal_recv,
1873 libnal_recv_pages: kranal_recv_pages,
1874 libnal_dist: kranal_dist