4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
27 * Copyright (c) 2004, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
31 * This file is part of Lustre, http://www.lustre.org/
32 * Lustre is a trademark of Sun Microsystems, Inc.
34 * lnet/klnds/ralnd/ralnd_cb.c
36 * Author: Eric Barton <eric@bartonsoftware.com>
42 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
48 CDEBUG(D_NET, "callback for device %d\n", devid);
50 for (i = 0; i < kranal_data.kra_ndevs; i++) {
52 dev = &kranal_data.kra_devices[i];
53 if (dev->rad_id != devid)
56 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
58 if (!dev->rad_ready) {
60 cfs_waitq_signal(&dev->rad_waitq);
63 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
67 CWARN("callback for unknown device %d\n", devid);
71 kranal_schedule_conn(kra_conn_t *conn)
73 kra_device_t *dev = conn->rac_device;
76 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
78 if (!conn->rac_scheduled) {
79 kranal_conn_addref(conn); /* +1 ref for scheduler */
80 conn->rac_scheduled = 1;
81 cfs_list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
82 cfs_waitq_signal(&dev->rad_waitq);
85 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
89 kranal_get_idle_tx (void)
94 cfs_spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
96 if (cfs_list_empty(&kranal_data.kra_idle_txs)) {
97 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
101 tx = cfs_list_entry(kranal_data.kra_idle_txs.next, kra_tx_t, tx_list);
102 cfs_list_del(&tx->tx_list);
104 /* Allocate a new completion cookie. It might not be needed, but we've
105 * got a lock right now... */
106 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
108 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
110 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
111 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
112 LASSERT (tx->tx_conn == NULL);
113 LASSERT (tx->tx_lntmsg[0] == NULL);
114 LASSERT (tx->tx_lntmsg[1] == NULL);
120 kranal_init_msg(kra_msg_t *msg, int type)
122 msg->ram_magic = RANAL_MSG_MAGIC;
123 msg->ram_version = RANAL_MSG_VERSION;
124 msg->ram_type = type;
125 msg->ram_srcnid = kranal_data.kra_ni->ni_nid;
126 /* ram_connstamp gets set when FMA is sent */
130 kranal_new_tx_msg (int type)
132 kra_tx_t *tx = kranal_get_idle_tx();
135 kranal_init_msg(&tx->tx_msg, type);
141 kranal_setup_immediate_buffer (kra_tx_t *tx,
142 unsigned int niov, struct iovec *iov,
146 /* For now this is almost identical to kranal_setup_virt_buffer, but we
147 * could "flatten" the payload into a single contiguous buffer ready
148 * for sending direct over an FMA if we ever needed to. */
150 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
154 tx->tx_buffer = NULL;
158 while (offset >= iov->iov_len) {
159 offset -= iov->iov_len;
165 if (nob > iov->iov_len - offset) {
166 CERROR("Can't handle multiple vaddr fragments\n");
170 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
173 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
179 kranal_setup_virt_buffer (kra_tx_t *tx,
180 unsigned int niov, struct iovec *iov,
186 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
188 while (offset >= iov->iov_len) {
189 offset -= iov->iov_len;
195 if (nob > iov->iov_len - offset) {
196 CERROR("Can't handle multiple vaddr fragments\n");
200 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
202 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
207 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
210 RAP_PHYS_REGION *phys = tx->tx_phys;
213 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
217 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
219 while (offset >= kiov->kiov_len) {
220 offset -= kiov->kiov_len;
226 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
228 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
230 phys->Address = lnet_page2phys(kiov->kiov_page);
233 resid = nob - (kiov->kiov_len - offset);
239 if (kiov->kiov_offset != 0 ||
240 ((resid > PAGE_SIZE) &&
241 kiov->kiov_len < PAGE_SIZE)) {
242 /* Can't have gaps */
243 CERROR("Can't make payload contiguous in I/O VM:"
244 "page %d, offset %d, len %d \n",
245 (int)(phys - tx->tx_phys),
246 kiov->kiov_offset, kiov->kiov_len);
250 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
251 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
255 phys->Address = lnet_page2phys(kiov->kiov_page);
261 tx->tx_phys_npages = phys - tx->tx_phys;
266 kranal_setup_rdma_buffer (kra_tx_t *tx, unsigned int niov,
267 struct iovec *iov, lnet_kiov_t *kiov,
270 LASSERT ((iov == NULL) != (kiov == NULL));
273 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
275 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
279 kranal_map_buffer (kra_tx_t *tx)
281 kra_conn_t *conn = tx->tx_conn;
282 kra_device_t *dev = conn->rac_device;
285 LASSERT (current == dev->rad_scheduler);
287 switch (tx->tx_buftype) {
292 case RANAL_BUF_IMMEDIATE:
293 case RANAL_BUF_PHYS_MAPPED:
294 case RANAL_BUF_VIRT_MAPPED:
297 case RANAL_BUF_PHYS_UNMAPPED:
298 rrc = RapkRegisterPhys(dev->rad_handle,
299 tx->tx_phys, tx->tx_phys_npages,
301 if (rrc != RAP_SUCCESS) {
302 CERROR ("Can't map %d pages: dev %d "
303 "phys %u pp %u, virt %u nob %lu\n",
304 tx->tx_phys_npages, dev->rad_id,
305 dev->rad_nphysmap, dev->rad_nppphysmap,
306 dev->rad_nvirtmap, dev->rad_nobvirtmap);
307 return -ENOMEM; /* assume insufficient resources */
311 dev->rad_nppphysmap += tx->tx_phys_npages;
313 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
316 case RANAL_BUF_VIRT_UNMAPPED:
317 rrc = RapkRegisterMemory(dev->rad_handle,
318 tx->tx_buffer, tx->tx_nob,
320 if (rrc != RAP_SUCCESS) {
321 CERROR ("Can't map %d bytes: dev %d "
322 "phys %u pp %u, virt %u nob %lu\n",
323 tx->tx_nob, dev->rad_id,
324 dev->rad_nphysmap, dev->rad_nppphysmap,
325 dev->rad_nvirtmap, dev->rad_nobvirtmap);
326 return -ENOMEM; /* assume insufficient resources */
330 dev->rad_nobvirtmap += tx->tx_nob;
332 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
338 kranal_unmap_buffer (kra_tx_t *tx)
343 switch (tx->tx_buftype) {
348 case RANAL_BUF_IMMEDIATE:
349 case RANAL_BUF_PHYS_UNMAPPED:
350 case RANAL_BUF_VIRT_UNMAPPED:
353 case RANAL_BUF_PHYS_MAPPED:
354 LASSERT (tx->tx_conn != NULL);
355 dev = tx->tx_conn->rac_device;
356 LASSERT (current == dev->rad_scheduler);
357 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
359 LASSERT (rrc == RAP_SUCCESS);
362 dev->rad_nppphysmap -= tx->tx_phys_npages;
364 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
367 case RANAL_BUF_VIRT_MAPPED:
368 LASSERT (tx->tx_conn != NULL);
369 dev = tx->tx_conn->rac_device;
370 LASSERT (current == dev->rad_scheduler);
371 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
373 LASSERT (rrc == RAP_SUCCESS);
376 dev->rad_nobvirtmap -= tx->tx_nob;
378 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
384 kranal_tx_done (kra_tx_t *tx, int completion)
386 lnet_msg_t *lnetmsg[2];
390 LASSERT (!cfs_in_interrupt());
392 kranal_unmap_buffer(tx);
394 lnetmsg[0] = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
395 lnetmsg[1] = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
397 tx->tx_buftype = RANAL_BUF_NONE;
398 tx->tx_msg.ram_type = RANAL_MSG_NONE;
401 cfs_spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
403 cfs_list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
405 cfs_spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
407 /* finalize AFTER freeing lnet msgs */
408 for (i = 0; i < 2; i++) {
409 if (lnetmsg[i] == NULL)
412 lnet_finalize(kranal_data.kra_ni, lnetmsg[i], completion);
417 kranal_find_conn_locked (kra_peer_t *peer)
421 /* just return the first connection */
422 cfs_list_for_each (tmp, &peer->rap_conns) {
423 return cfs_list_entry(tmp, kra_conn_t, rac_list);
430 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
436 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
437 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
438 tx->tx_qtime = jiffies;
439 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
441 kranal_schedule_conn(conn);
445 kranal_launch_tx (kra_tx_t *tx, lnet_nid_t nid)
452 cfs_rwlock_t *g_lock = &kranal_data.kra_global_lock;
454 /* If I get here, I've committed to send, so I complete the tx with
455 * failure on any problems */
457 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
459 for (retry = 0; ; retry = 1) {
461 cfs_read_lock(g_lock);
463 peer = kranal_find_peer_locked(nid);
465 conn = kranal_find_conn_locked(peer);
467 kranal_post_fma(conn, tx);
468 cfs_read_unlock(g_lock);
473 /* Making connections; I'll need a write lock... */
474 cfs_read_unlock(g_lock);
475 cfs_write_lock_irqsave(g_lock, flags);
477 peer = kranal_find_peer_locked(nid);
481 cfs_write_unlock_irqrestore(g_lock, flags);
484 CERROR("Can't find peer %s\n", libcfs_nid2str(nid));
485 kranal_tx_done(tx, -EHOSTUNREACH);
489 rc = kranal_add_persistent_peer(nid, LNET_NIDADDR(nid),
490 lnet_acceptor_port());
492 CERROR("Can't add peer %s: %d\n",
493 libcfs_nid2str(nid), rc);
494 kranal_tx_done(tx, rc);
499 conn = kranal_find_conn_locked(peer);
501 /* Connection exists; queue message on it */
502 kranal_post_fma(conn, tx);
503 cfs_write_unlock_irqrestore(g_lock, flags);
507 LASSERT (peer->rap_persistence > 0);
509 if (!peer->rap_connecting) {
510 LASSERT (cfs_list_empty(&peer->rap_tx_queue));
512 if (!(peer->rap_reconnect_interval == 0 || /* first attempt */
513 cfs_time_aftereq(jiffies, peer->rap_reconnect_time))) {
514 cfs_write_unlock_irqrestore(g_lock, flags);
515 kranal_tx_done(tx, -EHOSTUNREACH);
519 peer->rap_connecting = 1;
520 kranal_peer_addref(peer); /* extra ref for connd */
522 cfs_spin_lock(&kranal_data.kra_connd_lock);
524 cfs_list_add_tail(&peer->rap_connd_list,
525 &kranal_data.kra_connd_peers);
526 cfs_waitq_signal(&kranal_data.kra_connd_waitq);
528 cfs_spin_unlock(&kranal_data.kra_connd_lock);
531 /* A connection is being established; queue the message... */
532 cfs_list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
534 cfs_write_unlock_irqrestore(g_lock, flags);
538 kranal_rdma(kra_tx_t *tx, int type,
539 kra_rdma_desc_t *sink, int nob, __u64 cookie)
541 kra_conn_t *conn = tx->tx_conn;
545 LASSERT (kranal_tx_mapped(tx));
546 LASSERT (nob <= sink->rard_nob);
547 LASSERT (nob <= tx->tx_nob);
549 /* No actual race with scheduler sending CLOSE (I'm she!) */
550 LASSERT (current == conn->rac_device->rad_scheduler);
552 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
553 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
554 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
555 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
556 tx->tx_rdma_desc.DstKey = sink->rard_key;
557 tx->tx_rdma_desc.Length = nob;
558 tx->tx_rdma_desc.AppPtr = tx;
560 /* prep final completion message */
561 kranal_init_msg(&tx->tx_msg, type);
562 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
564 if (nob == 0) { /* Immediate completion */
565 kranal_post_fma(conn, tx);
569 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
571 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
572 LASSERT (rrc == RAP_SUCCESS);
574 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
575 cfs_list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
576 tx->tx_qtime = jiffies;
577 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
581 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
583 __u32 nob_received = nob;
586 LASSERT (conn->rac_rxmsg != NULL);
587 CDEBUG(D_NET, "Consuming %p\n", conn);
589 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
590 &nob_received, sizeof(kra_msg_t));
591 LASSERT (rrc == RAP_SUCCESS);
593 conn->rac_rxmsg = NULL;
595 if (nob_received < nob) {
596 CWARN("Incomplete immediate msg from %s: expected %d, got %d\n",
597 libcfs_nid2str(conn->rac_peer->rap_nid),
606 kranal_send (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
608 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
609 int type = lntmsg->msg_type;
610 lnet_process_id_t target = lntmsg->msg_target;
611 int target_is_router = lntmsg->msg_target_is_router;
612 int routing = lntmsg->msg_routing;
613 unsigned int niov = lntmsg->msg_niov;
614 struct iovec *iov = lntmsg->msg_iov;
615 lnet_kiov_t *kiov = lntmsg->msg_kiov;
616 unsigned int offset = lntmsg->msg_offset;
617 unsigned int nob = lntmsg->msg_len;
621 /* NB 'private' is different depending on what we're sending.... */
623 CDEBUG(D_NET, "sending %d bytes in %d frags to %s\n",
624 nob, niov, libcfs_id2str(target));
626 LASSERT (nob == 0 || niov > 0);
627 LASSERT (niov <= LNET_MAX_IOV);
629 LASSERT (!cfs_in_interrupt());
630 /* payload is either all vaddrs or all pages */
631 LASSERT (!(kiov != NULL && iov != NULL));
634 CERROR ("Can't route\n");
649 /* We have to consider the eventual sink buffer rather than any
650 * payload passed here (there isn't any, and strictly, looking
651 * inside lntmsg is a layering violation). We send a simple
652 * IMMEDIATE GET if the sink buffer is mapped already and small
655 if (routing || target_is_router)
656 break; /* send IMMEDIATE */
658 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0 &&
659 lntmsg->msg_md->md_length <= RANAL_FMA_MAX_DATA &&
660 lntmsg->msg_md->md_length <= *kranal_tunables.kra_max_immediate)
661 break; /* send IMMEDIATE */
663 tx = kranal_new_tx_msg(RANAL_MSG_GET_REQ);
667 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
668 rc = kranal_setup_virt_buffer(tx, lntmsg->msg_md->md_niov,
669 lntmsg->msg_md->md_iov.iov,
670 0, lntmsg->msg_md->md_length);
672 rc = kranal_setup_phys_buffer(tx, lntmsg->msg_md->md_niov,
673 lntmsg->msg_md->md_iov.kiov,
674 0, lntmsg->msg_md->md_length);
676 kranal_tx_done(tx, rc);
680 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
681 if (tx->tx_lntmsg[1] == NULL) {
682 CERROR("Can't create reply for GET to %s\n",
683 libcfs_nid2str(target.nid));
684 kranal_tx_done(tx, rc);
688 tx->tx_lntmsg[0] = lntmsg;
689 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
690 /* rest of tx_msg is setup just before it is sent */
691 kranal_launch_tx(tx, target.nid);
696 if (kiov == NULL && /* not paged */
697 nob <= RANAL_FMA_MAX_DATA && /* small enough */
698 nob <= *kranal_tunables.kra_max_immediate)
699 break; /* send IMMEDIATE */
701 tx = kranal_new_tx_msg(RANAL_MSG_PUT_REQ);
705 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
707 kranal_tx_done(tx, rc);
711 tx->tx_lntmsg[0] = lntmsg;
712 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
713 /* rest of tx_msg is setup just before it is sent */
714 kranal_launch_tx(tx, target.nid);
720 LASSERT (kiov == NULL);
721 LASSERT (nob <= RANAL_FMA_MAX_DATA);
723 tx = kranal_new_tx_msg(RANAL_MSG_IMMEDIATE);
727 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
729 kranal_tx_done(tx, rc);
733 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
734 tx->tx_lntmsg[0] = lntmsg;
735 kranal_launch_tx(tx, target.nid);
740 kranal_reply(lnet_ni_t *ni, kra_conn_t *conn, lnet_msg_t *lntmsg)
742 kra_msg_t *rxmsg = conn->rac_rxmsg;
743 unsigned int niov = lntmsg->msg_niov;
744 struct iovec *iov = lntmsg->msg_iov;
745 lnet_kiov_t *kiov = lntmsg->msg_kiov;
746 unsigned int offset = lntmsg->msg_offset;
747 unsigned int nob = lntmsg->msg_len;
751 tx = kranal_get_idle_tx();
755 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
761 rc = kranal_map_buffer(tx);
765 tx->tx_lntmsg[0] = lntmsg;
767 kranal_rdma(tx, RANAL_MSG_GET_DONE,
768 &rxmsg->ram_u.get.ragm_desc, nob,
769 rxmsg->ram_u.get.ragm_cookie);
773 kranal_tx_done(tx, -EIO);
775 lnet_finalize(ni, lntmsg, -EIO);
779 kranal_eager_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
782 kra_conn_t *conn = (kra_conn_t *)private;
784 LCONSOLE_ERROR_MSG(0x12b, "Dropping message from %s: no buffers free.\n",
785 libcfs_nid2str(conn->rac_peer->rap_nid));
791 kranal_recv (lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
792 int delayed, unsigned int niov,
793 struct iovec *iov, lnet_kiov_t *kiov,
794 unsigned int offset, unsigned int mlen, unsigned int rlen)
796 kra_conn_t *conn = private;
797 kra_msg_t *rxmsg = conn->rac_rxmsg;
802 LASSERT (mlen <= rlen);
803 LASSERT (!cfs_in_interrupt());
804 /* Either all pages or all vaddrs */
805 LASSERT (!(kiov != NULL && iov != NULL));
807 CDEBUG(D_NET, "conn %p, rxmsg %p, lntmsg %p\n", conn, rxmsg, lntmsg);
809 switch(rxmsg->ram_type) {
813 case RANAL_MSG_IMMEDIATE:
816 } else if (kiov != NULL) {
817 CERROR("Can't recv immediate into paged buffer\n");
821 while (offset >= iov->iov_len) {
822 offset -= iov->iov_len;
827 if (mlen > iov->iov_len - offset) {
828 CERROR("Can't handle immediate frags\n");
831 buffer = ((char *)iov->iov_base) + offset;
833 rc = kranal_consume_rxmsg(conn, buffer, mlen);
834 lnet_finalize(ni, lntmsg, (rc == 0) ? 0 : -EIO);
837 case RANAL_MSG_PUT_REQ:
838 tx = kranal_new_tx_msg(RANAL_MSG_PUT_ACK);
840 kranal_consume_rxmsg(conn, NULL, 0);
844 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
846 kranal_tx_done(tx, rc);
847 kranal_consume_rxmsg(conn, NULL, 0);
852 rc = kranal_map_buffer(tx);
854 kranal_tx_done(tx, rc);
855 kranal_consume_rxmsg(conn, NULL, 0);
859 tx->tx_msg.ram_u.putack.rapam_src_cookie =
860 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
861 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
862 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
863 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
864 (__u64)((unsigned long)tx->tx_buffer);
865 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
867 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
869 kranal_post_fma(conn, tx);
870 kranal_consume_rxmsg(conn, NULL, 0);
873 case RANAL_MSG_GET_REQ:
874 if (lntmsg != NULL) {
876 kranal_reply(ni, conn, lntmsg);
879 tx = kranal_new_tx_msg(RANAL_MSG_GET_NAK);
881 tx->tx_msg.ram_u.completion.racm_cookie =
882 rxmsg->ram_u.get.ragm_cookie;
883 kranal_post_fma(conn, tx);
886 kranal_consume_rxmsg(conn, NULL, 0);
892 kranal_thread_start (int(*fn)(void *arg), void *arg)
894 long pid = cfs_create_thread(fn, arg, 0);
899 cfs_atomic_inc(&kranal_data.kra_nthreads);
904 kranal_thread_fini (void)
906 cfs_atomic_dec(&kranal_data.kra_nthreads);
910 kranal_check_conn_timeouts (kra_conn_t *conn)
916 unsigned long now = jiffies;
918 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
919 conn->rac_state == RANAL_CONN_CLOSING);
921 if (!conn->rac_close_sent &&
922 cfs_time_aftereq(now, conn->rac_last_tx + conn->rac_keepalive *
924 /* not sent in a while; schedule conn so scheduler sends a keepalive */
925 CDEBUG(D_NET, "Scheduling keepalive %p->%s\n",
926 conn, libcfs_nid2str(conn->rac_peer->rap_nid));
927 kranal_schedule_conn(conn);
930 timeout = conn->rac_timeout * CFS_HZ;
932 if (!conn->rac_close_recvd &&
933 cfs_time_aftereq(now, conn->rac_last_rx + timeout)) {
934 CERROR("%s received from %s within %lu seconds\n",
935 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
936 "Nothing" : "CLOSE not",
937 libcfs_nid2str(conn->rac_peer->rap_nid),
938 (now - conn->rac_last_rx)/CFS_HZ);
942 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
945 /* Check the conn's queues are moving. These are "belt+braces" checks,
946 * in case of hardware/software errors that make this conn seem
947 * responsive even though it isn't progressing its message queues. */
949 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
951 cfs_list_for_each (ttmp, &conn->rac_fmaq) {
952 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
954 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
955 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
956 CERROR("tx on fmaq for %s blocked %lu seconds\n",
957 libcfs_nid2str(conn->rac_peer->rap_nid),
958 (now - tx->tx_qtime)/CFS_HZ);
963 cfs_list_for_each (ttmp, &conn->rac_rdmaq) {
964 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
966 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
967 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
968 CERROR("tx on rdmaq for %s blocked %lu seconds\n",
969 libcfs_nid2str(conn->rac_peer->rap_nid),
970 (now - tx->tx_qtime)/CFS_HZ);
975 cfs_list_for_each (ttmp, &conn->rac_replyq) {
976 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
978 if (cfs_time_aftereq(now, tx->tx_qtime + timeout)) {
979 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
980 CERROR("tx on replyq for %s blocked %lu seconds\n",
981 libcfs_nid2str(conn->rac_peer->rap_nid),
982 (now - tx->tx_qtime)/CFS_HZ);
987 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
992 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
994 cfs_list_t *conns = &kranal_data.kra_conns[idx];
1001 /* NB. We expect to check all the conns and not find any problems, so
1002 * we just use a shared lock while we take a look... */
1003 cfs_read_lock(&kranal_data.kra_global_lock);
1005 cfs_list_for_each (ctmp, conns) {
1006 conn = cfs_list_entry(ctmp, kra_conn_t, rac_hashlist);
1008 if (conn->rac_timeout < *min_timeoutp )
1009 *min_timeoutp = conn->rac_timeout;
1010 if (conn->rac_keepalive < *min_timeoutp )
1011 *min_timeoutp = conn->rac_keepalive;
1013 rc = kranal_check_conn_timeouts(conn);
1017 kranal_conn_addref(conn);
1018 cfs_read_unlock(&kranal_data.kra_global_lock);
1020 CERROR("Conn to %s, cqid %d timed out\n",
1021 libcfs_nid2str(conn->rac_peer->rap_nid),
1024 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1026 switch (conn->rac_state) {
1030 case RANAL_CONN_ESTABLISHED:
1031 kranal_close_conn_locked(conn, -ETIMEDOUT);
1034 case RANAL_CONN_CLOSING:
1035 kranal_terminate_conn_locked(conn);
1039 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1042 kranal_conn_decref(conn);
1044 /* start again now I've dropped the lock */
1048 cfs_read_unlock(&kranal_data.kra_global_lock);
1052 kranal_connd (void *arg)
1054 long id = (long)arg;
1056 cfs_waitlink_t wait;
1057 unsigned long flags;
1059 kra_acceptsock_t *ras;
1062 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1063 cfs_daemonize(name);
1064 cfs_block_allsigs();
1066 cfs_waitlink_init(&wait);
1068 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1070 while (!kranal_data.kra_shutdown) {
1073 if (!cfs_list_empty(&kranal_data.kra_connd_acceptq)) {
1074 ras = cfs_list_entry(kranal_data.kra_connd_acceptq.next,
1075 kra_acceptsock_t, ras_list);
1076 cfs_list_del(&ras->ras_list);
1078 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1081 CDEBUG(D_NET,"About to handshake someone\n");
1083 kranal_conn_handshake(ras->ras_sock, NULL);
1084 kranal_free_acceptsock(ras);
1086 CDEBUG(D_NET,"Finished handshaking someone\n");
1088 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1093 if (!cfs_list_empty(&kranal_data.kra_connd_peers)) {
1094 peer = cfs_list_entry(kranal_data.kra_connd_peers.next,
1095 kra_peer_t, rap_connd_list);
1097 cfs_list_del_init(&peer->rap_connd_list);
1098 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock,
1101 kranal_connect(peer);
1102 kranal_peer_decref(peer);
1104 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock,
1112 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1113 cfs_waitq_add_exclusive(&kranal_data.kra_connd_waitq, &wait);
1115 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1117 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
1119 cfs_set_current_state(CFS_TASK_RUNNING);
1120 cfs_waitq_del(&kranal_data.kra_connd_waitq, &wait);
1122 cfs_spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1125 cfs_spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1127 kranal_thread_fini();
1132 kranal_update_reaper_timeout(long timeout)
1134 unsigned long flags;
1136 LASSERT (timeout > 0);
1138 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1140 if (timeout < kranal_data.kra_new_min_timeout)
1141 kranal_data.kra_new_min_timeout = timeout;
1143 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1147 kranal_reaper (void *arg)
1149 cfs_waitlink_t wait;
1150 unsigned long flags;
1153 int conn_entries = kranal_data.kra_conn_hash_size;
1155 int base_index = conn_entries - 1;
1156 unsigned long next_check_time = jiffies;
1157 long next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1158 long current_min_timeout = 1;
1160 cfs_daemonize("kranal_reaper");
1161 cfs_block_allsigs();
1163 cfs_waitlink_init(&wait);
1165 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1167 while (!kranal_data.kra_shutdown) {
1168 /* I wake up every 'p' seconds to check for timeouts on some
1169 * more peers. I try to check every connection 'n' times
1170 * within the global minimum of all keepalive and timeout
1171 * intervals, to ensure I attend to every connection within
1172 * (n+1)/n times its timeout intervals. */
1175 unsigned long min_timeout;
1178 /* careful with the jiffy wrap... */
1179 timeout = (long)(next_check_time - jiffies);
1181 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
1182 cfs_waitq_add(&kranal_data.kra_reaper_waitq, &wait);
1184 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock,
1187 cfs_waitq_timedwait(&wait, CFS_TASK_INTERRUPTIBLE,
1190 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock,
1193 cfs_set_current_state(CFS_TASK_RUNNING);
1194 cfs_waitq_del(&kranal_data.kra_reaper_waitq, &wait);
1198 if (kranal_data.kra_new_min_timeout !=
1199 CFS_MAX_SCHEDULE_TIMEOUT) {
1200 /* new min timeout set: restart min timeout scan */
1201 next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1202 base_index = conn_index - 1;
1204 base_index = conn_entries - 1;
1206 if (kranal_data.kra_new_min_timeout <
1207 current_min_timeout) {
1208 current_min_timeout =
1209 kranal_data.kra_new_min_timeout;
1210 CDEBUG(D_NET, "Set new min timeout %ld\n",
1211 current_min_timeout);
1214 kranal_data.kra_new_min_timeout =
1215 CFS_MAX_SCHEDULE_TIMEOUT;
1217 min_timeout = current_min_timeout;
1219 cfs_spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1221 LASSERT (min_timeout > 0);
1223 /* Compute how many table entries to check now so I get round
1224 * the whole table fast enough given that I do this at fixed
1225 * intervals of 'p' seconds) */
1226 chunk = conn_entries;
1227 if (min_timeout > n * p)
1228 chunk = (chunk * n * p) / min_timeout;
1232 for (i = 0; i < chunk; i++) {
1233 kranal_reaper_check(conn_index,
1235 conn_index = (conn_index + 1) % conn_entries;
1238 next_check_time += p * CFS_HZ;
1240 cfs_spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1242 if (((conn_index - chunk <= base_index &&
1243 base_index < conn_index) ||
1244 (conn_index - conn_entries - chunk <= base_index &&
1245 base_index < conn_index - conn_entries))) {
1247 /* Scanned all conns: set current_min_timeout... */
1248 if (current_min_timeout != next_min_timeout) {
1249 current_min_timeout = next_min_timeout;
1250 CDEBUG(D_NET, "Set new min timeout %ld\n",
1251 current_min_timeout);
1254 /* ...and restart min timeout scan */
1255 next_min_timeout = CFS_MAX_SCHEDULE_TIMEOUT;
1256 base_index = conn_index - 1;
1258 base_index = conn_entries - 1;
1262 kranal_thread_fini();
1267 kranal_check_rdma_cq (kra_device_t *dev)
1272 unsigned long flags;
1273 RAP_RDMA_DESCRIPTOR *desc;
1278 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1279 if (rrc == RAP_NOT_DONE) {
1280 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1284 LASSERT (rrc == RAP_SUCCESS);
1285 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1287 cfs_read_lock(&kranal_data.kra_global_lock);
1289 conn = kranal_cqid2conn_locked(cqid);
1291 /* Conn was destroyed? */
1292 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1293 cfs_read_unlock(&kranal_data.kra_global_lock);
1297 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1298 LASSERT (rrc == RAP_SUCCESS);
1300 CDEBUG(D_NET, "Completed %p\n",
1301 cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1303 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1305 LASSERT (!cfs_list_empty(&conn->rac_rdmaq));
1306 tx = cfs_list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1307 cfs_list_del(&tx->tx_list);
1309 LASSERT(desc->AppPtr == (void *)tx);
1310 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1311 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1313 cfs_list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1314 tx->tx_qtime = jiffies;
1316 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1318 /* Get conn's fmaq processed, now I've just put something
1320 kranal_schedule_conn(conn);
1322 cfs_read_unlock(&kranal_data.kra_global_lock);
1327 kranal_check_fma_cq (kra_device_t *dev)
1338 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1339 if (rrc == RAP_NOT_DONE) {
1340 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1344 LASSERT (rrc == RAP_SUCCESS);
1346 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1348 cfs_read_lock(&kranal_data.kra_global_lock);
1350 conn = kranal_cqid2conn_locked(cqid);
1352 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1355 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1357 kranal_schedule_conn(conn);
1360 cfs_read_unlock(&kranal_data.kra_global_lock);
1364 /* FMA CQ has overflowed: check ALL conns */
1365 CWARN("FMA CQ overflow: scheduling ALL conns on device %d\n",
1368 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1370 cfs_read_lock(&kranal_data.kra_global_lock);
1372 conns = &kranal_data.kra_conns[i];
1374 cfs_list_for_each (tmp, conns) {
1375 conn = cfs_list_entry(tmp, kra_conn_t,
1378 if (conn->rac_device == dev)
1379 kranal_schedule_conn(conn);
1382 /* don't block write lockers for too long... */
1383 cfs_read_unlock(&kranal_data.kra_global_lock);
1389 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1390 void *immediate, int immediatenob)
1392 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1395 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1396 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1397 immediate, immediatenob);
1399 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1400 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1401 immediatenob <= RANAL_FMA_MAX_DATA :
1404 msg->ram_connstamp = conn->rac_my_connstamp;
1405 msg->ram_seq = conn->rac_tx_seq;
1408 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1409 immediate, immediatenob,
1412 rrc = RapkFmaSend(conn->rac_rihandle,
1413 immediate, immediatenob,
1421 conn->rac_last_tx = jiffies;
1426 if (cfs_time_aftereq(jiffies,
1427 conn->rac_last_tx + conn->rac_keepalive *
1429 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1431 (jiffies - conn->rac_last_tx)/CFS_HZ);
1437 kranal_process_fmaq (kra_conn_t *conn)
1439 unsigned long flags;
1445 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1446 * However I will be rescheduled by an FMA completion event
1447 * when I eventually get some.
1448 * NB 2. Sampling rac_state here races with setting it elsewhere.
1449 * But it doesn't matter if I try to send a "real" message just
1450 * as I start closing because I'll get scheduled to send the
1453 /* Not racing with incoming message processing! */
1454 LASSERT (current == conn->rac_device->rad_scheduler);
1456 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1457 if (!cfs_list_empty(&conn->rac_rdmaq)) {
1458 /* RDMAs in progress */
1459 LASSERT (!conn->rac_close_sent);
1461 if (cfs_time_aftereq(jiffies,
1463 conn->rac_keepalive * CFS_HZ)) {
1464 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1465 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1466 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1471 if (conn->rac_close_sent)
1474 CWARN("sending CLOSE to %s\n",
1475 libcfs_nid2str(conn->rac_peer->rap_nid));
1476 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1477 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1481 conn->rac_close_sent = 1;
1482 if (!conn->rac_close_recvd)
1485 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1487 if (conn->rac_state == RANAL_CONN_CLOSING)
1488 kranal_terminate_conn_locked(conn);
1490 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1495 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1497 if (cfs_list_empty(&conn->rac_fmaq)) {
1499 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1501 if (cfs_time_aftereq(jiffies,
1502 conn->rac_last_tx + conn->rac_keepalive *
1504 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%ld))\n",
1505 libcfs_nid2str(conn->rac_peer->rap_nid), conn,
1506 (jiffies - conn->rac_last_tx)/CFS_HZ,
1507 conn->rac_keepalive);
1508 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1509 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1514 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1515 cfs_list_del(&tx->tx_list);
1516 more_to_do = !cfs_list_empty(&conn->rac_fmaq);
1518 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1521 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1522 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1523 switch (tx->tx_msg.ram_type) {
1527 case RANAL_MSG_IMMEDIATE:
1528 rc = kranal_sendmsg(conn, &tx->tx_msg,
1529 tx->tx_buffer, tx->tx_nob);
1532 case RANAL_MSG_PUT_NAK:
1533 case RANAL_MSG_PUT_DONE:
1534 case RANAL_MSG_GET_NAK:
1535 case RANAL_MSG_GET_DONE:
1536 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1539 case RANAL_MSG_PUT_REQ:
1540 rc = kranal_map_buffer(tx);
1541 LASSERT (rc != -EAGAIN);
1545 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1546 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1550 case RANAL_MSG_PUT_ACK:
1551 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1555 case RANAL_MSG_GET_REQ:
1556 rc = kranal_map_buffer(tx);
1557 LASSERT (rc != -EAGAIN);
1561 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1562 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1563 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1564 (__u64)((unsigned long)tx->tx_buffer);
1565 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1566 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1571 if (rc == -EAGAIN) {
1572 /* I need credits to send this. Replace tx at the head of the
1573 * fmaq and I'll get rescheduled when credits appear */
1574 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1575 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1576 cfs_list_add(&tx->tx_list, &conn->rac_fmaq);
1577 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1581 if (!expect_reply || rc != 0) {
1582 kranal_tx_done(tx, rc);
1584 /* LASSERT(current) above ensures this doesn't race with reply
1586 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1587 cfs_list_add_tail(&tx->tx_list, &conn->rac_replyq);
1588 tx->tx_qtime = jiffies;
1589 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1593 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1594 kranal_schedule_conn(conn);
1599 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1601 __swab64s(&d->rard_key.Key);
1602 __swab16s(&d->rard_key.Cookie);
1603 __swab16s(&d->rard_key.MdHandle);
1604 __swab32s(&d->rard_key.Flags);
1605 __swab64s(&d->rard_addr.AddressBits);
1606 __swab32s(&d->rard_nob);
1610 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1614 unsigned long flags;
1616 cfs_spin_lock_irqsave(&conn->rac_lock, flags);
1618 cfs_list_for_each(ttmp, &conn->rac_replyq) {
1619 tx = cfs_list_entry(ttmp, kra_tx_t, tx_list);
1621 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1622 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1624 if (tx->tx_cookie != cookie)
1627 if (tx->tx_msg.ram_type != type) {
1628 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1629 CWARN("Unexpected type %x (%x expected) "
1630 "matched reply from %s\n",
1631 tx->tx_msg.ram_type, type,
1632 libcfs_nid2str(conn->rac_peer->rap_nid));
1636 cfs_list_del(&tx->tx_list);
1637 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1641 cfs_spin_unlock_irqrestore(&conn->rac_lock, flags);
1642 CWARN("Unmatched reply %02x/"LPX64" from %s\n",
1643 type, cookie, libcfs_nid2str(conn->rac_peer->rap_nid));
1648 kranal_check_fma_rx (kra_conn_t *conn)
1650 unsigned long flags;
1655 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1656 kra_peer_t *peer = conn->rac_peer;
1660 if (rrc == RAP_NOT_DONE)
1663 CDEBUG(D_NET, "RX on %p\n", conn);
1665 LASSERT (rrc == RAP_SUCCESS);
1666 conn->rac_last_rx = jiffies;
1667 seq = conn->rac_rx_seq++;
1668 msg = (kra_msg_t *)prefix;
1670 /* stash message for portals callbacks they'll NULL
1671 * rac_rxmsg if they consume it */
1672 LASSERT (conn->rac_rxmsg == NULL);
1673 conn->rac_rxmsg = msg;
1675 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1676 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1677 CERROR("Unexpected magic %08x from %s\n",
1678 msg->ram_magic, libcfs_nid2str(peer->rap_nid));
1683 __swab32s(&msg->ram_magic);
1684 __swab16s(&msg->ram_version);
1685 __swab16s(&msg->ram_type);
1686 __swab64s(&msg->ram_srcnid);
1687 __swab64s(&msg->ram_connstamp);
1688 __swab32s(&msg->ram_seq);
1690 /* NB message type checked below; NOT here... */
1691 switch (msg->ram_type) {
1692 case RANAL_MSG_PUT_ACK:
1693 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1696 case RANAL_MSG_GET_REQ:
1697 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1705 if (msg->ram_version != RANAL_MSG_VERSION) {
1706 CERROR("Unexpected protocol version %d from %s\n",
1707 msg->ram_version, libcfs_nid2str(peer->rap_nid));
1712 if (msg->ram_srcnid != peer->rap_nid) {
1713 CERROR("Unexpected peer %s from %s\n",
1714 libcfs_nid2str(msg->ram_srcnid),
1715 libcfs_nid2str(peer->rap_nid));
1720 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1721 CERROR("Unexpected connstamp "LPX64"("LPX64
1722 " expected) from %s\n",
1723 msg->ram_connstamp, conn->rac_peer_connstamp,
1724 libcfs_nid2str(peer->rap_nid));
1729 if (msg->ram_seq != seq) {
1730 CERROR("Unexpected sequence number %d(%d expected) from %s\n",
1731 msg->ram_seq, seq, libcfs_nid2str(peer->rap_nid));
1736 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1737 /* This message signals RDMA completion... */
1738 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1739 if (rrc != RAP_SUCCESS) {
1740 CERROR("RapkFmaSyncWait failed: %d\n", rrc);
1746 if (conn->rac_close_recvd) {
1747 CERROR("Unexpected message %d after CLOSE from %s\n",
1748 msg->ram_type, libcfs_nid2str(conn->rac_peer->rap_nid));
1753 if (msg->ram_type == RANAL_MSG_CLOSE) {
1754 CWARN("RX CLOSE from %s\n", libcfs_nid2str(conn->rac_peer->rap_nid));
1755 conn->rac_close_recvd = 1;
1756 cfs_write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1758 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1759 kranal_close_conn_locked(conn, 0);
1760 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1761 conn->rac_close_sent)
1762 kranal_terminate_conn_locked(conn);
1764 cfs_write_unlock_irqrestore(&kranal_data.kra_global_lock,
1769 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1772 switch (msg->ram_type) {
1773 case RANAL_MSG_NOOP:
1774 /* Nothing to do; just a keepalive */
1775 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1778 case RANAL_MSG_IMMEDIATE:
1779 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1780 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.immediate.raim_hdr,
1781 msg->ram_srcnid, conn, 0);
1785 case RANAL_MSG_PUT_REQ:
1786 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1787 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.putreq.raprm_hdr,
1788 msg->ram_srcnid, conn, 1);
1792 case RANAL_MSG_PUT_NAK:
1793 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1794 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1795 msg->ram_u.completion.racm_cookie);
1799 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1800 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1801 kranal_tx_done(tx, -ENOENT); /* no match */
1804 case RANAL_MSG_PUT_ACK:
1805 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1806 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1807 msg->ram_u.putack.rapam_src_cookie);
1811 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1812 &msg->ram_u.putack.rapam_desc,
1813 msg->ram_u.putack.rapam_desc.rard_nob,
1814 msg->ram_u.putack.rapam_dst_cookie);
1817 case RANAL_MSG_PUT_DONE:
1818 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1819 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1820 msg->ram_u.completion.racm_cookie);
1824 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1825 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1826 kranal_tx_done(tx, 0);
1829 case RANAL_MSG_GET_REQ:
1830 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1831 rc = lnet_parse(kranal_data.kra_ni, &msg->ram_u.get.ragm_hdr,
1832 msg->ram_srcnid, conn, 1);
1836 case RANAL_MSG_GET_NAK:
1837 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1838 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1839 msg->ram_u.completion.racm_cookie);
1843 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1844 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1845 kranal_tx_done(tx, -ENOENT); /* no match */
1848 case RANAL_MSG_GET_DONE:
1849 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1850 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1851 msg->ram_u.completion.racm_cookie);
1855 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1856 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1858 /* completion message should send rdma length if we ever allow
1860 lnet_set_reply_msg_len(kranal_data.kra_ni, tx->tx_lntmsg[1], ???);
1862 kranal_tx_done(tx, 0);
1867 if (rc < 0) /* protocol/comms error */
1868 kranal_close_conn (conn, rc);
1870 if (repost && conn->rac_rxmsg != NULL)
1871 kranal_consume_rxmsg(conn, NULL, 0);
1873 /* check again later */
1874 kranal_schedule_conn(conn);
1878 kranal_complete_closed_conn (kra_conn_t *conn)
1884 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1885 LASSERT (cfs_list_empty(&conn->rac_list));
1886 LASSERT (cfs_list_empty(&conn->rac_hashlist));
1888 for (nfma = 0; !cfs_list_empty(&conn->rac_fmaq); nfma++) {
1889 tx = cfs_list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1891 cfs_list_del(&tx->tx_list);
1892 kranal_tx_done(tx, -ECONNABORTED);
1895 LASSERT (cfs_list_empty(&conn->rac_rdmaq));
1897 for (nreplies = 0; !cfs_list_empty(&conn->rac_replyq); nreplies++) {
1898 tx = cfs_list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1900 cfs_list_del(&tx->tx_list);
1901 kranal_tx_done(tx, -ECONNABORTED);
1904 CWARN("Closed conn %p -> %s: nmsg %d nreplies %d\n",
1905 conn, libcfs_nid2str(conn->rac_peer->rap_nid), nfma, nreplies);
1909 kranal_process_new_conn (kra_conn_t *conn)
1913 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1914 if (rrc == RAP_SUCCESS)
1917 LASSERT (rrc == RAP_NOT_DONE);
1918 if (!cfs_time_aftereq(jiffies, conn->rac_last_tx +
1919 conn->rac_timeout * CFS_HZ))
1923 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1924 LASSERT (rrc == RAP_SUCCESS);
1929 kranal_scheduler (void *arg)
1931 kra_device_t *dev = (kra_device_t *)arg;
1932 cfs_waitlink_t wait;
1935 unsigned long flags;
1936 unsigned long deadline;
1937 unsigned long soonest;
1946 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1947 cfs_daemonize(name);
1948 cfs_block_allsigs();
1950 dev->rad_scheduler = current;
1951 cfs_waitlink_init(&wait);
1953 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1955 while (!kranal_data.kra_shutdown) {
1956 /* Safe: kra_shutdown only set when quiescent */
1958 if (busy_loops++ >= RANAL_RESCHED) {
1959 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1964 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1969 if (dev->rad_ready) {
1970 /* Device callback fired since I last checked it */
1972 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1975 kranal_check_rdma_cq(dev);
1976 kranal_check_fma_cq(dev);
1978 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
1981 cfs_list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1982 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
1984 cfs_list_del_init(&conn->rac_schedlist);
1985 LASSERT (conn->rac_scheduled);
1986 conn->rac_scheduled = 0;
1987 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
1990 kranal_check_fma_rx(conn);
1991 kranal_process_fmaq(conn);
1993 if (conn->rac_state == RANAL_CONN_CLOSED)
1994 kranal_complete_closed_conn(conn);
1996 kranal_conn_decref(conn);
1997 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2003 cfs_list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2004 conn = cfs_list_entry(tmp, kra_conn_t, rac_schedlist);
2006 deadline = conn->rac_last_tx + conn->rac_keepalive;
2007 if (cfs_time_aftereq(jiffies, deadline)) {
2008 /* Time to process this new conn */
2009 cfs_spin_unlock_irqrestore(&dev->rad_lock,
2013 rc = kranal_process_new_conn(conn);
2014 if (rc != -EAGAIN) {
2015 /* All done with this conn */
2016 cfs_spin_lock_irqsave(&dev->rad_lock,
2018 cfs_list_del_init(&conn->rac_schedlist);
2019 cfs_spin_unlock_irqrestore(&dev-> \
2023 kranal_conn_decref(conn);
2024 cfs_spin_lock_irqsave(&dev->rad_lock,
2029 /* retry with exponential backoff until HZ */
2030 if (conn->rac_keepalive == 0)
2031 conn->rac_keepalive = 1;
2032 else if (conn->rac_keepalive <= CFS_HZ)
2033 conn->rac_keepalive *= 2;
2035 conn->rac_keepalive += CFS_HZ;
2037 deadline = conn->rac_last_tx + conn->rac_keepalive;
2038 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2041 /* Does this conn need attention soonest? */
2042 if (nsoonest++ == 0 ||
2043 !cfs_time_aftereq(deadline, soonest))
2047 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2050 cfs_set_current_state(CFS_TASK_INTERRUPTIBLE);
2051 cfs_waitq_add_exclusive(&dev->rad_waitq, &wait);
2052 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
2054 if (nsoonest == 0) {
2056 cfs_waitq_wait(&wait, CFS_TASK_INTERRUPTIBLE);
2058 timeout = (long)(soonest - jiffies);
2061 cfs_waitq_timedwait(&wait,
2062 CFS_TASK_INTERRUPTIBLE,
2067 cfs_waitq_del(&dev->rad_waitq, &wait);
2068 cfs_set_current_state(CFS_TASK_RUNNING);
2069 cfs_spin_lock_irqsave(&dev->rad_lock, flags);
2072 cfs_spin_unlock_irqrestore(&dev->rad_lock, flags);
2074 dev->rad_scheduler = NULL;
2075 kranal_thread_fini();