1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 kranal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
29 /* I would guess that if kranal_get_peer (nid) == NULL,
30 and we're not routing, then 'nid' is very distant :) */
31 if ( nal->libnal_ni.ni_pid.nid == nid ) {
41 kranal_device_callback(RAP_INT32 devid, RAP_PVOID arg)
47 CDEBUG(D_NET, "callback for device %d\n", devid);
49 for (i = 0; i < kranal_data.kra_ndevs; i++) {
51 dev = &kranal_data.kra_devices[i];
52 if (dev->rad_id != devid)
55 spin_lock_irqsave(&dev->rad_lock, flags);
57 if (!dev->rad_ready) {
59 wake_up(&dev->rad_waitq);
62 spin_unlock_irqrestore(&dev->rad_lock, flags);
66 CWARN("callback for unknown device %d\n", devid);
70 kranal_schedule_conn(kra_conn_t *conn)
72 kra_device_t *dev = conn->rac_device;
75 spin_lock_irqsave(&dev->rad_lock, flags);
77 if (!conn->rac_scheduled) {
78 kranal_conn_addref(conn); /* +1 ref for scheduler */
79 conn->rac_scheduled = 1;
80 list_add_tail(&conn->rac_schedlist, &dev->rad_ready_conns);
81 wake_up(&dev->rad_waitq);
84 spin_unlock_irqrestore(&dev->rad_lock, flags);
88 kranal_get_idle_tx (int may_block)
94 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
96 /* "normal" descriptor is free */
97 if (!list_empty(&kranal_data.kra_idle_txs)) {
98 tx = list_entry(kranal_data.kra_idle_txs.next,
104 /* may dip into reserve pool */
105 if (list_empty(&kranal_data.kra_idle_nblk_txs)) {
106 CERROR("reserved tx desc pool exhausted\n");
110 tx = list_entry(kranal_data.kra_idle_nblk_txs.next,
115 /* block for idle tx */
116 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
118 wait_event(kranal_data.kra_idle_tx_waitq,
119 !list_empty(&kranal_data.kra_idle_txs));
123 list_del(&tx->tx_list);
125 /* Allocate a new completion cookie. It might not be
126 * needed, but we've got a lock right now... */
127 tx->tx_cookie = kranal_data.kra_next_tx_cookie++;
129 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
130 LASSERT (tx->tx_msg.ram_type == RANAL_MSG_NONE);
131 LASSERT (tx->tx_conn == NULL);
132 LASSERT (tx->tx_libmsg[0] == NULL);
133 LASSERT (tx->tx_libmsg[1] == NULL);
136 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
142 kranal_init_msg(kra_msg_t *msg, int type)
144 msg->ram_magic = RANAL_MSG_MAGIC;
145 msg->ram_version = RANAL_MSG_VERSION;
146 msg->ram_type = type;
147 msg->ram_srcnid = kranal_lib.libnal_ni.ni_pid.nid;
148 /* ram_connstamp gets set when FMA is sent */
152 kranal_new_tx_msg (int may_block, int type)
154 kra_tx_t *tx = kranal_get_idle_tx(may_block);
159 kranal_init_msg(&tx->tx_msg, type);
164 kranal_setup_immediate_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
168 /* For now this is almost identical to kranal_setup_virt_buffer, but we
169 * could "flatten" the payload into a single contiguous buffer ready
170 * for sending direct over an FMA if we ever needed to. */
172 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
176 tx->tx_buffer = NULL;
180 while (offset >= iov->iov_len) {
181 offset -= iov->iov_len;
187 if (nob > iov->iov_len - offset) {
188 CERROR("Can't handle multiple vaddr fragments\n");
192 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
195 tx->tx_buftype = RANAL_BUF_IMMEDIATE;
201 kranal_setup_virt_buffer (kra_tx_t *tx, int niov, struct iovec *iov,
207 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
209 while (offset >= iov->iov_len) {
210 offset -= iov->iov_len;
216 if (nob > iov->iov_len - offset) {
217 CERROR("Can't handle multiple vaddr fragments\n");
221 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
223 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
228 kranal_setup_phys_buffer (kra_tx_t *tx, int nkiov, ptl_kiov_t *kiov,
231 RAP_PHYS_REGION *phys = tx->tx_phys;
234 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
238 LASSERT (tx->tx_buftype == RANAL_BUF_NONE);
240 while (offset >= kiov->kiov_len) {
241 offset -= kiov->kiov_len;
247 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
249 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
251 phys->Address = kranal_page2phys(kiov->kiov_page);
254 resid = nob - (kiov->kiov_len - offset);
260 if (kiov->kiov_offset != 0 ||
261 ((resid > PAGE_SIZE) &&
262 kiov->kiov_len < PAGE_SIZE)) {
263 /* Can't have gaps */
264 CERROR("Can't make payload contiguous in I/O VM:"
265 "page %d, offset %d, len %d \n",
266 (int)(phys - tx->tx_phys),
267 kiov->kiov_offset, kiov->kiov_len);
271 if ((phys - tx->tx_phys) == PTL_MD_MAX_IOV) {
272 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
276 phys->Address = kranal_page2phys(kiov->kiov_page);
282 tx->tx_phys_npages = phys - tx->tx_phys;
287 kranal_setup_rdma_buffer (kra_tx_t *tx, int niov,
288 struct iovec *iov, ptl_kiov_t *kiov,
291 LASSERT ((iov == NULL) != (kiov == NULL));
294 return kranal_setup_phys_buffer(tx, niov, kiov, offset, nob);
296 return kranal_setup_virt_buffer(tx, niov, iov, offset, nob);
300 kranal_map_buffer (kra_tx_t *tx)
302 kra_conn_t *conn = tx->tx_conn;
303 kra_device_t *dev = conn->rac_device;
306 LASSERT (current == dev->rad_scheduler);
308 switch (tx->tx_buftype) {
313 case RANAL_BUF_IMMEDIATE:
314 case RANAL_BUF_PHYS_MAPPED:
315 case RANAL_BUF_VIRT_MAPPED:
318 case RANAL_BUF_PHYS_UNMAPPED:
319 rrc = RapkRegisterPhys(dev->rad_handle,
320 tx->tx_phys, tx->tx_phys_npages,
322 if (rrc != RAP_SUCCESS) {
323 CERROR ("Can't map %d pages: dev %d "
324 "phys %u pp %u, virt %u nob %lu\n",
325 tx->tx_phys_npages, dev->rad_id,
326 dev->rad_nphysmap, dev->rad_nppphysmap,
327 dev->rad_nvirtmap, dev->rad_nobvirtmap);
328 return -ENOMEM; /* assume insufficient resources */
332 dev->rad_nppphysmap += tx->tx_phys_npages;
334 tx->tx_buftype = RANAL_BUF_PHYS_MAPPED;
337 case RANAL_BUF_VIRT_UNMAPPED:
338 rrc = RapkRegisterMemory(dev->rad_handle,
339 tx->tx_buffer, tx->tx_nob,
341 if (rrc != RAP_SUCCESS) {
342 CERROR ("Can't map %d bytes: dev %d "
343 "phys %u pp %u, virt %u nob %lu\n",
344 tx->tx_nob, dev->rad_id,
345 dev->rad_nphysmap, dev->rad_nppphysmap,
346 dev->rad_nvirtmap, dev->rad_nobvirtmap);
347 return -ENOMEM; /* assume insufficient resources */
351 dev->rad_nobvirtmap += tx->tx_nob;
353 tx->tx_buftype = RANAL_BUF_VIRT_MAPPED;
359 kranal_unmap_buffer (kra_tx_t *tx)
364 switch (tx->tx_buftype) {
369 case RANAL_BUF_IMMEDIATE:
370 case RANAL_BUF_PHYS_UNMAPPED:
371 case RANAL_BUF_VIRT_UNMAPPED:
374 case RANAL_BUF_PHYS_MAPPED:
375 LASSERT (tx->tx_conn != NULL);
376 dev = tx->tx_conn->rac_device;
377 LASSERT (current == dev->rad_scheduler);
378 rrc = RapkDeregisterMemory(dev->rad_handle, NULL,
380 LASSERT (rrc == RAP_SUCCESS);
383 dev->rad_nppphysmap -= tx->tx_phys_npages;
385 tx->tx_buftype = RANAL_BUF_PHYS_UNMAPPED;
388 case RANAL_BUF_VIRT_MAPPED:
389 LASSERT (tx->tx_conn != NULL);
390 dev = tx->tx_conn->rac_device;
391 LASSERT (current == dev->rad_scheduler);
392 rrc = RapkDeregisterMemory(dev->rad_handle, tx->tx_buffer,
394 LASSERT (rrc == RAP_SUCCESS);
397 dev->rad_nobvirtmap -= tx->tx_nob;
399 tx->tx_buftype = RANAL_BUF_VIRT_UNMAPPED;
405 kranal_tx_done (kra_tx_t *tx, int completion)
407 ptl_err_t ptlrc = (completion == 0) ? PTL_OK : PTL_FAIL;
411 LASSERT (!in_interrupt());
413 kranal_unmap_buffer(tx);
415 for (i = 0; i < 2; i++) {
416 /* tx may have up to 2 libmsgs to finalise */
417 if (tx->tx_libmsg[i] == NULL)
420 lib_finalize(&kranal_lib, NULL, tx->tx_libmsg[i], ptlrc);
421 tx->tx_libmsg[i] = NULL;
424 tx->tx_buftype = RANAL_BUF_NONE;
425 tx->tx_msg.ram_type = RANAL_MSG_NONE;
428 spin_lock_irqsave(&kranal_data.kra_tx_lock, flags);
431 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_nblk_txs);
433 list_add_tail(&tx->tx_list, &kranal_data.kra_idle_txs);
434 wake_up(&kranal_data.kra_idle_tx_waitq);
437 spin_unlock_irqrestore(&kranal_data.kra_tx_lock, flags);
441 kranal_find_conn_locked (kra_peer_t *peer)
443 struct list_head *tmp;
445 /* just return the first connection */
446 list_for_each (tmp, &peer->rap_conns) {
447 return list_entry(tmp, kra_conn_t, rac_list);
454 kranal_post_fma (kra_conn_t *conn, kra_tx_t *tx)
460 spin_lock_irqsave(&conn->rac_lock, flags);
461 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
462 tx->tx_qtime = jiffies;
463 spin_unlock_irqrestore(&conn->rac_lock, flags);
465 kranal_schedule_conn(conn);
469 kranal_launch_tx (kra_tx_t *tx, ptl_nid_t nid)
475 rwlock_t *g_lock = &kranal_data.kra_global_lock;
477 /* If I get here, I've committed to send, so I complete the tx with
478 * failure on any problems */
480 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
484 peer = kranal_find_peer_locked(nid);
487 kranal_tx_done(tx, -EHOSTUNREACH);
491 conn = kranal_find_conn_locked(peer);
493 kranal_post_fma(conn, tx);
498 /* Making one or more connections; I'll need a write lock... */
500 write_lock_irqsave(g_lock, flags);
502 peer = kranal_find_peer_locked(nid);
504 write_unlock_irqrestore(g_lock, flags);
505 kranal_tx_done(tx, -EHOSTUNREACH);
509 conn = kranal_find_conn_locked(peer);
511 /* Connection exists; queue message on it */
512 kranal_post_fma(conn, tx);
513 write_unlock_irqrestore(g_lock, flags);
517 LASSERT (peer->rap_persistence > 0);
519 if (!peer->rap_connecting) {
520 LASSERT (list_empty(&peer->rap_tx_queue));
522 now = CURRENT_SECONDS;
523 if (now < peer->rap_reconnect_time) {
524 write_unlock_irqrestore(g_lock, flags);
525 kranal_tx_done(tx, -EHOSTUNREACH);
529 peer->rap_connecting = 1;
530 kranal_peer_addref(peer); /* extra ref for connd */
532 spin_lock(&kranal_data.kra_connd_lock);
534 list_add_tail(&peer->rap_connd_list,
535 &kranal_data.kra_connd_peers);
536 wake_up(&kranal_data.kra_connd_waitq);
538 spin_unlock(&kranal_data.kra_connd_lock);
541 /* A connection is being established; queue the message... */
542 list_add_tail(&tx->tx_list, &peer->rap_tx_queue);
544 write_unlock_irqrestore(g_lock, flags);
548 kranal_rdma(kra_tx_t *tx, int type,
549 kra_rdma_desc_t *sink, int nob, __u64 cookie)
551 kra_conn_t *conn = tx->tx_conn;
555 LASSERT (kranal_tx_mapped(tx));
556 LASSERT (nob <= sink->rard_nob);
557 LASSERT (nob <= tx->tx_nob);
559 /* No actual race with scheduler sending CLOSE (I'm she!) */
560 LASSERT (current == conn->rac_device->rad_scheduler);
562 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
563 tx->tx_rdma_desc.SrcPtr.AddressBits = (__u64)((unsigned long)tx->tx_buffer);
564 tx->tx_rdma_desc.SrcKey = tx->tx_map_key;
565 tx->tx_rdma_desc.DstPtr = sink->rard_addr;
566 tx->tx_rdma_desc.DstKey = sink->rard_key;
567 tx->tx_rdma_desc.Length = nob;
568 tx->tx_rdma_desc.AppPtr = tx;
570 /* prep final completion message */
571 kranal_init_msg(&tx->tx_msg, type);
572 tx->tx_msg.ram_u.completion.racm_cookie = cookie;
574 if (nob == 0) { /* Immediate completion */
575 kranal_post_fma(conn, tx);
579 LASSERT (!conn->rac_close_sent); /* Don't lie (CLOSE == RDMA idle) */
581 rrc = RapkPostRdma(conn->rac_rihandle, &tx->tx_rdma_desc);
582 LASSERT (rrc == RAP_SUCCESS);
584 spin_lock_irqsave(&conn->rac_lock, flags);
585 list_add_tail(&tx->tx_list, &conn->rac_rdmaq);
586 tx->tx_qtime = jiffies;
587 spin_unlock_irqrestore(&conn->rac_lock, flags);
591 kranal_consume_rxmsg (kra_conn_t *conn, void *buffer, int nob)
593 __u32 nob_received = nob;
596 LASSERT (conn->rac_rxmsg != NULL);
597 CDEBUG(D_NET, "Consuming %p\n", conn);
599 rrc = RapkFmaCopyOut(conn->rac_rihandle, buffer,
600 &nob_received, sizeof(kra_msg_t));
601 LASSERT (rrc == RAP_SUCCESS);
603 conn->rac_rxmsg = NULL;
605 if (nob_received < nob) {
606 CWARN("Incomplete immediate msg from "LPX64
607 ": expected %d, got %d\n",
608 conn->rac_peer->rap_nid, nob, nob_received);
616 kranal_do_send (lib_nal_t *nal,
633 /* NB 'private' is different depending on what we're sending.... */
635 CDEBUG(D_NET, "sending %d bytes in %d frags to nid:"LPX64" pid %d\n",
636 nob, niov, nid, pid);
638 LASSERT (nob == 0 || niov > 0);
639 LASSERT (niov <= PTL_MD_MAX_IOV);
641 LASSERT (!in_interrupt());
642 /* payload is either all vaddrs or all pages */
643 LASSERT (!(kiov != NULL && iov != NULL));
649 case PTL_MSG_REPLY: {
650 /* reply's 'private' is the conn that received the GET_REQ */
652 LASSERT (conn->rac_rxmsg != NULL);
654 if (conn->rac_rxmsg->ram_type == RANAL_MSG_IMMEDIATE) {
655 if (nob > RANAL_FMA_MAX_DATA) {
656 CERROR("Can't REPLY IMMEDIATE %d to "LPX64"\n",
660 break; /* RDMA not expected */
663 /* Incoming message consistent with RDMA? */
664 if (conn->rac_rxmsg->ram_type != RANAL_MSG_GET_REQ) {
665 CERROR("REPLY to "LPX64" bad msg type %x!!!\n",
666 nid, conn->rac_rxmsg->ram_type);
670 tx = kranal_get_idle_tx(0);
674 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
676 kranal_tx_done(tx, rc);
681 tx->tx_libmsg[0] = libmsg;
683 rc = kranal_map_buffer(tx);
685 kranal_tx_done(tx, rc);
689 kranal_rdma(tx, RANAL_MSG_GET_DONE,
690 &conn->rac_rxmsg->ram_u.get.ragm_desc, nob,
691 conn->rac_rxmsg->ram_u.get.ragm_cookie);
693 /* flag matched by consuming rx message */
694 kranal_consume_rxmsg(conn, NULL, 0);
701 /* We have to consider the eventual sink buffer rather than any
702 * payload passed here (there isn't any, and strictly, looking
703 * inside libmsg is a layering violation). We send a simple
704 * IMMEDIATE GET if the sink buffer is mapped already and small
707 if ((libmsg->md->options & PTL_MD_KIOV) == 0 &&
708 libmsg->md->length <= RANAL_FMA_MAX_DATA &&
709 libmsg->md->length <= kranal_tunables.kra_max_immediate)
712 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_GET_REQ);
716 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
717 rc = kranal_setup_virt_buffer(tx, libmsg->md->md_niov,
718 libmsg->md->md_iov.iov,
719 0, libmsg->md->length);
721 rc = kranal_setup_phys_buffer(tx, libmsg->md->md_niov,
722 libmsg->md->md_iov.kiov,
723 0, libmsg->md->length);
725 kranal_tx_done(tx, rc);
729 tx->tx_libmsg[1] = lib_create_reply_msg(&kranal_lib, nid, libmsg);
730 if (tx->tx_libmsg[1] == NULL) {
731 CERROR("Can't create reply for GET to "LPX64"\n", nid);
732 kranal_tx_done(tx, rc);
736 tx->tx_libmsg[0] = libmsg;
737 tx->tx_msg.ram_u.get.ragm_hdr = *hdr;
738 /* rest of tx_msg is setup just before it is sent */
739 kranal_launch_tx(tx, nid);
747 if (kiov == NULL && /* not paged */
748 nob <= RANAL_FMA_MAX_DATA && /* small enough */
749 nob <= kranal_tunables.kra_max_immediate)
750 break; /* send IMMEDIATE */
752 tx = kranal_new_tx_msg(!in_interrupt(), RANAL_MSG_PUT_REQ);
756 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
758 kranal_tx_done(tx, rc);
762 tx->tx_libmsg[0] = libmsg;
763 tx->tx_msg.ram_u.putreq.raprm_hdr = *hdr;
764 /* rest of tx_msg is setup just before it is sent */
765 kranal_launch_tx(tx, nid);
769 LASSERT (kiov == NULL);
770 LASSERT (nob <= RANAL_FMA_MAX_DATA);
772 tx = kranal_new_tx_msg(!(type == PTL_MSG_ACK ||
773 type == PTL_MSG_REPLY ||
775 RANAL_MSG_IMMEDIATE);
779 rc = kranal_setup_immediate_buffer(tx, niov, iov, offset, nob);
781 kranal_tx_done(tx, rc);
785 tx->tx_msg.ram_u.immediate.raim_hdr = *hdr;
786 tx->tx_libmsg[0] = libmsg;
787 kranal_launch_tx(tx, nid);
792 kranal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
793 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
794 unsigned int niov, struct iovec *iov,
795 size_t offset, size_t len)
797 return kranal_do_send(nal, private, cookie,
804 kranal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
805 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
806 unsigned int niov, ptl_kiov_t *kiov,
807 size_t offset, size_t len)
809 return kranal_do_send(nal, private, cookie,
816 kranal_do_recv (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
817 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
818 int offset, int mlen, int rlen)
820 kra_conn_t *conn = private;
821 kra_msg_t *rxmsg = conn->rac_rxmsg;
826 LASSERT (mlen <= rlen);
827 LASSERT (!in_interrupt());
828 /* Either all pages or all vaddrs */
829 LASSERT (!(kiov != NULL && iov != NULL));
831 CDEBUG(D_NET, "conn %p, rxmsg %p, libmsg %p\n", conn, rxmsg, libmsg);
833 if (libmsg == NULL) {
834 /* GET or ACK or portals is discarding */
836 lib_finalize(nal, NULL, libmsg, PTL_OK);
840 switch(rxmsg->ram_type) {
845 case RANAL_MSG_IMMEDIATE:
848 } else if (kiov != NULL) {
849 CERROR("Can't recv immediate into paged buffer\n");
853 while (offset >= iov->iov_len) {
854 offset -= iov->iov_len;
859 if (mlen > iov->iov_len - offset) {
860 CERROR("Can't handle immediate frags\n");
863 buffer = ((char *)iov->iov_base) + offset;
865 rc = kranal_consume_rxmsg(conn, buffer, mlen);
866 lib_finalize(nal, NULL, libmsg, (rc == 0) ? PTL_OK : PTL_FAIL);
869 case RANAL_MSG_PUT_REQ:
870 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_ACK);
874 rc = kranal_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
876 kranal_tx_done(tx, rc);
881 rc = kranal_map_buffer(tx);
883 kranal_tx_done(tx, rc);
887 tx->tx_msg.ram_u.putack.rapam_src_cookie =
888 conn->rac_rxmsg->ram_u.putreq.raprm_cookie;
889 tx->tx_msg.ram_u.putack.rapam_dst_cookie = tx->tx_cookie;
890 tx->tx_msg.ram_u.putack.rapam_desc.rard_key = tx->tx_map_key;
891 tx->tx_msg.ram_u.putack.rapam_desc.rard_addr.AddressBits =
892 (__u64)((unsigned long)tx->tx_buffer);
893 tx->tx_msg.ram_u.putack.rapam_desc.rard_nob = mlen;
895 tx->tx_libmsg[0] = libmsg; /* finalize this on RDMA_DONE */
897 kranal_post_fma(conn, tx);
899 /* flag matched by consuming rx message */
900 kranal_consume_rxmsg(conn, NULL, 0);
906 kranal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
907 unsigned int niov, struct iovec *iov,
908 size_t offset, size_t mlen, size_t rlen)
910 return kranal_do_recv(nal, private, msg, niov, iov, NULL,
915 kranal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
916 unsigned int niov, ptl_kiov_t *kiov,
917 size_t offset, size_t mlen, size_t rlen)
919 return kranal_do_recv(nal, private, msg, niov, NULL, kiov,
924 kranal_thread_start (int(*fn)(void *arg), void *arg)
926 long pid = kernel_thread(fn, arg, 0);
931 atomic_inc(&kranal_data.kra_nthreads);
936 kranal_thread_fini (void)
938 atomic_dec(&kranal_data.kra_nthreads);
942 kranal_check_conn_timeouts (kra_conn_t *conn)
945 struct list_head *ttmp;
948 unsigned long now = jiffies;
950 LASSERT (conn->rac_state == RANAL_CONN_ESTABLISHED ||
951 conn->rac_state == RANAL_CONN_CLOSING);
953 if (!conn->rac_close_sent &&
954 time_after_eq(now, conn->rac_last_tx + conn->rac_keepalive * HZ)) {
955 /* not sent in a while; schedule conn so scheduler sends a keepalive */
956 CDEBUG(D_NET, "Scheduling keepalive %p->"LPX64"\n",
957 conn, conn->rac_peer->rap_nid);
958 kranal_schedule_conn(conn);
961 timeout = conn->rac_timeout * HZ;
963 if (!conn->rac_close_recvd &&
964 time_after_eq(now, conn->rac_last_rx + timeout)) {
965 CERROR("%s received from "LPX64" within %lu seconds\n",
966 (conn->rac_state == RANAL_CONN_ESTABLISHED) ?
967 "Nothing" : "CLOSE not",
968 conn->rac_peer->rap_nid, (now - conn->rac_last_rx)/HZ);
972 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
975 /* Check the conn's queues are moving. These are "belt+braces" checks,
976 * in case of hardware/software errors that make this conn seem
977 * responsive even though it isn't progressing its message queues. */
979 spin_lock_irqsave(&conn->rac_lock, flags);
981 list_for_each (ttmp, &conn->rac_fmaq) {
982 tx = list_entry(ttmp, kra_tx_t, tx_list);
984 if (time_after_eq(now, tx->tx_qtime + timeout)) {
985 spin_unlock_irqrestore(&conn->rac_lock, flags);
986 CERROR("tx on fmaq for "LPX64" blocked %lu seconds\n",
987 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
992 list_for_each (ttmp, &conn->rac_rdmaq) {
993 tx = list_entry(ttmp, kra_tx_t, tx_list);
995 if (time_after_eq(now, tx->tx_qtime + timeout)) {
996 spin_unlock_irqrestore(&conn->rac_lock, flags);
997 CERROR("tx on rdmaq for "LPX64" blocked %lu seconds\n",
998 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
1003 list_for_each (ttmp, &conn->rac_replyq) {
1004 tx = list_entry(ttmp, kra_tx_t, tx_list);
1006 if (time_after_eq(now, tx->tx_qtime + timeout)) {
1007 spin_unlock_irqrestore(&conn->rac_lock, flags);
1008 CERROR("tx on replyq for "LPX64" blocked %lu seconds\n",
1009 conn->rac_peer->rap_nid, (now - tx->tx_qtime)/HZ);
1014 spin_unlock_irqrestore(&conn->rac_lock, flags);
1019 kranal_reaper_check (int idx, unsigned long *min_timeoutp)
1021 struct list_head *conns = &kranal_data.kra_conns[idx];
1022 struct list_head *ctmp;
1024 unsigned long flags;
1028 /* NB. We expect to check all the conns and not find any problems, so
1029 * we just use a shared lock while we take a look... */
1030 read_lock(&kranal_data.kra_global_lock);
1032 list_for_each (ctmp, conns) {
1033 conn = list_entry(ctmp, kra_conn_t, rac_hashlist);
1035 if (conn->rac_timeout < *min_timeoutp )
1036 *min_timeoutp = conn->rac_timeout;
1037 if (conn->rac_keepalive < *min_timeoutp )
1038 *min_timeoutp = conn->rac_keepalive;
1040 rc = kranal_check_conn_timeouts(conn);
1044 kranal_conn_addref(conn);
1045 read_unlock(&kranal_data.kra_global_lock);
1047 CERROR("Conn to "LPX64", cqid %d timed out\n",
1048 conn->rac_peer->rap_nid, conn->rac_cqid);
1050 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1052 switch (conn->rac_state) {
1056 case RANAL_CONN_ESTABLISHED:
1057 kranal_close_conn_locked(conn, -ETIMEDOUT);
1060 case RANAL_CONN_CLOSING:
1061 kranal_terminate_conn_locked(conn);
1065 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1067 kranal_conn_decref(conn);
1069 /* start again now I've dropped the lock */
1073 read_unlock(&kranal_data.kra_global_lock);
1077 kranal_connd (void *arg)
1079 long id = (long)arg;
1082 unsigned long flags;
1084 kra_acceptsock_t *ras;
1087 snprintf(name, sizeof(name), "kranal_connd_%02ld", id);
1088 kportal_daemonize(name);
1089 kportal_blockallsigs();
1091 init_waitqueue_entry(&wait, current);
1093 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1095 while (!kranal_data.kra_shutdown) {
1098 if (!list_empty(&kranal_data.kra_connd_acceptq)) {
1099 ras = list_entry(kranal_data.kra_connd_acceptq.next,
1100 kra_acceptsock_t, ras_list);
1101 list_del(&ras->ras_list);
1103 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1105 CDEBUG(D_NET,"About to handshake someone\n");
1107 kranal_conn_handshake(ras->ras_sock, NULL);
1108 kranal_free_acceptsock(ras);
1110 CDEBUG(D_NET,"Finished handshaking someone\n");
1112 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1116 if (!list_empty(&kranal_data.kra_connd_peers)) {
1117 peer = list_entry(kranal_data.kra_connd_peers.next,
1118 kra_peer_t, rap_connd_list);
1120 list_del_init(&peer->rap_connd_list);
1121 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1123 kranal_connect(peer);
1124 kranal_peer_decref(peer);
1126 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1133 set_current_state(TASK_INTERRUPTIBLE);
1134 add_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1136 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1140 set_current_state(TASK_RUNNING);
1141 remove_wait_queue(&kranal_data.kra_connd_waitq, &wait);
1143 spin_lock_irqsave(&kranal_data.kra_connd_lock, flags);
1146 spin_unlock_irqrestore(&kranal_data.kra_connd_lock, flags);
1148 kranal_thread_fini();
1153 kranal_update_reaper_timeout(long timeout)
1155 unsigned long flags;
1157 LASSERT (timeout > 0);
1159 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1161 if (timeout < kranal_data.kra_new_min_timeout)
1162 kranal_data.kra_new_min_timeout = timeout;
1164 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1168 kranal_reaper (void *arg)
1171 unsigned long flags;
1174 int conn_entries = kranal_data.kra_conn_hash_size;
1176 int base_index = conn_entries - 1;
1177 unsigned long next_check_time = jiffies;
1178 long next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1179 long current_min_timeout = 1;
1181 kportal_daemonize("kranal_reaper");
1182 kportal_blockallsigs();
1184 init_waitqueue_entry(&wait, current);
1186 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1188 while (!kranal_data.kra_shutdown) {
1189 /* I wake up every 'p' seconds to check for timeouts on some
1190 * more peers. I try to check every connection 'n' times
1191 * within the global minimum of all keepalive and timeout
1192 * intervals, to ensure I attend to every connection within
1193 * (n+1)/n times its timeout intervals. */
1196 unsigned long min_timeout;
1199 /* careful with the jiffy wrap... */
1200 timeout = (long)(next_check_time - jiffies);
1202 set_current_state(TASK_INTERRUPTIBLE);
1203 add_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1205 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1207 schedule_timeout(timeout);
1209 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1211 set_current_state(TASK_RUNNING);
1212 remove_wait_queue(&kranal_data.kra_reaper_waitq, &wait);
1216 if (kranal_data.kra_new_min_timeout != MAX_SCHEDULE_TIMEOUT) {
1217 /* new min timeout set: restart min timeout scan */
1218 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1219 base_index = conn_index - 1;
1221 base_index = conn_entries - 1;
1223 if (kranal_data.kra_new_min_timeout < current_min_timeout) {
1224 current_min_timeout = kranal_data.kra_new_min_timeout;
1225 CDEBUG(D_NET, "Set new min timeout %ld\n",
1226 current_min_timeout);
1229 kranal_data.kra_new_min_timeout = MAX_SCHEDULE_TIMEOUT;
1231 min_timeout = current_min_timeout;
1233 spin_unlock_irqrestore(&kranal_data.kra_reaper_lock, flags);
1235 LASSERT (min_timeout > 0);
1237 /* Compute how many table entries to check now so I get round
1238 * the whole table fast enough given that I do this at fixed
1239 * intervals of 'p' seconds) */
1240 chunk = conn_entries;
1241 if (min_timeout > n * p)
1242 chunk = (chunk * n * p) / min_timeout;
1246 for (i = 0; i < chunk; i++) {
1247 kranal_reaper_check(conn_index,
1249 conn_index = (conn_index + 1) % conn_entries;
1252 next_check_time += p * HZ;
1254 spin_lock_irqsave(&kranal_data.kra_reaper_lock, flags);
1256 if (((conn_index - chunk <= base_index &&
1257 base_index < conn_index) ||
1258 (conn_index - conn_entries - chunk <= base_index &&
1259 base_index < conn_index - conn_entries))) {
1261 /* Scanned all conns: set current_min_timeout... */
1262 if (current_min_timeout != next_min_timeout) {
1263 current_min_timeout = next_min_timeout;
1264 CDEBUG(D_NET, "Set new min timeout %ld\n",
1265 current_min_timeout);
1268 /* ...and restart min timeout scan */
1269 next_min_timeout = MAX_SCHEDULE_TIMEOUT;
1270 base_index = conn_index - 1;
1272 base_index = conn_entries - 1;
1276 kranal_thread_fini();
1281 kranal_check_rdma_cq (kra_device_t *dev)
1286 unsigned long flags;
1287 RAP_RDMA_DESCRIPTOR *desc;
1292 rrc = RapkCQDone(dev->rad_rdma_cqh, &cqid, &event_type);
1293 if (rrc == RAP_NOT_DONE) {
1294 CDEBUG(D_NET, "RDMA CQ %d empty\n", dev->rad_id);
1298 LASSERT (rrc == RAP_SUCCESS);
1299 LASSERT ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0);
1301 read_lock(&kranal_data.kra_global_lock);
1303 conn = kranal_cqid2conn_locked(cqid);
1305 /* Conn was destroyed? */
1306 CDEBUG(D_NET, "RDMA CQID lookup %d failed\n", cqid);
1307 read_unlock(&kranal_data.kra_global_lock);
1311 rrc = RapkRdmaDone(conn->rac_rihandle, &desc);
1312 LASSERT (rrc == RAP_SUCCESS);
1314 CDEBUG(D_NET, "Completed %p\n",
1315 list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list));
1317 spin_lock_irqsave(&conn->rac_lock, flags);
1319 LASSERT (!list_empty(&conn->rac_rdmaq));
1320 tx = list_entry(conn->rac_rdmaq.next, kra_tx_t, tx_list);
1321 list_del(&tx->tx_list);
1323 LASSERT(desc->AppPtr == (void *)tx);
1324 LASSERT(tx->tx_msg.ram_type == RANAL_MSG_PUT_DONE ||
1325 tx->tx_msg.ram_type == RANAL_MSG_GET_DONE);
1327 list_add_tail(&tx->tx_list, &conn->rac_fmaq);
1328 tx->tx_qtime = jiffies;
1330 spin_unlock_irqrestore(&conn->rac_lock, flags);
1332 /* Get conn's fmaq processed, now I've just put something
1334 kranal_schedule_conn(conn);
1336 read_unlock(&kranal_data.kra_global_lock);
1341 kranal_check_fma_cq (kra_device_t *dev)
1347 struct list_head *conns;
1348 struct list_head *tmp;
1352 rrc = RapkCQDone(dev->rad_fma_cqh, &cqid, &event_type);
1353 if (rrc == RAP_NOT_DONE) {
1354 CDEBUG(D_NET, "FMA CQ %d empty\n", dev->rad_id);
1358 LASSERT (rrc == RAP_SUCCESS);
1360 if ((event_type & RAPK_CQ_EVENT_OVERRUN) == 0) {
1362 read_lock(&kranal_data.kra_global_lock);
1364 conn = kranal_cqid2conn_locked(cqid);
1366 CDEBUG(D_NET, "FMA CQID lookup %d failed\n",
1369 CDEBUG(D_NET, "FMA completed: %p CQID %d\n",
1371 kranal_schedule_conn(conn);
1374 read_unlock(&kranal_data.kra_global_lock);
1378 /* FMA CQ has overflowed: check ALL conns */
1379 CWARN("Scheduling ALL conns on device %d\n", dev->rad_id);
1381 for (i = 0; i < kranal_data.kra_conn_hash_size; i++) {
1383 read_lock(&kranal_data.kra_global_lock);
1385 conns = &kranal_data.kra_conns[i];
1387 list_for_each (tmp, conns) {
1388 conn = list_entry(tmp, kra_conn_t,
1391 if (conn->rac_device == dev)
1392 kranal_schedule_conn(conn);
1395 /* don't block write lockers for too long... */
1396 read_unlock(&kranal_data.kra_global_lock);
1402 kranal_sendmsg(kra_conn_t *conn, kra_msg_t *msg,
1403 void *immediate, int immediatenob)
1405 int sync = (msg->ram_type & RANAL_MSG_FENCE) != 0;
1408 CDEBUG(D_NET,"%p sending msg %p %02x%s [%p for %d]\n",
1409 conn, msg, msg->ram_type, sync ? "(sync)" : "",
1410 immediate, immediatenob);
1412 LASSERT (sizeof(*msg) <= RANAL_FMA_MAX_PREFIX);
1413 LASSERT ((msg->ram_type == RANAL_MSG_IMMEDIATE) ?
1414 immediatenob <= RANAL_FMA_MAX_DATA :
1417 msg->ram_connstamp = conn->rac_my_connstamp;
1418 msg->ram_seq = conn->rac_tx_seq;
1421 rrc = RapkFmaSyncSend(conn->rac_rihandle,
1422 immediate, immediatenob,
1425 rrc = RapkFmaSend(conn->rac_rihandle,
1426 immediate, immediatenob,
1434 conn->rac_last_tx = jiffies;
1439 if (time_after_eq(jiffies,
1440 conn->rac_last_tx + conn->rac_keepalive*HZ))
1441 CWARN("EAGAIN sending %02x (idle %lu secs)\n",
1442 msg->ram_type, (jiffies - conn->rac_last_tx)/HZ);
1448 kranal_process_fmaq (kra_conn_t *conn)
1450 unsigned long flags;
1456 /* NB 1. kranal_sendmsg() may fail if I'm out of credits right now.
1457 * However I will be rescheduled by an FMA completion event
1458 * when I eventually get some.
1459 * NB 2. Sampling rac_state here races with setting it elsewhere.
1460 * But it doesn't matter if I try to send a "real" message just
1461 * as I start closing because I'll get scheduled to send the
1464 /* Not racing with incoming message processing! */
1465 LASSERT (current == conn->rac_device->rad_scheduler);
1467 if (conn->rac_state != RANAL_CONN_ESTABLISHED) {
1468 if (!list_empty(&conn->rac_rdmaq)) {
1469 /* RDMAs in progress */
1470 LASSERT (!conn->rac_close_sent);
1472 if (time_after_eq(jiffies,
1474 conn->rac_keepalive * HZ)) {
1475 CDEBUG(D_NET, "sending NOOP (rdma in progress)\n");
1476 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1477 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1482 if (conn->rac_close_sent)
1485 CWARN("sending CLOSE to "LPX64"\n", conn->rac_peer->rap_nid);
1486 kranal_init_msg(&conn->rac_msg, RANAL_MSG_CLOSE);
1487 rc = kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1491 conn->rac_close_sent = 1;
1492 if (!conn->rac_close_recvd)
1495 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1497 if (conn->rac_state == RANAL_CONN_CLOSING)
1498 kranal_terminate_conn_locked(conn);
1500 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1504 spin_lock_irqsave(&conn->rac_lock, flags);
1506 if (list_empty(&conn->rac_fmaq)) {
1508 spin_unlock_irqrestore(&conn->rac_lock, flags);
1510 if (time_after_eq(jiffies,
1511 conn->rac_last_tx + conn->rac_keepalive * HZ)) {
1512 CDEBUG(D_NET, "sending NOOP -> "LPX64" (%p idle %lu(%ld))\n",
1513 conn->rac_peer->rap_nid, conn,
1514 (jiffies - conn->rac_last_tx)/HZ, conn->rac_keepalive);
1515 kranal_init_msg(&conn->rac_msg, RANAL_MSG_NOOP);
1516 kranal_sendmsg(conn, &conn->rac_msg, NULL, 0);
1521 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1522 list_del(&tx->tx_list);
1523 more_to_do = !list_empty(&conn->rac_fmaq);
1525 spin_unlock_irqrestore(&conn->rac_lock, flags);
1528 CDEBUG(D_NET, "sending regular msg: %p, type %02x, cookie "LPX64"\n",
1529 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1530 switch (tx->tx_msg.ram_type) {
1534 case RANAL_MSG_IMMEDIATE:
1535 rc = kranal_sendmsg(conn, &tx->tx_msg,
1536 tx->tx_buffer, tx->tx_nob);
1539 case RANAL_MSG_PUT_NAK:
1540 case RANAL_MSG_PUT_DONE:
1541 case RANAL_MSG_GET_NAK:
1542 case RANAL_MSG_GET_DONE:
1543 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1546 case RANAL_MSG_PUT_REQ:
1547 rc = kranal_map_buffer(tx);
1548 LASSERT (rc != -EAGAIN);
1552 tx->tx_msg.ram_u.putreq.raprm_cookie = tx->tx_cookie;
1553 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1557 case RANAL_MSG_PUT_ACK:
1558 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1562 case RANAL_MSG_GET_REQ:
1563 rc = kranal_map_buffer(tx);
1564 LASSERT (rc != -EAGAIN);
1568 tx->tx_msg.ram_u.get.ragm_cookie = tx->tx_cookie;
1569 tx->tx_msg.ram_u.get.ragm_desc.rard_key = tx->tx_map_key;
1570 tx->tx_msg.ram_u.get.ragm_desc.rard_addr.AddressBits =
1571 (__u64)((unsigned long)tx->tx_buffer);
1572 tx->tx_msg.ram_u.get.ragm_desc.rard_nob = tx->tx_nob;
1573 rc = kranal_sendmsg(conn, &tx->tx_msg, NULL, 0);
1578 if (rc == -EAGAIN) {
1579 /* I need credits to send this. Replace tx at the head of the
1580 * fmaq and I'll get rescheduled when credits appear */
1581 CDEBUG(D_NET, "EAGAIN on %p\n", conn);
1582 spin_lock_irqsave(&conn->rac_lock, flags);
1583 list_add(&tx->tx_list, &conn->rac_fmaq);
1584 spin_unlock_irqrestore(&conn->rac_lock, flags);
1588 if (!expect_reply || rc != 0) {
1589 kranal_tx_done(tx, rc);
1591 /* LASSERT(current) above ensures this doesn't race with reply
1593 spin_lock_irqsave(&conn->rac_lock, flags);
1594 list_add_tail(&tx->tx_list, &conn->rac_replyq);
1595 tx->tx_qtime = jiffies;
1596 spin_unlock_irqrestore(&conn->rac_lock, flags);
1600 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
1601 kranal_schedule_conn(conn);
1606 kranal_swab_rdma_desc (kra_rdma_desc_t *d)
1608 __swab64s(&d->rard_key.Key);
1609 __swab16s(&d->rard_key.Cookie);
1610 __swab16s(&d->rard_key.MdHandle);
1611 __swab32s(&d->rard_key.Flags);
1612 __swab64s(&d->rard_addr.AddressBits);
1613 __swab32s(&d->rard_nob);
1617 kranal_match_reply(kra_conn_t *conn, int type, __u64 cookie)
1619 struct list_head *ttmp;
1621 unsigned long flags;
1623 spin_lock_irqsave(&conn->rac_lock, flags);
1625 list_for_each(ttmp, &conn->rac_replyq) {
1626 tx = list_entry(ttmp, kra_tx_t, tx_list);
1628 CDEBUG(D_NET,"Checking %p %02x/"LPX64"\n",
1629 tx, tx->tx_msg.ram_type, tx->tx_cookie);
1631 if (tx->tx_cookie != cookie)
1634 if (tx->tx_msg.ram_type != type) {
1635 spin_unlock_irqrestore(&conn->rac_lock, flags);
1636 CWARN("Unexpected type %x (%x expected) "
1637 "matched reply from "LPX64"\n",
1638 tx->tx_msg.ram_type, type,
1639 conn->rac_peer->rap_nid);
1643 list_del(&tx->tx_list);
1644 spin_unlock_irqrestore(&conn->rac_lock, flags);
1648 spin_unlock_irqrestore(&conn->rac_lock, flags);
1649 CWARN("Unmatched reply %02x/"LPX64" from "LPX64"\n",
1650 type, cookie, conn->rac_peer->rap_nid);
1655 kranal_check_fma_rx (kra_conn_t *conn)
1657 unsigned long flags;
1662 RAP_RETURN rrc = RapkFmaGetPrefix(conn->rac_rihandle, &prefix);
1663 kra_peer_t *peer = conn->rac_peer;
1665 if (rrc == RAP_NOT_DONE)
1668 CDEBUG(D_NET, "RX on %p\n", conn);
1670 LASSERT (rrc == RAP_SUCCESS);
1671 conn->rac_last_rx = jiffies;
1672 seq = conn->rac_rx_seq++;
1673 msg = (kra_msg_t *)prefix;
1675 /* stash message for portals callbacks they'll NULL
1676 * rac_rxmsg if they consume it */
1677 LASSERT (conn->rac_rxmsg == NULL);
1678 conn->rac_rxmsg = msg;
1680 if (msg->ram_magic != RANAL_MSG_MAGIC) {
1681 if (__swab32(msg->ram_magic) != RANAL_MSG_MAGIC) {
1682 CERROR("Unexpected magic %08x from "LPX64"\n",
1683 msg->ram_magic, peer->rap_nid);
1687 __swab32s(&msg->ram_magic);
1688 __swab16s(&msg->ram_version);
1689 __swab16s(&msg->ram_type);
1690 __swab64s(&msg->ram_srcnid);
1691 __swab64s(&msg->ram_connstamp);
1692 __swab32s(&msg->ram_seq);
1694 /* NB message type checked below; NOT here... */
1695 switch (msg->ram_type) {
1696 case RANAL_MSG_PUT_ACK:
1697 kranal_swab_rdma_desc(&msg->ram_u.putack.rapam_desc);
1700 case RANAL_MSG_GET_REQ:
1701 kranal_swab_rdma_desc(&msg->ram_u.get.ragm_desc);
1709 if (msg->ram_version != RANAL_MSG_VERSION) {
1710 CERROR("Unexpected protocol version %d from "LPX64"\n",
1711 msg->ram_version, peer->rap_nid);
1715 if (msg->ram_srcnid != peer->rap_nid) {
1716 CERROR("Unexpected peer "LPX64" from "LPX64"\n",
1717 msg->ram_srcnid, peer->rap_nid);
1721 if (msg->ram_connstamp != conn->rac_peer_connstamp) {
1722 CERROR("Unexpected connstamp "LPX64"("LPX64
1723 " expected) from "LPX64"\n",
1724 msg->ram_connstamp, conn->rac_peer_connstamp,
1729 if (msg->ram_seq != seq) {
1730 CERROR("Unexpected sequence number %d(%d expected) from "
1731 LPX64"\n", msg->ram_seq, seq, peer->rap_nid);
1735 if ((msg->ram_type & RANAL_MSG_FENCE) != 0) {
1736 /* This message signals RDMA completion... */
1737 rrc = RapkFmaSyncWait(conn->rac_rihandle);
1738 LASSERT (rrc == RAP_SUCCESS);
1741 if (conn->rac_close_recvd) {
1742 CERROR("Unexpected message %d after CLOSE from "LPX64"\n",
1743 msg->ram_type, conn->rac_peer->rap_nid);
1747 if (msg->ram_type == RANAL_MSG_CLOSE) {
1748 CWARN("RX CLOSE from "LPX64"\n", conn->rac_peer->rap_nid);
1749 conn->rac_close_recvd = 1;
1750 write_lock_irqsave(&kranal_data.kra_global_lock, flags);
1752 if (conn->rac_state == RANAL_CONN_ESTABLISHED)
1753 kranal_close_conn_locked(conn, 0);
1754 else if (conn->rac_state == RANAL_CONN_CLOSING &&
1755 conn->rac_close_sent)
1756 kranal_terminate_conn_locked(conn);
1758 write_unlock_irqrestore(&kranal_data.kra_global_lock, flags);
1762 if (conn->rac_state != RANAL_CONN_ESTABLISHED)
1765 switch (msg->ram_type) {
1766 case RANAL_MSG_NOOP:
1767 /* Nothing to do; just a keepalive */
1768 CDEBUG(D_NET, "RX NOOP on %p\n", conn);
1771 case RANAL_MSG_IMMEDIATE:
1772 CDEBUG(D_NET, "RX IMMEDIATE on %p\n", conn);
1773 lib_parse(&kranal_lib, &msg->ram_u.immediate.raim_hdr, conn);
1776 case RANAL_MSG_PUT_REQ:
1777 CDEBUG(D_NET, "RX PUT_REQ on %p\n", conn);
1778 lib_parse(&kranal_lib, &msg->ram_u.putreq.raprm_hdr, conn);
1780 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1783 tx = kranal_new_tx_msg(0, RANAL_MSG_PUT_NAK);
1787 tx->tx_msg.ram_u.completion.racm_cookie =
1788 msg->ram_u.putreq.raprm_cookie;
1789 kranal_post_fma(conn, tx);
1792 case RANAL_MSG_PUT_NAK:
1793 CDEBUG(D_NET, "RX PUT_NAK on %p\n", conn);
1794 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1795 msg->ram_u.completion.racm_cookie);
1799 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1800 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1801 kranal_tx_done(tx, -ENOENT); /* no match */
1804 case RANAL_MSG_PUT_ACK:
1805 CDEBUG(D_NET, "RX PUT_ACK on %p\n", conn);
1806 tx = kranal_match_reply(conn, RANAL_MSG_PUT_REQ,
1807 msg->ram_u.putack.rapam_src_cookie);
1811 kranal_rdma(tx, RANAL_MSG_PUT_DONE,
1812 &msg->ram_u.putack.rapam_desc,
1813 msg->ram_u.putack.rapam_desc.rard_nob,
1814 msg->ram_u.putack.rapam_dst_cookie);
1817 case RANAL_MSG_PUT_DONE:
1818 CDEBUG(D_NET, "RX PUT_DONE on %p\n", conn);
1819 tx = kranal_match_reply(conn, RANAL_MSG_PUT_ACK,
1820 msg->ram_u.completion.racm_cookie);
1824 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1825 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1826 kranal_tx_done(tx, 0);
1829 case RANAL_MSG_GET_REQ:
1830 CDEBUG(D_NET, "RX GET_REQ on %p\n", conn);
1831 lib_parse(&kranal_lib, &msg->ram_u.get.ragm_hdr, conn);
1833 if (conn->rac_rxmsg == NULL) /* lib_parse matched something */
1836 tx = kranal_new_tx_msg(0, RANAL_MSG_GET_NAK);
1840 tx->tx_msg.ram_u.completion.racm_cookie = msg->ram_u.get.ragm_cookie;
1841 kranal_post_fma(conn, tx);
1844 case RANAL_MSG_GET_NAK:
1845 CDEBUG(D_NET, "RX GET_NAK on %p\n", conn);
1846 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1847 msg->ram_u.completion.racm_cookie);
1851 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1852 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1853 kranal_tx_done(tx, -ENOENT); /* no match */
1856 case RANAL_MSG_GET_DONE:
1857 CDEBUG(D_NET, "RX GET_DONE on %p\n", conn);
1858 tx = kranal_match_reply(conn, RANAL_MSG_GET_REQ,
1859 msg->ram_u.completion.racm_cookie);
1863 LASSERT (tx->tx_buftype == RANAL_BUF_PHYS_MAPPED ||
1864 tx->tx_buftype == RANAL_BUF_VIRT_MAPPED);
1865 kranal_tx_done(tx, 0);
1870 if (conn->rac_rxmsg != NULL)
1871 kranal_consume_rxmsg(conn, NULL, 0);
1873 /* check again later */
1874 kranal_schedule_conn(conn);
1878 kranal_complete_closed_conn (kra_conn_t *conn)
1884 LASSERT (conn->rac_state == RANAL_CONN_CLOSED);
1885 LASSERT (list_empty(&conn->rac_list));
1886 LASSERT (list_empty(&conn->rac_hashlist));
1888 for (nfma = 0; !list_empty(&conn->rac_fmaq); nfma++) {
1889 tx = list_entry(conn->rac_fmaq.next, kra_tx_t, tx_list);
1891 list_del(&tx->tx_list);
1892 kranal_tx_done(tx, -ECONNABORTED);
1895 LASSERT (list_empty(&conn->rac_rdmaq));
1897 for (nreplies = 0; !list_empty(&conn->rac_replyq); nreplies++) {
1898 tx = list_entry(conn->rac_replyq.next, kra_tx_t, tx_list);
1900 list_del(&tx->tx_list);
1901 kranal_tx_done(tx, -ECONNABORTED);
1904 CWARN("Closed conn %p -> "LPX64": nmsg %d nreplies %d\n",
1905 conn, conn->rac_peer->rap_nid, nfma, nreplies);
1909 kranal_process_new_conn (kra_conn_t *conn)
1913 rrc = RapkCompleteSync(conn->rac_rihandle, 1);
1914 if (rrc == RAP_SUCCESS)
1917 LASSERT (rrc == RAP_NOT_DONE);
1918 if (!time_after_eq(jiffies, conn->rac_last_tx +
1919 conn->rac_timeout * HZ))
1923 rrc = RapkCompleteSync(conn->rac_rihandle, 0);
1924 LASSERT (rrc == RAP_SUCCESS);
1929 kranal_scheduler (void *arg)
1931 kra_device_t *dev = (kra_device_t *)arg;
1935 unsigned long flags;
1936 unsigned long deadline;
1937 unsigned long soonest;
1940 struct list_head *tmp;
1941 struct list_head *nxt;
1946 snprintf(name, sizeof(name), "kranal_sd_%02d", dev->rad_idx);
1947 kportal_daemonize(name);
1948 kportal_blockallsigs();
1950 dev->rad_scheduler = current;
1951 init_waitqueue_entry(&wait, current);
1953 spin_lock_irqsave(&dev->rad_lock, flags);
1955 while (!kranal_data.kra_shutdown) {
1956 /* Safe: kra_shutdown only set when quiescent */
1958 if (busy_loops++ >= RANAL_RESCHED) {
1959 spin_unlock_irqrestore(&dev->rad_lock, flags);
1964 spin_lock_irqsave(&dev->rad_lock, flags);
1969 if (dev->rad_ready) {
1970 /* Device callback fired since I last checked it */
1972 spin_unlock_irqrestore(&dev->rad_lock, flags);
1975 kranal_check_rdma_cq(dev);
1976 kranal_check_fma_cq(dev);
1978 spin_lock_irqsave(&dev->rad_lock, flags);
1981 list_for_each_safe(tmp, nxt, &dev->rad_ready_conns) {
1982 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
1984 list_del_init(&conn->rac_schedlist);
1985 LASSERT (conn->rac_scheduled);
1986 conn->rac_scheduled = 0;
1987 spin_unlock_irqrestore(&dev->rad_lock, flags);
1990 kranal_check_fma_rx(conn);
1991 kranal_process_fmaq(conn);
1993 if (conn->rac_state == RANAL_CONN_CLOSED)
1994 kranal_complete_closed_conn(conn);
1996 kranal_conn_decref(conn);
1997 spin_lock_irqsave(&dev->rad_lock, flags);
2003 list_for_each_safe(tmp, nxt, &dev->rad_new_conns) {
2004 conn = list_entry(tmp, kra_conn_t, rac_schedlist);
2006 deadline = conn->rac_last_tx + conn->rac_keepalive;
2007 if (time_after_eq(jiffies, deadline)) {
2008 /* Time to process this new conn */
2009 spin_unlock_irqrestore(&dev->rad_lock, flags);
2012 rc = kranal_process_new_conn(conn);
2013 if (rc != -EAGAIN) {
2014 /* All done with this conn */
2015 spin_lock_irqsave(&dev->rad_lock, flags);
2016 list_del_init(&conn->rac_schedlist);
2017 spin_unlock_irqrestore(&dev->rad_lock, flags);
2019 kranal_conn_decref(conn);
2020 spin_lock_irqsave(&dev->rad_lock, flags);
2024 /* retry with exponential backoff until HZ */
2025 if (conn->rac_keepalive == 0)
2026 conn->rac_keepalive = 1;
2027 else if (conn->rac_keepalive <= HZ)
2028 conn->rac_keepalive *= 2;
2030 conn->rac_keepalive += HZ;
2032 deadline = conn->rac_last_tx + conn->rac_keepalive;
2033 spin_lock_irqsave(&dev->rad_lock, flags);
2036 /* Does this conn need attention soonest? */
2037 if (nsoonest++ == 0 ||
2038 !time_after_eq(deadline, soonest))
2042 if (dropped_lock) /* may sleep iff I didn't drop the lock */
2045 set_current_state(TASK_INTERRUPTIBLE);
2046 add_wait_queue(&dev->rad_waitq, &wait);
2047 spin_unlock_irqrestore(&dev->rad_lock, flags);
2049 if (nsoonest == 0) {
2053 timeout = (long)(soonest - jiffies);
2056 schedule_timeout(timeout);
2060 remove_wait_queue(&dev->rad_waitq, &wait);
2061 set_current_state(TASK_RUNNING);
2062 spin_lock_irqsave(&dev->rad_lock, flags);
2065 spin_unlock_irqrestore(&dev->rad_lock, flags);
2067 dev->rad_scheduler = NULL;
2068 kranal_thread_fini();
2073 lib_nal_t kranal_lib = {
2074 libnal_data: &kranal_data, /* NAL private data */
2075 libnal_send: kranal_send,
2076 libnal_send_pages: kranal_send_pages,
2077 libnal_recv: kranal_recv,
2078 libnal_recv_pages: kranal_recv_pages,
2079 libnal_dist: kranal_dist