4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
30 * Copyright (c) 2012, Intel Corporation.
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
36 * lnet/klnds/ralnd/ralnd_cb.c
38 * Author: Eric Barton <eric@bartonsoftware.com>
45 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
51 CDEBUG(D_NET, "callback for device %d\n", devid);
53 for (i = 0; i < kranal_data.kra_ndevs; i++) {
55 dev = &kranal_data.kra_devices[i];
56 if (dev->rad_id != devid)
59 spin_lock_irqsave(&dev->rad_lock, flags);
61 if (!dev->rad_ready) {
63 wake_up(&dev->rad_waitq);
66 spin_unlock_irqrestore(&dev->rad_lock, flags);
70 CWARN("callback for unknown device %d\n", devid);
74 kranal_schedule_conn(kra_conn_t *conn)
76 kra_device_t *dev = conn->rac_device;
79 spin_lock_irqsave(&dev->rad_lock, flags);
81 if (!conn->rac_scheduled) {
82 kranal_conn_addref(conn); /* +1 ref for scheduler */
83 conn->rac_scheduled = 1;
84 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
85 wake_up(&dev->rad_waitq);
88 spin_unlock_irqrestore(&dev->rad_lock, flags);
92 kranal_get_idle_tx (void)
97 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
99 if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
100 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
104 tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
105 cfs_list_del(&tx->tx_list);
107 /* Allocate a new completion cookie. It might not be needed, but we've
108 * got a lock right now... */
109 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
111 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
113 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
114 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
115 LASSERT (tx->tx_conn == NULL);
116 LASSERT (tx->tx_lntmsg[0] == NULL);
117 LASSERT (tx->tx_lntmsg[1] == NULL);
123 kranal_init_msg(kra_msg_t *msg, int type)
125 msg->ram_magic = RANAL_MSG_MAGIC;
126 msg->ram_version = RANAL_MSG_VERSION;
127 msg->ram_type = type;
128 msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
129 /* ram_connstamp gets set when FMA is sent */
133 kranal_new_tx_msg (int type)
135 kra_tx_t *tx = kranal_get_idle_tx();
138 kranal_init_msg(&tx->tx_msg, type);
144 kranal_setup_immediate_buffer (kra_tx_t *tx,
145 unsigned int niov, struct iovec *iov,
149 /* For now this is almost identical to kranal_setup_virt_buffer, but we
150 * could "flatten" the payload into a single contiguous buffer ready
151 * for sending direct over an FMA if we ever needed to. */
153 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
157 tx->tx_buffer = NULL;
161 while (offset >= iov->iov_len) {
162 offset -= iov->iov_len;
168 if (nob > iov->iov_len - offset) {
169 CERROR("Can't handle multiple vaddr fragments\n");
173 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
176 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
182 kranal_setup_virt_buffer (kra_tx_t *tx,
183 unsigned int niov, struct iovec *iov,
189 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
191 while (offset >= iov->iov_len) {
192 offset -= iov->iov_len;
198 if (nob > iov->iov_len - offset) {
199 CERROR("Can't handle multiple vaddr fragments\n");
203 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
205 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
210 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
213 RAP_PHYS_REGION *phys = tx->tx_phys;
216 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
220 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
222 while (offset >= kiov->kiov_len) {
223 offset -= kiov->kiov_len;
229 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
231 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
233 phys->Address = page_to_phys(kiov->kiov_page);
236 resid = nob - (kiov->kiov_len - offset);
242 if (kiov->kiov_offset != 0 ||
243 ((resid > PAGE_SIZE) &&
244 kiov->kiov_len < PAGE_SIZE)) {
245 /* Can't have gaps */
246 CERROR("Can't make payload contiguous in I/O VM:"
247 "page %d, offset %d, len %d \n",
248 (int)(phys - tx->tx_phys),
249 kiov->kiov_offset, kiov->kiov_len);
253 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
254 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
258 phys->Address = page_to_phys(kiov->kiov_page);
264 tx->tx_phys_npages = phys - tx->tx_phys;
269 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
270 struct iovec *iov, lnet_kiov_t *kiov,
273 LASSERT ((iov == NULL) != (kiov == NULL));
276 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
278 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
282 kranal_map_buffer (kra_tx_t *tx)
284 kra_conn_t *conn = tx->tx_conn;
285 kra_device_t *dev = conn->rac_device;
288 LASSERT (current == dev->rad_scheduler);
290 switch (tx->tx_buftype) {
295 case RANAL_BUF_IMMEDIATE:
296 case RANAL_BUF_PHYS_MAPPED:
297 case RANAL_BUF_VIRT_MAPPED:
300 case RANAL_BUF_PHYS_UNMAPPED:
301 rrc = RapkRegisterPhys(dev->rad_handle,
302 tx->tx_phys, tx->tx_phys_npages,
304 if (rrc != RAP_SUCCESS) {
305 CERROR ("Can't map %d pages: dev %d "
306 "phys %u pp %u, virt %u nob %lu\n",
307 tx->tx_phys_npages, dev->rad_id,
308 dev->rad_nphysmap, dev->rad_nppphysmap,
309 dev->rad_nvirtmap, dev->rad_nobvirtmap);
310 return -ENOMEM; /* assume insufficient resources */
314 dev->rad_nppphysmap += tx->tx_phys_npages;
316 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
319 case RANAL_BUF_VIRT_UNMAPPED:
320 rrc = RapkRegisterMemory(dev->rad_handle,
321 tx->tx_buffer, tx->tx_nob,
323 if (rrc != RAP_SUCCESS) {
324 CERROR ("Can't map %d bytes: dev %d "
325 "phys %u pp %u, virt %u nob %lu\n",
326 tx->tx_nob, dev->rad_id,
327 dev->rad_nphysmap, dev->rad_nppphysmap,
328 dev->rad_nvirtmap, dev->rad_nobvirtmap);
329 return -ENOMEM; /* assume insufficient resources */
333 dev->rad_nobvirtmap += tx->tx_nob;
335 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
341 kranal_unmap_buffer (kra_tx_t *tx)
346 switch (tx->tx_buftype) {
351 case RANAL_BUF_IMMEDIATE:
352 case RANAL_BUF_PHYS_UNMAPPED:
353 case RANAL_BUF_VIRT_UNMAPPED:
356 case RANAL_BUF_PHYS_MAPPED:
357 LASSERT (tx->tx_conn != NULL);
358 dev = tx->tx_conn->rac_device;
359 LASSERT (current == dev->rad_scheduler);
360 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
362 LASSERT (rrc == RAP_SUCCESS);
365 dev->rad_nppphysmap -= tx->tx_phys_npages;
367 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
370 case RANAL_BUF_VIRT_MAPPED:
371 LASSERT (tx->tx_conn != NULL);
372 dev = tx->tx_conn->rac_device;
373 LASSERT (current == dev->rad_scheduler);
374 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
376 LASSERT (rrc == RAP_SUCCESS);
379 dev->rad_nobvirtmap -= tx->tx_nob;
381 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
387 kranal_tx_done (kra_tx_t *tx, int completion)
389 lnet_msg_t *lnetmsg[2];
393 LASSERT (!in_interrupt());
395 kranal_unmap_buffer(tx);
397 lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
398 lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
400 tx->tx_buftype = RANAL_BUF_NONE;
401 tx->tx_msg.ram_type = RANAL_MSG_NONE;
404 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
406 cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
408 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
410 /* finalize AFTER freeing lnet msgs */
411 for (i = 0; i < 2; i++) {
412 if (lnetmsg[i] == NULL)
415 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
420 kranal_find_conn_locked (kra_peer_t *peer)
424 /* just return the first connection */
425 cfs_list_for_each (tmp, &peer->rap_conns) {
426 return cfs_list_entry(tmp, kra_conn_t, rac_list);
433 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
439 spin_lock_irqsave(&conn->rac_lock, flags);
440 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
441 tx->tx_qtime = jiffies;
442 spin_unlock_irqrestore(&conn->rac_lock, flags);
444 kranal_schedule_conn(conn);
448 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
455 rwlock_t *g_lock = &kranal_data.kra_global_lock;
457 /* If I get here, I've committed to send, so I complete the tx with
458 * failure on any problems */
460 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
462 for (retry = 0; ; retry = 1) {
466 peer = kranal_find_peer_locked(nid);
468 conn = kranal_find_conn_locked(peer);
470 kranal_post_fma(conn, tx);
476 /* Making connections; I'll need a write lock... */
478 write_lock_irqsave(g_lock, flags);
480 peer = kranal_find_peer_locked(nid);
484 write_unlock_irqrestore(g_lock, flags);
487 CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
488 kranal_tx_done(tx, -EHOSTUNREACH);
492 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
493 lnet_acceptor_port());
495 CERROR("Can't add peer %s: %d\n",
496 libcfs_nid2str(nid), rc);
497 kranal_tx_done(tx, rc);
502 conn = kranal_find_conn_locked(peer);
504 /* Connection exists; queue message on it */
505 kranal_post_fma(conn, tx);
506 write_unlock_irqrestore(g_lock, flags);
510 LASSERT (peer->rap_persistence > 0);
512 if (!peer->rap_connecting) {
513 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
515 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
516 cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
517 write_unlock_irqrestore(g_lock, flags);
518 kranal_tx_done(tx, -EHOSTUNREACH);
522 peer->rap_connecting = 1;
523 kranal_peer_addref(peer); /* extra ref for connd */
525 spin_lock(&kranal_data.kra_connd_lock);
527 cfs_list_add_tail(&peer->rap_connd_list,
528 &kranal_data.kra_connd_peers);
529 wake_up(&kranal_data.kra_connd_waitq);
531 spin_unlock(&kranal_data.kra_connd_lock);
534 /* A connection is being established; queue the message... */
535 cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
537 write_unlock_irqrestore(g_lock, flags);
541 kranal_rdma(kra_tx_t *tx, int type,
542 kra_rdma_desc_t *sink, int nob, __u64 cookie)
544 kra_conn_t *conn = tx->tx_conn;
548 LASSERT (kranal_tx_mapped(tx));
549 LASSERT (nob <= sink->rard_nob);
550 LASSERT (nob <= tx->tx_nob);
552 /* No actual race with scheduler sending CLOSE (I'm she!) */
553 LASSERT (current == conn->rac_device->rad_scheduler);
555 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
556 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
557 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
558 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
559 tx->tx_rdma_desc.DstKey = sink->rard_key;
560 tx->tx_rdma_desc.Length = nob;
561 tx->tx_rdma_desc.AppPtr = tx;
563 /* prep final completion message */
564 kranal_init_msg(&tx->tx_msg, type);
565 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
567 if (nob == 0) { /* Immediate completion */
568 kranal_post_fma(conn, tx);
572 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
574 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
575 LASSERT (rrc == RAP_SUCCESS);
577 spin_lock_irqsave(&conn->rac_lock, flags);
578 cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
579 tx->tx_qtime = jiffies;
580 spin_unlock_irqrestore(&conn->rac_lock, flags);
584 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
586 __u32 nob_received = nob;
589 LASSERT (conn->rac_rxmsg != NULL);
590 CDEBUG(D_NET, "Consuming %p\n", conn);
592 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
593 &nob_received, sizeof(kra_msg_t));
594 LASSERT (rrc == RAP_SUCCESS);
596 conn->rac_rxmsg = NULL;
598 if (nob_received < nob) {
599 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
600 libcfs_nid2str(conn->rac_peer->rap_nid),
609 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
611 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
612 int type = lntmsg->msg_type;
613 lnet_process_id_t target = lntmsg->msg_target;
614 int target_is_router = lntmsg->msg_target_is_router;
615 int routing = lntmsg->msg_routing;
616 unsigned int niov = lntmsg->msg_niov;
617 struct iovec *iov = lntmsg->msg_iov;
618 lnet_kiov_t *kiov = lntmsg->msg_kiov;
619 unsigned int offset = lntmsg->msg_offset;
620 unsigned int nob = lntmsg->msg_len;
624 /* NB 'private' is different depending on what we're sending.... */
626 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
627 nob, niov, libcfs_id2str(target));
629 LASSERT (nob == 0 || niov > 0);
630 LASSERT (niov <= LNET_MAX_IOV);
632 LASSERT (!in_interrupt());
633 /* payload is either all vaddrs or all pages */
634 LASSERT (!(kiov != NULL && iov != NULL));
637 CERROR ("Can't route\n");
652 /* We have to consider the eventual sink buffer rather than any
653 * payload passed here (there isn't any, and strictly, looking
654 * inside lntmsg is a layering violation). We send a simple
655 * IMMEDIATE GET if the sink buffer is mapped already and small
658 if (routing || target_is_router)
659 break; /* send IMMEDIATE */
661 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
662 lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
663 lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
664 break; /* send IMMEDIATE */
666 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
670 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
671 rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
672 lntmsg->msg_md->md_iov.iov,
673 0, lntmsg->msg_md->md_length);
675 rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
676 lntmsg->msg_md->md_iov.kiov,
677 0, lntmsg->msg_md->md_length);
679 kranal_tx_done(tx, rc);
683 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
684 if (tx->tx_lntmsg[1] == NULL) {
685 CERROR("Can't create reply for GET to %s\n",
686 libcfs_nid2str(target.nid));
687 kranal_tx_done(tx, rc);
691 tx->tx_lntmsg[0] = lntmsg;
692 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
693 /* rest of tx_msg is setup just before it is sent */
694 kranal_launch_tx(tx, target.nid);
699 if (kiov == NULL && /* not paged */
700 nob <= RANAL_FMA_MAX_DATA && /* small enough */
701 nob <= *kranal_tunables.kra_max_immediate)
702 break; /* send IMMEDIATE */
704 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
708 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
710 kranal_tx_done(tx, rc);
714 tx->tx_lntmsg[0] = lntmsg;
715 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
716 /* rest of tx_msg is setup just before it is sent */
717 kranal_launch_tx(tx, target.nid);
723 LASSERT (kiov == NULL);
724 LASSERT (nob <= RANAL_FMA_MAX_DATA);
726 tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
730 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
732 kranal_tx_done(tx, rc);
736 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
737 tx->tx_lntmsg[0] = lntmsg;
738 kranal_launch_tx(tx, target.nid);
743 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
745 kra_msg_t *rxmsg = conn->rac_rxmsg;
746 unsigned int niov = lntmsg->msg_niov;
747 struct iovec *iov = lntmsg->msg_iov;
748 lnet_kiov_t *kiov = lntmsg->msg_kiov;
749 unsigned int offset = lntmsg->msg_offset;
750 unsigned int nob = lntmsg->msg_len;
754 tx = kranal_get_idle_tx();
758 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
764 rc = kranal_map_buffer(tx);
768 tx->tx_lntmsg[0] = lntmsg;
770 kranal_rdma(tx, RANAL_MSG_GET_DONE,
771 &rxmsg->ram_u.get.ragm_desc, nob,
772 rxmsg->ram_u.get.ragm_cookie);
776 kranal_tx_done(tx, -EIO);
778 lnet_finalize(ni, lntmsg, -EIO);
782 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
785 kra_conn_t *conn = (kra_conn_t *)private;
787 LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
788 libcfs_nid2str(conn->rac_peer->rap_nid));
794 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
795 int delayed, unsigned int niov,
796 struct iovec *iov, lnet_kiov_t *kiov,
797 unsigned int offset, unsigned int mlen, unsigned int rlen)
799 kra_conn_t *conn = private;
800 kra_msg_t *rxmsg = conn->rac_rxmsg;
805 LASSERT (mlen <= rlen);
806 LASSERT (!in_interrupt());
807 /* Either all pages or all vaddrs */
808 LASSERT (!(kiov != NULL && iov != NULL));
810 CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
812 switch(rxmsg->ram_type) {
816 case RANAL_MSG_IMMEDIATE:
819 } else if (kiov != NULL) {
820 CERROR("Can't recv immediate into paged buffer\n");
824 while (offset >= iov->iov_len) {
825 offset -= iov->iov_len;
830 if (mlen > iov->iov_len - offset) {
831 CERROR("Can't handle immediate frags\n");
834 buffer = ((char *)iov->iov_base) + offset;
836 rc = kranal_consume_rxmsg(conn, buffer, mlen);
837 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
840 case RANAL_MSG_PUT_REQ:
841 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
843 kranal_consume_rxmsg(conn, NULL, 0);
847 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
849 kranal_tx_done(tx, rc);
850 kranal_consume_rxmsg(conn, NULL, 0);
855 rc = kranal_map_buffer(tx);
857 kranal_tx_done(tx, rc);
858 kranal_consume_rxmsg(conn, NULL, 0);
862 tx->tx_msg.ram_u.putack.rapam_src_cookie =
863 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
864 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
865 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
866 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
867 (__u64)((unsigned long)tx->tx_buffer);
868 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
870 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
872 kranal_post_fma(conn, tx);
873 kranal_consume_rxmsg(conn, NULL, 0);
876 case RANAL_MSG_GET_REQ:
877 if (lntmsg != NULL) {
879 kranal_reply(ni, conn, lntmsg);
882 tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
884 tx->tx_msg.ram_u.completion.racm_cookie =
885 rxmsg->ram_u.get.ragm_cookie;
886 kranal_post_fma(conn, tx);
889 kranal_consume_rxmsg(conn, NULL, 0);
895 kranal_thread_start(int(*fn)(void *arg), void *arg, char *name)
897 struct task_struct *task = cfs_thread_run(fn, arg, name);
900 atomic_inc(&kranal_data.kra_nthreads);
901 return PTR_ERR(task);
905 kranal_thread_fini (void)
907 atomic_dec(&kranal_data.kra_nthreads);
911 kranal_check_conn_timeouts (kra_conn_t *conn)
917 unsigned long now = jiffies;
919 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
920 conn->rac_state == RANAL_CONN_CLOSING);
922 if (!conn->rac_close_sent &&
923 cfs_time_aftereq(now, conn->rac_last_tx +
924 msecs_to_jiffies(conn->rac_keepalive *
926 /* not sent in a while; schedule conn so scheduler sends a keepalive */
927 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
928 conn, libcfs_nid2str(conn->rac_peer->rap_nid));
929 kranal_schedule_conn(conn);
932 timeout = msecs_to_jiffies(conn->rac_timeout * MSEC_PER_SEC);
934 if (!conn->rac_close_recvd &&
935 cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
936 CERROR("%s received from %s within %lu seconds\n",
937 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
938 "Nothing" : "CLOSE not",
939 libcfs_nid2str(conn->rac_peer->rap_nid),
940 jiffies_to_msecs(now - conn->rac_last_rx)/MSEC_PER_SEC);
944 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
947 /* Check the conn's queues are moving. These are "belt+braces" checks,
948 * in case of hardware/software errors that make this conn seem
949 * responsive even though it isn't progressing its message queues. */
951 spin_lock_irqsave(&conn->rac_lock, flags);
953 cfs_list_for_each (ttmp, &conn->rac_fmaq) {
954 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
956 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
957 spin_unlock_irqrestore(&conn->rac_lock, flags);
958 CERROR("tx on fmaq for %s blocked %lu seconds\n",
959 libcfs_nid2str(conn->rac_peer->rap_nid),
960 jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
965 cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
966 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
968 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
969 spin_unlock_irqrestore(&conn->rac_lock, flags);
970 CERROR("tx on rdmaq for %s blocked %lu seconds\n",
971 libcfs_nid2str(conn->rac_peer->rap_nid),
972 jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
977 cfs_list_for_each (ttmp, &conn->rac_replyq) {
978 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
980 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
981 spin_unlock_irqrestore(&conn->rac_lock, flags);
982 CERROR("tx on replyq for %s blocked %lu seconds\n",
983 libcfs_nid2str(conn->rac_peer->rap_nid),
984 jiffies_to_msecs(now-tx->tx_qtime)/MSEC_PER_SEC);
989 spin_unlock_irqrestore(&conn->rac_lock, flags);
994 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
996 cfs_list_t *conns = &kranal_data.kra_conns[idx];
1003 /* NB. We expect to check all the conns and not find any problems, so
1004 * we just use a shared lock while we take a look... */
1005 read_lock(&kranal_data.kra_global_lock);
1007 cfs_list_for_each (ctmp, conns) {
1008 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1010 if (conn->rac_timeout < *min_timeoutp )
1011 *min_timeoutp = conn->rac_timeout;
1012 if (conn->rac_keepalive < *min_timeoutp )
1013 *min_timeoutp = conn->rac_keepalive;
1015 rc = kranal_check_conn_timeouts(conn);
1019 kranal_conn_addref(conn);
1020 read_unlock(&kranal_data.kra_global_lock);
1022 CERROR("Conn to %s, cqid %d timed out\n",
1023 libcfs_nid2str(conn->rac_peer->rap_nid),
1026 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1028 switch (conn->rac_state) {
1032 case RANAL_CONN_ESTABLISHED:
1033 kranal_close_conn_locked(conn, -ETIMEDOUT);
1036 case RANAL_CONN_CLOSING:
1037 kranal_terminate_conn_locked(conn);
1041 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1044 kranal_conn_decref(conn);
1046 /* start again now I've dropped the lock */
1050 read_unlock(&kranal_data.kra_global_lock);
1054 kranal_connd (void *arg)
1056 long id = (long)arg;
1058 unsigned long flags;
1060 kra_acceptsock_t *ras;
1063 cfs_block_allsigs();
1065 init_waitqueue_entry_current(&wait);
1067 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1069 while (!kranal_data.kra_shutdown) {
1072 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1073 ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1074 kra_acceptsock_t, ras_list);
1075 cfs_list_del(&ras->ras_list);
1077 spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1080 CDEBUG(D_NET,"About to handshake someone\n");
1082 kranal_conn_handshake(ras->ras_sock, NULL);
1083 kranal_free_acceptsock(ras);
1085 CDEBUG(D_NET,"Finished handshaking someone\n");
1087 spin_lock_irqsave(&kranal_data.kra_connd_lock,
1092 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1093 peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1094 kra_peer_t, rap_connd_list);
1096 cfs_list_del_init(&peer->rap_connd_list);
1097 spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1100 kranal_connect(peer);
1101 kranal_peer_decref(peer);
1103 spin_lock_irqsave(&kranal_data.kra_connd_lock,
1111 set_current_state(TASK_INTERRUPTIBLE);
1112 add_wait_queue_exclusive(&kranal_data.kra_connd_waitq, &wait);
1114 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1116 waitq_wait(&wait, TASK_INTERRUPTIBLE);
1118 set_current_state(TASK_RUNNING);
1119 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1121 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1124 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1126 kranal_thread_fini();
1131 kranal_update_reaper_timeout(long timeout)
1133 unsigned long flags;
1135 LASSERT (timeout > 0);
1137 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1139 if (timeout < kranal_data.kra_new_min_timeout)
1140 kranal_data.kra_new_min_timeout = timeout;
1142 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1146 kranal_reaper (void *arg)
1149 unsigned long flags;
1152 int conn_entries = kranal_data.kra_conn_hash_size;
1154 int base_index = conn_entries - 1;
1155 unsigned long next_check_time = jiffies;
1156 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1157 long current_min_timeout = 1;
1159 cfs_block_allsigs();
1161 init_waitqueue_entry_current(&wait);
1163 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1165 while (!kranal_data.kra_shutdown) {
1166 /* I wake up every 'p' seconds to check for timeouts on some
1167 * more peers. I try to check every connection 'n' times
1168 * within the global minimum of all keepalive and timeout
1169 * intervals, to ensure I attend to every connection within
1170 * (n+1)/n times its timeout intervals. */
1173 unsigned long min_timeout;
1176 /* careful with the jiffy wrap... */
1177 timeout = (long)(next_check_time - jiffies);
1179 set_current_state(TASK_INTERRUPTIBLE);
1180 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1182 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1185 waitq_timedwait(&wait, TASK_INTERRUPTIBLE,
1188 spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1191 set_current_state(TASK_RUNNING);
1192 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1196 if (kranal_data.kra_new_min_timeout !=
1197 MAX_SCHEDULE_TIMEOUT) {
1198 /* new min timeout set: restart min timeout scan */
1199 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1200 base_index = conn_index - 1;
1202 base_index = conn_entries - 1;
1204 if (kranal_data.kra_new_min_timeout <
1205 current_min_timeout) {
1206 current_min_timeout =
1207 kranal_data.kra_new_min_timeout;
1208 CDEBUG(D_NET, "Set new min timeout %ld\n",
1209 current_min_timeout);
1212 kranal_data.kra_new_min_timeout =
1213 MAX_SCHEDULE_TIMEOUT;
1215 min_timeout = current_min_timeout;
1217 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1219 LASSERT (min_timeout > 0);
1221 /* Compute how many table entries to check now so I get round
1222 * the whole table fast enough given that I do this at fixed
1223 * intervals of 'p' seconds) */
1224 chunk = conn_entries;
1225 if (min_timeout > n * p)
1226 chunk = (chunk * n * p) / min_timeout;
1230 for (i = 0; i < chunk; i++) {
1231 kranal_reaper_check(conn_index,
1233 conn_index = (conn_index + 1) % conn_entries;
1236 next_check_time += msecs_to_jiffies(p * MSEC_PER_SEC);
1238 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1240 if (((conn_index - chunk <= base_index &&
1241 base_index < conn_index) ||
1242 (conn_index - conn_entries - chunk <= base_index &&
1243 base_index < conn_index - conn_entries))) {
1245 /* Scanned all conns: set current_min_timeout... */
1246 if (current_min_timeout != next_min_timeout) {
1247 current_min_timeout = next_min_timeout;
1248 CDEBUG(D_NET, "Set new min timeout %ld\n",
1249 current_min_timeout);
1252 /* ...and restart min timeout scan */
1253 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1254 base_index = conn_index - 1;
1256 base_index = conn_entries - 1;
1260 kranal_thread_fini();
1265 kranal_check_rdma_cq (kra_device_t *dev)
1270 unsigned long flags;
1271 RAP_RDMA_DESCRIPTOR *desc;
1276 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1277 if (rrc == RAP_NOT_DONE) {
1278 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1282 LASSERT (rrc == RAP_SUCCESS);
1283 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1285 read_lock(&kranal_data.kra_global_lock);
1287 conn = kranal_cqid2conn_locked(cqid);
1289 /* Conn was destroyed? */
1290 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1291 read_unlock(&kranal_data.kra_global_lock);
1295 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1296 LASSERT (rrc == RAP_SUCCESS);
1298 CDEBUG(D_NET, "Completed %p\n",
1299 cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1301 spin_lock_irqsave(&conn->rac_lock, flags);
1303 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1304 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1305 cfs_list_del(&tx->tx_list);
1307 LASSERT(desc->AppPtr == (void *)tx);
1308 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1309 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1311 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1312 tx->tx_qtime = jiffies;
1314 spin_unlock_irqrestore(&conn->rac_lock, flags);
1316 /* Get conn's fmaq processed, now I've just put something
1318 kranal_schedule_conn(conn);
1320 read_unlock(&kranal_data.kra_global_lock);
1325 kranal_check_fma_cq (kra_device_t *dev)
1336 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1337 if (rrc == RAP_NOT_DONE) {
1338 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1342 LASSERT (rrc == RAP_SUCCESS);
1344 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1346 read_lock(&kranal_data.kra_global_lock);
1348 conn = kranal_cqid2conn_locked(cqid);
1350 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1353 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1355 kranal_schedule_conn(conn);
1358 read_unlock(&kranal_data.kra_global_lock);
1362 /* FMA CQ has overflowed: check ALL conns */
1363 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
1366 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1368 read_lock(&kranal_data.kra_global_lock);
1370 conns = &kranal_data.kra_conns[i];
1372 cfs_list_for_each (tmp, conns) {
1373 conn = cfs_list_entry(tmp, kra_conn_t,
1376 if (conn->rac_device == dev)
1377 kranal_schedule_conn(conn);
1380 /* don't block write lockers for too long... */
1381 read_unlock(&kranal_data.kra_global_lock);
1387 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1388 void *immediate, int immediatenob)
1390 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1393 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1394 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1395 immediate, immediatenob);
1397 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1398 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1399 immediatenob <= RANAL_FMA_MAX_DATA :
1402 msg->ram_connstamp = conn->rac_my_connstamp;
1403 msg->ram_seq = conn->rac_tx_seq;
1406 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1407 immediate, immediatenob,
1410 rrc = RapkFmaSend(conn->rac_rihandle,
1411 immediate, immediatenob,
1419 conn->rac_last_tx = jiffies;
1424 if (cfs_time_aftereq(jiffies,
1426 msecs_to_jiffies(conn->rac_keepalive *
1428 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1430 jiffies_to_msecs(jiffies - conn->rac_last_tx) /
1437 kranal_process_fmaq (kra_conn_t *conn)
1439 unsigned long flags;
1445 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1446 * However I will be rescheduled by an FMA completion event
1447 * when I eventually get some.
1448 * NB 2. Sampling rac_state here races with setting it elsewhere.
1449 * But it doesn't matter if I try to send a "real" message just
1450 * as I start closing because I'll get scheduled to send the
1453 /* Not racing with incoming message processing! */
1454 LASSERT (current == conn->rac_device->rad_scheduler);
1456 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1457 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1458 /* RDMAs in progress */
1459 LASSERT (!conn->rac_close_sent);
1461 if (cfs_time_aftereq(jiffies,
1463 msecs_to_jiffies(conn->rac_keepalive *
1465 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1466 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1467 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1472 if (conn->rac_close_sent)
1475 CWARN("sending CLOSE to %s\n",
1476 libcfs_nid2str(conn->rac_peer->rap_nid));
1477 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1478 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1482 conn->rac_close_sent = 1;
1483 if (!conn->rac_close_recvd)
1486 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1488 if (conn->rac_state == RANAL_CONN_CLOSING)
1489 kranal_terminate_conn_locked(conn);
1491 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1496 spin_lock_irqsave(&conn->rac_lock, flags);
1498 if (cfs_list_empty(&conn->rac_fmaq)) {
1500 spin_unlock_irqrestore(&conn->rac_lock, flags);
1502 if (cfs_time_aftereq(jiffies,
1504 msecs_to_jiffies(conn->rac_keepalive *
1506 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1507 libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1508 jiffies_to_msecs(jiffies - conn->rac_last_tx) /
1510 conn->rac_keepalive);
1511 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1512 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1517 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1518 cfs_list_del(&tx->tx_list);
1519 more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1521 spin_unlock_irqrestore(&conn->rac_lock, flags);
1524 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1525 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1526 switch (tx->tx_msg.ram_type) {
1530 case RANAL_MSG_IMMEDIATE:
1531 rc = kranal_sendmsg(conn, &tx->tx_msg,
1532 tx->tx_buffer, tx->tx_nob);
1535 case RANAL_MSG_PUT_NAK:
1536 case RANAL_MSG_PUT_DONE:
1537 case RANAL_MSG_GET_NAK:
1538 case RANAL_MSG_GET_DONE:
1539 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1542 case RANAL_MSG_PUT_REQ:
1543 rc = kranal_map_buffer(tx);
1544 LASSERT (rc != -EAGAIN);
1548 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1549 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1553 case RANAL_MSG_PUT_ACK:
1554 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1558 case RANAL_MSG_GET_REQ:
1559 rc = kranal_map_buffer(tx);
1560 LASSERT (rc != -EAGAIN);
1564 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1565 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1566 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1567 (__u64)((unsigned long)tx->tx_buffer);
1568 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1569 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1574 if (rc == -EAGAIN) {
1575 /* I need credits to send this. Replace tx at the head of the
1576 * fmaq and I'll get rescheduled when credits appear */
1577 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1578 spin_lock_irqsave(&conn->rac_lock, flags);
1579 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1580 spin_unlock_irqrestore(&conn->rac_lock, flags);
1584 if (!expect_reply || rc != 0) {
1585 kranal_tx_done(tx, rc);
1587 /* LASSERT(current) above ensures this doesn't race with reply
1589 spin_lock_irqsave(&conn->rac_lock, flags);
1590 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1591 tx->tx_qtime = jiffies;
1592 spin_unlock_irqrestore(&conn->rac_lock, flags);
1596 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1597 kranal_schedule_conn(conn);
1602 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1604 __swab64s(&d->rard_key.Key);
1605 __swab16s(&d->rard_key.Cookie);
1606 __swab16s(&d->rard_key.MdHandle);
1607 __swab32s(&d->rard_key.Flags);
1608 __swab64s(&d->rard_addr.AddressBits);
1609 __swab32s(&d->rard_nob);
1613 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1617 unsigned long flags;
1619 spin_lock_irqsave(&conn->rac_lock, flags);
1621 cfs_list_for_each(ttmp, &conn->rac_replyq) {
1622 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1624 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1625 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1627 if (tx->tx_cookie != cookie)
1630 if (tx->tx_msg.ram_type != type) {
1631 spin_unlock_irqrestore(&conn->rac_lock, flags);
1632 CWARN("Unexpected type %x (%x expected) "
1633 "matched reply from %s\n",
1634 tx->tx_msg.ram_type, type,
1635 libcfs_nid2str(conn->rac_peer->rap_nid));
1639 cfs_list_del(&tx->tx_list);
1640 spin_unlock_irqrestore(&conn->rac_lock, flags);
1644 spin_unlock_irqrestore(&conn->rac_lock, flags);
1645 CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1646 type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1651 kranal_check_fma_rx (kra_conn_t *conn)
1653 unsigned long flags;
1658 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1659 kra_peer_t *peer = conn->rac_peer;
1663 if (rrc == RAP_NOT_DONE)
1666 CDEBUG(D_NET, "RX on %p\n", conn);
1668 LASSERT (rrc == RAP_SUCCESS);
1669 conn->rac_last_rx = jiffies;
1670 seq = conn->rac_rx_seq++;
1671 msg = (kra_msg_t *)prefix;
1673 /* stash message for portals callbacks they'll NULL
1674 * rac_rxmsg if they consume it */
1675 LASSERT (conn->rac_rxmsg == NULL);
1676 conn->rac_rxmsg = msg;
1678 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1679 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1680 CERROR("Unexpected magic %08x from %s\n",
1681 msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1686 __swab32s(&msg->ram_magic);
1687 __swab16s(&msg->ram_version);
1688 __swab16s(&msg->ram_type);
1689 __swab64s(&msg->ram_srcnid);
1690 __swab64s(&msg->ram_connstamp);
1691 __swab32s(&msg->ram_seq);
1693 /* NB message type checked below; NOT here... */
1694 switch (msg->ram_type) {
1695 case RANAL_MSG_PUT_ACK:
1696 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1699 case RANAL_MSG_GET_REQ:
1700 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1708 if (msg->ram_version != RANAL_MSG_VERSION) {
1709 CERROR("Unexpected protocol version %d from %s\n",
1710 msg->ram_version, libcfs_nid2str(peer->rap_nid));
1715 if (msg->ram_srcnid != peer->rap_nid) {
1716 CERROR("Unexpected peer %s from %s\n",
1717 libcfs_nid2str(msg->ram_srcnid),
1718 libcfs_nid2str(peer->rap_nid));
1723 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1724 CERROR("Unexpected connstamp "LPX64"("LPX64
1725 " expected) from %s\n",
1726 msg->ram_connstamp, conn->rac_peer_connstamp,
1727 libcfs_nid2str(peer->rap_nid));
1732 if (msg->ram_seq != seq) {
1733 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1734 msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1739 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1740 /* This message signals RDMA completion... */
1741 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1742 if (rrc != RAP_SUCCESS) {
1743 CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1749 if (conn->rac_close_recvd) {
1750 CERROR("Unexpected message %d after CLOSE from %s\n",
1751 msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1756 if (msg->ram_type == RANAL_MSG_CLOSE) {
1757 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1758 conn->rac_close_recvd = 1;
1759 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1761 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1762 kranal_close_conn_locked(conn, 0);
1763 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1764 conn->rac_close_sent)
1765 kranal_terminate_conn_locked(conn);
1767 write_unlock_irqrestore(&kranal_data.kra_global_lock,
1772 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1775 switch (msg->ram_type) {
1776 case RANAL_MSG_NOOP:
1777 /* Nothing to do; just a keepalive */
1778 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1781 case RANAL_MSG_IMMEDIATE:
1782 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1783 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
1784 msg->ram_srcnid, conn, 0);
1788 case RANAL_MSG_PUT_REQ:
1789 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1790 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
1791 msg->ram_srcnid, conn, 1);
1795 case RANAL_MSG_PUT_NAK:
1796 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1797 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1798 msg->ram_u.completion.racm_cookie);
1802 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1803 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1804 kranal_tx_done(tx, -ENOENT); /* no match */
1807 case RANAL_MSG_PUT_ACK:
1808 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1809 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1810 msg->ram_u.putack.rapam_src_cookie);
1814 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1815 &msg->ram_u.putack.rapam_desc,
1816 msg->ram_u.putack.rapam_desc.rard_nob,
1817 msg->ram_u.putack.rapam_dst_cookie);
1820 case RANAL_MSG_PUT_DONE:
1821 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1822 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1823 msg->ram_u.completion.racm_cookie);
1827 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1828 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1829 kranal_tx_done(tx, 0);
1832 case RANAL_MSG_GET_REQ:
1833 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1834 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
1835 msg->ram_srcnid, conn, 1);
1839 case RANAL_MSG_GET_NAK:
1840 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1841 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1842 msg->ram_u.completion.racm_cookie);
1846 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1847 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1848 kranal_tx_done(tx, -ENOENT); /* no match */
1851 case RANAL_MSG_GET_DONE:
1852 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1853 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1854 msg->ram_u.completion.racm_cookie);
1858 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1859 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1861 /* completion message should send rdma length if we ever allow
1863 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1865 kranal_tx_done(tx, 0);
1870 if (rc < 0) /* protocol/comms error */
1871 kranal_close_conn (conn, rc);
1873 if (repost && conn->rac_rxmsg != NULL)
1874 kranal_consume_rxmsg(conn, NULL, 0);
1876 /* check again later */
1877 kranal_schedule_conn(conn);
1881 kranal_complete_closed_conn (kra_conn_t *conn)
1887 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1888 LASSERT (cfs_list_empty(&conn->rac_list));
1889 LASSERT (cfs_list_empty(&conn->rac_hashlist));
1891 for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1892 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1894 cfs_list_del(&tx->tx_list);
1895 kranal_tx_done(tx, -ECONNABORTED);
1898 LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1900 for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1901 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1903 cfs_list_del(&tx->tx_list);
1904 kranal_tx_done(tx, -ECONNABORTED);
1907 CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1908 conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1911 int kranal_process_new_conn (kra_conn_t *conn)
1915 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1916 if (rrc == RAP_SUCCESS)
1919 LASSERT (rrc == RAP_NOT_DONE);
1920 if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1921 msecs_to_jiffies(conn->rac_timeout*MSEC_PER_SEC)))
1925 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1926 LASSERT (rrc == RAP_SUCCESS);
1931 kranal_scheduler (void *arg)
1933 kra_device_t *dev = (kra_device_t *)arg;
1936 unsigned long flags;
1937 unsigned long deadline;
1938 unsigned long soonest;
1947 cfs_block_allsigs();
1949 dev->rad_scheduler = current;
1950 init_waitqueue_entry_current(&wait);
1952 spin_lock_irqsave(&dev->rad_lock, flags);
1954 while (!kranal_data.kra_shutdown) {
1955 /* Safe: kra_shutdown only set when quiescent */
1957 if (busy_loops++ >= RANAL_RESCHED) {
1958 spin_unlock_irqrestore(&dev->rad_lock, flags);
1963 spin_lock_irqsave(&dev->rad_lock, flags);
1968 if (dev->rad_ready) {
1969 /* Device callback fired since I last checked it */
1971 spin_unlock_irqrestore(&dev->rad_lock, flags);
1974 kranal_check_rdma_cq(dev);
1975 kranal_check_fma_cq(dev);
1977 spin_lock_irqsave(&dev->rad_lock, flags);
1980 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1981 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1983 cfs_list_del_init(&conn->rac_schedlist);
1984 LASSERT (conn->rac_scheduled);
1985 conn->rac_scheduled = 0;
1986 spin_unlock_irqrestore(&dev->rad_lock, flags);
1989 kranal_check_fma_rx(conn);
1990 kranal_process_fmaq(conn);
1992 if (conn->rac_state == RANAL_CONN_CLOSED)
1993 kranal_complete_closed_conn(conn);
1995 kranal_conn_decref(conn);
1996 spin_lock_irqsave(&dev->rad_lock, flags);
2002 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2003 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
2005 deadline = conn->rac_last_tx + conn->rac_keepalive;
2006 if (cfs_time_aftereq(jiffies, deadline)) {
2007 /* Time to process this new conn */
2008 spin_unlock_irqrestore(&dev->rad_lock,
2012 rc = kranal_process_new_conn(conn);
2013 if (rc != -EAGAIN) {
2014 /* All done with this conn */
2015 spin_lock_irqsave(&dev->rad_lock,
2017 cfs_list_del_init(&conn->rac_schedlist);
2018 spin_unlock_irqrestore(&dev-> \
2022 kranal_conn_decref(conn);
2023 spin_lock_irqsave(&dev->rad_lock,
2028 /* retry with exponential backoff until HZ */
2029 if (conn->rac_keepalive == 0)
2030 conn->rac_keepalive = 1;
2031 else if (conn->rac_keepalive <=
2032 msecs_to_jiffies(MSEC_PER_SEC))
2033 conn->rac_keepalive *= 2;
2035 conn->rac_keepalive +=
2036 msecs_to_jiffies(MSEC_PER_SEC);
2038 deadline = conn->rac_last_tx + conn->rac_keepalive;
2039 spin_lock_irqsave(&dev->rad_lock, flags);
2042 /* Does this conn need attention soonest? */
2043 if (nsoonest++ == 0 ||
2044 !cfs_time_aftereq(deadline, soonest))
2048 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2051 set_current_state(TASK_INTERRUPTIBLE);
2052 add_wait_queue_exclusive(&dev->rad_waitq, &wait);
2053 spin_unlock_irqrestore(&dev->rad_lock, flags);
2055 if (nsoonest == 0) {
2057 waitq_wait(&wait, TASK_INTERRUPTIBLE);
2059 timeout = (long)(soonest - jiffies);
2062 waitq_timedwait(&wait,
2068 remove_wait_queue(&dev->rad_waitq, &wait);
2069 set_current_state(TASK_RUNNING);
2070 spin_lock_irqsave(&dev->rad_lock, flags);
2073 spin_unlock_irqrestore(&dev->rad_lock, flags);
2075 dev->rad_scheduler = NULL;
2076 kranal_thread_fini();