4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ralnd/ralnd_cb.c
38 * Author: Eric Barton <eric@bartonsoftware.com>
44 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
50 CDEBUG(D_NET, "callback for device %d\n", devid);
52 for (i = 0; i < kranal_data.kra_ndevs; i++) {
54 dev = &kranal_data.kra_devices[i];
55 if (dev->rad_id != devid)
58 spin_lock_irqsave(&dev->rad_lock, flags);
60 if (!dev->rad_ready) {
62 wake_up(&dev->rad_waitq);
65 spin_unlock_irqrestore(&dev->rad_lock, flags);
69 CWARN("callback for unknown device %d\n", devid);
73 kranal_schedule_conn(kra_conn_t *conn)
75 kra_device_t *dev = conn->rac_device;
78 spin_lock_irqsave(&dev->rad_lock, flags);
80 if (!conn->rac_scheduled) {
81 kranal_conn_addref(conn); /* +1 ref for scheduler */
82 conn->rac_scheduled = 1;
83 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
84 wake_up(&dev->rad_waitq);
87 spin_unlock_irqrestore(&dev->rad_lock, flags);
91 kranal_get_idle_tx (void)
96 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
98 if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
99 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
103 tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
104 cfs_list_del(&tx->tx_list);
106 /* Allocate a new completion cookie. It might not be needed, but we've
107 * got a lock right now... */
108 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
110 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
112 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
113 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
114 LASSERT (tx->tx_conn == NULL);
115 LASSERT (tx->tx_lntmsg[0] == NULL);
116 LASSERT (tx->tx_lntmsg[1] == NULL);
122 kranal_init_msg(kra_msg_t *msg, int type)
124 msg->ram_magic = RANAL_MSG_MAGIC;
125 msg->ram_version = RANAL_MSG_VERSION;
126 msg->ram_type = type;
127 msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
128 /* ram_connstamp gets set when FMA is sent */
132 kranal_new_tx_msg (int type)
134 kra_tx_t *tx = kranal_get_idle_tx();
137 kranal_init_msg(&tx->tx_msg, type);
143 kranal_setup_immediate_buffer (kra_tx_t *tx,
144 unsigned int niov, struct iovec *iov,
148 /* For now this is almost identical to kranal_setup_virt_buffer, but we
149 * could "flatten" the payload into a single contiguous buffer ready
150 * for sending direct over an FMA if we ever needed to. */
152 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
156 tx->tx_buffer = NULL;
160 while (offset >= iov->iov_len) {
161 offset -= iov->iov_len;
167 if (nob > iov->iov_len - offset) {
168 CERROR("Can't handle multiple vaddr fragments\n");
172 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
175 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
181 kranal_setup_virt_buffer (kra_tx_t *tx,
182 unsigned int niov, struct iovec *iov,
188 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
190 while (offset >= iov->iov_len) {
191 offset -= iov->iov_len;
197 if (nob > iov->iov_len - offset) {
198 CERROR("Can't handle multiple vaddr fragments\n");
202 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
204 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
209 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
212 RAP_PHYS_REGION *phys = tx->tx_phys;
215 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
219 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
221 while (offset >= kiov->kiov_len) {
222 offset -= kiov->kiov_len;
228 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
230 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
232 phys->Address = lnet_page2phys(kiov->kiov_page);
235 resid = nob - (kiov->kiov_len - offset);
241 if (kiov->kiov_offset != 0 ||
242 ((resid > PAGE_SIZE) &&
243 kiov->kiov_len < PAGE_SIZE)) {
244 /* Can't have gaps */
245 CERROR("Can't make payload contiguous in I/O VM:"
246 "page %d, offset %d, len %d \n",
247 (int)(phys - tx->tx_phys),
248 kiov->kiov_offset, kiov->kiov_len);
252 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
253 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
257 phys->Address = lnet_page2phys(kiov->kiov_page);
263 tx->tx_phys_npages = phys - tx->tx_phys;
268 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
269 struct iovec *iov, lnet_kiov_t *kiov,
272 LASSERT ((iov == NULL) != (kiov == NULL));
275 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
277 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
281 kranal_map_buffer (kra_tx_t *tx)
283 kra_conn_t *conn = tx->tx_conn;
284 kra_device_t *dev = conn->rac_device;
287 LASSERT (current == dev->rad_scheduler);
289 switch (tx->tx_buftype) {
294 case RANAL_BUF_IMMEDIATE:
295 case RANAL_BUF_PHYS_MAPPED:
296 case RANAL_BUF_VIRT_MAPPED:
299 case RANAL_BUF_PHYS_UNMAPPED:
300 rrc = RapkRegisterPhys(dev->rad_handle,
301 tx->tx_phys, tx->tx_phys_npages,
303 if (rrc != RAP_SUCCESS) {
304 CERROR ("Can't map %d pages: dev %d "
305 "phys %u pp %u, virt %u nob %lu\n",
306 tx->tx_phys_npages, dev->rad_id,
307 dev->rad_nphysmap, dev->rad_nppphysmap,
308 dev->rad_nvirtmap, dev->rad_nobvirtmap);
309 return -ENOMEM; /* assume insufficient resources */
313 dev->rad_nppphysmap += tx->tx_phys_npages;
315 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
318 case RANAL_BUF_VIRT_UNMAPPED:
319 rrc = RapkRegisterMemory(dev->rad_handle,
320 tx->tx_buffer, tx->tx_nob,
322 if (rrc != RAP_SUCCESS) {
323 CERROR ("Can't map %d bytes: dev %d "
324 "phys %u pp %u, virt %u nob %lu\n",
325 tx->tx_nob, dev->rad_id,
326 dev->rad_nphysmap, dev->rad_nppphysmap,
327 dev->rad_nvirtmap, dev->rad_nobvirtmap);
328 return -ENOMEM; /* assume insufficient resources */
332 dev->rad_nobvirtmap += tx->tx_nob;
334 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
340 kranal_unmap_buffer (kra_tx_t *tx)
345 switch (tx->tx_buftype) {
350 case RANAL_BUF_IMMEDIATE:
351 case RANAL_BUF_PHYS_UNMAPPED:
352 case RANAL_BUF_VIRT_UNMAPPED:
355 case RANAL_BUF_PHYS_MAPPED:
356 LASSERT (tx->tx_conn != NULL);
357 dev = tx->tx_conn->rac_device;
358 LASSERT (current == dev->rad_scheduler);
359 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
361 LASSERT (rrc == RAP_SUCCESS);
364 dev->rad_nppphysmap -= tx->tx_phys_npages;
366 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
369 case RANAL_BUF_VIRT_MAPPED:
370 LASSERT (tx->tx_conn != NULL);
371 dev = tx->tx_conn->rac_device;
372 LASSERT (current == dev->rad_scheduler);
373 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
375 LASSERT (rrc == RAP_SUCCESS);
378 dev->rad_nobvirtmap -= tx->tx_nob;
380 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
386 kranal_tx_done (kra_tx_t *tx, int completion)
388 lnet_msg_t *lnetmsg[2];
392 LASSERT (!in_interrupt());
394 kranal_unmap_buffer(tx);
396 lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
397 lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
399 tx->tx_buftype = RANAL_BUF_NONE;
400 tx->tx_msg.ram_type = RANAL_MSG_NONE;
403 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
405 cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
407 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
409 /* finalize AFTER freeing lnet msgs */
410 for (i = 0; i < 2; i++) {
411 if (lnetmsg[i] == NULL)
414 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
419 kranal_find_conn_locked (kra_peer_t *peer)
423 /* just return the first connection */
424 cfs_list_for_each (tmp, &peer->rap_conns) {
425 return cfs_list_entry(tmp, kra_conn_t, rac_list);
432 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
438 spin_lock_irqsave(&conn->rac_lock, flags);
439 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
440 tx->tx_qtime = jiffies;
441 spin_unlock_irqrestore(&conn->rac_lock, flags);
443 kranal_schedule_conn(conn);
447 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
454 rwlock_t *g_lock = &kranal_data.kra_global_lock;
456 /* If I get here, I've committed to send, so I complete the tx with
457 * failure on any problems */
459 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
461 for (retry = 0; ; retry = 1) {
465 peer = kranal_find_peer_locked(nid);
467 conn = kranal_find_conn_locked(peer);
469 kranal_post_fma(conn, tx);
475 /* Making connections; I'll need a write lock... */
477 write_lock_irqsave(g_lock, flags);
479 peer = kranal_find_peer_locked(nid);
483 write_unlock_irqrestore(g_lock, flags);
486 CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
487 kranal_tx_done(tx, -EHOSTUNREACH);
491 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
492 lnet_acceptor_port());
494 CERROR("Can't add peer %s: %d\n",
495 libcfs_nid2str(nid), rc);
496 kranal_tx_done(tx, rc);
501 conn = kranal_find_conn_locked(peer);
503 /* Connection exists; queue message on it */
504 kranal_post_fma(conn, tx);
505 write_unlock_irqrestore(g_lock, flags);
509 LASSERT (peer->rap_persistence > 0);
511 if (!peer->rap_connecting) {
512 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
514 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
515 cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
516 write_unlock_irqrestore(g_lock, flags);
517 kranal_tx_done(tx, -EHOSTUNREACH);
521 peer->rap_connecting = 1;
522 kranal_peer_addref(peer); /* extra ref for connd */
524 spin_lock(&kranal_data.kra_connd_lock);
526 cfs_list_add_tail(&peer->rap_connd_list,
527 &kranal_data.kra_connd_peers);
528 wake_up(&kranal_data.kra_connd_waitq);
530 spin_unlock(&kranal_data.kra_connd_lock);
533 /* A connection is being established; queue the message... */
534 cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
536 write_unlock_irqrestore(g_lock, flags);
540 kranal_rdma(kra_tx_t *tx, int type,
541 kra_rdma_desc_t *sink, int nob, __u64 cookie)
543 kra_conn_t *conn = tx->tx_conn;
547 LASSERT (kranal_tx_mapped(tx));
548 LASSERT (nob <= sink->rard_nob);
549 LASSERT (nob <= tx->tx_nob);
551 /* No actual race with scheduler sending CLOSE (I'm she!) */
552 LASSERT (current == conn->rac_device->rad_scheduler);
554 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
555 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
556 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
557 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
558 tx->tx_rdma_desc.DstKey = sink->rard_key;
559 tx->tx_rdma_desc.Length = nob;
560 tx->tx_rdma_desc.AppPtr = tx;
562 /* prep final completion message */
563 kranal_init_msg(&tx->tx_msg, type);
564 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
566 if (nob == 0) { /* Immediate completion */
567 kranal_post_fma(conn, tx);
571 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
573 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
574 LASSERT (rrc == RAP_SUCCESS);
576 spin_lock_irqsave(&conn->rac_lock, flags);
577 cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
578 tx->tx_qtime = jiffies;
579 spin_unlock_irqrestore(&conn->rac_lock, flags);
583 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
585 __u32 nob_received = nob;
588 LASSERT (conn->rac_rxmsg != NULL);
589 CDEBUG(D_NET, "Consuming %p\n", conn);
591 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
592 &nob_received, sizeof(kra_msg_t));
593 LASSERT (rrc == RAP_SUCCESS);
595 conn->rac_rxmsg = NULL;
597 if (nob_received < nob) {
598 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
599 libcfs_nid2str(conn->rac_peer->rap_nid),
608 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
610 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
611 int type = lntmsg->msg_type;
612 lnet_process_id_t target = lntmsg->msg_target;
613 int target_is_router = lntmsg->msg_target_is_router;
614 int routing = lntmsg->msg_routing;
615 unsigned int niov = lntmsg->msg_niov;
616 struct iovec *iov = lntmsg->msg_iov;
617 lnet_kiov_t *kiov = lntmsg->msg_kiov;
618 unsigned int offset = lntmsg->msg_offset;
619 unsigned int nob = lntmsg->msg_len;
623 /* NB 'private' is different depending on what we're sending.... */
625 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
626 nob, niov, libcfs_id2str(target));
628 LASSERT (nob == 0 || niov > 0);
629 LASSERT (niov <= LNET_MAX_IOV);
631 LASSERT (!in_interrupt());
632 /* payload is either all vaddrs or all pages */
633 LASSERT (!(kiov != NULL && iov != NULL));
636 CERROR ("Can't route\n");
651 /* We have to consider the eventual sink buffer rather than any
652 * payload passed here (there isn't any, and strictly, looking
653 * inside lntmsg is a layering violation). We send a simple
654 * IMMEDIATE GET if the sink buffer is mapped already and small
657 if (routing || target_is_router)
658 break; /* send IMMEDIATE */
660 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
661 lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
662 lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
663 break; /* send IMMEDIATE */
665 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
669 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
670 rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
671 lntmsg->msg_md->md_iov.iov,
672 0, lntmsg->msg_md->md_length);
674 rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
675 lntmsg->msg_md->md_iov.kiov,
676 0, lntmsg->msg_md->md_length);
678 kranal_tx_done(tx, rc);
682 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
683 if (tx->tx_lntmsg[1] == NULL) {
684 CERROR("Can't create reply for GET to %s\n",
685 libcfs_nid2str(target.nid));
686 kranal_tx_done(tx, rc);
690 tx->tx_lntmsg[0] = lntmsg;
691 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
692 /* rest of tx_msg is setup just before it is sent */
693 kranal_launch_tx(tx, target.nid);
698 if (kiov == NULL && /* not paged */
699 nob <= RANAL_FMA_MAX_DATA && /* small enough */
700 nob <= *kranal_tunables.kra_max_immediate)
701 break; /* send IMMEDIATE */
703 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
707 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709 kranal_tx_done(tx, rc);
713 tx->tx_lntmsg[0] = lntmsg;
714 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
715 /* rest of tx_msg is setup just before it is sent */
716 kranal_launch_tx(tx, target.nid);
722 LASSERT (kiov == NULL);
723 LASSERT (nob <= RANAL_FMA_MAX_DATA);
725 tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
729 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
731 kranal_tx_done(tx, rc);
735 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
736 tx->tx_lntmsg[0] = lntmsg;
737 kranal_launch_tx(tx, target.nid);
742 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
744 kra_msg_t *rxmsg = conn->rac_rxmsg;
745 unsigned int niov = lntmsg->msg_niov;
746 struct iovec *iov = lntmsg->msg_iov;
747 lnet_kiov_t *kiov = lntmsg->msg_kiov;
748 unsigned int offset = lntmsg->msg_offset;
749 unsigned int nob = lntmsg->msg_len;
753 tx = kranal_get_idle_tx();
757 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
763 rc = kranal_map_buffer(tx);
767 tx->tx_lntmsg[0] = lntmsg;
769 kranal_rdma(tx, RANAL_MSG_GET_DONE,
770 &rxmsg->ram_u.get.ragm_desc, nob,
771 rxmsg->ram_u.get.ragm_cookie);
775 kranal_tx_done(tx, -EIO);
777 lnet_finalize(ni, lntmsg, -EIO);
781 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
784 kra_conn_t *conn = (kra_conn_t *)private;
786 LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
787 libcfs_nid2str(conn->rac_peer->rap_nid));
793 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
794 int delayed, unsigned int niov,
795 struct iovec *iov, lnet_kiov_t *kiov,
796 unsigned int offset, unsigned int mlen, unsigned int rlen)
798 kra_conn_t *conn = private;
799 kra_msg_t *rxmsg = conn->rac_rxmsg;
804 LASSERT (mlen <= rlen);
805 LASSERT (!in_interrupt());
806 /* Either all pages or all vaddrs */
807 LASSERT (!(kiov != NULL && iov != NULL));
809 CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
811 switch(rxmsg->ram_type) {
815 case RANAL_MSG_IMMEDIATE:
818 } else if (kiov != NULL) {
819 CERROR("Can't recv immediate into paged buffer\n");
823 while (offset >= iov->iov_len) {
824 offset -= iov->iov_len;
829 if (mlen > iov->iov_len - offset) {
830 CERROR("Can't handle immediate frags\n");
833 buffer = ((char *)iov->iov_base) + offset;
835 rc = kranal_consume_rxmsg(conn, buffer, mlen);
836 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
839 case RANAL_MSG_PUT_REQ:
840 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
842 kranal_consume_rxmsg(conn, NULL, 0);
846 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
848 kranal_tx_done(tx, rc);
849 kranal_consume_rxmsg(conn, NULL, 0);
854 rc = kranal_map_buffer(tx);
856 kranal_tx_done(tx, rc);
857 kranal_consume_rxmsg(conn, NULL, 0);
861 tx->tx_msg.ram_u.putack.rapam_src_cookie =
862 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
863 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
864 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
865 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
866 (__u64)((unsigned long)tx->tx_buffer);
867 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
869 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
871 kranal_post_fma(conn, tx);
872 kranal_consume_rxmsg(conn, NULL, 0);
875 case RANAL_MSG_GET_REQ:
876 if (lntmsg != NULL) {
878 kranal_reply(ni, conn, lntmsg);
881 tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
883 tx->tx_msg.ram_u.completion.racm_cookie =
884 rxmsg->ram_u.get.ragm_cookie;
885 kranal_post_fma(conn, tx);
888 kranal_consume_rxmsg(conn, NULL, 0);
894 kranal_thread_start(int(*fn)(void *arg), void *arg, char *name)
896 struct task_struct *task = cfs_thread_run(fn, arg, name);
899 atomic_inc(&kranal_data.kra_nthreads);
900 return PTR_ERR(task);
904 kranal_thread_fini (void)
906 atomic_dec(&kranal_data.kra_nthreads);
910 kranal_check_conn_timeouts (kra_conn_t *conn)
916 unsigned long now = jiffies;
918 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
919 conn->rac_state == RANAL_CONN_CLOSING);
921 if (!conn->rac_close_sent &&
922 cfs_time_aftereq(now, conn->rac_last_tx + conn->rac_keepalive *
924 /* not sent in a while; schedule conn so scheduler sends a keepalive */
925 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
926 conn, libcfs_nid2str(conn->rac_peer->rap_nid));
927 kranal_schedule_conn(conn);
930 timeout = conn->rac_timeout * HZ;
932 if (!conn->rac_close_recvd &&
933 cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
934 CERROR("%s received from %s within %lu seconds\n",
935 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
936 "Nothing" : "CLOSE not",
937 libcfs_nid2str(conn->rac_peer->rap_nid),
938 (now - conn->rac_last_rx)/HZ);
942 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
945 /* Check the conn's queues are moving. These are "belt+braces" checks,
946 * in case of hardware/software errors that make this conn seem
947 * responsive even though it isn't progressing its message queues. */
949 spin_lock_irqsave(&conn->rac_lock, flags);
951 cfs_list_for_each (ttmp, &conn->rac_fmaq) {
952 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
954 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
955 spin_unlock_irqrestore(&conn->rac_lock, flags);
956 CERROR("tx on fmaq for %s blocked %lu seconds\n",
957 libcfs_nid2str(conn->rac_peer->rap_nid),
958 (now - tx->tx_qtime)/HZ);
963 cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
964 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
966 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
967 spin_unlock_irqrestore(&conn->rac_lock, flags);
968 CERROR("tx on rdmaq for %s blocked %lu seconds\n",
969 libcfs_nid2str(conn->rac_peer->rap_nid),
970 (now - tx->tx_qtime)/HZ);
975 cfs_list_for_each (ttmp, &conn->rac_replyq) {
976 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
978 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
979 spin_unlock_irqrestore(&conn->rac_lock, flags);
980 CERROR("tx on replyq for %s blocked %lu seconds\n",
981 libcfs_nid2str(conn->rac_peer->rap_nid),
982 (now - tx->tx_qtime)/HZ);
987 spin_unlock_irqrestore(&conn->rac_lock, flags);
992 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
994 cfs_list_t *conns = &kranal_data.kra_conns[idx];
1001 /* NB. We expect to check all the conns and not find any problems, so
1002 * we just use a shared lock while we take a look... */
1003 read_lock(&kranal_data.kra_global_lock);
1005 cfs_list_for_each (ctmp, conns) {
1006 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1008 if (conn->rac_timeout < *min_timeoutp )
1009 *min_timeoutp = conn->rac_timeout;
1010 if (conn->rac_keepalive < *min_timeoutp )
1011 *min_timeoutp = conn->rac_keepalive;
1013 rc = kranal_check_conn_timeouts(conn);
1017 kranal_conn_addref(conn);
1018 read_unlock(&kranal_data.kra_global_lock);
1020 CERROR("Conn to %s, cqid %d timed out\n",
1021 libcfs_nid2str(conn->rac_peer->rap_nid),
1024 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1026 switch (conn->rac_state) {
1030 case RANAL_CONN_ESTABLISHED:
1031 kranal_close_conn_locked(conn, -ETIMEDOUT);
1034 case RANAL_CONN_CLOSING:
1035 kranal_terminate_conn_locked(conn);
1039 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1042 kranal_conn_decref(conn);
1044 /* start again now I've dropped the lock */
1048 read_unlock(&kranal_data.kra_global_lock);
1052 kranal_connd (void *arg)
1054 long id = (long)arg;
1056 unsigned long flags;
1058 kra_acceptsock_t *ras;
1061 cfs_block_allsigs();
1063 init_waitqueue_entry_current(&wait);
1065 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1067 while (!kranal_data.kra_shutdown) {
1070 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1071 ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1072 kra_acceptsock_t, ras_list);
1073 cfs_list_del(&ras->ras_list);
1075 spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1078 CDEBUG(D_NET,"About to handshake someone\n");
1080 kranal_conn_handshake(ras->ras_sock, NULL);
1081 kranal_free_acceptsock(ras);
1083 CDEBUG(D_NET,"Finished handshaking someone\n");
1085 spin_lock_irqsave(&kranal_data.kra_connd_lock,
1090 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1091 peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1092 kra_peer_t, rap_connd_list);
1094 cfs_list_del_init(&peer->rap_connd_list);
1095 spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1098 kranal_connect(peer);
1099 kranal_peer_decref(peer);
1101 spin_lock_irqsave(&kranal_data.kra_connd_lock,
1109 set_current_state(TASK_INTERRUPTIBLE);
1110 add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
1112 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1114 waitq_wait(&wait, TASK_INTERRUPTIBLE);
1116 set_current_state(TASK_RUNNING);
1117 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1119 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1122 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1124 kranal_thread_fini();
1129 kranal_update_reaper_timeout(long timeout)
1131 unsigned long flags;
1133 LASSERT (timeout > 0);
1135 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1137 if (timeout < kranal_data.kra_new_min_timeout)
1138 kranal_data.kra_new_min_timeout = timeout;
1140 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1144 kranal_reaper (void *arg)
1147 unsigned long flags;
1150 int conn_entries = kranal_data.kra_conn_hash_size;
1152 int base_index = conn_entries - 1;
1153 unsigned long next_check_time = jiffies;
1154 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1155 long current_min_timeout = 1;
1157 cfs_block_allsigs();
1159 init_waitqueue_entry_current(&wait);
1161 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1163 while (!kranal_data.kra_shutdown) {
1164 /* I wake up every 'p' seconds to check for timeouts on some
1165 * more peers. I try to check every connection 'n' times
1166 * within the global minimum of all keepalive and timeout
1167 * intervals, to ensure I attend to every connection within
1168 * (n+1)/n times its timeout intervals. */
1171 unsigned long min_timeout;
1174 /* careful with the jiffy wrap... */
1175 timeout = (long)(next_check_time - jiffies);
1177 set_current_state(TASK_INTERRUPTIBLE);
1178 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1180 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1183 waitq_timedwait(&wait, TASK_INTERRUPTIBLE,
1186 spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1189 set_current_state(TASK_RUNNING);
1190 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1194 if (kranal_data.kra_new_min_timeout !=
1195 MAX_SCHEDULE_TIMEOUT) {
1196 /* new min timeout set: restart min timeout scan */
1197 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1198 base_index = conn_index - 1;
1200 base_index = conn_entries - 1;
1202 if (kranal_data.kra_new_min_timeout <
1203 current_min_timeout) {
1204 current_min_timeout =
1205 kranal_data.kra_new_min_timeout;
1206 CDEBUG(D_NET, "Set new min timeout %ld\n",
1207 current_min_timeout);
1210 kranal_data.kra_new_min_timeout =
1211 MAX_SCHEDULE_TIMEOUT;
1213 min_timeout = current_min_timeout;
1215 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1217 LASSERT (min_timeout > 0);
1219 /* Compute how many table entries to check now so I get round
1220 * the whole table fast enough given that I do this at fixed
1221 * intervals of 'p' seconds) */
1222 chunk = conn_entries;
1223 if (min_timeout > n * p)
1224 chunk = (chunk * n * p) / min_timeout;
1228 for (i = 0; i < chunk; i++) {
1229 kranal_reaper_check(conn_index,
1231 conn_index = (conn_index + 1) % conn_entries;
1234 next_check_time += p * HZ;
1236 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1238 if (((conn_index - chunk <= base_index &&
1239 base_index < conn_index) ||
1240 (conn_index - conn_entries - chunk <= base_index &&
1241 base_index < conn_index - conn_entries))) {
1243 /* Scanned all conns: set current_min_timeout... */
1244 if (current_min_timeout != next_min_timeout) {
1245 current_min_timeout = next_min_timeout;
1246 CDEBUG(D_NET, "Set new min timeout %ld\n",
1247 current_min_timeout);
1250 /* ...and restart min timeout scan */
1251 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1252 base_index = conn_index - 1;
1254 base_index = conn_entries - 1;
1258 kranal_thread_fini();
1263 kranal_check_rdma_cq (kra_device_t *dev)
1268 unsigned long flags;
1269 RAP_RDMA_DESCRIPTOR *desc;
1274 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1275 if (rrc == RAP_NOT_DONE) {
1276 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1280 LASSERT (rrc == RAP_SUCCESS);
1281 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1283 read_lock(&kranal_data.kra_global_lock);
1285 conn = kranal_cqid2conn_locked(cqid);
1287 /* Conn was destroyed? */
1288 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1289 read_unlock(&kranal_data.kra_global_lock);
1293 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1294 LASSERT (rrc == RAP_SUCCESS);
1296 CDEBUG(D_NET, "Completed %p\n",
1297 cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1299 spin_lock_irqsave(&conn->rac_lock, flags);
1301 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1302 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1303 cfs_list_del(&tx->tx_list);
1305 LASSERT(desc->AppPtr == (void *)tx);
1306 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1307 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1309 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1310 tx->tx_qtime = jiffies;
1312 spin_unlock_irqrestore(&conn->rac_lock, flags);
1314 /* Get conn's fmaq processed, now I've just put something
1316 kranal_schedule_conn(conn);
1318 read_unlock(&kranal_data.kra_global_lock);
1323 kranal_check_fma_cq (kra_device_t *dev)
1334 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1335 if (rrc == RAP_NOT_DONE) {
1336 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1340 LASSERT (rrc == RAP_SUCCESS);
1342 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1344 read_lock(&kranal_data.kra_global_lock);
1346 conn = kranal_cqid2conn_locked(cqid);
1348 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1351 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1353 kranal_schedule_conn(conn);
1356 read_unlock(&kranal_data.kra_global_lock);
1360 /* FMA CQ has overflowed: check ALL conns */
1361 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
1364 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1366 read_lock(&kranal_data.kra_global_lock);
1368 conns = &kranal_data.kra_conns[i];
1370 cfs_list_for_each (tmp, conns) {
1371 conn = cfs_list_entry(tmp, kra_conn_t,
1374 if (conn->rac_device == dev)
1375 kranal_schedule_conn(conn);
1378 /* don't block write lockers for too long... */
1379 read_unlock(&kranal_data.kra_global_lock);
1385 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1386 void *immediate, int immediatenob)
1388 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1391 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1392 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1393 immediate, immediatenob);
1395 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1396 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1397 immediatenob <= RANAL_FMA_MAX_DATA :
1400 msg->ram_connstamp = conn->rac_my_connstamp;
1401 msg->ram_seq = conn->rac_tx_seq;
1404 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1405 immediate, immediatenob,
1408 rrc = RapkFmaSend(conn->rac_rihandle,
1409 immediate, immediatenob,
1417 conn->rac_last_tx = jiffies;
1422 if (cfs_time_aftereq(jiffies,
1423 conn->rac_last_tx + conn->rac_keepalive *
1425 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1427 (jiffies - conn->rac_last_tx)/HZ);
1433 kranal_process_fmaq (kra_conn_t *conn)
1435 unsigned long flags;
1441 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1442 * However I will be rescheduled by an FMA completion event
1443 * when I eventually get some.
1444 * NB 2. Sampling rac_state here races with setting it elsewhere.
1445 * But it doesn't matter if I try to send a "real" message just
1446 * as I start closing because I'll get scheduled to send the
1449 /* Not racing with incoming message processing! */
1450 LASSERT (current == conn->rac_device->rad_scheduler);
1452 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1453 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1454 /* RDMAs in progress */
1455 LASSERT (!conn->rac_close_sent);
1457 if (cfs_time_aftereq(jiffies,
1459 conn->rac_keepalive * HZ)) {
1460 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1461 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1462 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1467 if (conn->rac_close_sent)
1470 CWARN("sending CLOSE to %s\n",
1471 libcfs_nid2str(conn->rac_peer->rap_nid));
1472 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1473 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1477 conn->rac_close_sent = 1;
1478 if (!conn->rac_close_recvd)
1481 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1483 if (conn->rac_state == RANAL_CONN_CLOSING)
1484 kranal_terminate_conn_locked(conn);
1486 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1491 spin_lock_irqsave(&conn->rac_lock, flags);
1493 if (cfs_list_empty(&conn->rac_fmaq)) {
1495 spin_unlock_irqrestore(&conn->rac_lock, flags);
1497 if (cfs_time_aftereq(jiffies,
1498 conn->rac_last_tx + conn->rac_keepalive *
1500 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1501 libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1502 (jiffies - conn->rac_last_tx)/HZ,
1503 conn->rac_keepalive);
1504 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1505 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1510 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1511 cfs_list_del(&tx->tx_list);
1512 more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1514 spin_unlock_irqrestore(&conn->rac_lock, flags);
1517 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1518 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1519 switch (tx->tx_msg.ram_type) {
1523 case RANAL_MSG_IMMEDIATE:
1524 rc = kranal_sendmsg(conn, &tx->tx_msg,
1525 tx->tx_buffer, tx->tx_nob);
1528 case RANAL_MSG_PUT_NAK:
1529 case RANAL_MSG_PUT_DONE:
1530 case RANAL_MSG_GET_NAK:
1531 case RANAL_MSG_GET_DONE:
1532 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1535 case RANAL_MSG_PUT_REQ:
1536 rc = kranal_map_buffer(tx);
1537 LASSERT (rc != -EAGAIN);
1541 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1542 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1546 case RANAL_MSG_PUT_ACK:
1547 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1551 case RANAL_MSG_GET_REQ:
1552 rc = kranal_map_buffer(tx);
1553 LASSERT (rc != -EAGAIN);
1557 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1558 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1559 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1560 (__u64)((unsigned long)tx->tx_buffer);
1561 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1562 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1567 if (rc == -EAGAIN) {
1568 /* I need credits to send this. Replace tx at the head of the
1569 * fmaq and I'll get rescheduled when credits appear */
1570 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1571 spin_lock_irqsave(&conn->rac_lock, flags);
1572 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1573 spin_unlock_irqrestore(&conn->rac_lock, flags);
1577 if (!expect_reply || rc != 0) {
1578 kranal_tx_done(tx, rc);
1580 /* LASSERT(current) above ensures this doesn't race with reply
1582 spin_lock_irqsave(&conn->rac_lock, flags);
1583 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1584 tx->tx_qtime = jiffies;
1585 spin_unlock_irqrestore(&conn->rac_lock, flags);
1589 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1590 kranal_schedule_conn(conn);
1595 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1597 __swab64s(&d->rard_key.Key);
1598 __swab16s(&d->rard_key.Cookie);
1599 __swab16s(&d->rard_key.MdHandle);
1600 __swab32s(&d->rard_key.Flags);
1601 __swab64s(&d->rard_addr.AddressBits);
1602 __swab32s(&d->rard_nob);
1606 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1610 unsigned long flags;
1612 spin_lock_irqsave(&conn->rac_lock, flags);
1614 cfs_list_for_each(ttmp, &conn->rac_replyq) {
1615 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1617 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1618 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1620 if (tx->tx_cookie != cookie)
1623 if (tx->tx_msg.ram_type != type) {
1624 spin_unlock_irqrestore(&conn->rac_lock, flags);
1625 CWARN("Unexpected type %x (%x expected) "
1626 "matched reply from %s\n",
1627 tx->tx_msg.ram_type, type,
1628 libcfs_nid2str(conn->rac_peer->rap_nid));
1632 cfs_list_del(&tx->tx_list);
1633 spin_unlock_irqrestore(&conn->rac_lock, flags);
1637 spin_unlock_irqrestore(&conn->rac_lock, flags);
1638 CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1639 type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1644 kranal_check_fma_rx (kra_conn_t *conn)
1646 unsigned long flags;
1651 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1652 kra_peer_t *peer = conn->rac_peer;
1656 if (rrc == RAP_NOT_DONE)
1659 CDEBUG(D_NET, "RX on %p\n", conn);
1661 LASSERT (rrc == RAP_SUCCESS);
1662 conn->rac_last_rx = jiffies;
1663 seq = conn->rac_rx_seq++;
1664 msg = (kra_msg_t *)prefix;
1666 /* stash message for portals callbacks they'll NULL
1667 * rac_rxmsg if they consume it */
1668 LASSERT (conn->rac_rxmsg == NULL);
1669 conn->rac_rxmsg = msg;
1671 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1672 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1673 CERROR("Unexpected magic %08x from %s\n",
1674 msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1679 __swab32s(&msg->ram_magic);
1680 __swab16s(&msg->ram_version);
1681 __swab16s(&msg->ram_type);
1682 __swab64s(&msg->ram_srcnid);
1683 __swab64s(&msg->ram_connstamp);
1684 __swab32s(&msg->ram_seq);
1686 /* NB message type checked below; NOT here... */
1687 switch (msg->ram_type) {
1688 case RANAL_MSG_PUT_ACK:
1689 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1692 case RANAL_MSG_GET_REQ:
1693 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1701 if (msg->ram_version != RANAL_MSG_VERSION) {
1702 CERROR("Unexpected protocol version %d from %s\n",
1703 msg->ram_version, libcfs_nid2str(peer->rap_nid));
1708 if (msg->ram_srcnid != peer->rap_nid) {
1709 CERROR("Unexpected peer %s from %s\n",
1710 libcfs_nid2str(msg->ram_srcnid),
1711 libcfs_nid2str(peer->rap_nid));
1716 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1717 CERROR("Unexpected connstamp "LPX64"("LPX64
1718 " expected) from %s\n",
1719 msg->ram_connstamp, conn->rac_peer_connstamp,
1720 libcfs_nid2str(peer->rap_nid));
1725 if (msg->ram_seq != seq) {
1726 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1727 msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1732 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1733 /* This message signals RDMA completion... */
1734 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1735 if (rrc != RAP_SUCCESS) {
1736 CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1742 if (conn->rac_close_recvd) {
1743 CERROR("Unexpected message %d after CLOSE from %s\n",
1744 msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1749 if (msg->ram_type == RANAL_MSG_CLOSE) {
1750 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1751 conn->rac_close_recvd = 1;
1752 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1754 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1755 kranal_close_conn_locked(conn, 0);
1756 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1757 conn->rac_close_sent)
1758 kranal_terminate_conn_locked(conn);
1760 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1765 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1768 switch (msg->ram_type) {
1769 case RANAL_MSG_NOOP:
1770 /* Nothing to do; just a keepalive */
1771 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1774 case RANAL_MSG_IMMEDIATE:
1775 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1776 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
1777 msg->ram_srcnid, conn, 0);
1781 case RANAL_MSG_PUT_REQ:
1782 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1783 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
1784 msg->ram_srcnid, conn, 1);
1788 case RANAL_MSG_PUT_NAK:
1789 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1790 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1791 msg->ram_u.completion.racm_cookie);
1795 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1796 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1797 kranal_tx_done(tx, -ENOENT); /* no match */
1800 case RANAL_MSG_PUT_ACK:
1801 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1802 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1803 msg->ram_u.putack.rapam_src_cookie);
1807 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1808 &msg->ram_u.putack.rapam_desc,
1809 msg->ram_u.putack.rapam_desc.rard_nob,
1810 msg->ram_u.putack.rapam_dst_cookie);
1813 case RANAL_MSG_PUT_DONE:
1814 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1815 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1816 msg->ram_u.completion.racm_cookie);
1820 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1821 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1822 kranal_tx_done(tx, 0);
1825 case RANAL_MSG_GET_REQ:
1826 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1827 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
1828 msg->ram_srcnid, conn, 1);
1832 case RANAL_MSG_GET_NAK:
1833 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1834 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1835 msg->ram_u.completion.racm_cookie);
1839 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1840 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1841 kranal_tx_done(tx, -ENOENT); /* no match */
1844 case RANAL_MSG_GET_DONE:
1845 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1846 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1847 msg->ram_u.completion.racm_cookie);
1851 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1852 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1854 /* completion message should send rdma length if we ever allow
1856 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1858 kranal_tx_done(tx, 0);
1863 if (rc < 0) /* protocol/comms error */
1864 kranal_close_conn (conn, rc);
1866 if (repost && conn->rac_rxmsg != NULL)
1867 kranal_consume_rxmsg(conn, NULL, 0);
1869 /* check again later */
1870 kranal_schedule_conn(conn);
1874 kranal_complete_closed_conn (kra_conn_t *conn)
1880 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1881 LASSERT (cfs_list_empty(&conn->rac_list));
1882 LASSERT (cfs_list_empty(&conn->rac_hashlist));
1884 for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1885 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1887 cfs_list_del(&tx->tx_list);
1888 kranal_tx_done(tx, -ECONNABORTED);
1891 LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1893 for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1894 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1896 cfs_list_del(&tx->tx_list);
1897 kranal_tx_done(tx, -ECONNABORTED);
1900 CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1901 conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1904 int kranal_process_new_conn (kra_conn_t *conn)
1908 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1909 if (rrc == RAP_SUCCESS)
1912 LASSERT (rrc == RAP_NOT_DONE);
1913 if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1914 conn->rac_timeout * HZ))
1918 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1919 LASSERT (rrc == RAP_SUCCESS);
1924 kranal_scheduler (void *arg)
1926 kra_device_t *dev = (kra_device_t *)arg;
1929 unsigned long flags;
1930 unsigned long deadline;
1931 unsigned long soonest;
1940 cfs_block_allsigs();
1942 dev->rad_scheduler = current;
1943 init_waitqueue_entry_current(&wait);
1945 spin_lock_irqsave(&dev->rad_lock, flags);
1947 while (!kranal_data.kra_shutdown) {
1948 /* Safe: kra_shutdown only set when quiescent */
1950 if (busy_loops++ >= RANAL_RESCHED) {
1951 spin_unlock_irqrestore(&dev->rad_lock, flags);
1956 spin_lock_irqsave(&dev->rad_lock, flags);
1961 if (dev->rad_ready) {
1962 /* Device callback fired since I last checked it */
1964 spin_unlock_irqrestore(&dev->rad_lock, flags);
1967 kranal_check_rdma_cq(dev);
1968 kranal_check_fma_cq(dev);
1970 spin_lock_irqsave(&dev->rad_lock, flags);
1973 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1974 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1976 cfs_list_del_init(&conn->rac_schedlist);
1977 LASSERT (conn->rac_scheduled);
1978 conn->rac_scheduled = 0;
1979 spin_unlock_irqrestore(&dev->rad_lock, flags);
1982 kranal_check_fma_rx(conn);
1983 kranal_process_fmaq(conn);
1985 if (conn->rac_state == RANAL_CONN_CLOSED)
1986 kranal_complete_closed_conn(conn);
1988 kranal_conn_decref(conn);
1989 spin_lock_irqsave(&dev->rad_lock, flags);
1995 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
1996 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1998 deadline = conn->rac_last_tx + conn->rac_keepalive;
1999 if (cfs_time_aftereq(jiffies, deadline)) {
2000 /* Time to process this new conn */
2001 spin_unlock_irqrestore(&dev->rad_lock,
2005 rc = kranal_process_new_conn(conn);
2006 if (rc != -EAGAIN) {
2007 /* All done with this conn */
2008 spin_lock_irqsave(&dev->rad_lock,
2010 cfs_list_del_init(&conn->rac_schedlist);
2011 spin_unlock_irqrestore(&dev-> \
2015 kranal_conn_decref(conn);
2016 spin_lock_irqsave(&dev->rad_lock,
2021 /* retry with exponential backoff until HZ */
2022 if (conn->rac_keepalive == 0)
2023 conn->rac_keepalive = 1;
2024 else if (conn->rac_keepalive <= HZ)
2025 conn->rac_keepalive *= 2;
2027 conn->rac_keepalive += HZ;
2029 deadline = conn->rac_last_tx + conn->rac_keepalive;
2030 spin_lock_irqsave(&dev->rad_lock, flags);
2033 /* Does this conn need attention soonest? */
2034 if (nsoonest++ == 0 ||
2035 !cfs_time_aftereq(deadline, soonest))
2039 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2042 set_current_state(TASK_INTERRUPTIBLE);
2043 add_wait_queue_exclusive(&dev->rad_waitq, &wait);
2044 spin_unlock_irqrestore(&dev->rad_lock, flags);
2046 if (nsoonest == 0) {
2048 waitq_wait(&wait, TASK_INTERRUPTIBLE);
2050 timeout = (long)(soonest - jiffies);
2053 waitq_timedwait(&wait,
2059 remove_wait_queue(&dev->rad_waitq, &wait);
2060 set_current_state(TASK_RUNNING);
2061 spin_lock_irqsave(&dev->rad_lock, flags);
2064 spin_unlock_irqrestore(&dev->rad_lock, flags);
2066 dev->rad_scheduler = NULL;
2067 kranal_thread_fini();