1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see [sun.com URL with a
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ralnd/ralnd_cb.c
38 * Author: Eric Barton <eric@bartonsoftware.com>
44 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
50 CDEBUG(D_NET, "callback for device %d\n", devid);
52 for (i = 0; i < kranal_data.kra_ndevs; i++) {
54 dev = &kranal_data.kra_devices[i];
55 if (dev->rad_id != devid)
58 spin_lock_irqsave(&dev->rad_lock, flags);
60 if (!dev->rad_ready) {
62 wake_up(&dev->rad_waitq);
65 spin_unlock_irqrestore(&dev->rad_lock, flags);
69 CWARN("callback for unknown device %d\n", devid);
73 kranal_schedule_conn(kra_conn_t *conn)
75 kra_device_t *dev = conn->rac_device;
78 spin_lock_irqsave(&dev->rad_lock, flags);
80 if (!conn->rac_scheduled) {
81 kranal_conn_addref(conn); /* +1 ref for scheduler */
82 conn->rac_scheduled = 1;
83 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
84 wake_up(&dev->rad_waitq);
87 spin_unlock_irqrestore(&dev->rad_lock, flags);
91 kranal_get_idle_tx (void)
96 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
98 if (list_empty(&kranal_data.kra_idle_txs)) {
99 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
103 tx = list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
104 list_del(&tx->tx_list);
106 /* Allocate a new completion cookie. It might not be needed, but we've
107 * got a lock right now... */
108 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
110 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
112 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
113 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
114 LASSERT (tx->tx_conn == NULL);
115 LASSERT (tx->tx_lntmsg[0] == NULL);
116 LASSERT (tx->tx_lntmsg[1] == NULL);
122 kranal_init_msg(kra_msg_t *msg, int type)
124 msg->ram_magic = RANAL_MSG_MAGIC;
125 msg->ram_version = RANAL_MSG_VERSION;
126 msg->ram_type = type;
127 msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
128 /* ram_connstamp gets set when FMA is sent */
132 kranal_new_tx_msg (int type)
134 kra_tx_t *tx = kranal_get_idle_tx();
137 kranal_init_msg(&tx->tx_msg, type);
143 kranal_setup_immediate_buffer (kra_tx_t *tx,
144 unsigned int niov, struct iovec *iov,
148 /* For now this is almost identical to kranal_setup_virt_buffer, but we
149 * could "flatten" the payload into a single contiguous buffer ready
150 * for sending direct over an FMA if we ever needed to. */
152 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
156 tx->tx_buffer = NULL;
160 while (offset >= iov->iov_len) {
161 offset -= iov->iov_len;
167 if (nob > iov->iov_len - offset) {
168 CERROR("Can't handle multiple vaddr fragments\n");
172 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
175 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
181 kranal_setup_virt_buffer (kra_tx_t *tx,
182 unsigned int niov, struct iovec *iov,
188 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
190 while (offset >= iov->iov_len) {
191 offset -= iov->iov_len;
197 if (nob > iov->iov_len - offset) {
198 CERROR("Can't handle multiple vaddr fragments\n");
202 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
204 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
209 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
212 RAP_PHYS_REGION *phys = tx->tx_phys;
215 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
219 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
221 while (offset >= kiov->kiov_len) {
222 offset -= kiov->kiov_len;
228 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
230 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
232 phys->Address = lnet_page2phys(kiov->kiov_page);
235 resid = nob - (kiov->kiov_len - offset);
241 if (kiov->kiov_offset != 0 ||
242 ((resid > PAGE_SIZE) &&
243 kiov->kiov_len < PAGE_SIZE)) {
244 /* Can't have gaps */
245 CERROR("Can't make payload contiguous in I/O VM:"
246 "page %d, offset %d, len %d \n",
247 (int)(phys - tx->tx_phys),
248 kiov->kiov_offset, kiov->kiov_len);
252 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
253 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
257 phys->Address = lnet_page2phys(kiov->kiov_page);
263 tx->tx_phys_npages = phys - tx->tx_phys;
268 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
269 struct iovec *iov, lnet_kiov_t *kiov,
272 LASSERT ((iov == NULL) != (kiov == NULL));
275 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
277 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
281 kranal_map_buffer (kra_tx_t *tx)
283 kra_conn_t *conn = tx->tx_conn;
284 kra_device_t *dev = conn->rac_device;
287 LASSERT (current == dev->rad_scheduler);
289 switch (tx->tx_buftype) {
294 case RANAL_BUF_IMMEDIATE:
295 case RANAL_BUF_PHYS_MAPPED:
296 case RANAL_BUF_VIRT_MAPPED:
299 case RANAL_BUF_PHYS_UNMAPPED:
300 rrc = RapkRegisterPhys(dev->rad_handle,
301 tx->tx_phys, tx->tx_phys_npages,
303 if (rrc != RAP_SUCCESS) {
304 CERROR ("Can't map %d pages: dev %d "
305 "phys %u pp %u, virt %u nob %lu\n",
306 tx->tx_phys_npages, dev->rad_id,
307 dev->rad_nphysmap, dev->rad_nppphysmap,
308 dev->rad_nvirtmap, dev->rad_nobvirtmap);
309 return -ENOMEM; /* assume insufficient resources */
313 dev->rad_nppphysmap += tx->tx_phys_npages;
315 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
318 case RANAL_BUF_VIRT_UNMAPPED:
319 rrc = RapkRegisterMemory(dev->rad_handle,
320 tx->tx_buffer, tx->tx_nob,
322 if (rrc != RAP_SUCCESS) {
323 CERROR ("Can't map %d bytes: dev %d "
324 "phys %u pp %u, virt %u nob %lu\n",
325 tx->tx_nob, dev->rad_id,
326 dev->rad_nphysmap, dev->rad_nppphysmap,
327 dev->rad_nvirtmap, dev->rad_nobvirtmap);
328 return -ENOMEM; /* assume insufficient resources */
332 dev->rad_nobvirtmap += tx->tx_nob;
334 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
340 kranal_unmap_buffer (kra_tx_t *tx)
345 switch (tx->tx_buftype) {
350 case RANAL_BUF_IMMEDIATE:
351 case RANAL_BUF_PHYS_UNMAPPED:
352 case RANAL_BUF_VIRT_UNMAPPED:
355 case RANAL_BUF_PHYS_MAPPED:
356 LASSERT (tx->tx_conn != NULL);
357 dev = tx->tx_conn->rac_device;
358 LASSERT (current == dev->rad_scheduler);
359 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
361 LASSERT (rrc == RAP_SUCCESS);
364 dev->rad_nppphysmap -= tx->tx_phys_npages;
366 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
369 case RANAL_BUF_VIRT_MAPPED:
370 LASSERT (tx->tx_conn != NULL);
371 dev = tx->tx_conn->rac_device;
372 LASSERT (current == dev->rad_scheduler);
373 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
375 LASSERT (rrc == RAP_SUCCESS);
378 dev->rad_nobvirtmap -= tx->tx_nob;
380 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
386 kranal_tx_done (kra_tx_t *tx, int completion)
388 lnet_msg_t *lnetmsg[2];
392 LASSERT (!in_interrupt());
394 kranal_unmap_buffer(tx);
396 lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
397 lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
399 tx->tx_buftype = RANAL_BUF_NONE;
400 tx->tx_msg.ram_type = RANAL_MSG_NONE;
403 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
405 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
407 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
409 /* finalize AFTER freeing lnet msgs */
410 for (i = 0; i < 2; i++) {
411 if (lnetmsg[i] == NULL)
414 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
419 kranal_find_conn_locked (kra_peer_t *peer)
421 struct list_head *tmp;
423 /* just return the first connection */
424 list_for_each (tmp, &peer->rap_conns) {
425 return list_entry(tmp, kra_conn_t, rac_list);
432 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
438 spin_lock_irqsave(&conn->rac_lock, flags);
439 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
440 tx->tx_qtime = jiffies;
441 spin_unlock_irqrestore(&conn->rac_lock, flags);
443 kranal_schedule_conn(conn);
447 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
454 rwlock_t *g_lock = &kranal_data.kra_global_lock;
456 /* If I get here, I've committed to send, so I complete the tx with
457 * failure on any problems */
459 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
461 for (retry = 0; ; retry = 1) {
465 peer = kranal_find_peer_locked(nid);
467 conn = kranal_find_conn_locked(peer);
469 kranal_post_fma(conn, tx);
475 /* Making connections; I'll need a write lock... */
477 write_lock_irqsave(g_lock, flags);
479 peer = kranal_find_peer_locked(nid);
483 write_unlock_irqrestore(g_lock, flags);
486 CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
487 kranal_tx_done(tx, -EHOSTUNREACH);
491 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
492 lnet_acceptor_port());
494 CERROR("Can't add peer %s: %d\n",
495 libcfs_nid2str(nid), rc);
496 kranal_tx_done(tx, rc);
501 conn = kranal_find_conn_locked(peer);
503 /* Connection exists; queue message on it */
504 kranal_post_fma(conn, tx);
505 write_unlock_irqrestore(g_lock, flags);
509 LASSERT (peer->rap_persistence > 0);
511 if (!peer->rap_connecting) {
512 LASSERT (list_empty(&peer->rap_tx_queue));
514 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
515 time_after_eq(jiffies, peer->rap_reconnect_time))) {
516 write_unlock_irqrestore(g_lock, flags);
517 kranal_tx_done(tx, -EHOSTUNREACH);
521 peer->rap_connecting = 1;
522 kranal_peer_addref(peer); /* extra ref for connd */
524 spin_lock(&kranal_data.kra_connd_lock);
526 list_add_tail(&peer->rap_connd_list,
527 &kranal_data.kra_connd_peers);
528 wake_up(&kranal_data.kra_connd_waitq);
530 spin_unlock(&kranal_data.kra_connd_lock);
533 /* A connection is being established; queue the message... */
534 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
536 write_unlock_irqrestore(g_lock, flags);
540 kranal_rdma(kra_tx_t *tx, int type,
541 kra_rdma_desc_t *sink, int nob, __u64 cookie)
543 kra_conn_t *conn = tx->tx_conn;
547 LASSERT (kranal_tx_mapped(tx));
548 LASSERT (nob <= sink->rard_nob);
549 LASSERT (nob <= tx->tx_nob);
551 /* No actual race with scheduler sending CLOSE (I'm she!) */
552 LASSERT (current == conn->rac_device->rad_scheduler);
554 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
555 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
556 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
557 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
558 tx->tx_rdma_desc.DstKey = sink->rard_key;
559 tx->tx_rdma_desc.Length = nob;
560 tx->tx_rdma_desc.AppPtr = tx;
562 /* prep final completion message */
563 kranal_init_msg(&tx->tx_msg, type);
564 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
566 if (nob == 0) { /* Immediate completion */
567 kranal_post_fma(conn, tx);
571 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
573 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
574 LASSERT (rrc == RAP_SUCCESS);
576 spin_lock_irqsave(&conn->rac_lock, flags);
577 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
578 tx->tx_qtime = jiffies;
579 spin_unlock_irqrestore(&conn->rac_lock, flags);
583 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
585 __u32 nob_received = nob;
588 LASSERT (conn->rac_rxmsg != NULL);
589 CDEBUG(D_NET, "Consuming %p\n", conn);
591 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
592 &nob_received, sizeof(kra_msg_t));
593 LASSERT (rrc == RAP_SUCCESS);
595 conn->rac_rxmsg = NULL;
597 if (nob_received < nob) {
598 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
599 libcfs_nid2str(conn->rac_peer->rap_nid),
608 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
610 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
611 int type = lntmsg->msg_type;
612 lnet_process_id_t target = lntmsg->msg_target;
613 int target_is_router = lntmsg->msg_target_is_router;
614 int routing = lntmsg->msg_routing;
615 unsigned int niov = lntmsg->msg_niov;
616 struct iovec *iov = lntmsg->msg_iov;
617 lnet_kiov_t *kiov = lntmsg->msg_kiov;
618 unsigned int offset = lntmsg->msg_offset;
619 unsigned int nob = lntmsg->msg_len;
623 /* NB 'private' is different depending on what we're sending.... */
625 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
626 nob, niov, libcfs_id2str(target));
628 LASSERT (nob == 0 || niov > 0);
629 LASSERT (niov <= LNET_MAX_IOV);
631 LASSERT (!in_interrupt());
632 /* payload is either all vaddrs or all pages */
633 LASSERT (!(kiov != NULL && iov != NULL));
636 CERROR ("Can't route\n");
651 /* We have to consider the eventual sink buffer rather than any
652 * payload passed here (there isn't any, and strictly, looking
653 * inside lntmsg is a layering violation). We send a simple
654 * IMMEDIATE GET if the sink buffer is mapped already and small
657 if (routing || target_is_router)
658 break; /* send IMMEDIATE */
660 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
661 lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
662 lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
663 break; /* send IMMEDIATE */
665 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
669 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
670 rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
671 lntmsg->msg_md->md_iov.iov,
672 0, lntmsg->msg_md->md_length);
674 rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
675 lntmsg->msg_md->md_iov.kiov,
676 0, lntmsg->msg_md->md_length);
678 kranal_tx_done(tx, rc);
682 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
683 if (tx->tx_lntmsg[1] == NULL) {
684 CERROR("Can't create reply for GET to %s\n",
685 libcfs_nid2str(target.nid));
686 kranal_tx_done(tx, rc);
690 tx->tx_lntmsg[0] = lntmsg;
691 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
692 /* rest of tx_msg is setup just before it is sent */
693 kranal_launch_tx(tx, target.nid);
698 if (kiov == NULL && /* not paged */
699 nob <= RANAL_FMA_MAX_DATA && /* small enough */
700 nob <= *kranal_tunables.kra_max_immediate)
701 break; /* send IMMEDIATE */
703 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
707 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709 kranal_tx_done(tx, rc);
713 tx->tx_lntmsg[0] = lntmsg;
714 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
715 /* rest of tx_msg is setup just before it is sent */
716 kranal_launch_tx(tx, target.nid);
722 LASSERT (kiov == NULL);
723 LASSERT (nob <= RANAL_FMA_MAX_DATA);
725 tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
729 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
731 kranal_tx_done(tx, rc);
735 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
736 tx->tx_lntmsg[0] = lntmsg;
737 kranal_launch_tx(tx, target.nid);
742 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
744 kra_msg_t *rxmsg = conn->rac_rxmsg;
745 unsigned int niov = lntmsg->msg_niov;
746 struct iovec *iov = lntmsg->msg_iov;
747 lnet_kiov_t *kiov = lntmsg->msg_kiov;
748 unsigned int offset = lntmsg->msg_offset;
749 unsigned int nob = lntmsg->msg_len;
753 tx = kranal_get_idle_tx();
757 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
763 rc = kranal_map_buffer(tx);
767 tx->tx_lntmsg[0] = lntmsg;
769 kranal_rdma(tx, RANAL_MSG_GET_DONE,
770 &rxmsg->ram_u.get.ragm_desc, nob,
771 rxmsg->ram_u.get.ragm_cookie);
775 kranal_tx_done(tx, -EIO);
777 lnet_finalize(ni, lntmsg, -EIO);
781 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
784 kra_conn_t *conn = (kra_conn_t *)private;
786 LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
787 libcfs_nid2str(conn->rac_peer->rap_nid));
793 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
794 int delayed, unsigned int niov,
795 struct iovec *iov, lnet_kiov_t *kiov,
796 unsigned int offset, unsigned int mlen, unsigned int rlen)
798 kra_conn_t *conn = private;
799 kra_msg_t *rxmsg = conn->rac_rxmsg;
804 LASSERT (mlen <= rlen);
805 LASSERT (!in_interrupt());
806 /* Either all pages or all vaddrs */
807 LASSERT (!(kiov != NULL && iov != NULL));
809 CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
811 switch(rxmsg->ram_type) {
815 case RANAL_MSG_IMMEDIATE:
818 } else if (kiov != NULL) {
819 CERROR("Can't recv immediate into paged buffer\n");
823 while (offset >= iov->iov_len) {
824 offset -= iov->iov_len;
829 if (mlen > iov->iov_len - offset) {
830 CERROR("Can't handle immediate frags\n");
833 buffer = ((char *)iov->iov_base) + offset;
835 rc = kranal_consume_rxmsg(conn, buffer, mlen);
836 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
839 case RANAL_MSG_PUT_REQ:
840 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
842 kranal_consume_rxmsg(conn, NULL, 0);
846 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
848 kranal_tx_done(tx, rc);
849 kranal_consume_rxmsg(conn, NULL, 0);
854 rc = kranal_map_buffer(tx);
856 kranal_tx_done(tx, rc);
857 kranal_consume_rxmsg(conn, NULL, 0);
861 tx->tx_msg.ram_u.putack.rapam_src_cookie =
862 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
863 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
864 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
865 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
866 (__u64)((unsigned long)tx->tx_buffer);
867 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
869 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
871 kranal_post_fma(conn, tx);
872 kranal_consume_rxmsg(conn, NULL, 0);
875 case RANAL_MSG_GET_REQ:
876 if (lntmsg != NULL) {
878 kranal_reply(ni, conn, lntmsg);
881 tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
883 tx->tx_msg.ram_u.completion.racm_cookie =
884 rxmsg->ram_u.get.ragm_cookie;
885 kranal_post_fma(conn, tx);
888 kranal_consume_rxmsg(conn, NULL, 0);
894 kranal_thread_start (int(*fn)(void *arg), void *arg)
896 long pid = kernel_thread(fn, arg, 0);
901 atomic_inc(&kranal_data.kra_nthreads);
906 kranal_thread_fini (void)
908 atomic_dec(&kranal_data.kra_nthreads);
912 kranal_check_conn_timeouts (kra_conn_t *conn)
915 struct list_head *ttmp;
918 unsigned long now = jiffies;
920 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
921 conn->rac_state == RANAL_CONN_CLOSING);
923 if (!conn->rac_close_sent &&
924 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
925 /* not sent in a while; schedule conn so scheduler sends a keepalive */
926 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
927 conn, libcfs_nid2str(conn->rac_peer->rap_nid));
928 kranal_schedule_conn(conn);
931 timeout = conn->rac_timeout * HZ;
933 if (!conn->rac_close_recvd &&
934 time_after_eq(now, conn->rac_last_rx + timeout)) {
935 CERROR("%s received from %s within %lu seconds\n",
936 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
937 "Nothing" : "CLOSE not",
938 libcfs_nid2str(conn->rac_peer->rap_nid),
939 (now - conn->rac_last_rx)/HZ);
943 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
946 /* Check the conn's queues are moving. These are "belt+braces" checks,
947 * in case of hardware/software errors that make this conn seem
948 * responsive even though it isn't progressing its message queues. */
950 spin_lock_irqsave(&conn->rac_lock, flags);
952 list_for_each (ttmp, &conn->rac_fmaq) {
953 tx = list_entry(ttmp, kra_tx_t, tx_list);
955 if (time_after_eq(now, tx->tx_qtime + timeout)) {
956 spin_unlock_irqrestore(&conn->rac_lock, flags);
957 CERROR("tx on fmaq for %s blocked %lu seconds\n",
958 libcfs_nid2str(conn->rac_peer->rap_nid),
959 (now - tx->tx_qtime)/HZ);
964 list_for_each (ttmp, &conn->rac_rdmaq) {
965 tx = list_entry(ttmp, kra_tx_t, tx_list);
967 if (time_after_eq(now, tx->tx_qtime + timeout)) {
968 spin_unlock_irqrestore(&conn->rac_lock, flags);
969 CERROR("tx on rdmaq for %s blocked %lu seconds\n",
970 libcfs_nid2str(conn->rac_peer->rap_nid),
971 (now - tx->tx_qtime)/HZ);
976 list_for_each (ttmp, &conn->rac_replyq) {
977 tx = list_entry(ttmp, kra_tx_t, tx_list);
979 if (time_after_eq(now, tx->tx_qtime + timeout)) {
980 spin_unlock_irqrestore(&conn->rac_lock, flags);
981 CERROR("tx on replyq for %s blocked %lu seconds\n",
982 libcfs_nid2str(conn->rac_peer->rap_nid),
983 (now - tx->tx_qtime)/HZ);
988 spin_unlock_irqrestore(&conn->rac_lock, flags);
993 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
995 struct list_head *conns = &kranal_data.kra_conns[idx];
996 struct list_head *ctmp;
1002 /* NB. We expect to check all the conns and not find any problems, so
1003 * we just use a shared lock while we take a look... */
1004 read_lock(&kranal_data.kra_global_lock);
1006 list_for_each (ctmp, conns) {
1007 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
1009 if (conn->rac_timeout < *min_timeoutp )
1010 *min_timeoutp = conn->rac_timeout;
1011 if (conn->rac_keepalive < *min_timeoutp )
1012 *min_timeoutp = conn->rac_keepalive;
1014 rc = kranal_check_conn_timeouts(conn);
1018 kranal_conn_addref(conn);
1019 read_unlock(&kranal_data.kra_global_lock);
1021 CERROR("Conn to %s, cqid %d timed out\n",
1022 libcfs_nid2str(conn->rac_peer->rap_nid),
1025 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1027 switch (conn->rac_state) {
1031 case RANAL_CONN_ESTABLISHED:
1032 kranal_close_conn_locked(conn, -ETIMEDOUT);
1035 case RANAL_CONN_CLOSING:
1036 kranal_terminate_conn_locked(conn);
1040 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1042 kranal_conn_decref(conn);
1044 /* start again now I've dropped the lock */
1048 read_unlock(&kranal_data.kra_global_lock);
1052 kranal_connd (void *arg)
1054 long id = (long)arg;
1057 unsigned long flags;
1059 kra_acceptsock_t *ras;
1062 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1063 cfs_daemonize(name);
1064 cfs_block_allsigs();
1066 init_waitqueue_entry(&wait, current);
1068 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1070 while (!kranal_data.kra_shutdown) {
1073 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1074 ras = list_entry(kranal_data.kra_connd_acceptq.next,
1075 kra_acceptsock_t, ras_list);
1076 list_del(&ras->ras_list);
1078 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1080 CDEBUG(D_NET,"About to handshake someone\n");
1082 kranal_conn_handshake(ras->ras_sock, NULL);
1083 kranal_free_acceptsock(ras);
1085 CDEBUG(D_NET,"Finished handshaking someone\n");
1087 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1091 if (!list_empty(&kranal_data.kra_connd_peers)) {
1092 peer = list_entry(kranal_data.kra_connd_peers.next,
1093 kra_peer_t, rap_connd_list);
1095 list_del_init(&peer->rap_connd_list);
1096 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1098 kranal_connect(peer);
1099 kranal_peer_decref(peer);
1101 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1108 set_current_state(TASK_INTERRUPTIBLE);
1109 add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
1111 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1115 set_current_state(TASK_RUNNING);
1116 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1118 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1121 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1123 kranal_thread_fini();
1128 kranal_update_reaper_timeout(long timeout)
1130 unsigned long flags;
1132 LASSERT (timeout > 0);
1134 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1136 if (timeout < kranal_data.kra_new_min_timeout)
1137 kranal_data.kra_new_min_timeout = timeout;
1139 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1143 kranal_reaper (void *arg)
1146 unsigned long flags;
1149 int conn_entries = kranal_data.kra_conn_hash_size;
1151 int base_index = conn_entries - 1;
1152 unsigned long next_check_time = jiffies;
1153 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1154 long current_min_timeout = 1;
1156 cfs_daemonize("kranal_reaper");
1157 cfs_block_allsigs();
1159 init_waitqueue_entry(&wait, current);
1161 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1163 while (!kranal_data.kra_shutdown) {
1164 /* I wake up every 'p' seconds to check for timeouts on some
1165 * more peers. I try to check every connection 'n' times
1166 * within the global minimum of all keepalive and timeout
1167 * intervals, to ensure I attend to every connection within
1168 * (n+1)/n times its timeout intervals. */
1171 unsigned long min_timeout;
1174 /* careful with the jiffy wrap... */
1175 timeout = (long)(next_check_time - jiffies);
1177 set_current_state(TASK_INTERRUPTIBLE);
1178 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1180 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1182 schedule_timeout(timeout);
1184 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1186 set_current_state(TASK_RUNNING);
1187 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1191 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1192 /* new min timeout set: restart min timeout scan */
1193 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1194 base_index = conn_index - 1;
1196 base_index = conn_entries - 1;
1198 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1199 current_min_timeout = kranal_data.kra_new_min_timeout;
1200 CDEBUG(D_NET, "Set new min timeout %ld\n",
1201 current_min_timeout);
1204 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1206 min_timeout = current_min_timeout;
1208 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1210 LASSERT (min_timeout > 0);
1212 /* Compute how many table entries to check now so I get round
1213 * the whole table fast enough given that I do this at fixed
1214 * intervals of 'p' seconds) */
1215 chunk = conn_entries;
1216 if (min_timeout > n * p)
1217 chunk = (chunk * n * p) / min_timeout;
1221 for (i = 0; i < chunk; i++) {
1222 kranal_reaper_check(conn_index,
1224 conn_index = (conn_index + 1) % conn_entries;
1227 next_check_time += p * HZ;
1229 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1231 if (((conn_index - chunk <= base_index &&
1232 base_index < conn_index) ||
1233 (conn_index - conn_entries - chunk <= base_index &&
1234 base_index < conn_index - conn_entries))) {
1236 /* Scanned all conns: set current_min_timeout... */
1237 if (current_min_timeout != next_min_timeout) {
1238 current_min_timeout = next_min_timeout;
1239 CDEBUG(D_NET, "Set new min timeout %ld\n",
1240 current_min_timeout);
1243 /* ...and restart min timeout scan */
1244 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1245 base_index = conn_index - 1;
1247 base_index = conn_entries - 1;
1251 kranal_thread_fini();
1256 kranal_check_rdma_cq (kra_device_t *dev)
1261 unsigned long flags;
1262 RAP_RDMA_DESCRIPTOR *desc;
1267 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1268 if (rrc == RAP_NOT_DONE) {
1269 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1273 LASSERT (rrc == RAP_SUCCESS);
1274 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1276 read_lock(&kranal_data.kra_global_lock);
1278 conn = kranal_cqid2conn_locked(cqid);
1280 /* Conn was destroyed? */
1281 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1282 read_unlock(&kranal_data.kra_global_lock);
1286 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1287 LASSERT (rrc == RAP_SUCCESS);
1289 CDEBUG(D_NET, "Completed %p\n",
1290 list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1292 spin_lock_irqsave(&conn->rac_lock, flags);
1294 LASSERT (!list_empty(&conn->rac_rdmaq));
1295 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1296 list_del(&tx->tx_list);
1298 LASSERT(desc->AppPtr == (void *)tx);
1299 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1300 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1302 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1303 tx->tx_qtime = jiffies;
1305 spin_unlock_irqrestore(&conn->rac_lock, flags);
1307 /* Get conn's fmaq processed, now I've just put something
1309 kranal_schedule_conn(conn);
1311 read_unlock(&kranal_data.kra_global_lock);
1316 kranal_check_fma_cq (kra_device_t *dev)
1322 struct list_head *conns;
1323 struct list_head *tmp;
1327 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1328 if (rrc == RAP_NOT_DONE) {
1329 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1333 LASSERT (rrc == RAP_SUCCESS);
1335 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1337 read_lock(&kranal_data.kra_global_lock);
1339 conn = kranal_cqid2conn_locked(cqid);
1341 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1344 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1346 kranal_schedule_conn(conn);
1349 read_unlock(&kranal_data.kra_global_lock);
1353 /* FMA CQ has overflowed: check ALL conns */
1354 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
1357 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1359 read_lock(&kranal_data.kra_global_lock);
1361 conns = &kranal_data.kra_conns[i];
1363 list_for_each (tmp, conns) {
1364 conn = list_entry(tmp, kra_conn_t,
1367 if (conn->rac_device == dev)
1368 kranal_schedule_conn(conn);
1371 /* don't block write lockers for too long... */
1372 read_unlock(&kranal_data.kra_global_lock);
1378 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1379 void *immediate, int immediatenob)
1381 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1384 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1385 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1386 immediate, immediatenob);
1388 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1389 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1390 immediatenob <= RANAL_FMA_MAX_DATA :
1393 msg->ram_connstamp = conn->rac_my_connstamp;
1394 msg->ram_seq = conn->rac_tx_seq;
1397 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1398 immediate, immediatenob,
1401 rrc = RapkFmaSend(conn->rac_rihandle,
1402 immediate, immediatenob,
1410 conn->rac_last_tx = jiffies;
1415 if (time_after_eq(jiffies,
1416 conn->rac_last_tx + conn->rac_keepalive*HZ))
1417 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1418 msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1424 kranal_process_fmaq (kra_conn_t *conn)
1426 unsigned long flags;
1432 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1433 * However I will be rescheduled by an FMA completion event
1434 * when I eventually get some.
1435 * NB 2. Sampling rac_state here races with setting it elsewhere.
1436 * But it doesn't matter if I try to send a "real" message just
1437 * as I start closing because I'll get scheduled to send the
1440 /* Not racing with incoming message processing! */
1441 LASSERT (current == conn->rac_device->rad_scheduler);
1443 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1444 if (!list_empty(&conn->rac_rdmaq)) {
1445 /* RDMAs in progress */
1446 LASSERT (!conn->rac_close_sent);
1448 if (time_after_eq(jiffies,
1450 conn->rac_keepalive * HZ)) {
1451 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1452 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1453 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1458 if (conn->rac_close_sent)
1461 CWARN("sending CLOSE to %s\n",
1462 libcfs_nid2str(conn->rac_peer->rap_nid));
1463 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1464 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1468 conn->rac_close_sent = 1;
1469 if (!conn->rac_close_recvd)
1472 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1474 if (conn->rac_state == RANAL_CONN_CLOSING)
1475 kranal_terminate_conn_locked(conn);
1477 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1481 spin_lock_irqsave(&conn->rac_lock, flags);
1483 if (list_empty(&conn->rac_fmaq)) {
1485 spin_unlock_irqrestore(&conn->rac_lock, flags);
1487 if (time_after_eq(jiffies,
1488 conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1489 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1490 libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1491 (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1492 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1493 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1498 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1499 list_del(&tx->tx_list);
1500 more_to_do = !list_empty(&conn->rac_fmaq);
1502 spin_unlock_irqrestore(&conn->rac_lock, flags);
1505 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1506 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1507 switch (tx->tx_msg.ram_type) {
1511 case RANAL_MSG_IMMEDIATE:
1512 rc = kranal_sendmsg(conn, &tx->tx_msg,
1513 tx->tx_buffer, tx->tx_nob);
1516 case RANAL_MSG_PUT_NAK:
1517 case RANAL_MSG_PUT_DONE:
1518 case RANAL_MSG_GET_NAK:
1519 case RANAL_MSG_GET_DONE:
1520 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1523 case RANAL_MSG_PUT_REQ:
1524 rc = kranal_map_buffer(tx);
1525 LASSERT (rc != -EAGAIN);
1529 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1530 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1534 case RANAL_MSG_PUT_ACK:
1535 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1539 case RANAL_MSG_GET_REQ:
1540 rc = kranal_map_buffer(tx);
1541 LASSERT (rc != -EAGAIN);
1545 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1546 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1547 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1548 (__u64)((unsigned long)tx->tx_buffer);
1549 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1550 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1555 if (rc == -EAGAIN) {
1556 /* I need credits to send this. Replace tx at the head of the
1557 * fmaq and I'll get rescheduled when credits appear */
1558 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1559 spin_lock_irqsave(&conn->rac_lock, flags);
1560 list_add(&tx->tx_list, &conn->rac_fmaq);
1561 spin_unlock_irqrestore(&conn->rac_lock, flags);
1565 if (!expect_reply || rc != 0) {
1566 kranal_tx_done(tx, rc);
1568 /* LASSERT(current) above ensures this doesn't race with reply
1570 spin_lock_irqsave(&conn->rac_lock, flags);
1571 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1572 tx->tx_qtime = jiffies;
1573 spin_unlock_irqrestore(&conn->rac_lock, flags);
1577 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1578 kranal_schedule_conn(conn);
1583 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1585 __swab64s(&d->rard_key.Key);
1586 __swab16s(&d->rard_key.Cookie);
1587 __swab16s(&d->rard_key.MdHandle);
1588 __swab32s(&d->rard_key.Flags);
1589 __swab64s(&d->rard_addr.AddressBits);
1590 __swab32s(&d->rard_nob);
1594 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1596 struct list_head *ttmp;
1598 unsigned long flags;
1600 spin_lock_irqsave(&conn->rac_lock, flags);
1602 list_for_each(ttmp, &conn->rac_replyq) {
1603 tx = list_entry(ttmp, kra_tx_t, tx_list);
1605 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1606 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1608 if (tx->tx_cookie != cookie)
1611 if (tx->tx_msg.ram_type != type) {
1612 spin_unlock_irqrestore(&conn->rac_lock, flags);
1613 CWARN("Unexpected type %x (%x expected) "
1614 "matched reply from %s\n",
1615 tx->tx_msg.ram_type, type,
1616 libcfs_nid2str(conn->rac_peer->rap_nid));
1620 list_del(&tx->tx_list);
1621 spin_unlock_irqrestore(&conn->rac_lock, flags);
1625 spin_unlock_irqrestore(&conn->rac_lock, flags);
1626 CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1627 type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1632 kranal_check_fma_rx (kra_conn_t *conn)
1634 unsigned long flags;
1639 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1640 kra_peer_t *peer = conn->rac_peer;
1644 if (rrc == RAP_NOT_DONE)
1647 CDEBUG(D_NET, "RX on %p\n", conn);
1649 LASSERT (rrc == RAP_SUCCESS);
1650 conn->rac_last_rx = jiffies;
1651 seq = conn->rac_rx_seq++;
1652 msg = (kra_msg_t *)prefix;
1654 /* stash message for portals callbacks they'll NULL
1655 * rac_rxmsg if they consume it */
1656 LASSERT (conn->rac_rxmsg == NULL);
1657 conn->rac_rxmsg = msg;
1659 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1660 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1661 CERROR("Unexpected magic %08x from %s\n",
1662 msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1667 __swab32s(&msg->ram_magic);
1668 __swab16s(&msg->ram_version);
1669 __swab16s(&msg->ram_type);
1670 __swab64s(&msg->ram_srcnid);
1671 __swab64s(&msg->ram_connstamp);
1672 __swab32s(&msg->ram_seq);
1674 /* NB message type checked below; NOT here... */
1675 switch (msg->ram_type) {
1676 case RANAL_MSG_PUT_ACK:
1677 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1680 case RANAL_MSG_GET_REQ:
1681 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1689 if (msg->ram_version != RANAL_MSG_VERSION) {
1690 CERROR("Unexpected protocol version %d from %s\n",
1691 msg->ram_version, libcfs_nid2str(peer->rap_nid));
1696 if (msg->ram_srcnid != peer->rap_nid) {
1697 CERROR("Unexpected peer %s from %s\n",
1698 libcfs_nid2str(msg->ram_srcnid),
1699 libcfs_nid2str(peer->rap_nid));
1704 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1705 CERROR("Unexpected connstamp "LPX64"("LPX64
1706 " expected) from %s\n",
1707 msg->ram_connstamp, conn->rac_peer_connstamp,
1708 libcfs_nid2str(peer->rap_nid));
1713 if (msg->ram_seq != seq) {
1714 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1715 msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1720 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1721 /* This message signals RDMA completion... */
1722 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1723 if (rrc != RAP_SUCCESS) {
1724 CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1730 if (conn->rac_close_recvd) {
1731 CERROR("Unexpected message %d after CLOSE from %s\n",
1732 msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1737 if (msg->ram_type == RANAL_MSG_CLOSE) {
1738 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1739 conn->rac_close_recvd = 1;
1740 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1742 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1743 kranal_close_conn_locked(conn, 0);
1744 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1745 conn->rac_close_sent)
1746 kranal_terminate_conn_locked(conn);
1748 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1752 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1755 switch (msg->ram_type) {
1756 case RANAL_MSG_NOOP:
1757 /* Nothing to do; just a keepalive */
1758 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1761 case RANAL_MSG_IMMEDIATE:
1762 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1763 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
1764 msg->ram_srcnid, conn, 0);
1768 case RANAL_MSG_PUT_REQ:
1769 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1770 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
1771 msg->ram_srcnid, conn, 1);
1775 case RANAL_MSG_PUT_NAK:
1776 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1777 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1778 msg->ram_u.completion.racm_cookie);
1782 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1783 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1784 kranal_tx_done(tx, -ENOENT); /* no match */
1787 case RANAL_MSG_PUT_ACK:
1788 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1789 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1790 msg->ram_u.putack.rapam_src_cookie);
1794 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1795 &msg->ram_u.putack.rapam_desc,
1796 msg->ram_u.putack.rapam_desc.rard_nob,
1797 msg->ram_u.putack.rapam_dst_cookie);
1800 case RANAL_MSG_PUT_DONE:
1801 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1802 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1803 msg->ram_u.completion.racm_cookie);
1807 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1808 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1809 kranal_tx_done(tx, 0);
1812 case RANAL_MSG_GET_REQ:
1813 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1814 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
1815 msg->ram_srcnid, conn, 1);
1819 case RANAL_MSG_GET_NAK:
1820 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1821 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1822 msg->ram_u.completion.racm_cookie);
1826 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1827 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1828 kranal_tx_done(tx, -ENOENT); /* no match */
1831 case RANAL_MSG_GET_DONE:
1832 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1833 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1834 msg->ram_u.completion.racm_cookie);
1838 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1839 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1841 /* completion message should send rdma length if we ever allow
1843 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1845 kranal_tx_done(tx, 0);
1850 if (rc < 0) /* protocol/comms error */
1851 kranal_close_conn (conn, rc);
1853 if (repost && conn->rac_rxmsg != NULL)
1854 kranal_consume_rxmsg(conn, NULL, 0);
1856 /* check again later */
1857 kranal_schedule_conn(conn);
1861 kranal_complete_closed_conn (kra_conn_t *conn)
1867 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1868 LASSERT (list_empty(&conn->rac_list));
1869 LASSERT (list_empty(&conn->rac_hashlist));
1871 for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1872 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1874 list_del(&tx->tx_list);
1875 kranal_tx_done(tx, -ECONNABORTED);
1878 LASSERT (list_empty(&conn->rac_rdmaq));
1880 for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1881 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1883 list_del(&tx->tx_list);
1884 kranal_tx_done(tx, -ECONNABORTED);
1887 CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1888 conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1892 kranal_process_new_conn (kra_conn_t *conn)
1896 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1897 if (rrc == RAP_SUCCESS)
1900 LASSERT (rrc == RAP_NOT_DONE);
1901 if (!time_after_eq(jiffies, conn->rac_last_tx +
1902 conn->rac_timeout * HZ))
1906 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1907 LASSERT (rrc == RAP_SUCCESS);
1912 kranal_scheduler (void *arg)
1914 kra_device_t *dev = (kra_device_t *)arg;
1918 unsigned long flags;
1919 unsigned long deadline;
1920 unsigned long soonest;
1923 struct list_head *tmp;
1924 struct list_head *nxt;
1929 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1930 cfs_daemonize(name);
1931 cfs_block_allsigs();
1933 dev->rad_scheduler = current;
1934 init_waitqueue_entry(&wait, current);
1936 spin_lock_irqsave(&dev->rad_lock, flags);
1938 while (!kranal_data.kra_shutdown) {
1939 /* Safe: kra_shutdown only set when quiescent */
1941 if (busy_loops++ >= RANAL_RESCHED) {
1942 spin_unlock_irqrestore(&dev->rad_lock, flags);
1947 spin_lock_irqsave(&dev->rad_lock, flags);
1952 if (dev->rad_ready) {
1953 /* Device callback fired since I last checked it */
1955 spin_unlock_irqrestore(&dev->rad_lock, flags);
1958 kranal_check_rdma_cq(dev);
1959 kranal_check_fma_cq(dev);
1961 spin_lock_irqsave(&dev->rad_lock, flags);
1964 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1965 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1967 list_del_init(&conn->rac_schedlist);
1968 LASSERT (conn->rac_scheduled);
1969 conn->rac_scheduled = 0;
1970 spin_unlock_irqrestore(&dev->rad_lock, flags);
1973 kranal_check_fma_rx(conn);
1974 kranal_process_fmaq(conn);
1976 if (conn->rac_state == RANAL_CONN_CLOSED)
1977 kranal_complete_closed_conn(conn);
1979 kranal_conn_decref(conn);
1980 spin_lock_irqsave(&dev->rad_lock, flags);
1986 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
1987 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1989 deadline = conn->rac_last_tx + conn->rac_keepalive;
1990 if (time_after_eq(jiffies, deadline)) {
1991 /* Time to process this new conn */
1992 spin_unlock_irqrestore(&dev->rad_lock, flags);
1995 rc = kranal_process_new_conn(conn);
1996 if (rc != -EAGAIN) {
1997 /* All done with this conn */
1998 spin_lock_irqsave(&dev->rad_lock, flags);
1999 list_del_init(&conn->rac_schedlist);
2000 spin_unlock_irqrestore(&dev->rad_lock, flags);
2002 kranal_conn_decref(conn);
2003 spin_lock_irqsave(&dev->rad_lock, flags);
2007 /* retry with exponential backoff until HZ */
2008 if (conn->rac_keepalive == 0)
2009 conn->rac_keepalive = 1;
2010 else if (conn->rac_keepalive <= HZ)
2011 conn->rac_keepalive *= 2;
2013 conn->rac_keepalive += HZ;
2015 deadline = conn->rac_last_tx + conn->rac_keepalive;
2016 spin_lock_irqsave(&dev->rad_lock, flags);
2019 /* Does this conn need attention soonest? */
2020 if (nsoonest++ == 0 ||
2021 !time_after_eq(deadline, soonest))
2025 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2028 set_current_state(TASK_INTERRUPTIBLE);
2029 add_wait_queue_exclusive(&dev->rad_waitq, &wait);
2030 spin_unlock_irqrestore(&dev->rad_lock, flags);
2032 if (nsoonest == 0) {
2036 timeout = (long)(soonest - jiffies);
2039 schedule_timeout(timeout);
2043 remove_wait_queue(&dev->rad_waitq, &wait);
2044 set_current_state(TASK_RUNNING);
2045 spin_lock_irqsave(&dev->rad_lock, flags);
2048 spin_unlock_irqrestore(&dev->rad_lock, flags);
2050 dev->rad_scheduler = NULL;
2051 kranal_thread_fini();