1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
6 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
8 * This program is free software; you can redistribute it and/or modify
9 * it under the terms of the GNU General Public License version 2 only,
10 * as published by the Free Software Foundation.
12 * This program is distributed in the hope that it will be useful, but
13 * WITHOUT ANY WARRANTY; without even the implied warranty of
14 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
15 * General Public License version 2 for more details (a copy is included
16 * in the LICENSE file that accompanied this code).
18 * You should have received a copy of the GNU General Public License
19 * version 2 along with this program; If not, see
20 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
22 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
23 * CA 95054 USA or visit www.sun.com if you need additional information or
29 * Copyright 2008 Sun Microsystems, Inc. All rights reserved
30 * Use is subject to license terms.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ralnd/ralnd_cb.c
38 * Author: Eric Barton <eric@bartonsoftware.com>
44 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
50 CDEBUG(D_NET, "callback for device %d\n", devid);
52 for (i = 0; i < kranal_data.kra_ndevs; i++) {
54 dev = &kranal_data.kra_devices[i];
55 if (dev->rad_id != devid)
58 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
60 if (!dev->rad_ready) {
62 cfs_waitq_signal(&dev->rad_waitq);
65 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
69 CWARN("callback for unknown device %d\n", devid);
73 kranal_schedule_conn(kra_conn_t *conn)
75 kra_device_t *dev = conn->rac_device;
78 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
80 if (!conn->rac_scheduled) {
81 kranal_conn_addref(conn); /* +1 ref for scheduler */
82 conn->rac_scheduled = 1;
83 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
84 cfs_waitq_signal(&dev->rad_waitq);
87 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
91 kranal_get_idle_tx (void)
96 cfs_spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
98 if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
99 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
103 tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
104 cfs_list_del(&tx->tx_list);
106 /* Allocate a new completion cookie. It might not be needed, but we've
107 * got a lock right now... */
108 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
110 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
112 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
113 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
114 LASSERT (tx->tx_conn == NULL);
115 LASSERT (tx->tx_lntmsg[0] == NULL);
116 LASSERT (tx->tx_lntmsg[1] == NULL);
122 kranal_init_msg(kra_msg_t *msg, int type)
124 msg->ram_magic = RANAL_MSG_MAGIC;
125 msg->ram_version = RANAL_MSG_VERSION;
126 msg->ram_type = type;
127 msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
128 /* ram_connstamp gets set when FMA is sent */
132 kranal_new_tx_msg (int type)
134 kra_tx_t *tx = kranal_get_idle_tx();
137 kranal_init_msg(&tx->tx_msg, type);
143 kranal_setup_immediate_buffer (kra_tx_t *tx,
144 unsigned int niov, struct iovec *iov,
148 /* For now this is almost identical to kranal_setup_virt_buffer, but we
149 * could "flatten" the payload into a single contiguous buffer ready
150 * for sending direct over an FMA if we ever needed to. */
152 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
156 tx->tx_buffer = NULL;
160 while (offset >= iov->iov_len) {
161 offset -= iov->iov_len;
167 if (nob > iov->iov_len - offset) {
168 CERROR("Can't handle multiple vaddr fragments\n");
172 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
175 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
181 kranal_setup_virt_buffer (kra_tx_t *tx,
182 unsigned int niov, struct iovec *iov,
188 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
190 while (offset >= iov->iov_len) {
191 offset -= iov->iov_len;
197 if (nob > iov->iov_len - offset) {
198 CERROR("Can't handle multiple vaddr fragments\n");
202 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
204 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
209 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
212 RAP_PHYS_REGION *phys = tx->tx_phys;
215 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
219 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
221 while (offset >= kiov->kiov_len) {
222 offset -= kiov->kiov_len;
228 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
230 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
232 phys->Address = lnet_page2phys(kiov->kiov_page);
235 resid = nob - (kiov->kiov_len - offset);
241 if (kiov->kiov_offset != 0 ||
242 ((resid > PAGE_SIZE) &&
243 kiov->kiov_len < PAGE_SIZE)) {
244 /* Can't have gaps */
245 CERROR("Can't make payload contiguous in I/O VM:"
246 "page %d, offset %d, len %d \n",
247 (int)(phys - tx->tx_phys),
248 kiov->kiov_offset, kiov->kiov_len);
252 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
253 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
257 phys->Address = lnet_page2phys(kiov->kiov_page);
263 tx->tx_phys_npages = phys - tx->tx_phys;
268 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
269 struct iovec *iov, lnet_kiov_t *kiov,
272 LASSERT ((iov == NULL) != (kiov == NULL));
275 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
277 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
281 kranal_map_buffer (kra_tx_t *tx)
283 kra_conn_t *conn = tx->tx_conn;
284 kra_device_t *dev = conn->rac_device;
287 LASSERT (current == dev->rad_scheduler);
289 switch (tx->tx_buftype) {
294 case RANAL_BUF_IMMEDIATE:
295 case RANAL_BUF_PHYS_MAPPED:
296 case RANAL_BUF_VIRT_MAPPED:
299 case RANAL_BUF_PHYS_UNMAPPED:
300 rrc = RapkRegisterPhys(dev->rad_handle,
301 tx->tx_phys, tx->tx_phys_npages,
303 if (rrc != RAP_SUCCESS) {
304 CERROR ("Can't map %d pages: dev %d "
305 "phys %u pp %u, virt %u nob %lu\n",
306 tx->tx_phys_npages, dev->rad_id,
307 dev->rad_nphysmap, dev->rad_nppphysmap,
308 dev->rad_nvirtmap, dev->rad_nobvirtmap);
309 return -ENOMEM; /* assume insufficient resources */
313 dev->rad_nppphysmap += tx->tx_phys_npages;
315 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
318 case RANAL_BUF_VIRT_UNMAPPED:
319 rrc = RapkRegisterMemory(dev->rad_handle,
320 tx->tx_buffer, tx->tx_nob,
322 if (rrc != RAP_SUCCESS) {
323 CERROR ("Can't map %d bytes: dev %d "
324 "phys %u pp %u, virt %u nob %lu\n",
325 tx->tx_nob, dev->rad_id,
326 dev->rad_nphysmap, dev->rad_nppphysmap,
327 dev->rad_nvirtmap, dev->rad_nobvirtmap);
328 return -ENOMEM; /* assume insufficient resources */
332 dev->rad_nobvirtmap += tx->tx_nob;
334 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
340 kranal_unmap_buffer (kra_tx_t *tx)
345 switch (tx->tx_buftype) {
350 case RANAL_BUF_IMMEDIATE:
351 case RANAL_BUF_PHYS_UNMAPPED:
352 case RANAL_BUF_VIRT_UNMAPPED:
355 case RANAL_BUF_PHYS_MAPPED:
356 LASSERT (tx->tx_conn != NULL);
357 dev = tx->tx_conn->rac_device;
358 LASSERT (current == dev->rad_scheduler);
359 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
361 LASSERT (rrc == RAP_SUCCESS);
364 dev->rad_nppphysmap -= tx->tx_phys_npages;
366 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
369 case RANAL_BUF_VIRT_MAPPED:
370 LASSERT (tx->tx_conn != NULL);
371 dev = tx->tx_conn->rac_device;
372 LASSERT (current == dev->rad_scheduler);
373 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
375 LASSERT (rrc == RAP_SUCCESS);
378 dev->rad_nobvirtmap -= tx->tx_nob;
380 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
386 kranal_tx_done (kra_tx_t *tx, int completion)
388 lnet_msg_t *lnetmsg[2];
392 LASSERT (!cfs_in_interrupt());
394 kranal_unmap_buffer(tx);
396 lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
397 lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
399 tx->tx_buftype = RANAL_BUF_NONE;
400 tx->tx_msg.ram_type = RANAL_MSG_NONE;
403 cfs_spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
405 cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
407 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
409 /* finalize AFTER freeing lnet msgs */
410 for (i = 0; i < 2; i++) {
411 if (lnetmsg[i] == NULL)
414 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
419 kranal_find_conn_locked (kra_peer_t *peer)
423 /* just return the first connection */
424 cfs_list_for_each (tmp, &peer->rap_conns) {
425 return cfs_list_entry(tmp, kra_conn_t, rac_list);
432 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
438 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
439 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
440 tx->tx_qtime = jiffies;
441 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
443 kranal_schedule_conn(conn);
447 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
454 cfs_rwlock_t *g_lock = &kranal_data.kra_global_lock;
456 /* If I get here, I've committed to send, so I complete the tx with
457 * failure on any problems */
459 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
461 for (retry = 0; ; retry = 1) {
463 cfs_read_lock(g_lock);
465 peer = kranal_find_peer_locked(nid);
467 conn = kranal_find_conn_locked(peer);
469 kranal_post_fma(conn, tx);
470 cfs_read_unlock(g_lock);
475 /* Making connections; I'll need a write lock... */
476 cfs_read_unlock(g_lock);
477 cfs_write_lock_irqsave(g_lock, flags);
479 peer = kranal_find_peer_locked(nid);
483 cfs_write_unlock_irqrestore(g_lock, flags);
486 CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
487 kranal_tx_done(tx, -EHOSTUNREACH);
491 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
492 lnet_acceptor_port());
494 CERROR("Can't add peer %s: %d\n",
495 libcfs_nid2str(nid), rc);
496 kranal_tx_done(tx, rc);
501 conn = kranal_find_conn_locked(peer);
503 /* Connection exists; queue message on it */
504 kranal_post_fma(conn, tx);
505 cfs_write_unlock_irqrestore(g_lock, flags);
509 LASSERT (peer->rap_persistence > 0);
511 if (!peer->rap_connecting) {
512 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
514 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
515 cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
516 cfs_write_unlock_irqrestore(g_lock, flags);
517 kranal_tx_done(tx, -EHOSTUNREACH);
521 peer->rap_connecting = 1;
522 kranal_peer_addref(peer); /* extra ref for connd */
524 cfs_spin_lock(&kranal_data.kra_connd_lock);
526 cfs_list_add_tail(&peer->rap_connd_list,
527 &kranal_data.kra_connd_peers);
528 cfs_waitq_signal(&kranal_data.kra_connd_waitq);
530 cfs_spin_unlock(&kranal_data.kra_connd_lock);
533 /* A connection is being established; queue the message... */
534 cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
536 cfs_write_unlock_irqrestore(g_lock, flags);
540 kranal_rdma(kra_tx_t *tx, int type,
541 kra_rdma_desc_t *sink, int nob, __u64 cookie)
543 kra_conn_t *conn = tx->tx_conn;
547 LASSERT (kranal_tx_mapped(tx));
548 LASSERT (nob <= sink->rard_nob);
549 LASSERT (nob <= tx->tx_nob);
551 /* No actual race with scheduler sending CLOSE (I'm she!) */
552 LASSERT (current == conn->rac_device->rad_scheduler);
554 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
555 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
556 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
557 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
558 tx->tx_rdma_desc.DstKey = sink->rard_key;
559 tx->tx_rdma_desc.Length = nob;
560 tx->tx_rdma_desc.AppPtr = tx;
562 /* prep final completion message */
563 kranal_init_msg(&tx->tx_msg, type);
564 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
566 if (nob == 0) { /* Immediate completion */
567 kranal_post_fma(conn, tx);
571 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
573 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
574 LASSERT (rrc == RAP_SUCCESS);
576 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
577 cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
578 tx->tx_qtime = jiffies;
579 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
583 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
585 __u32 nob_received = nob;
588 LASSERT (conn->rac_rxmsg != NULL);
589 CDEBUG(D_NET, "Consuming %p\n", conn);
591 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
592 &nob_received, sizeof(kra_msg_t));
593 LASSERT (rrc == RAP_SUCCESS);
595 conn->rac_rxmsg = NULL;
597 if (nob_received < nob) {
598 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
599 libcfs_nid2str(conn->rac_peer->rap_nid),
608 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
610 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
611 int type = lntmsg->msg_type;
612 lnet_process_id_t target = lntmsg->msg_target;
613 int target_is_router = lntmsg->msg_target_is_router;
614 int routing = lntmsg->msg_routing;
615 unsigned int niov = lntmsg->msg_niov;
616 struct iovec *iov = lntmsg->msg_iov;
617 lnet_kiov_t *kiov = lntmsg->msg_kiov;
618 unsigned int offset = lntmsg->msg_offset;
619 unsigned int nob = lntmsg->msg_len;
623 /* NB 'private' is different depending on what we're sending.... */
625 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
626 nob, niov, libcfs_id2str(target));
628 LASSERT (nob == 0 || niov > 0);
629 LASSERT (niov <= LNET_MAX_IOV);
631 LASSERT (!cfs_in_interrupt());
632 /* payload is either all vaddrs or all pages */
633 LASSERT (!(kiov != NULL && iov != NULL));
636 CERROR ("Can't route\n");
651 /* We have to consider the eventual sink buffer rather than any
652 * payload passed here (there isn't any, and strictly, looking
653 * inside lntmsg is a layering violation). We send a simple
654 * IMMEDIATE GET if the sink buffer is mapped already and small
657 if (routing || target_is_router)
658 break; /* send IMMEDIATE */
660 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
661 lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
662 lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
663 break; /* send IMMEDIATE */
665 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
669 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
670 rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
671 lntmsg->msg_md->md_iov.iov,
672 0, lntmsg->msg_md->md_length);
674 rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
675 lntmsg->msg_md->md_iov.kiov,
676 0, lntmsg->msg_md->md_length);
678 kranal_tx_done(tx, rc);
682 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
683 if (tx->tx_lntmsg[1] == NULL) {
684 CERROR("Can't create reply for GET to %s\n",
685 libcfs_nid2str(target.nid));
686 kranal_tx_done(tx, rc);
690 tx->tx_lntmsg[0] = lntmsg;
691 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
692 /* rest of tx_msg is setup just before it is sent */
693 kranal_launch_tx(tx, target.nid);
698 if (kiov == NULL && /* not paged */
699 nob <= RANAL_FMA_MAX_DATA && /* small enough */
700 nob <= *kranal_tunables.kra_max_immediate)
701 break; /* send IMMEDIATE */
703 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
707 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
709 kranal_tx_done(tx, rc);
713 tx->tx_lntmsg[0] = lntmsg;
714 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
715 /* rest of tx_msg is setup just before it is sent */
716 kranal_launch_tx(tx, target.nid);
722 LASSERT (kiov == NULL);
723 LASSERT (nob <= RANAL_FMA_MAX_DATA);
725 tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
729 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
731 kranal_tx_done(tx, rc);
735 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
736 tx->tx_lntmsg[0] = lntmsg;
737 kranal_launch_tx(tx, target.nid);
742 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
744 kra_msg_t *rxmsg = conn->rac_rxmsg;
745 unsigned int niov = lntmsg->msg_niov;
746 struct iovec *iov = lntmsg->msg_iov;
747 lnet_kiov_t *kiov = lntmsg->msg_kiov;
748 unsigned int offset = lntmsg->msg_offset;
749 unsigned int nob = lntmsg->msg_len;
753 tx = kranal_get_idle_tx();
757 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
763 rc = kranal_map_buffer(tx);
767 tx->tx_lntmsg[0] = lntmsg;
769 kranal_rdma(tx, RANAL_MSG_GET_DONE,
770 &rxmsg->ram_u.get.ragm_desc, nob,
771 rxmsg->ram_u.get.ragm_cookie);
775 kranal_tx_done(tx, -EIO);
777 lnet_finalize(ni, lntmsg, -EIO);
781 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
784 kra_conn_t *conn = (kra_conn_t *)private;
786 LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
787 libcfs_nid2str(conn->rac_peer->rap_nid));
793 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
794 int delayed, unsigned int niov,
795 struct iovec *iov, lnet_kiov_t *kiov,
796 unsigned int offset, unsigned int mlen, unsigned int rlen)
798 kra_conn_t *conn = private;
799 kra_msg_t *rxmsg = conn->rac_rxmsg;
804 LASSERT (mlen <= rlen);
805 LASSERT (!cfs_in_interrupt());
806 /* Either all pages or all vaddrs */
807 LASSERT (!(kiov != NULL && iov != NULL));
809 CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
811 switch(rxmsg->ram_type) {
815 case RANAL_MSG_IMMEDIATE:
818 } else if (kiov != NULL) {
819 CERROR("Can't recv immediate into paged buffer\n");
823 while (offset >= iov->iov_len) {
824 offset -= iov->iov_len;
829 if (mlen > iov->iov_len - offset) {
830 CERROR("Can't handle immediate frags\n");
833 buffer = ((char *)iov->iov_base) + offset;
835 rc = kranal_consume_rxmsg(conn, buffer, mlen);
836 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
839 case RANAL_MSG_PUT_REQ:
840 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
842 kranal_consume_rxmsg(conn, NULL, 0);
846 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
848 kranal_tx_done(tx, rc);
849 kranal_consume_rxmsg(conn, NULL, 0);
854 rc = kranal_map_buffer(tx);
856 kranal_tx_done(tx, rc);
857 kranal_consume_rxmsg(conn, NULL, 0);
861 tx->tx_msg.ram_u.putack.rapam_src_cookie =
862 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
863 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
864 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
865 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
866 (__u64)((unsigned long)tx->tx_buffer);
867 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
869 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
871 kranal_post_fma(conn, tx);
872 kranal_consume_rxmsg(conn, NULL, 0);
875 case RANAL_MSG_GET_REQ:
876 if (lntmsg != NULL) {
878 kranal_reply(ni, conn, lntmsg);
881 tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
883 tx->tx_msg.ram_u.completion.racm_cookie =
884 rxmsg->ram_u.get.ragm_cookie;
885 kranal_post_fma(conn, tx);
888 kranal_consume_rxmsg(conn, NULL, 0);
894 kranal_thread_start (int(*fn)(void *arg), void *arg)
896 long pid = cfs_kernel_thread(fn, arg, 0);
901 cfs_atomic_inc(&kranal_data.kra_nthreads);
906 kranal_thread_fini (void)
908 cfs_atomic_dec(&kranal_data.kra_nthreads);
912 kranal_check_conn_timeouts (kra_conn_t *conn)
918 unsigned long now = jiffies;
920 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
921 conn->rac_state == RANAL_CONN_CLOSING);
923 if (!conn->rac_close_sent &&
924 cfs_time_aftereq(now, conn->rac_last_tx + conn->rac_keepalive *
926 /* not sent in a while; schedule conn so scheduler sends a keepalive */
927 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
928 conn, libcfs_nid2str(conn->rac_peer->rap_nid));
929 kranal_schedule_conn(conn);
932 timeout = conn->rac_timeout * CFS_HZ;
934 if (!conn->rac_close_recvd &&
935 cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
936 CERROR("%s received from %s within %lu seconds\n",
937 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
938 "Nothing" : "CLOSE not",
939 libcfs_nid2str(conn->rac_peer->rap_nid),
940 (now - conn->rac_last_rx)/CFS_HZ);
944 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
947 /* Check the conn's queues are moving. These are "belt+braces" checks,
948 * in case of hardware/software errors that make this conn seem
949 * responsive even though it isn't progressing its message queues. */
951 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
953 cfs_list_for_each (ttmp, &conn->rac_fmaq) {
954 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
956 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
957 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
958 CERROR("tx on fmaq for %s blocked %lu seconds\n",
959 libcfs_nid2str(conn->rac_peer->rap_nid),
960 (now - tx->tx_qtime)/CFS_HZ);
965 cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
966 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
968 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
969 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
970 CERROR("tx on rdmaq for %s blocked %lu seconds\n",
971 libcfs_nid2str(conn->rac_peer->rap_nid),
972 (now - tx->tx_qtime)/CFS_HZ);
977 cfs_list_for_each (ttmp, &conn->rac_replyq) {
978 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
980 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
981 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
982 CERROR("tx on replyq for %s blocked %lu seconds\n",
983 libcfs_nid2str(conn->rac_peer->rap_nid),
984 (now - tx->tx_qtime)/CFS_HZ);
989 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
994 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
996 cfs_list_t *conns = &kranal_data.kra_conns[idx];
1003 /* NB. We expect to check all the conns and not find any problems, so
1004 * we just use a shared lock while we take a look... */
1005 cfs_read_lock(&kranal_data.kra_global_lock);
1007 cfs_list_for_each (ctmp, conns) {
1008 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1010 if (conn->rac_timeout < *min_timeoutp )
1011 *min_timeoutp = conn->rac_timeout;
1012 if (conn->rac_keepalive < *min_timeoutp )
1013 *min_timeoutp = conn->rac_keepalive;
1015 rc = kranal_check_conn_timeouts(conn);
1019 kranal_conn_addref(conn);
1020 cfs_read_unlock(&kranal_data.kra_global_lock);
1022 CERROR("Conn to %s, cqid %d timed out\n",
1023 libcfs_nid2str(conn->rac_peer->rap_nid),
1026 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1028 switch (conn->rac_state) {
1032 case RANAL_CONN_ESTABLISHED:
1033 kranal_close_conn_locked(conn, -ETIMEDOUT);
1036 case RANAL_CONN_CLOSING:
1037 kranal_terminate_conn_locked(conn);
1041 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1044 kranal_conn_decref(conn);
1046 /* start again now I've dropped the lock */
1050 cfs_read_unlock(&kranal_data.kra_global_lock);
1054 kranal_connd (void *arg)
1056 long id = (long)arg;
1058 cfs_waitlink_t wait;
1059 unsigned long flags;
1061 kra_acceptsock_t *ras;
1064 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1065 cfs_daemonize(name);
1066 cfs_block_allsigs();
1068 cfs_waitlink_init(&wait);
1070 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1072 while (!kranal_data.kra_shutdown) {
1075 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1076 ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1077 kra_acceptsock_t, ras_list);
1078 cfs_list_del(&ras->ras_list);
1080 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1083 CDEBUG(D_NET,"About to handshake someone\n");
1085 kranal_conn_handshake(ras->ras_sock, NULL);
1086 kranal_free_acceptsock(ras);
1088 CDEBUG(D_NET,"Finished handshaking someone\n");
1090 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1095 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1096 peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1097 kra_peer_t, rap_connd_list);
1099 cfs_list_del_init(&peer->rap_connd_list);
1100 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1103 kranal_connect(peer);
1104 kranal_peer_decref(peer);
1106 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1114 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1115 cfs_waitq_add_exclusive(&kranal_data.kra_connd_waitq, &wait);
1117 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1119 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
1121 cfs_set_current_state(CFS_TASK_RUNNING);
1122 cfs_waitq_del(&kranal_data.kra_connd_waitq, &wait);
1124 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1127 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1129 kranal_thread_fini();
1134 kranal_update_reaper_timeout(long timeout)
1136 unsigned long flags;
1138 LASSERT (timeout > 0);
1140 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1142 if (timeout < kranal_data.kra_new_min_timeout)
1143 kranal_data.kra_new_min_timeout = timeout;
1145 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1149 kranal_reaper (void *arg)
1151 cfs_waitlink_t wait;
1152 unsigned long flags;
1155 int conn_entries = kranal_data.kra_conn_hash_size;
1157 int base_index = conn_entries - 1;
1158 unsigned long next_check_time = jiffies;
1159 long next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1160 long current_min_timeout = 1;
1162 cfs_daemonize("kranal_reaper");
1163 cfs_block_allsigs();
1165 cfs_waitlink_init(&wait);
1167 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1169 while (!kranal_data.kra_shutdown) {
1170 /* I wake up every 'p' seconds to check for timeouts on some
1171 * more peers. I try to check every connection 'n' times
1172 * within the global minimum of all keepalive and timeout
1173 * intervals, to ensure I attend to every connection within
1174 * (n+1)/n times its timeout intervals. */
1177 unsigned long min_timeout;
1180 /* careful with the jiffy wrap... */
1181 timeout = (long)(next_check_time - jiffies);
1183 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1184 cfs_waitq_add(&kranal_data.kra_reaper_waitq, &wait);
1186 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1189 cfs_waitq_timedwait(&wait, CFS_TASK_INTERRUPTIBLE,
1192 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1195 cfs_set_current_state(CFS_TASK_RUNNING);
1196 cfs_waitq_del(&kranal_data.kra_reaper_waitq, &wait);
1200 if (kranal_data.kra_new_min_timeout !=
1201 CFS_MAX_SCHEDULE_TIMEOUT) {
1202 /* new min timeout set: restart min timeout scan */
1203 next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1204 base_index = conn_index - 1;
1206 base_index = conn_entries - 1;
1208 if (kranal_data.kra_new_min_timeout <
1209 current_min_timeout) {
1210 current_min_timeout =
1211 kranal_data.kra_new_min_timeout;
1212 CDEBUG(D_NET, "Set new min timeout %ld\n",
1213 current_min_timeout);
1216 kranal_data.kra_new_min_timeout =
1217 CFS_MAX_SCHEDULE_TIMEOUT;
1219 min_timeout = current_min_timeout;
1221 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1223 LASSERT (min_timeout > 0);
1225 /* Compute how many table entries to check now so I get round
1226 * the whole table fast enough given that I do this at fixed
1227 * intervals of 'p' seconds) */
1228 chunk = conn_entries;
1229 if (min_timeout > n * p)
1230 chunk = (chunk * n * p) / min_timeout;
1234 for (i = 0; i < chunk; i++) {
1235 kranal_reaper_check(conn_index,
1237 conn_index = (conn_index + 1) % conn_entries;
1240 next_check_time += p * CFS_HZ;
1242 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1244 if (((conn_index - chunk <= base_index &&
1245 base_index < conn_index) ||
1246 (conn_index - conn_entries - chunk <= base_index &&
1247 base_index < conn_index - conn_entries))) {
1249 /* Scanned all conns: set current_min_timeout... */
1250 if (current_min_timeout != next_min_timeout) {
1251 current_min_timeout = next_min_timeout;
1252 CDEBUG(D_NET, "Set new min timeout %ld\n",
1253 current_min_timeout);
1256 /* ...and restart min timeout scan */
1257 next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1258 base_index = conn_index - 1;
1260 base_index = conn_entries - 1;
1264 kranal_thread_fini();
1269 kranal_check_rdma_cq (kra_device_t *dev)
1274 unsigned long flags;
1275 RAP_RDMA_DESCRIPTOR *desc;
1280 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1281 if (rrc == RAP_NOT_DONE) {
1282 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1286 LASSERT (rrc == RAP_SUCCESS);
1287 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1289 cfs_read_lock(&kranal_data.kra_global_lock);
1291 conn = kranal_cqid2conn_locked(cqid);
1293 /* Conn was destroyed? */
1294 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1295 cfs_read_unlock(&kranal_data.kra_global_lock);
1299 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1300 LASSERT (rrc == RAP_SUCCESS);
1302 CDEBUG(D_NET, "Completed %p\n",
1303 cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1305 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1307 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1308 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1309 cfs_list_del(&tx->tx_list);
1311 LASSERT(desc->AppPtr == (void *)tx);
1312 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1313 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1315 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1316 tx->tx_qtime = jiffies;
1318 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1320 /* Get conn's fmaq processed, now I've just put something
1322 kranal_schedule_conn(conn);
1324 cfs_read_unlock(&kranal_data.kra_global_lock);
1329 kranal_check_fma_cq (kra_device_t *dev)
1340 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1341 if (rrc == RAP_NOT_DONE) {
1342 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1346 LASSERT (rrc == RAP_SUCCESS);
1348 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1350 cfs_read_lock(&kranal_data.kra_global_lock);
1352 conn = kranal_cqid2conn_locked(cqid);
1354 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1357 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1359 kranal_schedule_conn(conn);
1362 cfs_read_unlock(&kranal_data.kra_global_lock);
1366 /* FMA CQ has overflowed: check ALL conns */
1367 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
1370 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1372 cfs_read_lock(&kranal_data.kra_global_lock);
1374 conns = &kranal_data.kra_conns[i];
1376 cfs_list_for_each (tmp, conns) {
1377 conn = cfs_list_entry(tmp, kra_conn_t,
1380 if (conn->rac_device == dev)
1381 kranal_schedule_conn(conn);
1384 /* don't block write lockers for too long... */
1385 cfs_read_unlock(&kranal_data.kra_global_lock);
1391 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1392 void *immediate, int immediatenob)
1394 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1397 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1398 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1399 immediate, immediatenob);
1401 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1402 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1403 immediatenob <= RANAL_FMA_MAX_DATA :
1406 msg->ram_connstamp = conn->rac_my_connstamp;
1407 msg->ram_seq = conn->rac_tx_seq;
1410 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1411 immediate, immediatenob,
1414 rrc = RapkFmaSend(conn->rac_rihandle,
1415 immediate, immediatenob,
1423 conn->rac_last_tx = jiffies;
1428 if (cfs_time_aftereq(jiffies,
1429 conn->rac_last_tx + conn->rac_keepalive *
1431 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1433 (jiffies - conn->rac_last_tx)/CFS_HZ);
1439 kranal_process_fmaq (kra_conn_t *conn)
1441 unsigned long flags;
1447 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1448 * However I will be rescheduled by an FMA completion event
1449 * when I eventually get some.
1450 * NB 2. Sampling rac_state here races with setting it elsewhere.
1451 * But it doesn't matter if I try to send a "real" message just
1452 * as I start closing because I'll get scheduled to send the
1455 /* Not racing with incoming message processing! */
1456 LASSERT (current == conn->rac_device->rad_scheduler);
1458 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1459 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1460 /* RDMAs in progress */
1461 LASSERT (!conn->rac_close_sent);
1463 if (cfs_time_aftereq(jiffies,
1465 conn->rac_keepalive * CFS_HZ)) {
1466 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1467 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1468 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1473 if (conn->rac_close_sent)
1476 CWARN("sending CLOSE to %s\n",
1477 libcfs_nid2str(conn->rac_peer->rap_nid));
1478 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1479 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1483 conn->rac_close_sent = 1;
1484 if (!conn->rac_close_recvd)
1487 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1489 if (conn->rac_state == RANAL_CONN_CLOSING)
1490 kranal_terminate_conn_locked(conn);
1492 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1497 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1499 if (cfs_list_empty(&conn->rac_fmaq)) {
1501 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1503 if (cfs_time_aftereq(jiffies,
1504 conn->rac_last_tx + conn->rac_keepalive *
1506 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1507 libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1508 (jiffies - conn->rac_last_tx)/CFS_HZ,
1509 conn->rac_keepalive);
1510 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1511 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1516 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1517 cfs_list_del(&tx->tx_list);
1518 more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1520 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1523 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1524 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1525 switch (tx->tx_msg.ram_type) {
1529 case RANAL_MSG_IMMEDIATE:
1530 rc = kranal_sendmsg(conn, &tx->tx_msg,
1531 tx->tx_buffer, tx->tx_nob);
1534 case RANAL_MSG_PUT_NAK:
1535 case RANAL_MSG_PUT_DONE:
1536 case RANAL_MSG_GET_NAK:
1537 case RANAL_MSG_GET_DONE:
1538 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1541 case RANAL_MSG_PUT_REQ:
1542 rc = kranal_map_buffer(tx);
1543 LASSERT (rc != -EAGAIN);
1547 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1548 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1552 case RANAL_MSG_PUT_ACK:
1553 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1557 case RANAL_MSG_GET_REQ:
1558 rc = kranal_map_buffer(tx);
1559 LASSERT (rc != -EAGAIN);
1563 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1564 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1565 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1566 (__u64)((unsigned long)tx->tx_buffer);
1567 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1568 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1573 if (rc == -EAGAIN) {
1574 /* I need credits to send this. Replace tx at the head of the
1575 * fmaq and I'll get rescheduled when credits appear */
1576 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1577 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1578 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1579 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1583 if (!expect_reply || rc != 0) {
1584 kranal_tx_done(tx, rc);
1586 /* LASSERT(current) above ensures this doesn't race with reply
1588 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1589 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1590 tx->tx_qtime = jiffies;
1591 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1595 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1596 kranal_schedule_conn(conn);
1601 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1603 __swab64s(&d->rard_key.Key);
1604 __swab16s(&d->rard_key.Cookie);
1605 __swab16s(&d->rard_key.MdHandle);
1606 __swab32s(&d->rard_key.Flags);
1607 __swab64s(&d->rard_addr.AddressBits);
1608 __swab32s(&d->rard_nob);
1612 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1616 unsigned long flags;
1618 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1620 cfs_list_for_each(ttmp, &conn->rac_replyq) {
1621 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1623 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1624 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1626 if (tx->tx_cookie != cookie)
1629 if (tx->tx_msg.ram_type != type) {
1630 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1631 CWARN("Unexpected type %x (%x expected) "
1632 "matched reply from %s\n",
1633 tx->tx_msg.ram_type, type,
1634 libcfs_nid2str(conn->rac_peer->rap_nid));
1638 cfs_list_del(&tx->tx_list);
1639 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1643 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1644 CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1645 type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1650 kranal_check_fma_rx (kra_conn_t *conn)
1652 unsigned long flags;
1657 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1658 kra_peer_t *peer = conn->rac_peer;
1662 if (rrc == RAP_NOT_DONE)
1665 CDEBUG(D_NET, "RX on %p\n", conn);
1667 LASSERT (rrc == RAP_SUCCESS);
1668 conn->rac_last_rx = jiffies;
1669 seq = conn->rac_rx_seq++;
1670 msg = (kra_msg_t *)prefix;
1672 /* stash message for portals callbacks they'll NULL
1673 * rac_rxmsg if they consume it */
1674 LASSERT (conn->rac_rxmsg == NULL);
1675 conn->rac_rxmsg = msg;
1677 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1678 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1679 CERROR("Unexpected magic %08x from %s\n",
1680 msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1685 __swab32s(&msg->ram_magic);
1686 __swab16s(&msg->ram_version);
1687 __swab16s(&msg->ram_type);
1688 __swab64s(&msg->ram_srcnid);
1689 __swab64s(&msg->ram_connstamp);
1690 __swab32s(&msg->ram_seq);
1692 /* NB message type checked below; NOT here... */
1693 switch (msg->ram_type) {
1694 case RANAL_MSG_PUT_ACK:
1695 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1698 case RANAL_MSG_GET_REQ:
1699 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1707 if (msg->ram_version != RANAL_MSG_VERSION) {
1708 CERROR("Unexpected protocol version %d from %s\n",
1709 msg->ram_version, libcfs_nid2str(peer->rap_nid));
1714 if (msg->ram_srcnid != peer->rap_nid) {
1715 CERROR("Unexpected peer %s from %s\n",
1716 libcfs_nid2str(msg->ram_srcnid),
1717 libcfs_nid2str(peer->rap_nid));
1722 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1723 CERROR("Unexpected connstamp "LPX64"("LPX64
1724 " expected) from %s\n",
1725 msg->ram_connstamp, conn->rac_peer_connstamp,
1726 libcfs_nid2str(peer->rap_nid));
1731 if (msg->ram_seq != seq) {
1732 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1733 msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1738 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1739 /* This message signals RDMA completion... */
1740 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1741 if (rrc != RAP_SUCCESS) {
1742 CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1748 if (conn->rac_close_recvd) {
1749 CERROR("Unexpected message %d after CLOSE from %s\n",
1750 msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1755 if (msg->ram_type == RANAL_MSG_CLOSE) {
1756 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1757 conn->rac_close_recvd = 1;
1758 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1760 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1761 kranal_close_conn_locked(conn, 0);
1762 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1763 conn->rac_close_sent)
1764 kranal_terminate_conn_locked(conn);
1766 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1771 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1774 switch (msg->ram_type) {
1775 case RANAL_MSG_NOOP:
1776 /* Nothing to do; just a keepalive */
1777 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1780 case RANAL_MSG_IMMEDIATE:
1781 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1782 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
1783 msg->ram_srcnid, conn, 0);
1787 case RANAL_MSG_PUT_REQ:
1788 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1789 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
1790 msg->ram_srcnid, conn, 1);
1794 case RANAL_MSG_PUT_NAK:
1795 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1796 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1797 msg->ram_u.completion.racm_cookie);
1801 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1802 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1803 kranal_tx_done(tx, -ENOENT); /* no match */
1806 case RANAL_MSG_PUT_ACK:
1807 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1808 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1809 msg->ram_u.putack.rapam_src_cookie);
1813 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1814 &msg->ram_u.putack.rapam_desc,
1815 msg->ram_u.putack.rapam_desc.rard_nob,
1816 msg->ram_u.putack.rapam_dst_cookie);
1819 case RANAL_MSG_PUT_DONE:
1820 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1821 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1822 msg->ram_u.completion.racm_cookie);
1826 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1827 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1828 kranal_tx_done(tx, 0);
1831 case RANAL_MSG_GET_REQ:
1832 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1833 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
1834 msg->ram_srcnid, conn, 1);
1838 case RANAL_MSG_GET_NAK:
1839 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1840 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1841 msg->ram_u.completion.racm_cookie);
1845 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1846 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1847 kranal_tx_done(tx, -ENOENT); /* no match */
1850 case RANAL_MSG_GET_DONE:
1851 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1852 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1853 msg->ram_u.completion.racm_cookie);
1857 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1858 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1860 /* completion message should send rdma length if we ever allow
1862 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1864 kranal_tx_done(tx, 0);
1869 if (rc < 0) /* protocol/comms error */
1870 kranal_close_conn (conn, rc);
1872 if (repost && conn->rac_rxmsg != NULL)
1873 kranal_consume_rxmsg(conn, NULL, 0);
1875 /* check again later */
1876 kranal_schedule_conn(conn);
1880 kranal_complete_closed_conn (kra_conn_t *conn)
1886 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1887 LASSERT (cfs_list_empty(&conn->rac_list));
1888 LASSERT (cfs_list_empty(&conn->rac_hashlist));
1890 for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1891 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1893 cfs_list_del(&tx->tx_list);
1894 kranal_tx_done(tx, -ECONNABORTED);
1897 LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1899 for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1900 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1902 cfs_list_del(&tx->tx_list);
1903 kranal_tx_done(tx, -ECONNABORTED);
1906 CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1907 conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1911 kranal_process_new_conn (kra_conn_t *conn)
1915 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1916 if (rrc == RAP_SUCCESS)
1919 LASSERT (rrc == RAP_NOT_DONE);
1920 if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1921 conn->rac_timeout * CFS_HZ))
1925 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1926 LASSERT (rrc == RAP_SUCCESS);
1931 kranal_scheduler (void *arg)
1933 kra_device_t *dev = (kra_device_t *)arg;
1934 cfs_waitlink_t wait;
1937 unsigned long flags;
1938 unsigned long deadline;
1939 unsigned long soonest;
1948 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1949 cfs_daemonize(name);
1950 cfs_block_allsigs();
1952 dev->rad_scheduler = current;
1953 cfs_waitlink_init(&wait);
1955 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1957 while (!kranal_data.kra_shutdown) {
1958 /* Safe: kra_shutdown only set when quiescent */
1960 if (busy_loops++ >= RANAL_RESCHED) {
1961 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1966 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1971 if (dev->rad_ready) {
1972 /* Device callback fired since I last checked it */
1974 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1977 kranal_check_rdma_cq(dev);
1978 kranal_check_fma_cq(dev);
1980 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1983 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1984 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1986 cfs_list_del_init(&conn->rac_schedlist);
1987 LASSERT (conn->rac_scheduled);
1988 conn->rac_scheduled = 0;
1989 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1992 kranal_check_fma_rx(conn);
1993 kranal_process_fmaq(conn);
1995 if (conn->rac_state == RANAL_CONN_CLOSED)
1996 kranal_complete_closed_conn(conn);
1998 kranal_conn_decref(conn);
1999 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2005 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2006 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
2008 deadline = conn->rac_last_tx + conn->rac_keepalive;
2009 if (cfs_time_aftereq(jiffies, deadline)) {
2010 /* Time to process this new conn */
2011 cfs_spin_unlock_irqrestore(&dev->rad_lock,
2015 rc = kranal_process_new_conn(conn);
2016 if (rc != -EAGAIN) {
2017 /* All done with this conn */
2018 cfs_spin_lock_irqsave(&dev->rad_lock,
2020 cfs_list_del_init(&conn->rac_schedlist);
2021 cfs_spin_unlock_irqrestore(&dev-> \
2025 kranal_conn_decref(conn);
2026 cfs_spin_lock_irqsave(&dev->rad_lock,
2031 /* retry with exponential backoff until HZ */
2032 if (conn->rac_keepalive == 0)
2033 conn->rac_keepalive = 1;
2034 else if (conn->rac_keepalive <= CFS_HZ)
2035 conn->rac_keepalive *= 2;
2037 conn->rac_keepalive += CFS_HZ;
2039 deadline = conn->rac_last_tx + conn->rac_keepalive;
2040 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2043 /* Does this conn need attention soonest? */
2044 if (nsoonest++ == 0 ||
2045 !cfs_time_aftereq(deadline, soonest))
2049 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2052 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2053 cfs_waitq_add_exclusive(&dev->rad_waitq, &wait);
2054 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
2056 if (nsoonest == 0) {
2058 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
2060 timeout = (long)(soonest - jiffies);
2063 cfs_waitq_timedwait(&wait,
2064 CFS_TASK_INTERRUPTIBLE,
2069 cfs_waitq_del(&dev->rad_waitq, &wait);
2070 cfs_set_current_state(CFS_TASK_RUNNING);
2071 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2074 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
2076 dev->rad_scheduler = NULL;
2077 kranal_thread_fini();