1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
47 CDEBUG(D_NET, "callback for device %d\n", devid);
49 for (i = 0; i < kranal_data.kra_ndevs; i++) {
51 dev = &kranal_data.kra_devices[i];
52 if (dev->rad_id != devid)
55 spin_lock_irqsave(&dev->rad_lock, flags);
57 if (!dev->rad_ready) {
59 wake_up(&dev->rad_waitq);
62 spin_unlock_irqrestore(&dev->rad_lock, flags);
66 CWARN("callback for unknown device %d\n", devid);
70 kranal_schedule_conn(kra_conn_t *conn)
72 kra_device_t *dev = conn->rac_device;
75 spin_lock_irqsave(&dev->rad_lock, flags);
77 if (!conn->rac_scheduled) {
78 kranal_conn_addref(conn); /* +1 ref for scheduler */
79 conn->rac_scheduled = 1;
80 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
81 wake_up(&dev->rad_waitq);
84 spin_unlock_irqrestore(&dev->rad_lock, flags);
88 kranal_get_idle_tx (int may_block)
94 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
96 /* "normal" descriptor is free */
97 if (!list_empty(&kranal_data.kra_idle_txs)) {
98 tx = list_entry(kranal_data.kra_idle_txs.next,
104 /* may dip into reserve pool */
105 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
106 CERROR("reserved tx desc pool exhausted\n");
110 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
115 /* block for idle tx */
116 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
118 wait_event(kranal_data.kra_idle_tx_waitq,
119 !list_empty(&kranal_data.kra_idle_txs));
123 list_del(&tx->tx_list);
125 /* Allocate a new completion cookie. It might not be
126 * needed, but we've got a lock right now... */
127 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
129 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
130 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
131 LASSERT (tx->tx_conn == NULL);
132 LASSERT (tx->tx_libmsg[0] == NULL);
133 LASSERT (tx->tx_libmsg[1] == NULL);
136 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
142 kranal_init_msg(kra_msg_t *msg, int type)
144 msg->ram_magic = RANAL_MSG_MAGIC;
145 msg->ram_version = RANAL_MSG_VERSION;
146 msg->ram_type = type;
147 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
148 /* ram_connstamp gets set when FMA is sent */
152 kranal_new_tx_msg (int may_block, int type)
154 kra_tx_t *tx = kranal_get_idle_tx(may_block);
159 kranal_init_msg(&tx->tx_msg, type);
164 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
168 /* For now this is almost identical to kranal_setup_virt_buffer, but we
169 * could "flatten" the payload into a single contiguous buffer ready
170 * for sending direct over an FMA if we ever needed to. */
172 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
176 tx->tx_buffer = NULL;
180 while (offset >= iov->iov_len) {
181 offset -= iov->iov_len;
187 if (nob > iov->iov_len - offset) {
188 CERROR("Can't handle multiple vaddr fragments\n");
192 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
195 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
201 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
207 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
209 while (offset >= iov->iov_len) {
210 offset -= iov->iov_len;
216 if (nob > iov->iov_len - offset) {
217 CERROR("Can't handle multiple vaddr fragments\n");
221 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
223 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
228 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
231 RAP_PHYS_REGION *phys = tx->tx_phys;
234 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
238 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
240 while (offset >= kiov->kiov_len) {
241 offset -= kiov->kiov_len;
247 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
249 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
251 phys->Address = kranal_page2phys(kiov->kiov_page);
254 resid = nob - (kiov->kiov_len - offset);
260 if (kiov->kiov_offset != 0 ||
261 ((resid > PAGE_SIZE) &&
262 kiov->kiov_len < PAGE_SIZE)) {
263 /* Can't have gaps */
264 CERROR("Can't make payload contiguous in I/O VM:"
265 "page %d, offset %d, len %d \n",
266 (int)(phys - tx->tx_phys),
267 kiov->kiov_offset, kiov->kiov_len);
271 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
272 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
276 phys->Address = kranal_page2phys(kiov->kiov_page);
282 tx->tx_phys_npages = phys - tx->tx_phys;
287 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
288 struct iovec *iov, ptl_kiov_t *kiov,
291 LASSERT ((iov == NULL) != (kiov == NULL));
294 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
296 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
300 kranal_map_buffer (kra_tx_t *tx)
302 kra_conn_t *conn = tx->tx_conn;
303 kra_device_t *dev = conn->rac_device;
306 LASSERT (current == dev->rad_scheduler);
308 switch (tx->tx_buftype) {
313 case RANAL_BUF_IMMEDIATE:
314 case RANAL_BUF_PHYS_MAPPED:
315 case RANAL_BUF_VIRT_MAPPED:
318 case RANAL_BUF_PHYS_UNMAPPED:
319 rrc = RapkRegisterPhys(dev->rad_handle,
320 tx->tx_phys, tx->tx_phys_npages,
322 LASSERT (rrc == RAP_SUCCESS);
323 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
326 case RANAL_BUF_VIRT_UNMAPPED:
327 rrc = RapkRegisterMemory(dev->rad_handle,
328 tx->tx_buffer, tx->tx_nob,
330 LASSERT (rrc == RAP_SUCCESS);
331 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
337 kranal_unmap_buffer (kra_tx_t *tx)
342 switch (tx->tx_buftype) {
347 case RANAL_BUF_IMMEDIATE:
348 case RANAL_BUF_PHYS_UNMAPPED:
349 case RANAL_BUF_VIRT_UNMAPPED:
352 case RANAL_BUF_PHYS_MAPPED:
353 LASSERT (tx->tx_conn != NULL);
354 dev = tx->tx_conn->rac_device;
355 LASSERT (current == dev->rad_scheduler);
356 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
358 LASSERT (rrc == RAP_SUCCESS);
359 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
362 case RANAL_BUF_VIRT_MAPPED:
363 LASSERT (tx->tx_conn != NULL);
364 dev = tx->tx_conn->rac_device;
365 LASSERT (current == dev->rad_scheduler);
366 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
368 LASSERT (rrc == RAP_SUCCESS);
369 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
375 kranal_tx_done (kra_tx_t *tx, int completion)
377 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
381 LASSERT (!in_interrupt());
383 kranal_unmap_buffer(tx);
385 for (i = 0; i < 2; i++) {
386 /* tx may have up to 2 libmsgs to finalise */
387 if (tx->tx_libmsg[i] == NULL)
390 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
391 tx->tx_libmsg[i] = NULL;
394 tx->tx_buftype = RANAL_BUF_NONE;
395 tx->tx_msg.ram_type = RANAL_MSG_NONE;
398 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
401 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
403 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
404 wake_up(&kranal_data.kra_idle_tx_waitq);
407 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
411 kranal_find_conn_locked (kra_peer_t *peer)
413 struct list_head *tmp;
415 /* just return the first connection */
416 list_for_each (tmp, &peer->rap_conns) {
417 return list_entry(tmp, kra_conn_t, rac_list);
424 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
430 spin_lock_irqsave(&conn->rac_lock, flags);
431 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
432 tx->tx_qtime = jiffies;
433 spin_unlock_irqrestore(&conn->rac_lock, flags);
435 kranal_schedule_conn(conn);
439 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
445 rwlock_t *g_lock = &kranal_data.kra_global_lock;
447 /* If I get here, I've committed to send, so I complete the tx with
448 * failure on any problems */
450 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
454 peer = kranal_find_peer_locked(nid);
457 kranal_tx_done(tx, -EHOSTUNREACH);
461 conn = kranal_find_conn_locked(peer);
463 kranal_post_fma(conn, tx);
468 /* Making one or more connections; I'll need a write lock... */
470 write_lock_irqsave(g_lock, flags);
472 peer = kranal_find_peer_locked(nid);
474 write_unlock_irqrestore(g_lock, flags);
475 kranal_tx_done(tx, -EHOSTUNREACH);
479 conn = kranal_find_conn_locked(peer);
481 /* Connection exists; queue message on it */
482 kranal_post_fma(conn, tx);
483 write_unlock_irqrestore(g_lock, flags);
487 LASSERT (peer->rap_persistence > 0);
489 if (!peer->rap_connecting) {
490 LASSERT (list_empty(&peer->rap_tx_queue));
492 now = CURRENT_SECONDS;
493 if (now < peer->rap_reconnect_time) {
494 write_unlock_irqrestore(g_lock, flags);
495 kranal_tx_done(tx, -EHOSTUNREACH);
499 peer->rap_connecting = 1;
500 kranal_peer_addref(peer); /* extra ref for connd */
502 spin_lock(&kranal_data.kra_connd_lock);
504 list_add_tail(&peer->rap_connd_list,
505 &kranal_data.kra_connd_peers);
506 wake_up(&kranal_data.kra_connd_waitq);
508 spin_unlock(&kranal_data.kra_connd_lock);
511 /* A connection is being established; queue the message... */
512 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
514 write_unlock_irqrestore(g_lock, flags);
518 kranal_rdma(kra_tx_t *tx, int type,
519 kra_rdma_desc_t *sink, int nob, __u64 cookie)
521 kra_conn_t *conn = tx->tx_conn;
525 LASSERT (kranal_tx_mapped(tx));
526 LASSERT (nob <= sink->rard_nob);
527 LASSERT (nob <= tx->tx_nob);
529 /* No actual race with scheduler sending CLOSE (I'm she!) */
530 LASSERT (current == conn->rac_device->rad_scheduler);
532 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
533 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
534 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
535 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
536 tx->tx_rdma_desc.DstKey = sink->rard_key;
537 tx->tx_rdma_desc.Length = nob;
538 tx->tx_rdma_desc.AppPtr = tx;
540 /* prep final completion message */
541 kranal_init_msg(&tx->tx_msg, type);
542 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
544 if (nob == 0) { /* Immediate completion */
545 kranal_post_fma(conn, tx);
549 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
551 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
552 LASSERT (rrc == RAP_SUCCESS);
554 spin_lock_irqsave(&conn->rac_lock, flags);
555 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
556 tx->tx_qtime = jiffies;
557 spin_unlock_irqrestore(&conn->rac_lock, flags);
561 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
563 __u32 nob_received = nob;
566 LASSERT (conn->rac_rxmsg != NULL);
567 CDEBUG(D_NET, "Consuming %p\n", conn);
569 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
570 &nob_received, sizeof(kra_msg_t));
571 LASSERT (rrc == RAP_SUCCESS);
573 conn->rac_rxmsg = NULL;
575 if (nob_received < nob) {
576 CWARN("Incomplete immediate msg from "LPX64
577 ": expected %d, got %d\n",
578 conn->rac_peer->rap_nid, nob, nob_received);
586 kranal_do_send (lib_nal_t *nal,
603 /* NB 'private' is different depending on what we're sending.... */
605 CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
606 nob, niov, nid, pid);
608 LASSERT (nob == 0 || niov > 0);
609 LASSERT (niov <= PTL_MD_MAX_IOV);
611 LASSERT (!in_interrupt());
612 /* payload is either all vaddrs or all pages */
613 LASSERT (!(kiov != NULL && iov != NULL));
619 case PTL_MSG_REPLY: {
620 /* reply's 'private' is the conn that received the GET_REQ */
622 LASSERT (conn->rac_rxmsg != NULL);
624 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
625 if (nob > RANAL_FMA_MAX_DATA) {
626 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
630 break; /* RDMA not expected */
633 /* Incoming message consistent with RDMA? */
634 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
635 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
636 nid, conn->rac_rxmsg->ram_type);
640 tx = kranal_get_idle_tx(0);
644 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
646 kranal_tx_done(tx, rc);
651 tx->tx_libmsg[0] = libmsg;
653 kranal_map_buffer(tx);
654 kranal_rdma(tx, RANAL_MSG_GET_DONE,
655 &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
656 conn->rac_rxmsg->ram_u.get.ragm_cookie);
658 /* flag matched by consuming rx message */
659 kranal_consume_rxmsg(conn, NULL, 0);
666 /* We have to consider the eventual sink buffer rather than any
667 * payload passed here (there isn't any, and strictly, looking
668 * inside libmsg is a layering violation). We send a simple
669 * IMMEDIATE GET if the sink buffer is mapped already and small
672 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
673 libmsg->md->length <= RANAL_FMA_MAX_DATA &&
674 libmsg->md->length <= kranal_tunables.kra_max_immediate)
677 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
681 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
682 rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
683 libmsg->md->md_iov.iov,
684 0, libmsg->md->length);
686 rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
687 libmsg->md->md_iov.kiov,
688 0, libmsg->md->length);
690 kranal_tx_done(tx, rc);
694 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
695 if (tx->tx_libmsg[1] == NULL) {
696 CERROR("Can't create reply for GET to "LPX64"\n", nid);
697 kranal_tx_done(tx, rc);
701 tx->tx_libmsg[0] = libmsg;
702 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
703 /* rest of tx_msg is setup just before it is sent */
704 kranal_launch_tx(tx, nid);
712 if (kiov == NULL && /* not paged */
713 nob <= RANAL_FMA_MAX_DATA && /* small enough */
714 nob <= kranal_tunables.kra_max_immediate)
715 break; /* send IMMEDIATE */
717 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
721 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
723 kranal_tx_done(tx, rc);
727 tx->tx_libmsg[0] = libmsg;
728 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
729 /* rest of tx_msg is setup just before it is sent */
730 kranal_launch_tx(tx, nid);
734 LASSERT (kiov == NULL);
735 LASSERT (nob <= RANAL_FMA_MAX_DATA);
737 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
738 type == PTL_MSG_REPLY ||
740 RANAL_MSG_IMMEDIATE);
744 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
746 kranal_tx_done(tx, rc);
750 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
751 tx->tx_libmsg[0] = libmsg;
752 kranal_launch_tx(tx, nid);
757 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
758 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
759 unsigned int niov, struct iovec *iov,
760 size_t offset, size_t len)
762 return kranal_do_send(nal, private, cookie,
769 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
770 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
771 unsigned int niov, ptl_kiov_t *kiov,
772 size_t offset, size_t len)
774 return kranal_do_send(nal, private, cookie,
781 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
782 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
783 int offset, int mlen, int rlen)
785 kra_conn_t *conn = private;
786 kra_msg_t *rxmsg = conn->rac_rxmsg;
791 LASSERT (mlen <= rlen);
792 LASSERT (!in_interrupt());
793 /* Either all pages or all vaddrs */
794 LASSERT (!(kiov != NULL && iov != NULL));
796 CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
798 if (libmsg == NULL) {
799 /* GET or ACK or portals is discarding */
801 lib_finalize(nal, NULL, libmsg, PTL_OK);
805 switch(rxmsg->ram_type) {
810 case RANAL_MSG_IMMEDIATE:
813 } else if (kiov != NULL) {
814 CERROR("Can't recv immediate into paged buffer\n");
818 while (offset >= iov->iov_len) {
819 offset -= iov->iov_len;
824 if (mlen > iov->iov_len - offset) {
825 CERROR("Can't handle immediate frags\n");
828 buffer = ((char *)iov->iov_base) + offset;
830 rc = kranal_consume_rxmsg(conn, buffer, mlen);
831 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
834 case RANAL_MSG_PUT_REQ:
835 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
839 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
841 kranal_tx_done(tx, rc);
846 kranal_map_buffer(tx);
848 tx->tx_msg.ram_u.putack.rapam_src_cookie =
849 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
850 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
851 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
852 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
853 (__u64)((unsigned long)tx->tx_buffer);
854 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
856 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
858 kranal_post_fma(conn, tx);
860 /* flag matched by consuming rx message */
861 kranal_consume_rxmsg(conn, NULL, 0);
867 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
868 unsigned int niov, struct iovec *iov,
869 size_t offset, size_t mlen, size_t rlen)
871 return kranal_do_recv(nal, private, msg, niov, iov, NULL,
876 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
877 unsigned int niov, ptl_kiov_t *kiov,
878 size_t offset, size_t mlen, size_t rlen)
880 return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
885 kranal_thread_start (int(*fn)(void *arg), void *arg)
887 long pid = kernel_thread(fn, arg, 0);
892 atomic_inc(&kranal_data.kra_nthreads);
897 kranal_thread_fini (void)
899 atomic_dec(&kranal_data.kra_nthreads);
903 kranal_check_conn_timeouts (kra_conn_t *conn)
906 struct list_head *ttmp;
909 unsigned long now = jiffies;
911 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
912 conn->rac_state == RANAL_CONN_CLOSING);
914 if (!conn->rac_close_sent &&
915 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
916 /* not sent in a while; schedule conn so scheduler sends a keepalive */
917 CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
918 conn, conn->rac_peer->rap_nid);
919 kranal_schedule_conn(conn);
922 timeout = conn->rac_timeout * HZ;
924 if (!conn->rac_close_recvd &&
925 time_after_eq(now, conn->rac_last_rx + timeout)) {
926 CERROR("%s received from "LPX64" within %lu seconds\n",
927 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
928 "Nothing" : "CLOSE not",
929 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
933 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
936 /* Check the conn's queues are moving. These are "belt+braces" checks,
937 * in case of hardware/software errors that make this conn seem
938 * responsive even though it isn't progressing its message queues. */
940 spin_lock_irqsave(&conn->rac_lock, flags);
942 list_for_each (ttmp, &conn->rac_fmaq) {
943 tx = list_entry(ttmp, kra_tx_t, tx_list);
945 if (time_after_eq(now, tx->tx_qtime + timeout)) {
946 spin_unlock_irqrestore(&conn->rac_lock, flags);
947 CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
948 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
953 list_for_each (ttmp, &conn->rac_rdmaq) {
954 tx = list_entry(ttmp, kra_tx_t, tx_list);
956 if (time_after_eq(now, tx->tx_qtime + timeout)) {
957 spin_unlock_irqrestore(&conn->rac_lock, flags);
958 CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
959 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
964 list_for_each (ttmp, &conn->rac_replyq) {
965 tx = list_entry(ttmp, kra_tx_t, tx_list);
967 if (time_after_eq(now, tx->tx_qtime + timeout)) {
968 spin_unlock_irqrestore(&conn->rac_lock, flags);
969 CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
970 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
975 spin_unlock_irqrestore(&conn->rac_lock, flags);
980 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
982 struct list_head *conns = &kranal_data.kra_conns[idx];
983 struct list_head *ctmp;
989 /* NB. We expect to check all the conns and not find any problems, so
990 * we just use a shared lock while we take a look... */
991 read_lock(&kranal_data.kra_global_lock);
993 list_for_each (ctmp, conns) {
994 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
996 if (conn->rac_timeout < *min_timeoutp )
997 *min_timeoutp = conn->rac_timeout;
998 if (conn->rac_keepalive < *min_timeoutp )
999 *min_timeoutp = conn->rac_keepalive;
1001 rc = kranal_check_conn_timeouts(conn);
1005 kranal_conn_addref(conn);
1006 read_unlock(&kranal_data.kra_global_lock);
1008 CERROR("Conn to "LPX64", cqid %d timed out\n",
1009 conn->rac_peer->rap_nid, conn->rac_cqid);
1011 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1013 switch (conn->rac_state) {
1017 case RANAL_CONN_ESTABLISHED:
1018 kranal_close_conn_locked(conn, -ETIMEDOUT);
1021 case RANAL_CONN_CLOSING:
1022 kranal_terminate_conn_locked(conn);
1026 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1028 kranal_conn_decref(conn);
1030 /* start again now I've dropped the lock */
1034 read_unlock(&kranal_data.kra_global_lock);
1038 kranal_connd (void *arg)
1040 long id = (long)arg;
1043 unsigned long flags;
1045 kra_acceptsock_t *ras;
1048 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1049 kportal_daemonize(name);
1050 kportal_blockallsigs();
1052 init_waitqueue_entry(&wait, current);
1054 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1056 while (!kranal_data.kra_shutdown) {
1059 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1060 ras = list_entry(kranal_data.kra_connd_acceptq.next,
1061 kra_acceptsock_t, ras_list);
1062 list_del(&ras->ras_list);
1064 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1066 CDEBUG(D_NET,"About to handshake someone\n");
1068 kranal_conn_handshake(ras->ras_sock, NULL);
1069 kranal_free_acceptsock(ras);
1071 CDEBUG(D_NET,"Finished handshaking someone\n");
1073 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1077 if (!list_empty(&kranal_data.kra_connd_peers)) {
1078 peer = list_entry(kranal_data.kra_connd_peers.next,
1079 kra_peer_t, rap_connd_list);
1081 list_del_init(&peer->rap_connd_list);
1082 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1084 kranal_connect(peer);
1085 kranal_peer_decref(peer);
1087 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1094 set_current_state(TASK_INTERRUPTIBLE);
1095 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1097 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1101 set_current_state(TASK_RUNNING);
1102 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1104 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1107 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1109 kranal_thread_fini();
1114 kranal_update_reaper_timeout(long timeout)
1116 unsigned long flags;
1118 LASSERT (timeout > 0);
1120 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1122 if (timeout < kranal_data.kra_new_min_timeout)
1123 kranal_data.kra_new_min_timeout = timeout;
1125 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1129 kranal_reaper (void *arg)
1132 unsigned long flags;
1135 int conn_entries = kranal_data.kra_conn_hash_size;
1137 int base_index = conn_entries - 1;
1138 unsigned long next_check_time = jiffies;
1139 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1140 long current_min_timeout = 1;
1142 kportal_daemonize("kranal_reaper");
1143 kportal_blockallsigs();
1145 init_waitqueue_entry(&wait, current);
1147 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1149 while (!kranal_data.kra_shutdown) {
1150 /* I wake up every 'p' seconds to check for timeouts on some
1151 * more peers. I try to check every connection 'n' times
1152 * within the global minimum of all keepalive and timeout
1153 * intervals, to ensure I attend to every connection within
1154 * (n+1)/n times its timeout intervals. */
1157 unsigned long min_timeout;
1160 /* careful with the jiffy wrap... */
1161 timeout = (long)(next_check_time - jiffies);
1163 set_current_state(TASK_INTERRUPTIBLE);
1164 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1166 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1168 schedule_timeout(timeout);
1170 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1172 set_current_state(TASK_RUNNING);
1173 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1177 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1178 /* new min timeout set: restart min timeout scan */
1179 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1180 base_index = conn_index - 1;
1182 base_index = conn_entries - 1;
1184 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1185 current_min_timeout = kranal_data.kra_new_min_timeout;
1186 CDEBUG(D_NET, "Set new min timeout %ld\n",
1187 current_min_timeout);
1190 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1192 min_timeout = current_min_timeout;
1194 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1196 LASSERT (min_timeout > 0);
1198 /* Compute how many table entries to check now so I get round
1199 * the whole table fast enough given that I do this at fixed
1200 * intervals of 'p' seconds) */
1201 chunk = conn_entries;
1202 if (min_timeout > n * p)
1203 chunk = (chunk * n * p) / min_timeout;
1207 for (i = 0; i < chunk; i++) {
1208 kranal_reaper_check(conn_index,
1210 conn_index = (conn_index + 1) % conn_entries;
1213 next_check_time += p * HZ;
1215 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1217 if (((conn_index - chunk <= base_index &&
1218 base_index < conn_index) ||
1219 (conn_index - conn_entries - chunk <= base_index &&
1220 base_index < conn_index - conn_entries))) {
1222 /* Scanned all conns: set current_min_timeout... */
1223 if (current_min_timeout != next_min_timeout) {
1224 current_min_timeout = next_min_timeout;
1225 CDEBUG(D_NET, "Set new min timeout %ld\n",
1226 current_min_timeout);
1229 /* ...and restart min timeout scan */
1230 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1231 base_index = conn_index - 1;
1233 base_index = conn_entries - 1;
1237 kranal_thread_fini();
1242 kranal_check_rdma_cq (kra_device_t *dev)
1247 unsigned long flags;
1248 RAP_RDMA_DESCRIPTOR *desc;
1253 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1254 if (rrc == RAP_NOT_DONE) {
1255 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1259 LASSERT (rrc == RAP_SUCCESS);
1260 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1262 read_lock(&kranal_data.kra_global_lock);
1264 conn = kranal_cqid2conn_locked(cqid);
1266 /* Conn was destroyed? */
1267 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1268 read_unlock(&kranal_data.kra_global_lock);
1272 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1273 LASSERT (rrc == RAP_SUCCESS);
1275 CDEBUG(D_NET, "Completed %p\n",
1276 list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1278 spin_lock_irqsave(&conn->rac_lock, flags);
1280 LASSERT (!list_empty(&conn->rac_rdmaq));
1281 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1282 list_del(&tx->tx_list);
1284 LASSERT(desc->AppPtr == (void *)tx);
1285 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1286 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1288 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1289 tx->tx_qtime = jiffies;
1291 spin_unlock_irqrestore(&conn->rac_lock, flags);
1293 /* Get conn's fmaq processed, now I've just put something
1295 kranal_schedule_conn(conn);
1297 read_unlock(&kranal_data.kra_global_lock);
1302 kranal_check_fma_cq (kra_device_t *dev)
1308 struct list_head *conns;
1309 struct list_head *tmp;
1313 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1314 if (rrc == RAP_NOT_DONE) {
1315 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1319 LASSERT (rrc == RAP_SUCCESS);
1321 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1323 read_lock(&kranal_data.kra_global_lock);
1325 conn = kranal_cqid2conn_locked(cqid);
1327 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1330 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1332 kranal_schedule_conn(conn);
1335 read_unlock(&kranal_data.kra_global_lock);
1339 /* FMA CQ has overflowed: check ALL conns */
1340 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1342 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1344 read_lock(&kranal_data.kra_global_lock);
1346 conns = &kranal_data.kra_conns[i];
1348 list_for_each (tmp, conns) {
1349 conn = list_entry(tmp, kra_conn_t,
1352 if (conn->rac_device == dev)
1353 kranal_schedule_conn(conn);
1356 /* don't block write lockers for too long... */
1357 read_unlock(&kranal_data.kra_global_lock);
1363 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1364 void *immediate, int immediatenob)
1366 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1369 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1370 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1371 immediate, immediatenob);
1373 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1374 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1375 immediatenob <= RANAL_FMA_MAX_DATA :
1378 msg->ram_connstamp = conn->rac_my_connstamp;
1379 msg->ram_seq = conn->rac_tx_seq;
1382 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1383 immediate, immediatenob,
1386 rrc = RapkFmaSend(conn->rac_rihandle,
1387 immediate, immediatenob,
1395 conn->rac_last_tx = jiffies;
1400 if (time_after_eq(jiffies,
1401 conn->rac_last_tx + conn->rac_keepalive*HZ))
1402 CDEBUG(D_WARNING, "EAGAIN sending %02x (idle %lu secs)\n",
1403 msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1409 kranal_process_fmaq (kra_conn_t *conn)
1411 unsigned long flags;
1417 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1418 * However I will be rescheduled some by an FMA completion event
1419 * when I eventually get some.
1420 * NB 2. Sampling rac_state here races with setting it elsewhere.
1421 * But it doesn't matter if I try to send a "real" message just
1422 * as I start closing because I'll get scheduled to send the
1425 /* Not racing with incoming message processing! */
1426 LASSERT (current == conn->rac_device->rad_scheduler);
1428 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1429 if (!list_empty(&conn->rac_rdmaq)) {
1430 /* RDMAs in progress */
1431 LASSERT (!conn->rac_close_sent);
1433 if (time_after_eq(jiffies,
1435 conn->rac_keepalive * HZ)) {
1436 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1437 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1438 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1443 if (conn->rac_close_sent)
1446 CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
1447 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1448 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1452 conn->rac_close_sent = 1;
1453 if (!conn->rac_close_recvd)
1456 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1458 if (conn->rac_state == RANAL_CONN_CLOSING)
1459 kranal_terminate_conn_locked(conn);
1461 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1465 spin_lock_irqsave(&conn->rac_lock, flags);
1467 if (list_empty(&conn->rac_fmaq)) {
1469 spin_unlock_irqrestore(&conn->rac_lock, flags);
1471 if (time_after_eq(jiffies,
1472 conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1473 CDEBUG(D_NET, "sending NOOP -> "LPX64" (%p idle %lu(%ld))\n",
1474 conn->rac_peer->rap_nid, conn,
1475 (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1476 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1477 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1482 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1483 list_del(&tx->tx_list);
1484 more_to_do = !list_empty(&conn->rac_fmaq);
1486 spin_unlock_irqrestore(&conn->rac_lock, flags);
1489 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1490 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1491 switch (tx->tx_msg.ram_type) {
1495 case RANAL_MSG_IMMEDIATE:
1496 rc = kranal_sendmsg(conn, &tx->tx_msg,
1497 tx->tx_buffer, tx->tx_nob);
1501 case RANAL_MSG_PUT_NAK:
1502 case RANAL_MSG_PUT_DONE:
1503 case RANAL_MSG_GET_NAK:
1504 case RANAL_MSG_GET_DONE:
1505 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1509 case RANAL_MSG_PUT_REQ:
1510 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1511 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1512 kranal_map_buffer(tx);
1516 case RANAL_MSG_PUT_ACK:
1517 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1521 case RANAL_MSG_GET_REQ:
1522 kranal_map_buffer(tx);
1523 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1524 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1525 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1526 (__u64)((unsigned long)tx->tx_buffer);
1527 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1528 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1533 if (rc == -EAGAIN) {
1534 /* I need credits to send this. Replace tx at the head of the
1535 * fmaq and I'll get rescheduled when credits appear */
1536 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1537 spin_lock_irqsave(&conn->rac_lock, flags);
1538 list_add(&tx->tx_list, &conn->rac_fmaq);
1539 spin_unlock_irqrestore(&conn->rac_lock, flags);
1545 if (!expect_reply) {
1546 kranal_tx_done(tx, 0);
1548 /* LASSERT(current) above ensures this doesn't race with reply
1550 spin_lock_irqsave(&conn->rac_lock, flags);
1551 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1552 tx->tx_qtime = jiffies;
1553 spin_unlock_irqrestore(&conn->rac_lock, flags);
1557 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1558 kranal_schedule_conn(conn);
1563 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1565 __swab64s(&d->rard_key.Key);
1566 __swab16s(&d->rard_key.Cookie);
1567 __swab16s(&d->rard_key.MdHandle);
1568 __swab32s(&d->rard_key.Flags);
1569 __swab64s(&d->rard_addr.AddressBits);
1570 __swab32s(&d->rard_nob);
1574 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1576 struct list_head *ttmp;
1578 unsigned long flags;
1580 spin_lock_irqsave(&conn->rac_lock, flags);
1582 list_for_each(ttmp, &conn->rac_replyq) {
1583 tx = list_entry(ttmp, kra_tx_t, tx_list);
1585 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1586 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1588 if (tx->tx_cookie != cookie)
1591 if (tx->tx_msg.ram_type != type) {
1592 spin_unlock_irqrestore(&conn->rac_lock, flags);
1593 CWARN("Unexpected type %x (%x expected) "
1594 "matched reply from "LPX64"\n",
1595 tx->tx_msg.ram_type, type,
1596 conn->rac_peer->rap_nid);
1600 list_del(&tx->tx_list);
1601 spin_unlock_irqrestore(&conn->rac_lock, flags);
1605 spin_unlock_irqrestore(&conn->rac_lock, flags);
1606 CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
1607 type, cookie, conn->rac_peer->rap_nid);
1612 kranal_check_fma_rx (kra_conn_t *conn)
1614 unsigned long flags;
1619 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1620 kra_peer_t *peer = conn->rac_peer;
1622 if (rrc == RAP_NOT_DONE)
1625 CDEBUG(D_NET, "RX on %p\n", conn);
1627 LASSERT (rrc == RAP_SUCCESS);
1628 conn->rac_last_rx = jiffies;
1629 seq = conn->rac_rx_seq++;
1630 msg = (kra_msg_t *)prefix;
1632 /* stash message for portals callbacks they'll NULL
1633 * rac_rxmsg if they consume it */
1634 LASSERT (conn->rac_rxmsg == NULL);
1635 conn->rac_rxmsg = msg;
1637 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1638 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1639 CERROR("Unexpected magic %08x from "LPX64"\n",
1640 msg->ram_magic, peer->rap_nid);
1644 __swab32s(&msg->ram_magic);
1645 __swab16s(&msg->ram_version);
1646 __swab16s(&msg->ram_type);
1647 __swab64s(&msg->ram_srcnid);
1648 __swab64s(&msg->ram_connstamp);
1649 __swab32s(&msg->ram_seq);
1651 /* NB message type checked below; NOT here... */
1652 switch (msg->ram_type) {
1653 case RANAL_MSG_PUT_ACK:
1654 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1657 case RANAL_MSG_GET_REQ:
1658 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1666 if (msg->ram_version != RANAL_MSG_VERSION) {
1667 CERROR("Unexpected protocol version %d from "LPX64"\n",
1668 msg->ram_version, peer->rap_nid);
1672 if (msg->ram_srcnid != peer->rap_nid) {
1673 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1674 msg->ram_srcnid, peer->rap_nid);
1678 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1679 CERROR("Unexpected connstamp "LPX64"("LPX64
1680 " expected) from "LPX64"\n",
1681 msg->ram_connstamp, conn->rac_peer_connstamp,
1686 if (msg->ram_seq != seq) {
1687 CERROR("Unexpected sequence number %d(%d expected) from "
1688 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1692 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1693 /* This message signals RDMA completion... */
1694 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1695 LASSERT (rrc == RAP_SUCCESS);
1698 if (conn->rac_close_recvd) {
1699 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1700 msg->ram_type, conn->rac_peer->rap_nid);
1704 if (msg->ram_type == RANAL_MSG_CLOSE) {
1705 CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
1706 conn->rac_close_recvd = 1;
1707 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1709 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1710 kranal_close_conn_locked(conn, 0);
1711 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1712 conn->rac_close_sent)
1713 kranal_terminate_conn_locked(conn);
1715 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1719 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1722 switch (msg->ram_type) {
1723 case RANAL_MSG_NOOP:
1724 /* Nothing to do; just a keepalive */
1725 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1728 case RANAL_MSG_IMMEDIATE:
1729 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1730 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1733 case RANAL_MSG_PUT_REQ:
1734 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1735 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1737 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1740 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1744 tx->tx_msg.ram_u.completion.racm_cookie =
1745 msg->ram_u.putreq.raprm_cookie;
1746 kranal_post_fma(conn, tx);
1749 case RANAL_MSG_PUT_NAK:
1750 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1751 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1752 msg->ram_u.completion.racm_cookie);
1756 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1757 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1758 kranal_tx_done(tx, -ENOENT); /* no match */
1761 case RANAL_MSG_PUT_ACK:
1762 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1763 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1764 msg->ram_u.putack.rapam_src_cookie);
1768 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1769 &msg->ram_u.putack.rapam_desc,
1770 msg->ram_u.putack.rapam_desc.rard_nob,
1771 msg->ram_u.putack.rapam_dst_cookie);
1774 case RANAL_MSG_PUT_DONE:
1775 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1776 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1777 msg->ram_u.completion.racm_cookie);
1781 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1782 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1783 kranal_tx_done(tx, 0);
1786 case RANAL_MSG_GET_REQ:
1787 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1788 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1790 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1793 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1797 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1798 kranal_post_fma(conn, tx);
1801 case RANAL_MSG_GET_NAK:
1802 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1803 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1804 msg->ram_u.completion.racm_cookie);
1808 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1809 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1810 kranal_tx_done(tx, -ENOENT); /* no match */
1813 case RANAL_MSG_GET_DONE:
1814 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1815 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1816 msg->ram_u.completion.racm_cookie);
1820 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1821 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1822 kranal_tx_done(tx, 0);
1827 if (conn->rac_rxmsg != NULL)
1828 kranal_consume_rxmsg(conn, NULL, 0);
1830 /* check again later */
1831 kranal_schedule_conn(conn);
1835 kranal_complete_closed_conn (kra_conn_t *conn)
1841 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1842 LASSERT (list_empty(&conn->rac_list));
1843 LASSERT (list_empty(&conn->rac_hashlist));
1845 for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1846 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1848 list_del(&tx->tx_list);
1849 kranal_tx_done(tx, -ECONNABORTED);
1852 LASSERT (list_empty(&conn->rac_rdmaq));
1854 for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1855 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1857 list_del(&tx->tx_list);
1858 kranal_tx_done(tx, -ECONNABORTED);
1861 CDEBUG(D_WARNING, "Closed conn %p -> "LPX64": nmsg %d nreplies %d\n",
1862 conn, conn->rac_peer->rap_nid, nfma, nreplies);
1866 kranal_process_new_conn (kra_conn_t *conn)
1870 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1871 if (rrc == RAP_SUCCESS)
1874 LASSERT (rrc == RAP_NOT_DONE);
1875 if (!time_after_eq(jiffies, conn->rac_last_tx +
1876 conn->rac_timeout * HZ))
1880 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1881 LASSERT (rrc == RAP_SUCCESS);
1886 kranal_scheduler (void *arg)
1888 kra_device_t *dev = (kra_device_t *)arg;
1892 unsigned long flags;
1893 unsigned long deadline;
1894 unsigned long soonest;
1897 struct list_head *tmp;
1898 struct list_head *nxt;
1903 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1904 kportal_daemonize(name);
1905 kportal_blockallsigs();
1907 dev->rad_scheduler = current;
1908 init_waitqueue_entry(&wait, current);
1910 spin_lock_irqsave(&dev->rad_lock, flags);
1912 while (!kranal_data.kra_shutdown) {
1913 /* Safe: kra_shutdown only set when quiescent */
1915 if (busy_loops++ >= RANAL_RESCHED) {
1916 spin_unlock_irqrestore(&dev->rad_lock, flags);
1921 spin_lock_irqsave(&dev->rad_lock, flags);
1926 if (dev->rad_ready) {
1927 /* Device callback fired since I last checked it */
1929 spin_unlock_irqrestore(&dev->rad_lock, flags);
1932 kranal_check_rdma_cq(dev);
1933 kranal_check_fma_cq(dev);
1935 spin_lock_irqsave(&dev->rad_lock, flags);
1938 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1939 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1941 list_del_init(&conn->rac_schedlist);
1942 LASSERT (conn->rac_scheduled);
1943 conn->rac_scheduled = 0;
1944 spin_unlock_irqrestore(&dev->rad_lock, flags);
1947 kranal_check_fma_rx(conn);
1948 kranal_process_fmaq(conn);
1950 if (conn->rac_state == RANAL_CONN_CLOSED)
1951 kranal_complete_closed_conn(conn);
1953 kranal_conn_decref(conn);
1954 spin_lock_irqsave(&dev->rad_lock, flags);
1960 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
1961 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1963 deadline = conn->rac_last_tx + conn->rac_keepalive;
1964 if (time_after_eq(jiffies, deadline)) {
1965 /* Time to process this new conn */
1966 spin_unlock_irqrestore(&dev->rad_lock, flags);
1969 rc = kranal_process_new_conn(conn);
1970 if (rc != -EAGAIN) {
1971 /* All done with this conn */
1972 spin_lock_irqsave(&dev->rad_lock, flags);
1973 list_del_init(&conn->rac_schedlist);
1974 spin_unlock_irqrestore(&dev->rad_lock, flags);
1976 kranal_conn_decref(conn);
1977 spin_lock_irqsave(&dev->rad_lock, flags);
1981 /* retry with exponential backoff until HZ */
1982 if (conn->rac_keepalive == 0)
1983 conn->rac_keepalive = 1;
1984 else if (conn->rac_keepalive <= HZ)
1985 conn->rac_keepalive *= 2;
1987 conn->rac_keepalive += HZ;
1989 deadline = conn->rac_last_tx + conn->rac_keepalive;
1990 spin_lock_irqsave(&dev->rad_lock, flags);
1993 /* Does this conn need attention soonest? */
1994 if (nsoonest++ == 0 ||
1995 !time_after_eq(deadline, soonest))
1999 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2002 set_current_state(TASK_INTERRUPTIBLE);
2003 add_wait_queue(&dev->rad_waitq, &wait);
2004 spin_unlock_irqrestore(&dev->rad_lock, flags);
2006 if (nsoonest == 0) {
2010 timeout = (long)(soonest - jiffies);
2013 schedule_timeout(timeout);
2017 remove_wait_queue(&dev->rad_waitq, &wait);
2018 set_current_state(TASK_RUNNING);
2019 spin_lock_irqsave(&dev->rad_lock, flags);
2022 spin_unlock_irqrestore(&dev->rad_lock, flags);
2024 dev->rad_scheduler = NULL;
2025 kranal_thread_fini();
2030 lib_nal_t kranal_lib = {
2031 libnal_data: &kranal_data, /* NAL private data */
2032 libnal_send: kranal_send,
2033 libnal_send_pages: kranal_send_pages,
2034 libnal_recv: kranal_recv,
2035 libnal_recv_pages: kranal_recv_pages,
2036 libnal_dist: kranal_dist