1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
47 CDEBUG(D_NET, "callback for device %d\n", devid);
49 for (i = 0; i < kranal_data.kra_ndevs; i++) {
51 dev = &kranal_data.kra_devices[i];
52 if (dev->rad_id != devid)
55 spin_lock_irqsave(&dev->rad_lock, flags);
57 if (!dev->rad_ready) {
59 wake_up(&dev->rad_waitq);
62 spin_unlock_irqrestore(&dev->rad_lock, flags);
66 CWARN("callback for unknown device %d\n", devid);
70 kranal_schedule_conn(kra_conn_t *conn)
72 kra_device_t *dev = conn->rac_device;
75 spin_lock_irqsave(&dev->rad_lock, flags);
77 if (!conn->rac_scheduled) {
78 kranal_conn_addref(conn); /* +1 ref for scheduler */
79 conn->rac_scheduled = 1;
80 list_add_tail(&conn->rac_schedlist, &dev->rad_connq);
81 wake_up(&dev->rad_waitq);
84 spin_unlock_irqrestore(&dev->rad_lock, flags);
88 kranal_get_idle_tx (int may_block)
94 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
96 /* "normal" descriptor is free */
97 if (!list_empty(&kranal_data.kra_idle_txs)) {
98 tx = list_entry(kranal_data.kra_idle_txs.next,
104 /* may dip into reserve pool */
105 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
106 CERROR("reserved tx desc pool exhausted\n");
110 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
115 /* block for idle tx */
116 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
118 wait_event(kranal_data.kra_idle_tx_waitq,
119 !list_empty(&kranal_data.kra_idle_txs));
123 list_del(&tx->tx_list);
125 /* Allocate a new completion cookie. It might not be
126 * needed, but we've got a lock right now... */
127 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
129 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
130 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
131 LASSERT (tx->tx_conn == NULL);
132 LASSERT (tx->tx_libmsg[0] == NULL);
133 LASSERT (tx->tx_libmsg[1] == NULL);
136 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
142 kranal_init_msg(kra_msg_t *msg, int type)
144 msg->ram_magic = RANAL_MSG_MAGIC;
145 msg->ram_version = RANAL_MSG_VERSION;
146 msg->ram_type = type;
147 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
148 /* ram_connstamp gets set when FMA is sent */
152 kranal_new_tx_msg (int may_block, int type)
154 kra_tx_t *tx = kranal_get_idle_tx(may_block);
159 kranal_init_msg(&tx->tx_msg, type);
164 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
168 /* For now this is almost identical to kranal_setup_virt_buffer, but we
169 * could "flatten" the payload into a single contiguous buffer ready
170 * for sending direct over an FMA if we ever needed to. */
172 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
176 tx->tx_buffer = NULL;
180 while (offset >= iov->iov_len) {
181 offset -= iov->iov_len;
187 if (nob > iov->iov_len - offset) {
188 CERROR("Can't handle multiple vaddr fragments\n");
192 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
195 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
201 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
207 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
209 while (offset >= iov->iov_len) {
210 offset -= iov->iov_len;
216 if (nob > iov->iov_len - offset) {
217 CERROR("Can't handle multiple vaddr fragments\n");
221 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
223 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
228 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
231 RAP_PHYS_REGION *phys = tx->tx_phys;
234 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
238 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
240 while (offset >= kiov->kiov_len) {
241 offset -= kiov->kiov_len;
247 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
249 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
251 phys->Address = kranal_page2phys(kiov->kiov_page);
254 resid = nob - (kiov->kiov_len - offset);
260 if (kiov->kiov_offset != 0 ||
261 ((resid > PAGE_SIZE) &&
262 kiov->kiov_len < PAGE_SIZE)) {
263 /* Can't have gaps */
264 CERROR("Can't make payload contiguous in I/O VM:"
265 "page %d, offset %d, len %d \n",
266 (int)(phys - tx->tx_phys),
267 kiov->kiov_offset, kiov->kiov_len);
271 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
272 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
276 phys->Address = kranal_page2phys(kiov->kiov_page);
282 tx->tx_phys_npages = phys - tx->tx_phys;
287 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
288 struct iovec *iov, ptl_kiov_t *kiov,
291 LASSERT ((iov == NULL) != (kiov == NULL));
294 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
296 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
300 kranal_map_buffer (kra_tx_t *tx)
302 kra_conn_t *conn = tx->tx_conn;
303 kra_device_t *dev = conn->rac_device;
306 LASSERT (current == dev->rad_scheduler);
308 switch (tx->tx_buftype) {
313 case RANAL_BUF_IMMEDIATE:
314 case RANAL_BUF_PHYS_MAPPED:
315 case RANAL_BUF_VIRT_MAPPED:
318 case RANAL_BUF_PHYS_UNMAPPED:
319 rrc = RapkRegisterPhys(dev->rad_handle,
320 tx->tx_phys, tx->tx_phys_npages,
322 LASSERT (rrc == RAP_SUCCESS);
323 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
326 case RANAL_BUF_VIRT_UNMAPPED:
327 rrc = RapkRegisterMemory(dev->rad_handle,
328 tx->tx_buffer, tx->tx_nob,
330 LASSERT (rrc == RAP_SUCCESS);
331 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
337 kranal_unmap_buffer (kra_tx_t *tx)
342 switch (tx->tx_buftype) {
347 case RANAL_BUF_IMMEDIATE:
348 case RANAL_BUF_PHYS_UNMAPPED:
349 case RANAL_BUF_VIRT_UNMAPPED:
352 case RANAL_BUF_PHYS_MAPPED:
353 LASSERT (tx->tx_conn != NULL);
354 dev = tx->tx_conn->rac_device;
355 LASSERT (current == dev->rad_scheduler);
356 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
358 LASSERT (rrc == RAP_SUCCESS);
359 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
362 case RANAL_BUF_VIRT_MAPPED:
363 LASSERT (tx->tx_conn != NULL);
364 dev = tx->tx_conn->rac_device;
365 LASSERT (current == dev->rad_scheduler);
366 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
368 LASSERT (rrc == RAP_SUCCESS);
369 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
375 kranal_tx_done (kra_tx_t *tx, int completion)
377 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
381 LASSERT (!in_interrupt());
383 kranal_unmap_buffer(tx);
385 for (i = 0; i < 2; i++) {
386 /* tx may have up to 2 libmsgs to finalise */
387 if (tx->tx_libmsg[i] == NULL)
390 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
391 tx->tx_libmsg[i] = NULL;
394 tx->tx_buftype = RANAL_BUF_NONE;
395 tx->tx_msg.ram_type = RANAL_MSG_NONE;
398 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
401 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
403 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
404 wake_up(&kranal_data.kra_idle_tx_waitq);
407 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
411 kranal_find_conn_locked (kra_peer_t *peer)
413 struct list_head *tmp;
415 /* just return the first connection */
416 list_for_each (tmp, &peer->rap_conns) {
417 return list_entry(tmp, kra_conn_t, rac_list);
424 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
430 spin_lock_irqsave(&conn->rac_lock, flags);
431 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
432 tx->tx_qtime = jiffies;
433 spin_unlock_irqrestore(&conn->rac_lock, flags);
435 kranal_schedule_conn(conn);
439 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
445 rwlock_t *g_lock = &kranal_data.kra_global_lock;
447 /* If I get here, I've committed to send, so I complete the tx with
448 * failure on any problems */
450 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
454 peer = kranal_find_peer_locked(nid);
457 kranal_tx_done(tx, -EHOSTUNREACH);
461 conn = kranal_find_conn_locked(peer);
463 kranal_post_fma(conn, tx);
468 /* Making one or more connections; I'll need a write lock... */
470 write_lock_irqsave(g_lock, flags);
472 peer = kranal_find_peer_locked(nid);
474 write_unlock_irqrestore(g_lock, flags);
475 kranal_tx_done(tx, -EHOSTUNREACH);
479 conn = kranal_find_conn_locked(peer);
481 /* Connection exists; queue message on it */
482 kranal_post_fma(conn, tx);
483 write_unlock_irqrestore(g_lock, flags);
487 LASSERT (peer->rap_persistence > 0);
489 if (!peer->rap_connecting) {
490 LASSERT (list_empty(&peer->rap_tx_queue));
492 now = CURRENT_SECONDS;
493 if (now < peer->rap_reconnect_time) {
494 write_unlock_irqrestore(g_lock, flags);
495 kranal_tx_done(tx, -EHOSTUNREACH);
499 peer->rap_connecting = 1;
500 kranal_peer_addref(peer); /* extra ref for connd */
502 spin_lock(&kranal_data.kra_connd_lock);
504 list_add_tail(&peer->rap_connd_list,
505 &kranal_data.kra_connd_peers);
506 wake_up(&kranal_data.kra_connd_waitq);
508 spin_unlock(&kranal_data.kra_connd_lock);
511 /* A connection is being established; queue the message... */
512 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
514 write_unlock_irqrestore(g_lock, flags);
518 kranal_rdma(kra_tx_t *tx, int type,
519 kra_rdma_desc_t *sink, int nob, __u64 cookie)
521 kra_conn_t *conn = tx->tx_conn;
525 LASSERT (kranal_tx_mapped(tx));
526 LASSERT (nob <= sink->rard_nob);
527 LASSERT (nob <= tx->tx_nob);
529 /* No actual race with scheduler sending CLOSE (I'm she!) */
530 LASSERT (current == conn->rac_device->rad_scheduler);
532 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
533 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
534 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
535 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
536 tx->tx_rdma_desc.DstKey = sink->rard_key;
537 tx->tx_rdma_desc.Length = nob;
538 tx->tx_rdma_desc.AppPtr = tx;
540 /* prep final completion message */
541 kranal_init_msg(&tx->tx_msg, type);
542 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
544 if (nob == 0) { /* Immediate completion */
545 kranal_post_fma(conn, tx);
549 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
551 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
552 LASSERT (rrc == RAP_SUCCESS);
554 spin_lock_irqsave(&conn->rac_lock, flags);
555 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
556 tx->tx_qtime = jiffies;
557 spin_unlock_irqrestore(&conn->rac_lock, flags);
561 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
563 __u32 nob_received = nob;
566 LASSERT (conn->rac_rxmsg != NULL);
567 CDEBUG(D_NET, "Consuming %p\n", conn);
569 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
570 &nob_received, sizeof(kra_msg_t));
571 LASSERT (rrc == RAP_SUCCESS);
573 conn->rac_rxmsg = NULL;
575 if (nob_received < nob) {
576 CWARN("Incomplete immediate msg from "LPX64
577 ": expected %d, got %d\n",
578 conn->rac_peer->rap_nid, nob, nob_received);
586 kranal_do_send (lib_nal_t *nal,
603 /* NB 'private' is different depending on what we're sending.... */
605 CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
606 nob, niov, nid, pid);
608 LASSERT (nob == 0 || niov > 0);
609 LASSERT (niov <= PTL_MD_MAX_IOV);
611 LASSERT (!in_interrupt());
612 /* payload is either all vaddrs or all pages */
613 LASSERT (!(kiov != NULL && iov != NULL));
619 case PTL_MSG_REPLY: {
620 /* reply's 'private' is the conn that received the GET_REQ */
622 LASSERT (conn->rac_rxmsg != NULL);
624 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
625 if (nob > RANAL_FMA_MAX_DATA) {
626 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
630 break; /* RDMA not expected */
633 /* Incoming message consistent with immediate reply? */
634 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
635 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
636 nid, conn->rac_rxmsg->ram_type);
640 tx = kranal_get_idle_tx(0);
644 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
646 kranal_tx_done(tx, rc);
651 tx->tx_libmsg[0] = libmsg;
653 kranal_map_buffer(tx);
654 kranal_rdma(tx, RANAL_MSG_GET_DONE,
655 &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
656 conn->rac_rxmsg->ram_u.get.ragm_cookie);
658 /* flag matched by consuming rx message */
659 kranal_consume_rxmsg(conn, NULL, 0);
666 /* We have to consider the eventual sink buffer rather than any
667 * payload passed here (there isn't any, and strictly, looking
668 * inside libmsg is a layering violation). We send a simple
669 * IMMEDIATE GET if the sink buffer is mapped already and small
672 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
673 libmsg->md->length <= RANAL_FMA_MAX_DATA &&
674 libmsg->md->length <= kranal_tunables.kra_max_immediate)
677 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
681 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
682 rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
683 libmsg->md->md_iov.iov,
684 0, libmsg->md->length);
686 rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
687 libmsg->md->md_iov.kiov,
688 0, libmsg->md->length);
690 kranal_tx_done(tx, rc);
694 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
695 if (tx->tx_libmsg[1] == NULL) {
696 CERROR("Can't create reply for GET to "LPX64"\n", nid);
697 kranal_tx_done(tx, rc);
701 tx->tx_libmsg[0] = libmsg;
702 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
703 /* rest of tx_msg is setup just before it is sent */
704 kranal_launch_tx(tx, nid);
712 if (kiov == NULL && /* not paged */
713 nob <= RANAL_FMA_MAX_DATA && /* small enough */
714 nob <= kranal_tunables.kra_max_immediate)
715 break; /* send IMMEDIATE */
717 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
721 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
723 kranal_tx_done(tx, rc);
727 tx->tx_libmsg[0] = libmsg;
728 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
729 /* rest of tx_msg is setup just before it is sent */
730 kranal_launch_tx(tx, nid);
734 LASSERT (kiov == NULL);
735 LASSERT (nob <= RANAL_FMA_MAX_DATA);
737 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
738 type == PTL_MSG_REPLY ||
740 RANAL_MSG_IMMEDIATE);
744 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
746 kranal_tx_done(tx, rc);
750 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
751 tx->tx_libmsg[0] = libmsg;
752 kranal_launch_tx(tx, nid);
757 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
758 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
759 unsigned int niov, struct iovec *iov,
760 size_t offset, size_t len)
762 return kranal_do_send(nal, private, cookie,
769 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
770 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
771 unsigned int niov, ptl_kiov_t *kiov,
772 size_t offset, size_t len)
774 return kranal_do_send(nal, private, cookie,
781 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
782 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
783 int offset, int mlen, int rlen)
785 kra_conn_t *conn = private;
786 kra_msg_t *rxmsg = conn->rac_rxmsg;
791 LASSERT (mlen <= rlen);
792 LASSERT (!in_interrupt());
793 /* Either all pages or all vaddrs */
794 LASSERT (!(kiov != NULL && iov != NULL));
796 CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
798 if (libmsg == NULL) {
799 /* GET or ACK or portals is discarding */
801 lib_finalize(nal, NULL, libmsg, PTL_OK);
805 switch(rxmsg->ram_type) {
810 case RANAL_MSG_IMMEDIATE:
813 } else if (kiov != NULL) {
814 CERROR("Can't recv immediate into paged buffer\n");
818 while (offset >= iov->iov_len) {
819 offset -= iov->iov_len;
824 if (mlen > iov->iov_len - offset) {
825 CERROR("Can't handle immediate frags\n");
828 buffer = ((char *)iov->iov_base) + offset;
830 rc = kranal_consume_rxmsg(conn, buffer, mlen);
831 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
834 case RANAL_MSG_PUT_REQ:
835 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
839 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
841 kranal_tx_done(tx, rc);
846 kranal_map_buffer(tx);
848 tx->tx_msg.ram_u.putack.rapam_src_cookie =
849 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
850 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
851 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
852 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
853 (__u64)((unsigned long)tx->tx_buffer);
854 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
856 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
858 kranal_post_fma(conn, tx);
860 /* flag matched by consuming rx message */
861 kranal_consume_rxmsg(conn, NULL, 0);
867 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
868 unsigned int niov, struct iovec *iov,
869 size_t offset, size_t mlen, size_t rlen)
871 return kranal_do_recv(nal, private, msg, niov, iov, NULL,
876 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
877 unsigned int niov, ptl_kiov_t *kiov,
878 size_t offset, size_t mlen, size_t rlen)
880 return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
885 kranal_thread_start (int(*fn)(void *arg), void *arg)
887 long pid = kernel_thread(fn, arg, 0);
892 atomic_inc(&kranal_data.kra_nthreads);
897 kranal_thread_fini (void)
899 atomic_dec(&kranal_data.kra_nthreads);
903 kranal_check_conn_timeouts (kra_conn_t *conn)
906 struct list_head *ttmp;
909 unsigned long now = jiffies;
911 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
912 conn->rac_state == RANAL_CONN_CLOSING);
914 if (!conn->rac_close_sent &&
915 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
916 /* not sent in a while; schedule conn so scheduler sends a keepalive */
917 CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
918 conn, conn->rac_peer->rap_nid);
919 kranal_schedule_conn(conn);
922 timeout = conn->rac_timeout * HZ;
924 if (!conn->rac_close_recvd &&
925 time_after_eq(now, conn->rac_last_rx + timeout)) {
926 CERROR("%s received from "LPX64" within %lu seconds\n",
927 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
928 "Nothing" : "CLOSE not",
929 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
933 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
936 /* Check the conn's queues are moving. These are "belt+braces" checks,
937 * in case of hardware/software errors that make this conn seem
938 * responsive even though it isn't progressing its message queues. */
940 spin_lock_irqsave(&conn->rac_lock, flags);
942 list_for_each (ttmp, &conn->rac_fmaq) {
943 tx = list_entry(ttmp, kra_tx_t, tx_list);
945 if (time_after_eq(now, tx->tx_qtime + timeout)) {
946 spin_unlock_irqrestore(&conn->rac_lock, flags);
947 CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
948 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
953 list_for_each (ttmp, &conn->rac_rdmaq) {
954 tx = list_entry(ttmp, kra_tx_t, tx_list);
956 if (time_after_eq(now, tx->tx_qtime + timeout)) {
957 spin_unlock_irqrestore(&conn->rac_lock, flags);
958 CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
959 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
964 list_for_each (ttmp, &conn->rac_replyq) {
965 tx = list_entry(ttmp, kra_tx_t, tx_list);
967 if (time_after_eq(now, tx->tx_qtime + timeout)) {
968 spin_unlock_irqrestore(&conn->rac_lock, flags);
969 CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
970 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
975 spin_unlock_irqrestore(&conn->rac_lock, flags);
980 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
982 struct list_head *conns = &kranal_data.kra_conns[idx];
983 struct list_head *ctmp;
989 /* NB. We expect to check all the conns and not find any problems, so
990 * we just use a shared lock while we take a look... */
991 read_lock(&kranal_data.kra_global_lock);
993 list_for_each (ctmp, conns) {
994 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
996 if (conn->rac_timeout < *min_timeoutp )
997 *min_timeoutp = conn->rac_timeout;
998 if (conn->rac_keepalive < *min_timeoutp )
999 *min_timeoutp = conn->rac_keepalive;
1001 rc = kranal_check_conn_timeouts(conn);
1005 kranal_conn_addref(conn);
1006 read_unlock(&kranal_data.kra_global_lock);
1008 CERROR("Conn to "LPX64", cqid %d timed out\n",
1009 conn->rac_peer->rap_nid, conn->rac_cqid);
1011 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1013 switch (conn->rac_state) {
1017 case RANAL_CONN_ESTABLISHED:
1018 kranal_close_conn_locked(conn, -ETIMEDOUT);
1021 case RANAL_CONN_CLOSING:
1022 kranal_terminate_conn_locked(conn);
1026 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1028 kranal_conn_decref(conn);
1030 /* start again now I've dropped the lock */
1034 read_unlock(&kranal_data.kra_global_lock);
1038 kranal_connd (void *arg)
1040 long id = (long)arg;
1043 unsigned long flags;
1045 kra_acceptsock_t *ras;
1048 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1049 kportal_daemonize(name);
1050 kportal_blockallsigs();
1052 init_waitqueue_entry(&wait, current);
1054 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1056 while (!kranal_data.kra_shutdown) {
1059 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1060 ras = list_entry(kranal_data.kra_connd_acceptq.next,
1061 kra_acceptsock_t, ras_list);
1062 list_del(&ras->ras_list);
1064 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1066 CDEBUG(D_NET,"About to handshake someone\n");
1068 kranal_conn_handshake(ras->ras_sock, NULL);
1069 kranal_free_acceptsock(ras);
1071 CDEBUG(D_NET,"Finished handshaking someone\n");
1073 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1077 if (!list_empty(&kranal_data.kra_connd_peers)) {
1078 peer = list_entry(kranal_data.kra_connd_peers.next,
1079 kra_peer_t, rap_connd_list);
1081 list_del_init(&peer->rap_connd_list);
1082 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1084 kranal_connect(peer);
1085 kranal_peer_decref(peer);
1087 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1094 set_current_state(TASK_INTERRUPTIBLE);
1095 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1097 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1101 set_current_state(TASK_RUNNING);
1102 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1104 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1107 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1109 kranal_thread_fini();
1114 kranal_update_reaper_timeout(long timeout)
1116 unsigned long flags;
1118 LASSERT (timeout > 0);
1120 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1122 if (timeout < kranal_data.kra_new_min_timeout)
1123 kranal_data.kra_new_min_timeout = timeout;
1125 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1129 kranal_reaper (void *arg)
1132 unsigned long flags;
1135 int conn_entries = kranal_data.kra_conn_hash_size;
1137 int base_index = conn_entries - 1;
1138 unsigned long next_check_time = jiffies;
1139 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1140 long current_min_timeout = 1;
1142 kportal_daemonize("kranal_reaper");
1143 kportal_blockallsigs();
1145 init_waitqueue_entry(&wait, current);
1147 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1149 while (!kranal_data.kra_shutdown) {
1150 /* I wake up every 'p' seconds to check for timeouts on some
1151 * more peers. I try to check every connection 'n' times
1152 * within the global minimum of all keepalive and timeout
1153 * intervals, to ensure I attend to every connection within
1154 * (n+1)/n times its timeout intervals. */
1157 unsigned long min_timeout;
1160 /* careful with the jiffy wrap... */
1161 timeout = (long)(next_check_time - jiffies);
1163 set_current_state(TASK_INTERRUPTIBLE);
1164 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1166 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1168 schedule_timeout(timeout);
1170 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1172 set_current_state(TASK_RUNNING);
1173 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1177 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1178 /* new min timeout set: restart min timeout scan */
1179 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1180 base_index = conn_index - 1;
1182 base_index = conn_entries - 1;
1184 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1185 current_min_timeout = kranal_data.kra_new_min_timeout;
1186 CDEBUG(D_NET, "Set new min timeout %ld\n",
1187 current_min_timeout);
1190 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1192 min_timeout = current_min_timeout;
1194 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1196 LASSERT (min_timeout > 0);
1198 /* Compute how many table entries to check now so I get round
1199 * the whole table fast enough given that I do this at fixed
1200 * intervals of 'p' seconds) */
1201 chunk = conn_entries;
1202 if (min_timeout > n * p)
1203 chunk = (chunk * n * p) / min_timeout;
1207 for (i = 0; i < chunk; i++) {
1208 kranal_reaper_check(conn_index,
1210 conn_index = (conn_index + 1) % conn_entries;
1213 next_check_time += p * HZ;
1215 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1217 if (((conn_index - chunk <= base_index &&
1218 base_index < conn_index) ||
1219 (conn_index - conn_entries - chunk <= base_index &&
1220 base_index < conn_index - conn_entries))) {
1222 /* Scanned all conns: set current_min_timeout... */
1223 if (current_min_timeout != next_min_timeout) {
1224 current_min_timeout = next_min_timeout;
1225 CDEBUG(D_NET, "Set new min timeout %ld\n",
1226 current_min_timeout);
1229 /* ...and restart min timeout scan */
1230 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1231 base_index = conn_index - 1;
1233 base_index = conn_entries - 1;
1237 kranal_thread_fini();
1242 kranal_check_rdma_cq (kra_device_t *dev)
1247 unsigned long flags;
1248 RAP_RDMA_DESCRIPTOR *desc;
1253 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1254 if (rrc == RAP_NOT_DONE) {
1255 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1259 LASSERT (rrc == RAP_SUCCESS);
1260 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1262 read_lock(&kranal_data.kra_global_lock);
1264 conn = kranal_cqid2conn_locked(cqid);
1266 /* Conn was destroyed? */
1267 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1268 read_unlock(&kranal_data.kra_global_lock);
1272 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1273 LASSERT (rrc == RAP_SUCCESS);
1275 CDEBUG(D_NET, "Completed %p\n",
1276 list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1278 spin_lock_irqsave(&conn->rac_lock, flags);
1280 LASSERT (!list_empty(&conn->rac_rdmaq));
1281 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1282 list_del(&tx->tx_list);
1284 LASSERT(desc->AppPtr == (void *)tx);
1285 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1286 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1288 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1289 tx->tx_qtime = jiffies;
1291 spin_unlock_irqrestore(&conn->rac_lock, flags);
1293 /* Get conn's fmaq processed, now I've just put something
1295 kranal_schedule_conn(conn);
1297 read_unlock(&kranal_data.kra_global_lock);
1302 kranal_check_fma_cq (kra_device_t *dev)
1308 struct list_head *conns;
1309 struct list_head *tmp;
1313 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1314 if (rrc == RAP_NOT_DONE) {
1315 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1319 LASSERT (rrc == RAP_SUCCESS);
1321 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1323 read_lock(&kranal_data.kra_global_lock);
1325 conn = kranal_cqid2conn_locked(cqid);
1327 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1330 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1332 kranal_schedule_conn(conn);
1335 read_unlock(&kranal_data.kra_global_lock);
1339 /* FMA CQ has overflowed: check ALL conns */
1340 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1342 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1344 read_lock(&kranal_data.kra_global_lock);
1346 conns = &kranal_data.kra_conns[i];
1348 list_for_each (tmp, conns) {
1349 conn = list_entry(tmp, kra_conn_t,
1352 if (conn->rac_device == dev)
1353 kranal_schedule_conn(conn);
1356 /* don't block write lockers for too long... */
1357 read_unlock(&kranal_data.kra_global_lock);
1363 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1364 void *immediate, int immediatenob)
1366 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1369 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1370 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1371 immediate, immediatenob);
1373 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1374 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1375 immediatenob <= RANAL_FMA_MAX_DATA :
1378 msg->ram_connstamp = conn->rac_my_connstamp;
1379 msg->ram_seq = conn->rac_tx_seq;
1382 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1383 immediate, immediatenob,
1386 rrc = RapkFmaSend(conn->rac_rihandle,
1387 immediate, immediatenob,
1395 conn->rac_last_tx = jiffies;
1405 kranal_process_fmaq (kra_conn_t *conn)
1407 unsigned long flags;
1413 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1414 * However I will be rescheduled some by an FMA completion event
1415 * when I eventually get some.
1416 * NB 2. Sampling rac_state here races with setting it elsewhere.
1417 * But it doesn't matter if I try to send a "real" message just
1418 * as I start closing because I'll get scheduled to send the
1421 /* Not racing with incoming message processing! */
1422 LASSERT (current == conn->rac_device->rad_scheduler);
1424 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1425 if (!list_empty(&conn->rac_rdmaq)) {
1426 /* RDMAs in progress */
1427 LASSERT (!conn->rac_close_sent);
1429 if (time_after_eq(jiffies,
1431 conn->rac_keepalive * HZ)) {
1432 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1433 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1434 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1439 if (conn->rac_close_sent)
1442 CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
1443 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1444 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1448 conn->rac_close_sent = 1;
1449 if (!conn->rac_close_recvd)
1452 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1454 if (conn->rac_state == RANAL_CONN_CLOSING)
1455 kranal_terminate_conn_locked(conn);
1457 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1461 spin_lock_irqsave(&conn->rac_lock, flags);
1463 if (list_empty(&conn->rac_fmaq)) {
1465 spin_unlock_irqrestore(&conn->rac_lock, flags);
1467 if (time_after_eq(jiffies,
1468 conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1469 CDEBUG(D_NET, "sending NOOP (idle)\n");
1470 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1471 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1476 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1477 list_del(&tx->tx_list);
1478 more_to_do = !list_empty(&conn->rac_fmaq);
1480 spin_unlock_irqrestore(&conn->rac_lock, flags);
1483 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1484 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1485 switch (tx->tx_msg.ram_type) {
1489 case RANAL_MSG_IMMEDIATE:
1490 rc = kranal_sendmsg(conn, &tx->tx_msg,
1491 tx->tx_buffer, tx->tx_nob);
1495 case RANAL_MSG_PUT_NAK:
1496 case RANAL_MSG_PUT_DONE:
1497 case RANAL_MSG_GET_NAK:
1498 case RANAL_MSG_GET_DONE:
1499 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1503 case RANAL_MSG_PUT_REQ:
1504 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1505 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1506 kranal_map_buffer(tx);
1510 case RANAL_MSG_PUT_ACK:
1511 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1515 case RANAL_MSG_GET_REQ:
1516 kranal_map_buffer(tx);
1517 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1518 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1519 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1520 (__u64)((unsigned long)tx->tx_buffer);
1521 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1522 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1527 if (rc == -EAGAIN) {
1528 /* I need credits to send this. Replace tx at the head of the
1529 * fmaq and I'll get rescheduled when credits appear */
1530 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1531 spin_lock_irqsave(&conn->rac_lock, flags);
1532 list_add(&tx->tx_list, &conn->rac_fmaq);
1533 spin_unlock_irqrestore(&conn->rac_lock, flags);
1539 if (!expect_reply) {
1540 kranal_tx_done(tx, 0);
1542 /* LASSERT(current) above ensures this doesn't race with reply
1544 spin_lock_irqsave(&conn->rac_lock, flags);
1545 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1546 tx->tx_qtime = jiffies;
1547 spin_unlock_irqrestore(&conn->rac_lock, flags);
1551 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1552 kranal_schedule_conn(conn);
1557 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1559 __swab64s(&d->rard_key.Key);
1560 __swab16s(&d->rard_key.Cookie);
1561 __swab16s(&d->rard_key.MdHandle);
1562 __swab32s(&d->rard_key.Flags);
1563 __swab64s(&d->rard_addr.AddressBits);
1564 __swab32s(&d->rard_nob);
1568 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1570 struct list_head *ttmp;
1572 unsigned long flags;
1574 spin_lock_irqsave(&conn->rac_lock, flags);
1576 list_for_each(ttmp, &conn->rac_replyq) {
1577 tx = list_entry(ttmp, kra_tx_t, tx_list);
1579 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1580 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1582 if (tx->tx_cookie != cookie)
1585 if (tx->tx_msg.ram_type != type) {
1586 spin_unlock_irqrestore(&conn->rac_lock, flags);
1587 CWARN("Unexpected type %x (%x expected) "
1588 "matched reply from "LPX64"\n",
1589 tx->tx_msg.ram_type, type,
1590 conn->rac_peer->rap_nid);
1594 list_del(&tx->tx_list);
1595 spin_unlock_irqrestore(&conn->rac_lock, flags);
1599 spin_unlock_irqrestore(&conn->rac_lock, flags);
1600 CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
1601 type, cookie, conn->rac_peer->rap_nid);
1606 kranal_check_fma_rx (kra_conn_t *conn)
1608 unsigned long flags;
1613 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1614 kra_peer_t *peer = conn->rac_peer;
1616 if (rrc == RAP_NOT_DONE)
1619 CDEBUG(D_NET, "RX on %p\n", conn);
1621 LASSERT (rrc == RAP_SUCCESS);
1622 conn->rac_last_rx = jiffies;
1623 seq = conn->rac_rx_seq++;
1624 msg = (kra_msg_t *)prefix;
1626 /* stash message for portals callbacks they'll NULL
1627 * rac_rxmsg if they consume it */
1628 LASSERT (conn->rac_rxmsg == NULL);
1629 conn->rac_rxmsg = msg;
1631 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1632 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1633 CERROR("Unexpected magic %08x from "LPX64"\n",
1634 msg->ram_magic, peer->rap_nid);
1638 __swab32s(&msg->ram_magic);
1639 __swab16s(&msg->ram_version);
1640 __swab16s(&msg->ram_type);
1641 __swab64s(&msg->ram_srcnid);
1642 __swab64s(&msg->ram_connstamp);
1643 __swab32s(&msg->ram_seq);
1645 /* NB message type checked below; NOT here... */
1646 switch (msg->ram_type) {
1647 case RANAL_MSG_PUT_ACK:
1648 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1651 case RANAL_MSG_GET_REQ:
1652 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1660 if (msg->ram_version != RANAL_MSG_VERSION) {
1661 CERROR("Unexpected protocol version %d from "LPX64"\n",
1662 msg->ram_version, peer->rap_nid);
1666 if (msg->ram_srcnid != peer->rap_nid) {
1667 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1668 msg->ram_srcnid, peer->rap_nid);
1672 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1673 CERROR("Unexpected connstamp "LPX64"("LPX64
1674 " expected) from "LPX64"\n",
1675 msg->ram_connstamp, conn->rac_peer_connstamp,
1680 if (msg->ram_seq != seq) {
1681 CERROR("Unexpected sequence number %d(%d expected) from "
1682 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1686 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1687 /* This message signals RDMA completion... */
1688 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1689 LASSERT (rrc == RAP_SUCCESS);
1692 if (conn->rac_close_recvd) {
1693 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1694 msg->ram_type, conn->rac_peer->rap_nid);
1698 if (msg->ram_type == RANAL_MSG_CLOSE) {
1699 CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
1700 conn->rac_close_recvd = 1;
1701 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1703 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1704 kranal_close_conn_locked(conn, 0);
1705 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1706 conn->rac_close_sent)
1707 kranal_terminate_conn_locked(conn);
1709 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1713 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1716 switch (msg->ram_type) {
1717 case RANAL_MSG_NOOP:
1718 /* Nothing to do; just a keepalive */
1719 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1722 case RANAL_MSG_IMMEDIATE:
1723 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1724 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1727 case RANAL_MSG_PUT_REQ:
1728 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1729 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1731 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1734 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1738 tx->tx_msg.ram_u.completion.racm_cookie =
1739 msg->ram_u.putreq.raprm_cookie;
1740 kranal_post_fma(conn, tx);
1743 case RANAL_MSG_PUT_NAK:
1744 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1745 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1746 msg->ram_u.completion.racm_cookie);
1750 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1751 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1752 kranal_tx_done(tx, -ENOENT); /* no match */
1755 case RANAL_MSG_PUT_ACK:
1756 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1757 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1758 msg->ram_u.putack.rapam_src_cookie);
1762 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1763 &msg->ram_u.putack.rapam_desc,
1764 msg->ram_u.putack.rapam_desc.rard_nob,
1765 msg->ram_u.putack.rapam_dst_cookie);
1768 case RANAL_MSG_PUT_DONE:
1769 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1770 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1771 msg->ram_u.completion.racm_cookie);
1775 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1776 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1777 kranal_tx_done(tx, 0);
1780 case RANAL_MSG_GET_REQ:
1781 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1782 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1784 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1787 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1791 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1792 kranal_post_fma(conn, tx);
1795 case RANAL_MSG_GET_NAK:
1796 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1797 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1798 msg->ram_u.completion.racm_cookie);
1802 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1803 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1804 kranal_tx_done(tx, -ENOENT); /* no match */
1807 case RANAL_MSG_GET_DONE:
1808 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1809 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1810 msg->ram_u.completion.racm_cookie);
1814 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1815 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1816 kranal_tx_done(tx, 0);
1821 if (conn->rac_rxmsg != NULL)
1822 kranal_consume_rxmsg(conn, NULL, 0);
1824 /* check again later */
1825 kranal_schedule_conn(conn);
1829 kranal_complete_closed_conn (kra_conn_t *conn)
1833 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1834 LASSERT (list_empty(&conn->rac_list));
1835 LASSERT (list_empty(&conn->rac_hashlist));
1837 while (!list_empty(&conn->rac_fmaq)) {
1838 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1840 list_del(&tx->tx_list);
1841 kranal_tx_done(tx, -ECONNABORTED);
1844 LASSERT (list_empty(&conn->rac_rdmaq));
1846 while (!list_empty(&conn->rac_replyq)) {
1847 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1849 list_del(&tx->tx_list);
1850 kranal_tx_done(tx, -ECONNABORTED);
1855 kranal_scheduler (void *arg)
1857 kra_device_t *dev = (kra_device_t *)arg;
1861 unsigned long flags;
1864 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1865 kportal_daemonize(name);
1866 kportal_blockallsigs();
1868 dev->rad_scheduler = current;
1869 init_waitqueue_entry(&wait, current);
1871 spin_lock_irqsave(&dev->rad_lock, flags);
1873 while (!kranal_data.kra_shutdown) {
1874 /* Safe: kra_shutdown only set when quiescent */
1876 if (busy_loops++ >= RANAL_RESCHED) {
1877 spin_unlock_irqrestore(&dev->rad_lock, flags);
1882 spin_lock_irqsave(&dev->rad_lock, flags);
1885 if (dev->rad_ready) {
1886 /* Device callback fired since I last checked it */
1888 spin_unlock_irqrestore(&dev->rad_lock, flags);
1890 kranal_check_rdma_cq(dev);
1891 kranal_check_fma_cq(dev);
1893 spin_lock_irqsave(&dev->rad_lock, flags);
1896 if (!list_empty(&dev->rad_connq)) {
1897 /* Connection needs attention */
1898 conn = list_entry(dev->rad_connq.next,
1899 kra_conn_t, rac_schedlist);
1900 list_del_init(&conn->rac_schedlist);
1901 LASSERT (conn->rac_scheduled);
1902 conn->rac_scheduled = 0;
1903 spin_unlock_irqrestore(&dev->rad_lock, flags);
1905 kranal_check_fma_rx(conn);
1906 kranal_process_fmaq(conn);
1908 if (conn->rac_state == RANAL_CONN_CLOSED)
1909 kranal_complete_closed_conn(conn);
1911 kranal_conn_decref(conn);
1913 spin_lock_irqsave(&dev->rad_lock, flags);
1917 /* recheck device callback fired before sleeping */
1921 add_wait_queue(&dev->rad_waitq, &wait);
1922 set_current_state(TASK_INTERRUPTIBLE);
1924 spin_unlock_irqrestore(&dev->rad_lock, flags);
1929 set_current_state(TASK_RUNNING);
1930 remove_wait_queue(&dev->rad_waitq, &wait);
1932 spin_lock_irqsave(&dev->rad_lock, flags);
1935 spin_unlock_irqrestore(&dev->rad_lock, flags);
1937 dev->rad_scheduler = NULL;
1938 kranal_thread_fini();
1943 lib_nal_t kranal_lib = {
1944 libnal_data: &kranal_data, /* NAL private data */
1945 libnal_send: kranal_send,
1946 libnal_send_pages: kranal_send_pages,
1947 libnal_recv: kranal_recv,
1948 libnal_recv_pages: kranal_recv_pages,
1949 libnal_dist: kranal_dist