1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
33 CDEBUG(D_NET, "callback for device %d\n", devid);
35 for (i = 0; i < kranal_data.kra_ndevs; i++) {
37 dev = &kranal_data.kra_devices[i];
38 if (dev->rad_id != devid)
41 spin_lock_irqsave(&dev->rad_lock, flags);
43 if (!dev->rad_ready) {
45 wake_up(&dev->rad_waitq);
48 spin_unlock_irqrestore(&dev->rad_lock, flags);
52 CWARN("callback for unknown device %d\n", devid);
56 kranal_schedule_conn(kra_conn_t *conn)
58 kra_device_t *dev = conn->rac_device;
61 spin_lock_irqsave(&dev->rad_lock, flags);
63 if (!conn->rac_scheduled) {
64 kranal_conn_addref(conn); /* +1 ref for scheduler */
65 conn->rac_scheduled = 1;
66 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
67 wake_up(&dev->rad_waitq);
70 spin_unlock_irqrestore(&dev->rad_lock, flags);
74 kranal_get_idle_tx (void)
79 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
81 if (list_empty(&kranal_data.kra_idle_txs)) {
82 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
86 tx = list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
87 list_del(&tx->tx_list);
89 /* Allocate a new completion cookie. It might not be needed, but we've
90 * got a lock right now... */
91 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
93 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
95 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
96 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
97 LASSERT (tx->tx_conn == NULL);
98 LASSERT (tx->tx_lntmsg[0] == NULL);
99 LASSERT (tx->tx_lntmsg[1] == NULL);
105 kranal_init_msg(kra_msg_t *msg, int type)
107 msg->ram_magic = RANAL_MSG_MAGIC;
108 msg->ram_version = RANAL_MSG_VERSION;
109 msg->ram_type = type;
110 msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
111 /* ram_connstamp gets set when FMA is sent */
115 kranal_new_tx_msg (int type)
117 kra_tx_t *tx = kranal_get_idle_tx();
120 kranal_init_msg(&tx->tx_msg, type);
126 kranal_setup_immediate_buffer (kra_tx_t *tx,
127 unsigned int niov, struct iovec *iov,
131 /* For now this is almost identical to kranal_setup_virt_buffer, but we
132 * could "flatten" the payload into a single contiguous buffer ready
133 * for sending direct over an FMA if we ever needed to. */
135 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
139 tx->tx_buffer = NULL;
143 while (offset >= iov->iov_len) {
144 offset -= iov->iov_len;
150 if (nob > iov->iov_len - offset) {
151 CERROR("Can't handle multiple vaddr fragments\n");
155 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
158 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
164 kranal_setup_virt_buffer (kra_tx_t *tx,
165 unsigned int niov, struct iovec *iov,
171 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
173 while (offset >= iov->iov_len) {
174 offset -= iov->iov_len;
180 if (nob > iov->iov_len - offset) {
181 CERROR("Can't handle multiple vaddr fragments\n");
185 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
187 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
192 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
195 RAP_PHYS_REGION *phys = tx->tx_phys;
198 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
202 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
204 while (offset >= kiov->kiov_len) {
205 offset -= kiov->kiov_len;
211 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
213 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
215 phys->Address = lnet_page2phys(kiov->kiov_page);
218 resid = nob - (kiov->kiov_len - offset);
224 if (kiov->kiov_offset != 0 ||
225 ((resid > PAGE_SIZE) &&
226 kiov->kiov_len < PAGE_SIZE)) {
227 /* Can't have gaps */
228 CERROR("Can't make payload contiguous in I/O VM:"
229 "page %d, offset %d, len %d \n",
230 (int)(phys - tx->tx_phys),
231 kiov->kiov_offset, kiov->kiov_len);
235 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
236 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
240 phys->Address = lnet_page2phys(kiov->kiov_page);
246 tx->tx_phys_npages = phys - tx->tx_phys;
251 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
252 struct iovec *iov, lnet_kiov_t *kiov,
255 LASSERT ((iov == NULL) != (kiov == NULL));
258 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
260 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
264 kranal_map_buffer (kra_tx_t *tx)
266 kra_conn_t *conn = tx->tx_conn;
267 kra_device_t *dev = conn->rac_device;
270 LASSERT (current == dev->rad_scheduler);
272 switch (tx->tx_buftype) {
277 case RANAL_BUF_IMMEDIATE:
278 case RANAL_BUF_PHYS_MAPPED:
279 case RANAL_BUF_VIRT_MAPPED:
282 case RANAL_BUF_PHYS_UNMAPPED:
283 rrc = RapkRegisterPhys(dev->rad_handle,
284 tx->tx_phys, tx->tx_phys_npages,
286 if (rrc != RAP_SUCCESS) {
287 CERROR ("Can't map %d pages: dev %d "
288 "phys %u pp %u, virt %u nob %lu\n",
289 tx->tx_phys_npages, dev->rad_id,
290 dev->rad_nphysmap, dev->rad_nppphysmap,
291 dev->rad_nvirtmap, dev->rad_nobvirtmap);
292 return -ENOMEM; /* assume insufficient resources */
296 dev->rad_nppphysmap += tx->tx_phys_npages;
298 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
301 case RANAL_BUF_VIRT_UNMAPPED:
302 rrc = RapkRegisterMemory(dev->rad_handle,
303 tx->tx_buffer, tx->tx_nob,
305 if (rrc != RAP_SUCCESS) {
306 CERROR ("Can't map %d bytes: dev %d "
307 "phys %u pp %u, virt %u nob %lu\n",
308 tx->tx_nob, dev->rad_id,
309 dev->rad_nphysmap, dev->rad_nppphysmap,
310 dev->rad_nvirtmap, dev->rad_nobvirtmap);
311 return -ENOMEM; /* assume insufficient resources */
315 dev->rad_nobvirtmap += tx->tx_nob;
317 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
323 kranal_unmap_buffer (kra_tx_t *tx)
328 switch (tx->tx_buftype) {
333 case RANAL_BUF_IMMEDIATE:
334 case RANAL_BUF_PHYS_UNMAPPED:
335 case RANAL_BUF_VIRT_UNMAPPED:
338 case RANAL_BUF_PHYS_MAPPED:
339 LASSERT (tx->tx_conn != NULL);
340 dev = tx->tx_conn->rac_device;
341 LASSERT (current == dev->rad_scheduler);
342 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
344 LASSERT (rrc == RAP_SUCCESS);
347 dev->rad_nppphysmap -= tx->tx_phys_npages;
349 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
352 case RANAL_BUF_VIRT_MAPPED:
353 LASSERT (tx->tx_conn != NULL);
354 dev = tx->tx_conn->rac_device;
355 LASSERT (current == dev->rad_scheduler);
356 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
358 LASSERT (rrc == RAP_SUCCESS);
361 dev->rad_nobvirtmap -= tx->tx_nob;
363 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
369 kranal_tx_done (kra_tx_t *tx, int completion)
371 lnet_msg_t *lnetmsg[2];
375 LASSERT (!in_interrupt());
377 kranal_unmap_buffer(tx);
379 lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
380 lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
382 tx->tx_buftype = RANAL_BUF_NONE;
383 tx->tx_msg.ram_type = RANAL_MSG_NONE;
386 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
388 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
390 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
392 /* finalize AFTER freeing lnet msgs */
393 for (i = 0; i < 2; i++) {
394 if (lnetmsg[i] == NULL)
397 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
402 kranal_find_conn_locked (kra_peer_t *peer)
404 struct list_head *tmp;
406 /* just return the first connection */
407 list_for_each (tmp, &peer->rap_conns) {
408 return list_entry(tmp, kra_conn_t, rac_list);
415 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
421 spin_lock_irqsave(&conn->rac_lock, flags);
422 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
423 tx->tx_qtime = jiffies;
424 spin_unlock_irqrestore(&conn->rac_lock, flags);
426 kranal_schedule_conn(conn);
430 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
437 rwlock_t *g_lock = &kranal_data.kra_global_lock;
439 /* If I get here, I've committed to send, so I complete the tx with
440 * failure on any problems */
442 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
444 for (retry = 0; ; retry = 1) {
448 peer = kranal_find_peer_locked(nid);
450 conn = kranal_find_conn_locked(peer);
452 kranal_post_fma(conn, tx);
458 /* Making connections; I'll need a write lock... */
460 write_lock_irqsave(g_lock, flags);
462 peer = kranal_find_peer_locked(nid);
466 write_unlock_irqrestore(g_lock, flags);
469 CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
470 kranal_tx_done(tx, -EHOSTUNREACH);
474 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
475 lnet_acceptor_port());
477 CERROR("Can't add peer %s: %d\n",
478 libcfs_nid2str(nid), rc);
479 kranal_tx_done(tx, rc);
484 conn = kranal_find_conn_locked(peer);
486 /* Connection exists; queue message on it */
487 kranal_post_fma(conn, tx);
488 write_unlock_irqrestore(g_lock, flags);
492 LASSERT (peer->rap_persistence > 0);
494 if (!peer->rap_connecting) {
495 LASSERT (list_empty(&peer->rap_tx_queue));
497 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
498 time_after_eq(jiffies, peer->rap_reconnect_time))) {
499 write_unlock_irqrestore(g_lock, flags);
500 kranal_tx_done(tx, -EHOSTUNREACH);
504 peer->rap_connecting = 1;
505 kranal_peer_addref(peer); /* extra ref for connd */
507 spin_lock(&kranal_data.kra_connd_lock);
509 list_add_tail(&peer->rap_connd_list,
510 &kranal_data.kra_connd_peers);
511 wake_up(&kranal_data.kra_connd_waitq);
513 spin_unlock(&kranal_data.kra_connd_lock);
516 /* A connection is being established; queue the message... */
517 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
519 write_unlock_irqrestore(g_lock, flags);
523 kranal_rdma(kra_tx_t *tx, int type,
524 kra_rdma_desc_t *sink, int nob, __u64 cookie)
526 kra_conn_t *conn = tx->tx_conn;
530 LASSERT (kranal_tx_mapped(tx));
531 LASSERT (nob <= sink->rard_nob);
532 LASSERT (nob <= tx->tx_nob);
534 /* No actual race with scheduler sending CLOSE (I'm she!) */
535 LASSERT (current == conn->rac_device->rad_scheduler);
537 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
538 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
539 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
540 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
541 tx->tx_rdma_desc.DstKey = sink->rard_key;
542 tx->tx_rdma_desc.Length = nob;
543 tx->tx_rdma_desc.AppPtr = tx;
545 /* prep final completion message */
546 kranal_init_msg(&tx->tx_msg, type);
547 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
549 if (nob == 0) { /* Immediate completion */
550 kranal_post_fma(conn, tx);
554 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
556 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
557 LASSERT (rrc == RAP_SUCCESS);
559 spin_lock_irqsave(&conn->rac_lock, flags);
560 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
561 tx->tx_qtime = jiffies;
562 spin_unlock_irqrestore(&conn->rac_lock, flags);
566 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
568 __u32 nob_received = nob;
571 LASSERT (conn->rac_rxmsg != NULL);
572 CDEBUG(D_NET, "Consuming %p\n", conn);
574 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
575 &nob_received, sizeof(kra_msg_t));
576 LASSERT (rrc == RAP_SUCCESS);
578 conn->rac_rxmsg = NULL;
580 if (nob_received < nob) {
581 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
582 libcfs_nid2str(conn->rac_peer->rap_nid),
591 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
593 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
594 int type = lntmsg->msg_type;
595 lnet_process_id_t target = lntmsg->msg_target;
596 int target_is_router = lntmsg->msg_target_is_router;
597 int routing = lntmsg->msg_routing;
598 unsigned int niov = lntmsg->msg_niov;
599 struct iovec *iov = lntmsg->msg_iov;
600 lnet_kiov_t *kiov = lntmsg->msg_kiov;
601 unsigned int offset = lntmsg->msg_offset;
602 unsigned int nob = lntmsg->msg_len;
606 /* NB 'private' is different depending on what we're sending.... */
608 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
609 nob, niov, libcfs_id2str(target));
611 LASSERT (nob == 0 || niov > 0);
612 LASSERT (niov <= LNET_MAX_IOV);
614 LASSERT (!in_interrupt());
615 /* payload is either all vaddrs or all pages */
616 LASSERT (!(kiov != NULL && iov != NULL));
619 CERROR ("Can't route\n");
634 /* We have to consider the eventual sink buffer rather than any
635 * payload passed here (there isn't any, and strictly, looking
636 * inside lntmsg is a layering violation). We send a simple
637 * IMMEDIATE GET if the sink buffer is mapped already and small
640 if (routing || target_is_router)
641 break; /* send IMMEDIATE */
643 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
644 lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
645 lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
646 break; /* send IMMEDIATE */
648 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
652 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
653 rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
654 lntmsg->msg_md->md_iov.iov,
655 0, lntmsg->msg_md->md_length);
657 rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
658 lntmsg->msg_md->md_iov.kiov,
659 0, lntmsg->msg_md->md_length);
661 kranal_tx_done(tx, rc);
665 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
666 if (tx->tx_lntmsg[1] == NULL) {
667 CERROR("Can't create reply for GET to %s\n",
668 libcfs_nid2str(target.nid));
669 kranal_tx_done(tx, rc);
673 tx->tx_lntmsg[0] = lntmsg;
674 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
675 /* rest of tx_msg is setup just before it is sent */
676 kranal_launch_tx(tx, target.nid);
681 if (kiov == NULL && /* not paged */
682 nob <= RANAL_FMA_MAX_DATA && /* small enough */
683 nob <= *kranal_tunables.kra_max_immediate)
684 break; /* send IMMEDIATE */
686 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
690 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
692 kranal_tx_done(tx, rc);
696 tx->tx_lntmsg[0] = lntmsg;
697 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
698 /* rest of tx_msg is setup just before it is sent */
699 kranal_launch_tx(tx, target.nid);
705 LASSERT (kiov == NULL);
706 LASSERT (nob <= RANAL_FMA_MAX_DATA);
708 tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
712 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
714 kranal_tx_done(tx, rc);
718 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
719 tx->tx_lntmsg[0] = lntmsg;
720 kranal_launch_tx(tx, target.nid);
725 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
727 kra_msg_t *rxmsg = conn->rac_rxmsg;
728 unsigned int niov = lntmsg->msg_niov;
729 struct iovec *iov = lntmsg->msg_iov;
730 lnet_kiov_t *kiov = lntmsg->msg_kiov;
731 unsigned int offset = lntmsg->msg_offset;
732 unsigned int nob = lntmsg->msg_len;
736 tx = kranal_get_idle_tx();
740 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
746 rc = kranal_map_buffer(tx);
750 tx->tx_lntmsg[0] = lntmsg;
752 kranal_rdma(tx, RANAL_MSG_GET_DONE,
753 &rxmsg->ram_u.get.ragm_desc, nob,
754 rxmsg->ram_u.get.ragm_cookie);
758 kranal_tx_done(tx, -EIO);
760 lnet_finalize(ni, lntmsg, -EIO);
764 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
767 kra_conn_t *conn = (kra_conn_t *)private;
769 LCONSOLE_ERROR("Dropping message from %s: no buffers free.\n",
770 libcfs_nid2str(conn->rac_peer->rap_nid));
776 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
777 int delayed, unsigned int niov,
778 struct iovec *iov, lnet_kiov_t *kiov,
779 unsigned int offset, unsigned int mlen, unsigned int rlen)
781 kra_conn_t *conn = private;
782 kra_msg_t *rxmsg = conn->rac_rxmsg;
787 LASSERT (mlen <= rlen);
788 LASSERT (!in_interrupt());
789 /* Either all pages or all vaddrs */
790 LASSERT (!(kiov != NULL && iov != NULL));
792 CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
794 switch(rxmsg->ram_type) {
798 case RANAL_MSG_IMMEDIATE:
801 } else if (kiov != NULL) {
802 CERROR("Can't recv immediate into paged buffer\n");
806 while (offset >= iov->iov_len) {
807 offset -= iov->iov_len;
812 if (mlen > iov->iov_len - offset) {
813 CERROR("Can't handle immediate frags\n");
816 buffer = ((char *)iov->iov_base) + offset;
818 rc = kranal_consume_rxmsg(conn, buffer, mlen);
819 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
822 case RANAL_MSG_PUT_REQ:
823 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
825 kranal_consume_rxmsg(conn, NULL, 0);
829 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
831 kranal_tx_done(tx, rc);
832 kranal_consume_rxmsg(conn, NULL, 0);
837 rc = kranal_map_buffer(tx);
839 kranal_tx_done(tx, rc);
840 kranal_consume_rxmsg(conn, NULL, 0);
844 tx->tx_msg.ram_u.putack.rapam_src_cookie =
845 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
846 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
847 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
848 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
849 (__u64)((unsigned long)tx->tx_buffer);
850 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
852 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
854 kranal_post_fma(conn, tx);
855 kranal_consume_rxmsg(conn, NULL, 0);
858 case RANAL_MSG_GET_REQ:
859 if (lntmsg != NULL) {
861 kranal_reply(ni, conn, lntmsg);
864 tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
866 tx->tx_msg.ram_u.completion.racm_cookie =
867 rxmsg->ram_u.get.ragm_cookie;
868 kranal_post_fma(conn, tx);
871 kranal_consume_rxmsg(conn, NULL, 0);
877 kranal_thread_start (int(*fn)(void *arg), void *arg)
879 long pid = kernel_thread(fn, arg, 0);
884 atomic_inc(&kranal_data.kra_nthreads);
889 kranal_thread_fini (void)
891 atomic_dec(&kranal_data.kra_nthreads);
895 kranal_check_conn_timeouts (kra_conn_t *conn)
898 struct list_head *ttmp;
901 unsigned long now = jiffies;
903 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
904 conn->rac_state == RANAL_CONN_CLOSING);
906 if (!conn->rac_close_sent &&
907 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
908 /* not sent in a while; schedule conn so scheduler sends a keepalive */
909 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
910 conn, libcfs_nid2str(conn->rac_peer->rap_nid));
911 kranal_schedule_conn(conn);
914 timeout = conn->rac_timeout * HZ;
916 if (!conn->rac_close_recvd &&
917 time_after_eq(now, conn->rac_last_rx + timeout)) {
918 CERROR("%s received from %s within %lu seconds\n",
919 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
920 "Nothing" : "CLOSE not",
921 libcfs_nid2str(conn->rac_peer->rap_nid),
922 (now - conn->rac_last_rx)/HZ);
926 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
929 /* Check the conn's queues are moving. These are "belt+braces" checks,
930 * in case of hardware/software errors that make this conn seem
931 * responsive even though it isn't progressing its message queues. */
933 spin_lock_irqsave(&conn->rac_lock, flags);
935 list_for_each (ttmp, &conn->rac_fmaq) {
936 tx = list_entry(ttmp, kra_tx_t, tx_list);
938 if (time_after_eq(now, tx->tx_qtime + timeout)) {
939 spin_unlock_irqrestore(&conn->rac_lock, flags);
940 CERROR("tx on fmaq for %s blocked %lu seconds\n",
941 libcfs_nid2str(conn->rac_peer->rap_nid),
942 (now - tx->tx_qtime)/HZ);
947 list_for_each (ttmp, &conn->rac_rdmaq) {
948 tx = list_entry(ttmp, kra_tx_t, tx_list);
950 if (time_after_eq(now, tx->tx_qtime + timeout)) {
951 spin_unlock_irqrestore(&conn->rac_lock, flags);
952 CERROR("tx on rdmaq for %s blocked %lu seconds\n",
953 libcfs_nid2str(conn->rac_peer->rap_nid),
954 (now - tx->tx_qtime)/HZ);
959 list_for_each (ttmp, &conn->rac_replyq) {
960 tx = list_entry(ttmp, kra_tx_t, tx_list);
962 if (time_after_eq(now, tx->tx_qtime + timeout)) {
963 spin_unlock_irqrestore(&conn->rac_lock, flags);
964 CERROR("tx on replyq for %s blocked %lu seconds\n",
965 libcfs_nid2str(conn->rac_peer->rap_nid),
966 (now - tx->tx_qtime)/HZ);
971 spin_unlock_irqrestore(&conn->rac_lock, flags);
976 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
978 struct list_head *conns = &kranal_data.kra_conns[idx];
979 struct list_head *ctmp;
985 /* NB. We expect to check all the conns and not find any problems, so
986 * we just use a shared lock while we take a look... */
987 read_lock(&kranal_data.kra_global_lock);
989 list_for_each (ctmp, conns) {
990 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
992 if (conn->rac_timeout < *min_timeoutp )
993 *min_timeoutp = conn->rac_timeout;
994 if (conn->rac_keepalive < *min_timeoutp )
995 *min_timeoutp = conn->rac_keepalive;
997 rc = kranal_check_conn_timeouts(conn);
1001 kranal_conn_addref(conn);
1002 read_unlock(&kranal_data.kra_global_lock);
1004 CERROR("Conn to %s, cqid %d timed out\n",
1005 libcfs_nid2str(conn->rac_peer->rap_nid),
1008 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1010 switch (conn->rac_state) {
1014 case RANAL_CONN_ESTABLISHED:
1015 kranal_close_conn_locked(conn, -ETIMEDOUT);
1018 case RANAL_CONN_CLOSING:
1019 kranal_terminate_conn_locked(conn);
1023 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1025 kranal_conn_decref(conn);
1027 /* start again now I've dropped the lock */
1031 read_unlock(&kranal_data.kra_global_lock);
1035 kranal_connd (void *arg)
1037 long id = (long)arg;
1040 unsigned long flags;
1042 kra_acceptsock_t *ras;
1045 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1046 cfs_daemonize(name);
1047 cfs_block_allsigs();
1049 init_waitqueue_entry(&wait, current);
1051 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1053 while (!kranal_data.kra_shutdown) {
1056 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1057 ras = list_entry(kranal_data.kra_connd_acceptq.next,
1058 kra_acceptsock_t, ras_list);
1059 list_del(&ras->ras_list);
1061 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1063 CDEBUG(D_NET,"About to handshake someone\n");
1065 kranal_conn_handshake(ras->ras_sock, NULL);
1066 kranal_free_acceptsock(ras);
1068 CDEBUG(D_NET,"Finished handshaking someone\n");
1070 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1074 if (!list_empty(&kranal_data.kra_connd_peers)) {
1075 peer = list_entry(kranal_data.kra_connd_peers.next,
1076 kra_peer_t, rap_connd_list);
1078 list_del_init(&peer->rap_connd_list);
1079 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1081 kranal_connect(peer);
1082 kranal_peer_decref(peer);
1084 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1091 set_current_state(TASK_INTERRUPTIBLE);
1092 add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
1094 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1098 set_current_state(TASK_RUNNING);
1099 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1101 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1104 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1106 kranal_thread_fini();
1111 kranal_update_reaper_timeout(long timeout)
1113 unsigned long flags;
1115 LASSERT (timeout > 0);
1117 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1119 if (timeout < kranal_data.kra_new_min_timeout)
1120 kranal_data.kra_new_min_timeout = timeout;
1122 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1126 kranal_reaper (void *arg)
1129 unsigned long flags;
1132 int conn_entries = kranal_data.kra_conn_hash_size;
1134 int base_index = conn_entries - 1;
1135 unsigned long next_check_time = jiffies;
1136 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1137 long current_min_timeout = 1;
1139 cfs_daemonize("kranal_reaper");
1140 cfs_block_allsigs();
1142 init_waitqueue_entry(&wait, current);
1144 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1146 while (!kranal_data.kra_shutdown) {
1147 /* I wake up every 'p' seconds to check for timeouts on some
1148 * more peers. I try to check every connection 'n' times
1149 * within the global minimum of all keepalive and timeout
1150 * intervals, to ensure I attend to every connection within
1151 * (n+1)/n times its timeout intervals. */
1154 unsigned long min_timeout;
1157 /* careful with the jiffy wrap... */
1158 timeout = (long)(next_check_time - jiffies);
1160 set_current_state(TASK_INTERRUPTIBLE);
1161 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1163 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1165 schedule_timeout(timeout);
1167 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1169 set_current_state(TASK_RUNNING);
1170 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1174 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1175 /* new min timeout set: restart min timeout scan */
1176 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1177 base_index = conn_index - 1;
1179 base_index = conn_entries - 1;
1181 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1182 current_min_timeout = kranal_data.kra_new_min_timeout;
1183 CDEBUG(D_NET, "Set new min timeout %ld\n",
1184 current_min_timeout);
1187 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1189 min_timeout = current_min_timeout;
1191 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1193 LASSERT (min_timeout > 0);
1195 /* Compute how many table entries to check now so I get round
1196 * the whole table fast enough given that I do this at fixed
1197 * intervals of 'p' seconds) */
1198 chunk = conn_entries;
1199 if (min_timeout > n * p)
1200 chunk = (chunk * n * p) / min_timeout;
1204 for (i = 0; i < chunk; i++) {
1205 kranal_reaper_check(conn_index,
1207 conn_index = (conn_index + 1) % conn_entries;
1210 next_check_time += p * HZ;
1212 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1214 if (((conn_index - chunk <= base_index &&
1215 base_index < conn_index) ||
1216 (conn_index - conn_entries - chunk <= base_index &&
1217 base_index < conn_index - conn_entries))) {
1219 /* Scanned all conns: set current_min_timeout... */
1220 if (current_min_timeout != next_min_timeout) {
1221 current_min_timeout = next_min_timeout;
1222 CDEBUG(D_NET, "Set new min timeout %ld\n",
1223 current_min_timeout);
1226 /* ...and restart min timeout scan */
1227 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1228 base_index = conn_index - 1;
1230 base_index = conn_entries - 1;
1234 kranal_thread_fini();
1239 kranal_check_rdma_cq (kra_device_t *dev)
1244 unsigned long flags;
1245 RAP_RDMA_DESCRIPTOR *desc;
1250 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1251 if (rrc == RAP_NOT_DONE) {
1252 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1256 LASSERT (rrc == RAP_SUCCESS);
1257 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1259 read_lock(&kranal_data.kra_global_lock);
1261 conn = kranal_cqid2conn_locked(cqid);
1263 /* Conn was destroyed? */
1264 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1265 read_unlock(&kranal_data.kra_global_lock);
1269 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1270 LASSERT (rrc == RAP_SUCCESS);
1272 CDEBUG(D_NET, "Completed %p\n",
1273 list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1275 spin_lock_irqsave(&conn->rac_lock, flags);
1277 LASSERT (!list_empty(&conn->rac_rdmaq));
1278 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1279 list_del(&tx->tx_list);
1281 LASSERT(desc->AppPtr == (void *)tx);
1282 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1283 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1285 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1286 tx->tx_qtime = jiffies;
1288 spin_unlock_irqrestore(&conn->rac_lock, flags);
1290 /* Get conn's fmaq processed, now I've just put something
1292 kranal_schedule_conn(conn);
1294 read_unlock(&kranal_data.kra_global_lock);
1299 kranal_check_fma_cq (kra_device_t *dev)
1305 struct list_head *conns;
1306 struct list_head *tmp;
1310 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1311 if (rrc == RAP_NOT_DONE) {
1312 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1316 LASSERT (rrc == RAP_SUCCESS);
1318 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1320 read_lock(&kranal_data.kra_global_lock);
1322 conn = kranal_cqid2conn_locked(cqid);
1324 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1327 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1329 kranal_schedule_conn(conn);
1332 read_unlock(&kranal_data.kra_global_lock);
1336 /* FMA CQ has overflowed: check ALL conns */
1337 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
1340 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1342 read_lock(&kranal_data.kra_global_lock);
1344 conns = &kranal_data.kra_conns[i];
1346 list_for_each (tmp, conns) {
1347 conn = list_entry(tmp, kra_conn_t,
1350 if (conn->rac_device == dev)
1351 kranal_schedule_conn(conn);
1354 /* don't block write lockers for too long... */
1355 read_unlock(&kranal_data.kra_global_lock);
1361 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1362 void *immediate, int immediatenob)
1364 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1367 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1368 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1369 immediate, immediatenob);
1371 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1372 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1373 immediatenob <= RANAL_FMA_MAX_DATA :
1376 msg->ram_connstamp = conn->rac_my_connstamp;
1377 msg->ram_seq = conn->rac_tx_seq;
1380 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1381 immediate, immediatenob,
1384 rrc = RapkFmaSend(conn->rac_rihandle,
1385 immediate, immediatenob,
1393 conn->rac_last_tx = jiffies;
1398 if (time_after_eq(jiffies,
1399 conn->rac_last_tx + conn->rac_keepalive*HZ))
1400 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1401 msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1407 kranal_process_fmaq (kra_conn_t *conn)
1409 unsigned long flags;
1415 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1416 * However I will be rescheduled by an FMA completion event
1417 * when I eventually get some.
1418 * NB 2. Sampling rac_state here races with setting it elsewhere.
1419 * But it doesn't matter if I try to send a "real" message just
1420 * as I start closing because I'll get scheduled to send the
1423 /* Not racing with incoming message processing! */
1424 LASSERT (current == conn->rac_device->rad_scheduler);
1426 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1427 if (!list_empty(&conn->rac_rdmaq)) {
1428 /* RDMAs in progress */
1429 LASSERT (!conn->rac_close_sent);
1431 if (time_after_eq(jiffies,
1433 conn->rac_keepalive * HZ)) {
1434 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1435 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1436 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1441 if (conn->rac_close_sent)
1444 CWARN("sending CLOSE to %s\n",
1445 libcfs_nid2str(conn->rac_peer->rap_nid));
1446 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1447 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1451 conn->rac_close_sent = 1;
1452 if (!conn->rac_close_recvd)
1455 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1457 if (conn->rac_state == RANAL_CONN_CLOSING)
1458 kranal_terminate_conn_locked(conn);
1460 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1464 spin_lock_irqsave(&conn->rac_lock, flags);
1466 if (list_empty(&conn->rac_fmaq)) {
1468 spin_unlock_irqrestore(&conn->rac_lock, flags);
1470 if (time_after_eq(jiffies,
1471 conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1472 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1473 libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1474 (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1475 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1476 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1481 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1482 list_del(&tx->tx_list);
1483 more_to_do = !list_empty(&conn->rac_fmaq);
1485 spin_unlock_irqrestore(&conn->rac_lock, flags);
1488 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1489 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1490 switch (tx->tx_msg.ram_type) {
1494 case RANAL_MSG_IMMEDIATE:
1495 rc = kranal_sendmsg(conn, &tx->tx_msg,
1496 tx->tx_buffer, tx->tx_nob);
1499 case RANAL_MSG_PUT_NAK:
1500 case RANAL_MSG_PUT_DONE:
1501 case RANAL_MSG_GET_NAK:
1502 case RANAL_MSG_GET_DONE:
1503 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1506 case RANAL_MSG_PUT_REQ:
1507 rc = kranal_map_buffer(tx);
1508 LASSERT (rc != -EAGAIN);
1512 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1513 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1517 case RANAL_MSG_PUT_ACK:
1518 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1522 case RANAL_MSG_GET_REQ:
1523 rc = kranal_map_buffer(tx);
1524 LASSERT (rc != -EAGAIN);
1528 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1529 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1530 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1531 (__u64)((unsigned long)tx->tx_buffer);
1532 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1533 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1538 if (rc == -EAGAIN) {
1539 /* I need credits to send this. Replace tx at the head of the
1540 * fmaq and I'll get rescheduled when credits appear */
1541 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1542 spin_lock_irqsave(&conn->rac_lock, flags);
1543 list_add(&tx->tx_list, &conn->rac_fmaq);
1544 spin_unlock_irqrestore(&conn->rac_lock, flags);
1548 if (!expect_reply || rc != 0) {
1549 kranal_tx_done(tx, rc);
1551 /* LASSERT(current) above ensures this doesn't race with reply
1553 spin_lock_irqsave(&conn->rac_lock, flags);
1554 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1555 tx->tx_qtime = jiffies;
1556 spin_unlock_irqrestore(&conn->rac_lock, flags);
1560 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1561 kranal_schedule_conn(conn);
1566 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1568 __swab64s(&d->rard_key.Key);
1569 __swab16s(&d->rard_key.Cookie);
1570 __swab16s(&d->rard_key.MdHandle);
1571 __swab32s(&d->rard_key.Flags);
1572 __swab64s(&d->rard_addr.AddressBits);
1573 __swab32s(&d->rard_nob);
1577 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1579 struct list_head *ttmp;
1581 unsigned long flags;
1583 spin_lock_irqsave(&conn->rac_lock, flags);
1585 list_for_each(ttmp, &conn->rac_replyq) {
1586 tx = list_entry(ttmp, kra_tx_t, tx_list);
1588 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1589 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1591 if (tx->tx_cookie != cookie)
1594 if (tx->tx_msg.ram_type != type) {
1595 spin_unlock_irqrestore(&conn->rac_lock, flags);
1596 CWARN("Unexpected type %x (%x expected) "
1597 "matched reply from %s\n",
1598 tx->tx_msg.ram_type, type,
1599 libcfs_nid2str(conn->rac_peer->rap_nid));
1603 list_del(&tx->tx_list);
1604 spin_unlock_irqrestore(&conn->rac_lock, flags);
1608 spin_unlock_irqrestore(&conn->rac_lock, flags);
1609 CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1610 type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1615 kranal_check_fma_rx (kra_conn_t *conn)
1617 unsigned long flags;
1622 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1623 kra_peer_t *peer = conn->rac_peer;
1627 if (rrc == RAP_NOT_DONE)
1630 CDEBUG(D_NET, "RX on %p\n", conn);
1632 LASSERT (rrc == RAP_SUCCESS);
1633 conn->rac_last_rx = jiffies;
1634 seq = conn->rac_rx_seq++;
1635 msg = (kra_msg_t *)prefix;
1637 /* stash message for portals callbacks they'll NULL
1638 * rac_rxmsg if they consume it */
1639 LASSERT (conn->rac_rxmsg == NULL);
1640 conn->rac_rxmsg = msg;
1642 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1643 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1644 CERROR("Unexpected magic %08x from %s\n",
1645 msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1650 __swab32s(&msg->ram_magic);
1651 __swab16s(&msg->ram_version);
1652 __swab16s(&msg->ram_type);
1653 __swab64s(&msg->ram_srcnid);
1654 __swab64s(&msg->ram_connstamp);
1655 __swab32s(&msg->ram_seq);
1657 /* NB message type checked below; NOT here... */
1658 switch (msg->ram_type) {
1659 case RANAL_MSG_PUT_ACK:
1660 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1663 case RANAL_MSG_GET_REQ:
1664 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1672 if (msg->ram_version != RANAL_MSG_VERSION) {
1673 CERROR("Unexpected protocol version %d from %s\n",
1674 msg->ram_version, libcfs_nid2str(peer->rap_nid));
1679 if (msg->ram_srcnid != peer->rap_nid) {
1680 CERROR("Unexpected peer %s from %s\n",
1681 libcfs_nid2str(msg->ram_srcnid),
1682 libcfs_nid2str(peer->rap_nid));
1687 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1688 CERROR("Unexpected connstamp "LPX64"("LPX64
1689 " expected) from %s\n",
1690 msg->ram_connstamp, conn->rac_peer_connstamp,
1691 libcfs_nid2str(peer->rap_nid));
1696 if (msg->ram_seq != seq) {
1697 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1698 msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1703 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1704 /* This message signals RDMA completion... */
1705 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1706 if (rrc != RAP_SUCCESS) {
1707 CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1713 if (conn->rac_close_recvd) {
1714 CERROR("Unexpected message %d after CLOSE from %s\n",
1715 msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1720 if (msg->ram_type == RANAL_MSG_CLOSE) {
1721 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1722 conn->rac_close_recvd = 1;
1723 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1725 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1726 kranal_close_conn_locked(conn, 0);
1727 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1728 conn->rac_close_sent)
1729 kranal_terminate_conn_locked(conn);
1731 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1735 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1738 switch (msg->ram_type) {
1739 case RANAL_MSG_NOOP:
1740 /* Nothing to do; just a keepalive */
1741 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1744 case RANAL_MSG_IMMEDIATE:
1745 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1746 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
1747 msg->ram_srcnid, conn, 0);
1751 case RANAL_MSG_PUT_REQ:
1752 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1753 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
1754 msg->ram_srcnid, conn, 1);
1758 case RANAL_MSG_PUT_NAK:
1759 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1760 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1761 msg->ram_u.completion.racm_cookie);
1765 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1766 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1767 kranal_tx_done(tx, -ENOENT); /* no match */
1770 case RANAL_MSG_PUT_ACK:
1771 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1772 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1773 msg->ram_u.putack.rapam_src_cookie);
1777 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1778 &msg->ram_u.putack.rapam_desc,
1779 msg->ram_u.putack.rapam_desc.rard_nob,
1780 msg->ram_u.putack.rapam_dst_cookie);
1783 case RANAL_MSG_PUT_DONE:
1784 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1785 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1786 msg->ram_u.completion.racm_cookie);
1790 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1791 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1792 kranal_tx_done(tx, 0);
1795 case RANAL_MSG_GET_REQ:
1796 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1797 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
1798 msg->ram_srcnid, conn, 1);
1802 case RANAL_MSG_GET_NAK:
1803 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1804 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1805 msg->ram_u.completion.racm_cookie);
1809 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1810 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1811 kranal_tx_done(tx, -ENOENT); /* no match */
1814 case RANAL_MSG_GET_DONE:
1815 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1816 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1817 msg->ram_u.completion.racm_cookie);
1821 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1822 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1824 /* completion message should send rdma length if we ever allow
1826 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1828 kranal_tx_done(tx, 0);
1833 if (rc < 0) /* protocol/comms error */
1834 kranal_close_conn (conn, rc);
1836 if (repost && conn->rac_rxmsg != NULL)
1837 kranal_consume_rxmsg(conn, NULL, 0);
1839 /* check again later */
1840 kranal_schedule_conn(conn);
1844 kranal_complete_closed_conn (kra_conn_t *conn)
1850 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1851 LASSERT (list_empty(&conn->rac_list));
1852 LASSERT (list_empty(&conn->rac_hashlist));
1854 for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1855 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1857 list_del(&tx->tx_list);
1858 kranal_tx_done(tx, -ECONNABORTED);
1861 LASSERT (list_empty(&conn->rac_rdmaq));
1863 for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1864 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1866 list_del(&tx->tx_list);
1867 kranal_tx_done(tx, -ECONNABORTED);
1870 CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1871 conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1875 kranal_process_new_conn (kra_conn_t *conn)
1879 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1880 if (rrc == RAP_SUCCESS)
1883 LASSERT (rrc == RAP_NOT_DONE);
1884 if (!time_after_eq(jiffies, conn->rac_last_tx +
1885 conn->rac_timeout * HZ))
1889 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1890 LASSERT (rrc == RAP_SUCCESS);
1895 kranal_scheduler (void *arg)
1897 kra_device_t *dev = (kra_device_t *)arg;
1901 unsigned long flags;
1902 unsigned long deadline;
1903 unsigned long soonest;
1906 struct list_head *tmp;
1907 struct list_head *nxt;
1912 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1913 cfs_daemonize(name);
1914 cfs_block_allsigs();
1916 dev->rad_scheduler = current;
1917 init_waitqueue_entry(&wait, current);
1919 spin_lock_irqsave(&dev->rad_lock, flags);
1921 while (!kranal_data.kra_shutdown) {
1922 /* Safe: kra_shutdown only set when quiescent */
1924 if (busy_loops++ >= RANAL_RESCHED) {
1925 spin_unlock_irqrestore(&dev->rad_lock, flags);
1930 spin_lock_irqsave(&dev->rad_lock, flags);
1935 if (dev->rad_ready) {
1936 /* Device callback fired since I last checked it */
1938 spin_unlock_irqrestore(&dev->rad_lock, flags);
1941 kranal_check_rdma_cq(dev);
1942 kranal_check_fma_cq(dev);
1944 spin_lock_irqsave(&dev->rad_lock, flags);
1947 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1948 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1950 list_del_init(&conn->rac_schedlist);
1951 LASSERT (conn->rac_scheduled);
1952 conn->rac_scheduled = 0;
1953 spin_unlock_irqrestore(&dev->rad_lock, flags);
1956 kranal_check_fma_rx(conn);
1957 kranal_process_fmaq(conn);
1959 if (conn->rac_state == RANAL_CONN_CLOSED)
1960 kranal_complete_closed_conn(conn);
1962 kranal_conn_decref(conn);
1963 spin_lock_irqsave(&dev->rad_lock, flags);
1969 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
1970 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1972 deadline = conn->rac_last_tx + conn->rac_keepalive;
1973 if (time_after_eq(jiffies, deadline)) {
1974 /* Time to process this new conn */
1975 spin_unlock_irqrestore(&dev->rad_lock, flags);
1978 rc = kranal_process_new_conn(conn);
1979 if (rc != -EAGAIN) {
1980 /* All done with this conn */
1981 spin_lock_irqsave(&dev->rad_lock, flags);
1982 list_del_init(&conn->rac_schedlist);
1983 spin_unlock_irqrestore(&dev->rad_lock, flags);
1985 kranal_conn_decref(conn);
1986 spin_lock_irqsave(&dev->rad_lock, flags);
1990 /* retry with exponential backoff until HZ */
1991 if (conn->rac_keepalive == 0)
1992 conn->rac_keepalive = 1;
1993 else if (conn->rac_keepalive <= HZ)
1994 conn->rac_keepalive *= 2;
1996 conn->rac_keepalive += HZ;
1998 deadline = conn->rac_last_tx + conn->rac_keepalive;
1999 spin_lock_irqsave(&dev->rad_lock, flags);
2002 /* Does this conn need attention soonest? */
2003 if (nsoonest++ == 0 ||
2004 !time_after_eq(deadline, soonest))
2008 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2011 set_current_state(TASK_INTERRUPTIBLE);
2012 add_wait_queue_exclusive(&dev->rad_waitq, &wait);
2013 spin_unlock_irqrestore(&dev->rad_lock, flags);
2015 if (nsoonest == 0) {
2019 timeout = (long)(soonest - jiffies);
2022 schedule_timeout(timeout);
2026 remove_wait_queue(&dev->rad_waitq, &wait);
2027 set_current_state(TASK_RUNNING);
2028 spin_lock_irqsave(&dev->rad_lock, flags);
2031 spin_unlock_irqrestore(&dev->rad_lock, flags);
2033 dev->rad_scheduler = NULL;
2034 kranal_thread_fini();