1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2 * vim:expandtab:shiftwidth=8:tabstop=8:
4 * Copyright (C) 2004 Cluster File Systems, Inc.
5 * Author: Eric Barton <eric@bartonsoftware.com>
7 * This file is part of Lustre, http://www.lustre.org.
9 * Lustre is free software; you can redistribute it and/or
10 * modify it under the terms of version 2 of the GNU General Public
11 * License as published by the Free Software Foundation.
13 * Lustre is distributed in the hope that it will be useful,
14 * but WITHOUT ANY WARRANTY; without even the implied warranty of
15 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16 * GNU General Public License for more details.
18 * You should have received a copy of the GNU General Public License
19 * along with Lustre; if not, write to the Free Software
20 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
27 * LIB functions follow
31 kibnal_schedule_tx_done (kib_tx_t *tx)
35 spin_lock_irqsave (&kibnal_data.kib_sched_lock, flags);
37 list_add_tail(&tx->tx_list, &kibnal_data.kib_sched_txq);
38 wake_up (&kibnal_data.kib_sched_waitq);
40 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
44 kibnal_tx_done (kib_tx_t *tx)
46 ptl_err_t ptlrc = (tx->tx_status == 0) ? PTL_OK : PTL_FAIL;
51 LASSERT (tx->tx_sending == 0); /* mustn't be awaiting callback */
52 LASSERT (!tx->tx_passive_rdma_wait); /* mustn't be awaiting RDMA */
54 switch (tx->tx_mapped) {
63 /* can't deregister memory in IRQ context... */
64 kibnal_schedule_tx_done(tx);
67 frc = iibt_deregister_memory(tx->tx_md.md_handle);
68 LASSERT (frc == FSUCCESS);
69 tx->tx_mapped = KIB_TX_UNMAPPED;
73 case KIB_TX_MAPPED_FMR:
74 if (in_interrupt() && tx->tx_status != 0) {
75 /* can't flush FMRs in IRQ context... */
76 kibnal_schedule_tx_done(tx);
80 rc = ib_fmr_deregister(tx->tx_md.md_handle.fmr);
83 if (tx->tx_status != 0)
84 ib_fmr_pool_force_flush(kibnal_data.kib_fmr_pool);
85 tx->tx_mapped = KIB_TX_UNMAPPED;
90 for (i = 0; i < 2; i++) {
91 /* tx may have up to 2 libmsgs to finalise */
92 if (tx->tx_libmsg[i] == NULL)
95 lib_finalize (&kibnal_lib, NULL, tx->tx_libmsg[i], ptlrc);
96 tx->tx_libmsg[i] = NULL;
99 if (tx->tx_conn != NULL) {
100 kibnal_put_conn (tx->tx_conn);
105 tx->tx_passive_rdma = 0;
108 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
111 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_nblk_txs);
113 list_add_tail (&tx->tx_list, &kibnal_data.kib_idle_txs);
114 wake_up (&kibnal_data.kib_idle_tx_waitq);
117 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
121 kibnal_get_idle_tx (int may_block)
128 spin_lock_irqsave (&kibnal_data.kib_tx_lock, flags);
130 /* "normal" descriptor is free */
131 if (!list_empty (&kibnal_data.kib_idle_txs)) {
132 tx = list_entry (kibnal_data.kib_idle_txs.next,
138 /* may dip into reserve pool */
139 if (list_empty (&kibnal_data.kib_idle_nblk_txs)) {
140 CERROR ("reserved tx desc pool exhausted\n");
144 tx = list_entry (kibnal_data.kib_idle_nblk_txs.next,
149 /* block for idle tx */
150 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
152 wait_event (kibnal_data.kib_idle_tx_waitq,
153 !list_empty (&kibnal_data.kib_idle_txs) ||
154 kibnal_data.kib_shutdown);
158 list_del (&tx->tx_list);
160 /* Allocate a new passive RDMA completion cookie. It might
161 * not be needed, but we've got a lock right now and we're
162 * unlikely to wrap... */
163 tx->tx_passive_rdma_cookie = kibnal_data.kib_next_tx_cookie++;
165 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
166 LASSERT (tx->tx_nsp == 0);
167 LASSERT (tx->tx_sending == 0);
168 LASSERT (tx->tx_status == 0);
169 LASSERT (tx->tx_conn == NULL);
170 LASSERT (!tx->tx_passive_rdma);
171 LASSERT (!tx->tx_passive_rdma_wait);
172 LASSERT (tx->tx_libmsg[0] == NULL);
173 LASSERT (tx->tx_libmsg[1] == NULL);
176 spin_unlock_irqrestore (&kibnal_data.kib_tx_lock, flags);
182 kibnal_dist(lib_nal_t *nal, ptl_nid_t nid, unsigned long *dist)
184 /* I would guess that if kibnal_get_peer (nid) == NULL,
185 and we're not routing, then 'nid' is very distant :) */
186 if ( nal->libnal_ni.ni_pid.nid == nid ) {
196 kibnal_complete_passive_rdma(kib_conn_t *conn, __u64 cookie, int status)
198 struct list_head *ttmp;
202 spin_lock_irqsave (&conn->ibc_lock, flags);
204 list_for_each (ttmp, &conn->ibc_active_txs) {
205 kib_tx_t *tx = list_entry(ttmp, kib_tx_t, tx_list);
207 LASSERT (tx->tx_passive_rdma ||
208 !tx->tx_passive_rdma_wait);
210 LASSERT (tx->tx_passive_rdma_wait ||
211 tx->tx_sending != 0);
213 if (!tx->tx_passive_rdma_wait ||
214 tx->tx_passive_rdma_cookie != cookie)
217 CDEBUG(D_NET, "Complete %p "LPD64": %d\n", tx, cookie, status);
219 tx->tx_status = status;
220 tx->tx_passive_rdma_wait = 0;
221 idle = (tx->tx_sending == 0);
224 list_del (&tx->tx_list);
226 spin_unlock_irqrestore (&conn->ibc_lock, flags);
228 /* I could be racing with tx callbacks. It's whoever
229 * _makes_ tx idle that frees it */
235 spin_unlock_irqrestore (&conn->ibc_lock, flags);
237 CERROR ("Unmatched (late?) RDMA completion "LPX64" from "LPX64"\n",
238 cookie, conn->ibc_peer->ibp_nid);
242 kibnal_lkey(kib_pages_t *ibp)
244 if (kibnal_whole_mem())
245 return kibnal_data.kib_md.md_lkey;
247 return ibp->ibp_lkey;
251 kibnal_post_rx (kib_rx_t *rx, int do_credits)
253 kib_conn_t *conn = rx->rx_conn;
259 rx->rx_gl = (IB_LOCAL_DATASEGMENT) {
260 .Address = rx->rx_vaddr,
261 .Length = IBNAL_MSG_SIZE,
262 .Lkey = kibnal_lkey(conn->ibc_rx_pages),
265 rx->rx_wrq = (IB_WORK_REQ) {
266 .Operation = WROpRecv,
268 .MessageLen = IBNAL_MSG_SIZE,
269 .WorkReqId = kibnal_ptr2wreqid(rx, 1),
270 .DSList = &rx->rx_gl,
273 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED,
275 LASSERT (!rx->rx_posted);
279 if (conn->ibc_state != IBNAL_CONN_ESTABLISHED)
282 frc = iibt_postrecv(conn->ibc_qp, &rx->rx_wrq);
283 if (frc != FSUCCESS) {
284 CDEBUG(D_NET, "post failed %d\n", frc);
287 CDEBUG(D_NET, "posted rx %p\n", &rx->rx_wrq);
292 spin_lock_irqsave(&conn->ibc_lock, flags);
293 conn->ibc_outstanding_credits++;
294 spin_unlock_irqrestore(&conn->ibc_lock, flags);
296 kibnal_check_sends(conn);
302 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
303 CERROR ("Error posting receive -> "LPX64": %d\n",
304 conn->ibc_peer->ibp_nid, rc);
305 kibnal_close_conn (rx->rx_conn, rc);
307 CDEBUG (D_NET, "Error posting receive -> "LPX64": %d\n",
308 conn->ibc_peer->ibp_nid, rc);
312 kibnal_put_conn (conn);
317 static inline __u32 kibnal_cksum (void *ptr, int nob)
323 sum = ((sum << 1) | (sum >> 31)) + *c++;
329 static void hexdump(char *string, void *ptr, int len)
331 unsigned char *c = ptr;
336 if (len < 0 || len > 2048) {
337 printk("XXX what the hell? %d\n",len);
341 printk("%d bytes of '%s' from 0x%p\n", len, string, ptr);
343 for (i = 0; i < len;) {
344 printk("%02x",*(c++));
359 kibnal_rx_callback (IB_WORK_COMPLETION *wc)
361 kib_rx_t *rx = (kib_rx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
362 kib_msg_t *msg = rx->rx_msg;
363 kib_conn_t *conn = rx->rx_conn;
364 int nob = wc->Length;
365 const int base_nob = offsetof(kib_msg_t, ibm_u);
372 __u32 computed_cksum;
375 /* we set the QP to erroring after we've finished disconnecting,
376 * maybe we should do so sooner. */
377 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_ESTABLISHED,
378 IBNAL_CONN_DISCONNECTED);
380 CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
381 LASSERT (rx->rx_posted);
385 /* receives complete with error in any case after we've started
387 if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
390 if (wc->Status != WRStatusSuccess) {
391 CERROR("Rx from "LPX64" failed: %d\n",
392 conn->ibc_peer->ibp_nid, wc->Status);
396 if (nob < base_nob) {
397 CERROR ("Short rx from "LPX64": %d < expected %d\n",
398 conn->ibc_peer->ibp_nid, nob, base_nob);
402 hexdump("rx", rx->rx_msg, sizeof(kib_msg_t));
404 /* Receiver does any byte flipping if necessary... */
406 if (msg->ibm_magic == IBNAL_MSG_MAGIC) {
409 if (msg->ibm_magic != __swab32(IBNAL_MSG_MAGIC)) {
410 CERROR ("Unrecognised magic: %08x from "LPX64"\n",
411 msg->ibm_magic, conn->ibc_peer->ibp_nid);
415 __swab16s (&msg->ibm_version);
416 LASSERT (sizeof(msg->ibm_type) == 1);
417 LASSERT (sizeof(msg->ibm_credits) == 1);
420 if (msg->ibm_version != IBNAL_MSG_VERSION) {
421 CERROR ("Incompatible msg version %d (%d expected)\n",
422 msg->ibm_version, IBNAL_MSG_VERSION);
427 if (nob != msg->ibm_nob) {
428 CERROR ("Unexpected # bytes %d (%d expected)\n", nob, msg->ibm_nob);
432 msg_cksum = le32_to_cpu(msg->ibm_cksum);
434 computed_cksum = kibnal_cksum (msg, nob);
436 if (msg_cksum != computed_cksum) {
437 CERROR ("Checksum failure %d: (%d expected)\n",
438 computed_cksum, msg_cksum);
441 CDEBUG(D_NET, "cksum %x, nob %d\n", computed_cksum, nob);
444 /* Have I received credits that will let me send? */
445 credits = msg->ibm_credits;
447 spin_lock_irqsave(&conn->ibc_lock, flags);
448 conn->ibc_credits += credits;
449 spin_unlock_irqrestore(&conn->ibc_lock, flags);
451 kibnal_check_sends(conn);
454 switch (msg->ibm_type) {
456 kibnal_post_rx (rx, 1);
459 case IBNAL_MSG_IMMEDIATE:
460 if (nob < base_nob + sizeof (kib_immediate_msg_t)) {
461 CERROR ("Short IMMEDIATE from "LPX64": %d\n",
462 conn->ibc_peer->ibp_nid, nob);
467 case IBNAL_MSG_PUT_RDMA:
468 case IBNAL_MSG_GET_RDMA:
469 if (nob < base_nob + sizeof (kib_rdma_msg_t)) {
470 CERROR ("Short RDMA msg from "LPX64": %d\n",
471 conn->ibc_peer->ibp_nid, nob);
475 __swab32(msg->ibm_u.rdma.ibrm_num_descs);
477 CDEBUG(D_NET, "%d RDMA: cookie "LPX64":\n",
478 msg->ibm_type, msg->ibm_u.rdma.ibrm_cookie);
480 if ((msg->ibm_u.rdma.ibrm_num_descs > PTL_MD_MAX_IOV) ||
481 (kib_rdma_msg_len(msg->ibm_u.rdma.ibrm_num_descs) >
482 min(nob, IBNAL_MSG_SIZE))) {
483 CERROR ("num_descs %d too large\n",
484 msg->ibm_u.rdma.ibrm_num_descs);
488 for(i = 0; i < msg->ibm_u.rdma.ibrm_num_descs; i++) {
489 kib_rdma_desc_t *desc = &msg->ibm_u.rdma.ibrm_desc[i];
492 __swab32(desc->rd_key);
493 __swab32(desc->rd_nob);
494 __swab64(desc->rd_addr);
497 CDEBUG(D_NET, " key %x, " "addr "LPX64", nob %u\n",
498 desc->rd_key, desc->rd_addr, desc->rd_nob);
502 case IBNAL_MSG_PUT_DONE:
503 case IBNAL_MSG_GET_DONE:
504 if (nob < base_nob + sizeof (kib_completion_msg_t)) {
505 CERROR ("Short COMPLETION msg from "LPX64": %d\n",
506 conn->ibc_peer->ibp_nid, nob);
510 __swab32s(&msg->ibm_u.completion.ibcm_status);
512 CDEBUG(D_NET, "%d DONE: cookie "LPX64", status %d\n",
513 msg->ibm_type, msg->ibm_u.completion.ibcm_cookie,
514 msg->ibm_u.completion.ibcm_status);
516 kibnal_complete_passive_rdma (conn,
517 msg->ibm_u.completion.ibcm_cookie,
518 msg->ibm_u.completion.ibcm_status);
519 kibnal_post_rx (rx, 1);
523 CERROR ("Can't parse type from "LPX64": %d\n",
524 conn->ibc_peer->ibp_nid, msg->ibm_type);
528 /* schedule for kibnal_rx() in thread context */
529 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
531 list_add_tail (&rx->rx_list, &kibnal_data.kib_sched_rxq);
532 wake_up (&kibnal_data.kib_sched_waitq);
534 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
538 CDEBUG(D_NET, "rx %p conn %p\n", rx, conn);
539 kibnal_close_conn(conn, -ECONNABORTED);
541 /* Don't re-post rx & drop its ref on conn */
542 kibnal_put_conn(conn);
546 kibnal_rx (kib_rx_t *rx)
548 kib_msg_t *msg = rx->rx_msg;
550 /* Clear flag so I can detect if I've sent an RDMA completion */
553 switch (msg->ibm_type) {
554 case IBNAL_MSG_GET_RDMA:
555 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
556 /* If the incoming get was matched, I'll have initiated the
557 * RDMA and the completion message... */
561 /* Otherwise, I'll send a failed completion now to prevent
562 * the peer's GET blocking for the full timeout. */
563 CERROR ("Completing unmatched RDMA GET from "LPX64"\n",
564 rx->rx_conn->ibc_peer->ibp_nid);
565 kibnal_start_active_rdma (IBNAL_MSG_GET_DONE, -EIO,
566 rx, NULL, 0, NULL, NULL, 0, 0);
569 case IBNAL_MSG_PUT_RDMA:
570 lib_parse(&kibnal_lib, &msg->ibm_u.rdma.ibrm_hdr, rx);
573 /* This is most unusual, since even if lib_parse() didn't
574 * match anything, it should have asked us to read (and
575 * discard) the payload. The portals header must be
576 * inconsistent with this message type, so it's the
577 * sender's fault for sending garbage and she can time
579 CERROR ("Uncompleted RMDA PUT from "LPX64"\n",
580 rx->rx_conn->ibc_peer->ibp_nid);
583 case IBNAL_MSG_IMMEDIATE:
584 lib_parse(&kibnal_lib, &msg->ibm_u.immediate.ibim_hdr, rx);
585 LASSERT (!rx->rx_rdma);
593 kibnal_post_rx (rx, 1);
597 kibnal_kvaddr_to_page (unsigned long vaddr)
601 if (vaddr >= VMALLOC_START &&
603 page = vmalloc_to_page ((void *)vaddr);
605 else if (vaddr >= PKMAP_BASE &&
606 vaddr < (PKMAP_BASE + LAST_PKMAP * PAGE_SIZE))
607 page = vmalloc_to_page ((void *)vaddr);
608 /* in 2.4 ^ just walks the page tables */
611 page = virt_to_page (vaddr);
613 if (!VALID_PAGE (page))
620 kibnal_fill_ibrm(kib_tx_t *tx, struct page *page, unsigned long page_offset,
621 unsigned long len, int active)
623 kib_rdma_msg_t *ibrm = &tx->tx_msg->ibm_u.rdma;
624 kib_rdma_desc_t *desc;
626 LASSERTF(ibrm->ibrm_num_descs < PTL_MD_MAX_IOV, "%u\n",
627 ibrm->ibrm_num_descs);
629 desc = &ibrm->ibrm_desc[ibrm->ibrm_num_descs];
631 desc->rd_key = kibnal_data.kib_md.md_lkey;
633 desc->rd_key = kibnal_data.kib_md.md_rkey;
634 desc->rd_nob = len; /*PAGE_SIZE - kiov->kiov_offset; */
635 desc->rd_addr = kibnal_page2phys(page) + page_offset +
636 kibnal_data.kib_md.md_addr;
638 ibrm->ibrm_num_descs++;
642 kibnal_map_rdma_iov(kib_tx_t *tx, unsigned long vaddr, int nob, int active)
645 int page_offset, len;
648 page = kibnal_kvaddr_to_page(vaddr);
652 page_offset = vaddr & (PAGE_SIZE - 1);
653 len = min(nob, (int)PAGE_SIZE - page_offset);
655 kibnal_fill_ibrm(tx, page, page_offset, len, active);
663 kibnal_map_iov (kib_tx_t *tx, IB_ACCESS_CONTROL access,
664 int niov, struct iovec *iov, int offset, int nob, int active)
672 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
674 while (offset >= iov->iov_len) {
675 offset -= iov->iov_len;
681 if (nob > iov->iov_len - offset) {
682 CERROR ("Can't map multiple vaddr fragments\n");
686 /* our large contiguous iov could be backed by multiple physical
688 if (kibnal_whole_mem()) {
690 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
691 rc = kibnal_map_rdma_iov(tx, (unsigned long)iov->iov_base +
692 offset, nob, active);
694 CERROR ("Can't map iov: %d\n", rc);
700 vaddr = (void *)(((unsigned long)iov->iov_base) + offset);
701 tx->tx_md.md_addr = (__u64)((unsigned long)vaddr);
703 frc = iibt_register_memory(kibnal_data.kib_hca, vaddr, nob,
704 kibnal_data.kib_pd, access,
705 &tx->tx_md.md_handle, &tx->tx_md.md_lkey,
708 CERROR ("Can't map vaddr %p: %d\n", vaddr, frc);
712 tx->tx_mapped = KIB_TX_MAPPED;
717 kibnal_map_kiov (kib_tx_t *tx, IB_ACCESS_CONTROL access,
718 int nkiov, ptl_kiov_t *kiov,
719 int offset, int nob, int active)
729 CDEBUG(D_NET, "niov %d offset %d nob %d\n", nkiov, offset, nob);
733 LASSERT (tx->tx_mapped == KIB_TX_UNMAPPED);
735 while (offset >= kiov->kiov_len) {
736 offset -= kiov->kiov_len;
742 page_offset = kiov->kiov_offset + offset;
745 if (!kibnal_whole_mem()) {
746 phys_size = nkiov * sizeof (*phys);
747 PORTAL_ALLOC(phys, phys_size);
749 CERROR ("Can't allocate tmp phys\n");
753 phys[0] = kibnal_page2phys(kiov->kiov_page);
755 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 0;
756 kibnal_fill_ibrm(tx, kiov->kiov_page, kiov->kiov_offset,
757 kiov->kiov_len, active);
760 resid = nob - (kiov->kiov_len - offset);
767 if (kiov->kiov_offset != 0 ||
768 ((resid > PAGE_SIZE) &&
769 kiov->kiov_len < PAGE_SIZE)) {
770 /* Can't have gaps */
771 CERROR ("Can't make payload contiguous in I/O VM:"
772 "page %d, offset %d, len %d \n", nphys,
773 kiov->kiov_offset, kiov->kiov_len);
775 for (i = -nphys; i < nkiov; i++)
777 CERROR("kiov[%d] %p +%d for %d\n",
778 i, kiov[i].kiov_page, kiov[i].kiov_offset, kiov[i].kiov_len);
785 if (nphys == PTL_MD_MAX_IOV) {
786 CERROR ("payload too big (%d)\n", nphys);
791 if (!kibnal_whole_mem()) {
792 LASSERT (nphys * sizeof (*phys) < phys_size);
793 phys[nphys] = kibnal_page2phys(kiov->kiov_page);
795 if (kib_rdma_msg_len(nphys) > IBNAL_MSG_SIZE) {
796 CERROR ("payload too big (%d)\n", nphys);
800 kibnal_fill_ibrm(tx, kiov->kiov_page,
801 kiov->kiov_offset, kiov->kiov_len,
809 if (kibnal_whole_mem())
813 CWARN ("nphys %d, nob %d, page_offset %d\n", nphys, nob, page_offset);
814 for (i = 0; i < nphys; i++)
815 CWARN (" [%d] "LPX64"\n", i, phys[i]);
819 #error "iibnal hasn't learned about FMR yet"
820 rc = ib_fmr_register_physical (kibnal_data.kib_fmr_pool,
824 &tx->tx_md.md_handle.fmr,
828 frc = iibt_register_physical_memory(kibnal_data.kib_hca,
834 &tx->tx_md.md_handle,
839 if (frc == FSUCCESS) {
840 CDEBUG(D_NET, "Mapped %d pages %d bytes @ offset %d: lkey %x, rkey %x\n",
841 nphys, nob, page_offset, tx->tx_md.md_lkey, tx->tx_md.md_rkey);
843 tx->tx_mapped = KIB_TX_MAPPED_FMR;
845 tx->tx_mapped = KIB_TX_MAPPED;
848 CERROR ("Can't map phys: %d\n", rc);
854 PORTAL_FREE(phys, phys_size);
859 kibnal_find_conn_locked (kib_peer_t *peer)
861 struct list_head *tmp;
863 /* just return the first connection */
864 list_for_each (tmp, &peer->ibp_conns) {
865 return (list_entry(tmp, kib_conn_t, ibc_list));
872 kibnal_check_sends (kib_conn_t *conn)
882 spin_lock_irqsave (&conn->ibc_lock, flags);
884 LASSERT (conn->ibc_nsends_posted <= IBNAL_MSG_QUEUE_SIZE);
886 if (list_empty(&conn->ibc_tx_queue) &&
887 conn->ibc_outstanding_credits >= IBNAL_CREDIT_HIGHWATER) {
888 spin_unlock_irqrestore(&conn->ibc_lock, flags);
890 tx = kibnal_get_idle_tx(0); /* don't block */
892 kibnal_init_tx_msg(tx, IBNAL_MSG_NOOP, 0);
894 spin_lock_irqsave(&conn->ibc_lock, flags);
897 atomic_inc(&conn->ibc_refcount);
898 kibnal_queue_tx_locked(tx, conn);
902 while (!list_empty (&conn->ibc_tx_queue)) {
903 tx = list_entry (conn->ibc_tx_queue.next, kib_tx_t, tx_list);
905 /* We rely on this for QP sizing */
906 LASSERT (tx->tx_nsp > 0 && tx->tx_nsp <= IBNAL_TX_MAX_SG);
908 LASSERT (conn->ibc_outstanding_credits >= 0);
909 LASSERT (conn->ibc_outstanding_credits <= IBNAL_MSG_QUEUE_SIZE);
910 LASSERT (conn->ibc_credits >= 0);
911 LASSERT (conn->ibc_credits <= IBNAL_MSG_QUEUE_SIZE);
913 /* Not on ibc_rdma_queue */
914 LASSERT (!tx->tx_passive_rdma_wait);
916 if (conn->ibc_nsends_posted == IBNAL_MSG_QUEUE_SIZE)
919 if (conn->ibc_credits == 0) /* no credits */
922 if (conn->ibc_credits == 1 && /* last credit reserved for */
923 conn->ibc_outstanding_credits == 0) /* giving back credits */
926 list_del (&tx->tx_list);
928 if (tx->tx_msg->ibm_type == IBNAL_MSG_NOOP &&
929 (!list_empty(&conn->ibc_tx_queue) ||
930 conn->ibc_outstanding_credits < IBNAL_CREDIT_HIGHWATER)) {
932 spin_unlock_irqrestore(&conn->ibc_lock, flags);
934 spin_lock_irqsave(&conn->ibc_lock, flags);
938 tx->tx_msg->ibm_credits = conn->ibc_outstanding_credits;
939 conn->ibc_outstanding_credits = 0;
941 conn->ibc_nsends_posted++;
944 /* we only get a tx completion for the final rdma op */
945 tx->tx_sending = min(tx->tx_nsp, 2);
946 tx->tx_passive_rdma_wait = tx->tx_passive_rdma;
947 list_add (&tx->tx_list, &conn->ibc_active_txs);
949 tx->tx_msg->ibm_cksum = 0;
950 tx->tx_msg->ibm_cksum = kibnal_cksum(tx->tx_msg, tx->tx_msg->ibm_nob);
951 CDEBUG(D_NET, "cksum %x, nob %d\n", tx->tx_msg->ibm_cksum, tx->tx_msg->ibm_nob);
953 spin_unlock_irqrestore (&conn->ibc_lock, flags);
955 /* NB the gap between removing tx from the queue and sending it
956 * allows message re-ordering to occur */
958 LASSERT (tx->tx_nsp > 0);
962 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
964 /* Driver only accepts 1 item at a time */
965 for (i = 0; i < tx->tx_nsp; i++) {
966 hexdump("tx", tx->tx_msg, sizeof(kib_msg_t));
967 rc = iibt_postsend(conn->ibc_qp,
971 if (wrq_signals_completion(&tx->tx_wrq[i]))
973 CDEBUG(D_NET, "posted tx wrq %p\n",
978 spin_lock_irqsave (&conn->ibc_lock, flags);
980 /* NB credits are transferred in the actual
981 * message, which can only be the last work item */
982 conn->ibc_outstanding_credits += tx->tx_msg->ibm_credits;
984 conn->ibc_nsends_posted--;
987 tx->tx_passive_rdma_wait = 0;
988 tx->tx_sending -= tx->tx_nsp - nwork;
990 done = (tx->tx_sending == 0);
992 list_del (&tx->tx_list);
994 spin_unlock_irqrestore (&conn->ibc_lock, flags);
996 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
997 CERROR ("Error %d posting transmit to "LPX64"\n",
998 rc, conn->ibc_peer->ibp_nid);
1000 CDEBUG (D_NET, "Error %d posting transmit to "
1001 LPX64"\n", rc, conn->ibc_peer->ibp_nid);
1003 kibnal_close_conn (conn, rc);
1006 kibnal_tx_done (tx);
1014 spin_unlock_irqrestore (&conn->ibc_lock, flags);
1018 kibnal_tx_callback (IB_WORK_COMPLETION *wc)
1020 kib_tx_t *tx = (kib_tx_t *)kibnal_wreqid2ptr(wc->WorkReqId);
1022 unsigned long flags;
1026 LASSERT (conn != NULL);
1027 LASSERT (tx->tx_sending != 0);
1029 spin_lock_irqsave(&conn->ibc_lock, flags);
1031 CDEBUG(D_NET, "conn %p tx %p [%d/%d]: %d\n", conn, tx,
1032 tx->tx_sending, tx->tx_nsp, wc->Status);
1034 /* I could be racing with rdma completion. Whoever makes 'tx' idle
1035 * gets to free it, which also drops its ref on 'conn'. If it's
1036 * not me, then I take an extra ref on conn so it can't disappear
1040 idle = (tx->tx_sending == 0) && /* This is the final callback */
1041 (!tx->tx_passive_rdma_wait); /* Not waiting for RDMA completion */
1043 list_del(&tx->tx_list);
1045 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1046 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1047 atomic_read (&conn->ibc_refcount));
1048 atomic_inc (&conn->ibc_refcount);
1050 if (tx->tx_sending == 0)
1051 conn->ibc_nsends_posted--;
1053 if (wc->Status != WRStatusSuccess &&
1055 tx->tx_status = -ECONNABORTED;
1057 spin_unlock_irqrestore(&conn->ibc_lock, flags);
1060 kibnal_tx_done (tx);
1062 if (wc->Status != WRStatusSuccess) {
1063 CERROR ("Tx completion to "LPX64" failed: %d\n",
1064 conn->ibc_peer->ibp_nid, wc->Status);
1065 kibnal_close_conn (conn, -ENETDOWN);
1067 /* can I shovel some more sends out the door? */
1068 kibnal_check_sends(conn);
1071 kibnal_put_conn (conn);
1075 kibnal_ca_async_callback (void *ca_arg, IB_EVENT_RECORD *ev)
1077 /* XXX flesh out. this seems largely for async errors */
1078 CERROR("type: %d code: %u\n", ev->EventType, ev->EventCode);
1082 kibnal_ca_callback (void *ca_arg, void *cq_arg)
1084 IB_HANDLE cq = *(IB_HANDLE *)cq_arg;
1085 IB_HANDLE ca = *(IB_HANDLE *)ca_arg;
1086 IB_WORK_COMPLETION wc;
1089 CDEBUG(D_NET, "ca %p cq %p\n", ca, cq);
1092 while (iibt_cq_poll(cq, &wc) == FSUCCESS) {
1093 if (kibnal_wreqid_is_rx(wc.WorkReqId))
1094 kibnal_rx_callback(&wc);
1096 kibnal_tx_callback(&wc);
1100 if (iibt_cq_rearm(cq, CQEventSelNextWC) != FSUCCESS) {
1101 CERROR("rearm failed?\n");
1109 kibnal_init_tx_msg (kib_tx_t *tx, int type, int body_nob)
1111 IB_LOCAL_DATASEGMENT *gl = &tx->tx_gl[tx->tx_nsp];
1112 IB_WORK_REQ *wrq = &tx->tx_wrq[tx->tx_nsp];
1114 int nob = offsetof (kib_msg_t, ibm_u) + body_nob;
1116 LASSERT (tx->tx_nsp >= 0 &&
1117 tx->tx_nsp < sizeof(tx->tx_wrq)/sizeof(tx->tx_wrq[0]));
1118 LASSERT (nob <= IBNAL_MSG_SIZE);
1120 tx->tx_msg->ibm_magic = IBNAL_MSG_MAGIC;
1121 tx->tx_msg->ibm_version = IBNAL_MSG_VERSION;
1122 tx->tx_msg->ibm_type = type;
1124 tx->tx_msg->ibm_nob = nob;
1126 /* Fence the message if it's bundled with an RDMA read */
1127 fence = (tx->tx_nsp > 0) &&
1128 (type == IBNAL_MSG_PUT_DONE);
1130 *gl = (IB_LOCAL_DATASEGMENT) {
1131 .Address = tx->tx_vaddr,
1132 .Length = IBNAL_MSG_SIZE,
1133 .Lkey = kibnal_lkey(kibnal_data.kib_tx_pages),
1136 wrq->WorkReqId = kibnal_ptr2wreqid(tx, 0);
1137 wrq->Operation = WROpSend;
1139 wrq->DSListDepth = 1;
1140 wrq->MessageLen = nob;
1141 wrq->Req.SendRC.ImmediateData = 0;
1142 wrq->Req.SendRC.Options.s.SolicitedEvent = 1;
1143 wrq->Req.SendRC.Options.s.SignaledCompletion = 1;
1144 wrq->Req.SendRC.Options.s.ImmediateData = 0;
1145 wrq->Req.SendRC.Options.s.Fence = fence;
1151 kibnal_queue_tx (kib_tx_t *tx, kib_conn_t *conn)
1153 unsigned long flags;
1155 spin_lock_irqsave(&conn->ibc_lock, flags);
1157 kibnal_queue_tx_locked (tx, conn);
1159 spin_unlock_irqrestore(&conn->ibc_lock, flags);
1161 kibnal_check_sends(conn);
1165 kibnal_launch_tx (kib_tx_t *tx, ptl_nid_t nid)
1167 unsigned long flags;
1170 rwlock_t *g_lock = &kibnal_data.kib_global_lock;
1172 /* If I get here, I've committed to send, so I complete the tx with
1173 * failure on any problems */
1175 LASSERT (tx->tx_conn == NULL); /* only set when assigned a conn */
1176 LASSERT (tx->tx_nsp > 0); /* work items have been set up */
1180 peer = kibnal_find_peer_locked (nid);
1182 read_unlock (g_lock);
1183 tx->tx_status = -EHOSTUNREACH;
1184 kibnal_tx_done (tx);
1188 conn = kibnal_find_conn_locked (peer);
1190 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1191 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1192 atomic_read (&conn->ibc_refcount));
1193 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1194 read_unlock (g_lock);
1196 kibnal_queue_tx (tx, conn);
1200 /* Making one or more connections; I'll need a write lock... */
1201 read_unlock (g_lock);
1202 write_lock_irqsave (g_lock, flags);
1204 peer = kibnal_find_peer_locked (nid);
1206 write_unlock_irqrestore (g_lock, flags);
1207 tx->tx_status = -EHOSTUNREACH;
1208 kibnal_tx_done (tx);
1212 conn = kibnal_find_conn_locked (peer);
1214 /* Connection exists; queue message on it */
1215 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1216 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1217 atomic_read (&conn->ibc_refcount));
1218 atomic_inc (&conn->ibc_refcount); /* 1 ref for the tx */
1219 write_unlock_irqrestore (g_lock, flags);
1221 kibnal_queue_tx (tx, conn);
1225 if (peer->ibp_connecting == 0) {
1226 if (!time_after_eq(jiffies, peer->ibp_reconnect_time)) {
1227 write_unlock_irqrestore (g_lock, flags);
1228 tx->tx_status = -EHOSTUNREACH;
1229 kibnal_tx_done (tx);
1233 peer->ibp_connecting = 1;
1234 kib_peer_addref(peer); /* extra ref for connd */
1236 spin_lock (&kibnal_data.kib_connd_lock);
1238 list_add_tail (&peer->ibp_connd_list,
1239 &kibnal_data.kib_connd_peers);
1240 wake_up (&kibnal_data.kib_connd_waitq);
1242 spin_unlock (&kibnal_data.kib_connd_lock);
1245 /* A connection is being established; queue the message... */
1246 list_add_tail (&tx->tx_list, &peer->ibp_tx_queue);
1248 write_unlock_irqrestore (g_lock, flags);
1252 kibnal_start_passive_rdma (int type, ptl_nid_t nid,
1253 lib_msg_t *libmsg, ptl_hdr_t *hdr)
1255 int nob = libmsg->md->length;
1259 IB_ACCESS_CONTROL access = {0,};
1261 LASSERT (type == IBNAL_MSG_PUT_RDMA || type == IBNAL_MSG_GET_RDMA);
1263 LASSERT (!in_interrupt()); /* Mapping could block */
1265 access.s.MWBindable = 1;
1266 access.s.LocalWrite = 1;
1267 access.s.RdmaRead = 1;
1268 access.s.RdmaWrite = 1;
1270 tx = kibnal_get_idle_tx (1); /* May block; caller is an app thread */
1271 LASSERT (tx != NULL);
1273 if ((libmsg->md->options & PTL_MD_KIOV) == 0)
1274 rc = kibnal_map_iov (tx, access,
1275 libmsg->md->md_niov,
1276 libmsg->md->md_iov.iov,
1279 rc = kibnal_map_kiov (tx, access,
1280 libmsg->md->md_niov,
1281 libmsg->md->md_iov.kiov,
1285 CERROR ("Can't map RDMA for "LPX64": %d\n", nid, rc);
1289 if (type == IBNAL_MSG_GET_RDMA) {
1290 /* reply gets finalized when tx completes */
1291 tx->tx_libmsg[1] = lib_create_reply_msg(&kibnal_lib,
1293 if (tx->tx_libmsg[1] == NULL) {
1294 CERROR ("Can't create reply for GET -> "LPX64"\n",
1301 tx->tx_passive_rdma = 1;
1305 ibmsg->ibm_u.rdma.ibrm_hdr = *hdr;
1306 ibmsg->ibm_u.rdma.ibrm_cookie = tx->tx_passive_rdma_cookie;
1307 /* map_kiov alrady filled the rdma descs for the whole_mem case */
1308 if (!kibnal_whole_mem()) {
1309 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_key = tx->tx_md.md_rkey;
1310 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1311 ibmsg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1312 ibmsg->ibm_u.rdma.ibrm_num_descs = 1;
1315 kibnal_init_tx_msg (tx, type,
1316 kib_rdma_msg_len(ibmsg->ibm_u.rdma.ibrm_num_descs));
1318 CDEBUG(D_NET, "Passive: %p cookie "LPX64", key %x, addr "
1320 tx, tx->tx_passive_rdma_cookie, tx->tx_md.md_rkey,
1321 tx->tx_md.md_addr, nob);
1323 /* libmsg gets finalized when tx completes. */
1324 tx->tx_libmsg[0] = libmsg;
1326 kibnal_launch_tx(tx, nid);
1331 kibnal_tx_done (tx);
1336 kibnal_start_active_rdma (int type, int status,
1337 kib_rx_t *rx, lib_msg_t *libmsg,
1339 struct iovec *iov, ptl_kiov_t *kiov,
1340 size_t offset, size_t nob)
1342 kib_msg_t *rxmsg = rx->rx_msg;
1345 IB_ACCESS_CONTROL access = {0,};
1350 CDEBUG(D_NET, "type %d, status %d, niov %d, offset %d, nob %d\n",
1351 type, status, niov, offset, nob);
1353 /* Called by scheduler */
1354 LASSERT (!in_interrupt ());
1356 /* Either all pages or all vaddrs */
1357 LASSERT (!(kiov != NULL && iov != NULL));
1359 /* No data if we're completing with failure */
1360 LASSERT (status == 0 || nob == 0);
1362 LASSERT (type == IBNAL_MSG_GET_DONE ||
1363 type == IBNAL_MSG_PUT_DONE);
1365 /* Flag I'm completing the RDMA. Even if I fail to send the
1366 * completion message, I will have tried my best so further
1367 * attempts shouldn't be tried. */
1368 LASSERT (!rx->rx_rdma);
1371 if (type == IBNAL_MSG_GET_DONE) {
1372 rdma_op = WROpRdmaWrite;
1373 LASSERT (rxmsg->ibm_type == IBNAL_MSG_GET_RDMA);
1375 access.s.LocalWrite = 1;
1376 rdma_op = WROpRdmaRead;
1377 LASSERT (rxmsg->ibm_type == IBNAL_MSG_PUT_RDMA);
1380 tx = kibnal_get_idle_tx (0); /* Mustn't block */
1382 CERROR ("tx descs exhausted on RDMA from "LPX64
1383 " completing locally with failure\n",
1384 rx->rx_conn->ibc_peer->ibp_nid);
1385 lib_finalize (&kibnal_lib, NULL, libmsg, PTL_NO_SPACE);
1388 LASSERT (tx->tx_nsp == 0);
1393 /* We actually need to transfer some data (the transfer
1394 * size could get truncated to zero when the incoming
1395 * message is matched) */
1397 rc = kibnal_map_kiov (tx, access, niov, kiov, offset, nob, 1);
1399 rc = kibnal_map_iov (tx, access, niov, iov, offset, nob, 1);
1402 CERROR ("Can't map RDMA -> "LPX64": %d\n",
1403 rx->rx_conn->ibc_peer->ibp_nid, rc);
1404 /* We'll skip the RDMA and complete with failure. */
1410 if (!kibnal_whole_mem()) {
1411 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_key = tx->tx_md.md_lkey;
1412 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_addr = tx->tx_md.md_addr;
1413 tx->tx_msg->ibm_u.rdma.ibrm_desc[0].rd_nob = nob;
1414 tx->tx_msg->ibm_u.rdma.ibrm_num_descs = 1;
1417 /* XXX ugh. different page-sized hosts. */
1418 if (tx->tx_msg->ibm_u.rdma.ibrm_num_descs !=
1419 rxmsg->ibm_u.rdma.ibrm_num_descs) {
1420 CERROR("tx descs (%u) != rx descs (%u)\n",
1421 tx->tx_msg->ibm_u.rdma.ibrm_num_descs,
1422 rxmsg->ibm_u.rdma.ibrm_num_descs);
1423 /* We'll skip the RDMA and complete with failure. */
1429 /* map_kiov filled in the rdma descs which describe our side of the
1431 /* ibrm_num_descs was verified in rx_callback */
1432 for(i = 0; i < rxmsg->ibm_u.rdma.ibrm_num_descs; i++) {
1433 kib_rdma_desc_t *ldesc, *rdesc; /* local, remote */
1434 IB_LOCAL_DATASEGMENT *ds = &tx->tx_gl[i];
1435 IB_WORK_REQ *wrq = &tx->tx_wrq[i];
1437 ldesc = &tx->tx_msg->ibm_u.rdma.ibrm_desc[i];
1438 rdesc = &rxmsg->ibm_u.rdma.ibrm_desc[i];
1440 ds->Address = ldesc->rd_addr;
1441 ds->Length = ldesc->rd_nob;
1442 ds->Lkey = ldesc->rd_key;
1444 memset(wrq, 0, sizeof(*wrq));
1445 wrq->WorkReqId = kibnal_ptr2wreqid(tx, 0);
1446 wrq->Operation = rdma_op;
1448 wrq->DSListDepth = 1;
1449 wrq->MessageLen = ds->Length;
1450 wrq->Req.SendRC.ImmediateData = 0;
1451 wrq->Req.SendRC.Options.s.SolicitedEvent = 0;
1452 wrq->Req.SendRC.Options.s.SignaledCompletion = 0;
1453 wrq->Req.SendRC.Options.s.ImmediateData = 0;
1454 wrq->Req.SendRC.Options.s.Fence = 0;
1455 wrq->Req.SendRC.RemoteDS.Address = rdesc->rd_addr;
1456 wrq->Req.SendRC.RemoteDS.Rkey = rdesc->rd_key;
1458 /* only the last rdma post triggers tx completion */
1459 if (i == rxmsg->ibm_u.rdma.ibrm_num_descs - 1)
1460 wrq->Req.SendRC.Options.s.SignaledCompletion = 1;
1468 txmsg->ibm_u.completion.ibcm_cookie = rxmsg->ibm_u.rdma.ibrm_cookie;
1469 txmsg->ibm_u.completion.ibcm_status = status;
1471 kibnal_init_tx_msg(tx, type, sizeof (kib_completion_msg_t));
1473 if (status == 0 && nob != 0) {
1474 LASSERT (tx->tx_nsp > 1);
1475 /* RDMA: libmsg gets finalized when the tx completes. This
1476 * is after the completion message has been sent, which in
1477 * turn is after the RDMA has finished. */
1478 tx->tx_libmsg[0] = libmsg;
1480 LASSERT (tx->tx_nsp == 1);
1481 /* No RDMA: local completion happens now! */
1482 CDEBUG(D_WARNING,"No data: immediate completion\n");
1483 lib_finalize (&kibnal_lib, NULL, libmsg,
1484 status == 0 ? PTL_OK : PTL_FAIL);
1487 /* +1 ref for this tx... */
1488 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1489 rx->rx_conn, rx->rx_conn->ibc_state,
1490 rx->rx_conn->ibc_peer->ibp_nid,
1491 atomic_read (&rx->rx_conn->ibc_refcount));
1492 atomic_inc (&rx->rx_conn->ibc_refcount);
1493 /* ...and queue it up */
1494 kibnal_queue_tx(tx, rx->rx_conn);
1498 kibnal_sendmsg(lib_nal_t *nal,
1505 unsigned int payload_niov,
1506 struct iovec *payload_iov,
1507 ptl_kiov_t *payload_kiov,
1508 size_t payload_offset,
1515 /* NB 'private' is different depending on what we're sending.... */
1517 CDEBUG(D_NET, "sending "LPSZ" bytes in %d frags to nid:"LPX64
1518 " pid %d\n", payload_nob, payload_niov, nid , pid);
1520 LASSERT (payload_nob == 0 || payload_niov > 0);
1521 LASSERT (payload_niov <= PTL_MD_MAX_IOV);
1523 /* Thread context if we're sending payload */
1524 LASSERT (!in_interrupt() || payload_niov == 0);
1525 /* payload is either all vaddrs or all pages */
1526 LASSERT (!(payload_kiov != NULL && payload_iov != NULL));
1533 case PTL_MSG_REPLY: {
1534 /* reply's 'private' is the incoming receive */
1535 kib_rx_t *rx = private;
1537 /* RDMA reply expected? */
1538 if (rx->rx_msg->ibm_type == IBNAL_MSG_GET_RDMA) {
1539 kibnal_start_active_rdma(IBNAL_MSG_GET_DONE, 0,
1540 rx, libmsg, payload_niov,
1541 payload_iov, payload_kiov,
1542 payload_offset, payload_nob);
1546 /* Incoming message consistent with immediate reply? */
1547 if (rx->rx_msg->ibm_type != IBNAL_MSG_IMMEDIATE) {
1548 CERROR ("REPLY to "LPX64" bad opbm type %d!!!\n",
1549 nid, rx->rx_msg->ibm_type);
1553 /* Will it fit in a message? */
1554 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1555 if (nob >= IBNAL_MSG_SIZE) {
1556 CERROR("REPLY for "LPX64" too big (RDMA not requested): %d\n",
1564 /* might the REPLY message be big enough to need RDMA? */
1565 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[libmsg->md->length]);
1566 if (nob > IBNAL_MSG_SIZE)
1567 return (kibnal_start_passive_rdma(IBNAL_MSG_GET_RDMA,
1572 LASSERT (payload_nob == 0);
1576 /* Is the payload big enough to need RDMA? */
1577 nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[payload_nob]);
1578 if (nob > IBNAL_MSG_SIZE)
1579 return (kibnal_start_passive_rdma(IBNAL_MSG_PUT_RDMA,
1585 tx = kibnal_get_idle_tx(!(type == PTL_MSG_ACK ||
1586 type == PTL_MSG_REPLY ||
1589 CERROR ("Can't send %d to "LPX64": tx descs exhausted%s\n",
1590 type, nid, in_interrupt() ? " (intr)" : "");
1591 return (PTL_NO_SPACE);
1595 ibmsg->ibm_u.immediate.ibim_hdr = *hdr;
1597 if (payload_nob > 0) {
1598 if (payload_kiov != NULL)
1599 lib_copy_kiov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1600 payload_niov, payload_kiov,
1601 payload_offset, payload_nob);
1603 lib_copy_iov2buf(ibmsg->ibm_u.immediate.ibim_payload,
1604 payload_niov, payload_iov,
1605 payload_offset, payload_nob);
1608 kibnal_init_tx_msg (tx, IBNAL_MSG_IMMEDIATE,
1609 offsetof(kib_immediate_msg_t,
1610 ibim_payload[payload_nob]));
1612 /* libmsg gets finalized when tx completes */
1613 tx->tx_libmsg[0] = libmsg;
1615 kibnal_launch_tx(tx, nid);
1620 kibnal_send (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1621 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1622 unsigned int payload_niov, struct iovec *payload_iov,
1623 size_t payload_offset, size_t payload_len)
1625 return (kibnal_sendmsg(nal, private, cookie,
1626 hdr, type, nid, pid,
1627 payload_niov, payload_iov, NULL,
1628 payload_offset, payload_len));
1632 kibnal_send_pages (lib_nal_t *nal, void *private, lib_msg_t *cookie,
1633 ptl_hdr_t *hdr, int type, ptl_nid_t nid, ptl_pid_t pid,
1634 unsigned int payload_niov, ptl_kiov_t *payload_kiov,
1635 size_t payload_offset, size_t payload_len)
1637 return (kibnal_sendmsg(nal, private, cookie,
1638 hdr, type, nid, pid,
1639 payload_niov, NULL, payload_kiov,
1640 payload_offset, payload_len));
1644 kibnal_recvmsg (lib_nal_t *nal, void *private, lib_msg_t *libmsg,
1645 unsigned int niov, struct iovec *iov, ptl_kiov_t *kiov,
1646 size_t offset, size_t mlen, size_t rlen)
1648 kib_rx_t *rx = private;
1649 kib_msg_t *rxmsg = rx->rx_msg;
1652 LASSERT (mlen <= rlen);
1653 LASSERT (!in_interrupt ());
1654 /* Either all pages or all vaddrs */
1655 LASSERT (!(kiov != NULL && iov != NULL));
1657 switch (rxmsg->ibm_type) {
1662 case IBNAL_MSG_IMMEDIATE:
1663 msg_nob = offsetof(kib_msg_t, ibm_u.immediate.ibim_payload[rlen]);
1664 if (msg_nob > IBNAL_MSG_SIZE) {
1665 CERROR ("Immediate message from "LPX64" too big: %d\n",
1666 rxmsg->ibm_u.immediate.ibim_hdr.src_nid, rlen);
1671 lib_copy_buf2kiov(niov, kiov, offset,
1672 rxmsg->ibm_u.immediate.ibim_payload,
1675 lib_copy_buf2iov(niov, iov, offset,
1676 rxmsg->ibm_u.immediate.ibim_payload,
1679 lib_finalize (nal, NULL, libmsg, PTL_OK);
1682 case IBNAL_MSG_GET_RDMA:
1683 /* We get called here just to discard any junk after the
1685 LASSERT (libmsg == NULL);
1686 lib_finalize (nal, NULL, libmsg, PTL_OK);
1689 case IBNAL_MSG_PUT_RDMA:
1690 kibnal_start_active_rdma (IBNAL_MSG_PUT_DONE, 0,
1692 niov, iov, kiov, offset, mlen);
1698 kibnal_recv (lib_nal_t *nal, void *private, lib_msg_t *msg,
1699 unsigned int niov, struct iovec *iov,
1700 size_t offset, size_t mlen, size_t rlen)
1702 return (kibnal_recvmsg (nal, private, msg, niov, iov, NULL,
1703 offset, mlen, rlen));
1707 kibnal_recv_pages (lib_nal_t *nal, void *private, lib_msg_t *msg,
1708 unsigned int niov, ptl_kiov_t *kiov,
1709 size_t offset, size_t mlen, size_t rlen)
1711 return (kibnal_recvmsg (nal, private, msg, niov, NULL, kiov,
1712 offset, mlen, rlen));
1715 /*****************************************************************************
1716 * the rest of this file concerns connection management. active connetions
1717 * start with connect_peer, passive connections start with passive_callback.
1718 * active disconnects start with conn_close, cm_callback starts passive
1719 * disconnects and contains the guts of how the disconnect state machine
1721 *****************************************************************************/
1724 kibnal_thread_start (int (*fn)(void *arg), void *arg)
1726 long pid = kernel_thread (fn, arg, 0);
1731 atomic_inc (&kibnal_data.kib_nthreads);
1736 kibnal_thread_fini (void)
1738 atomic_dec (&kibnal_data.kib_nthreads);
1741 /* this can be called by anyone at any time to close a connection. if
1742 * the connection is still established it heads to the connd to start
1743 * the disconnection in a safe context. It has no effect if called
1744 * on a connection that is already disconnecting */
1746 kibnal_close_conn_locked (kib_conn_t *conn, int error)
1748 /* This just does the immmediate housekeeping, and schedules the
1749 * connection for the connd to finish off.
1750 * Caller holds kib_global_lock exclusively in irq context */
1751 kib_peer_t *peer = conn->ibc_peer;
1753 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_CONNECTING,
1754 IBNAL_CONN_DISCONNECTED);
1756 if (conn->ibc_state > IBNAL_CONN_ESTABLISHED)
1757 return; /* already disconnecting */
1759 CDEBUG (error == 0 ? D_NET : D_ERROR,
1760 "closing conn to "LPX64": error %d\n", peer->ibp_nid, error);
1762 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED) {
1763 /* kib_connd_conns takes ibc_list's ref */
1764 list_del (&conn->ibc_list);
1766 /* new ref for kib_connd_conns */
1767 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1768 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1769 atomic_read (&conn->ibc_refcount));
1770 atomic_inc (&conn->ibc_refcount);
1773 if (list_empty (&peer->ibp_conns) &&
1774 peer->ibp_persistence == 0) {
1775 /* Non-persistent peer with no more conns... */
1776 kibnal_unlink_peer_locked (peer);
1779 conn->ibc_state = IBNAL_CONN_SEND_DREQ;
1781 spin_lock (&kibnal_data.kib_connd_lock);
1783 list_add_tail (&conn->ibc_list, &kibnal_data.kib_connd_conns);
1784 wake_up (&kibnal_data.kib_connd_waitq);
1786 spin_unlock (&kibnal_data.kib_connd_lock);
1790 kibnal_close_conn (kib_conn_t *conn, int error)
1792 unsigned long flags;
1794 write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1796 kibnal_close_conn_locked (conn, error);
1798 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1802 kibnal_peer_connect_failed (kib_peer_t *peer, int active, int rc)
1804 LIST_HEAD (zombies);
1806 unsigned long flags;
1809 LASSERT (peer->ibp_reconnect_interval >= IBNAL_MIN_RECONNECT_INTERVAL);
1811 write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1813 LASSERT (peer->ibp_connecting != 0);
1814 peer->ibp_connecting--;
1816 if (peer->ibp_connecting != 0) {
1817 /* another connection attempt under way (loopback?)... */
1818 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1822 if (list_empty(&peer->ibp_conns)) {
1823 /* Say when active connection can be re-attempted */
1824 peer->ibp_reconnect_time = jiffies + peer->ibp_reconnect_interval;
1825 /* Increase reconnection interval */
1826 peer->ibp_reconnect_interval = MIN (peer->ibp_reconnect_interval * 2,
1827 IBNAL_MAX_RECONNECT_INTERVAL);
1829 /* Take peer's blocked blocked transmits; I'll complete
1830 * them with error */
1831 while (!list_empty (&peer->ibp_tx_queue)) {
1832 tx = list_entry (peer->ibp_tx_queue.next,
1835 list_del (&tx->tx_list);
1836 list_add_tail (&tx->tx_list, &zombies);
1839 if (kibnal_peer_active(peer) &&
1840 (peer->ibp_persistence == 0)) {
1841 /* failed connection attempt on non-persistent peer */
1842 kibnal_unlink_peer_locked (peer);
1845 /* Can't have blocked transmits if there are connections */
1846 LASSERT (list_empty(&peer->ibp_tx_queue));
1849 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1851 if (!list_empty (&zombies))
1852 CERROR ("Deleting messages for "LPX64": connection failed\n",
1855 while (!list_empty (&zombies)) {
1856 tx = list_entry (zombies.next, kib_tx_t, tx_list);
1858 list_del (&tx->tx_list);
1860 tx->tx_status = -EHOSTUNREACH;
1861 kibnal_tx_done (tx);
1866 kibnal_connreq_done (kib_conn_t *conn, int active, int status)
1868 int state = conn->ibc_state;
1869 kib_peer_t *peer = conn->ibc_peer;
1871 unsigned long flags;
1874 /* passive connection has no connreq & vice versa */
1875 LASSERTF(!active == !(conn->ibc_connreq != NULL),
1876 "%d %p\n", active, conn->ibc_connreq);
1878 PORTAL_FREE (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
1879 conn->ibc_connreq = NULL;
1882 write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
1884 LASSERT (peer->ibp_connecting != 0);
1887 /* connection established... */
1888 KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_CONNECTING);
1889 conn->ibc_state = IBNAL_CONN_ESTABLISHED;
1891 if (!kibnal_peer_active(peer)) {
1892 /* ...but peer deleted meantime */
1893 status = -ECONNABORTED;
1896 KIB_ASSERT_CONN_STATE_RANGE(conn, IBNAL_CONN_INIT_QP,
1897 IBNAL_CONN_CONNECTING);
1901 /* Everything worked! */
1903 peer->ibp_connecting--;
1905 /* +1 ref for ibc_list; caller(== CM)'s ref remains until
1906 * the IB_CM_IDLE callback */
1907 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1908 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1909 atomic_read (&conn->ibc_refcount));
1910 atomic_inc (&conn->ibc_refcount);
1911 list_add (&conn->ibc_list, &peer->ibp_conns);
1913 /* reset reconnect interval for next attempt */
1914 peer->ibp_reconnect_interval = IBNAL_MIN_RECONNECT_INTERVAL;
1916 /* post blocked sends to the new connection */
1917 spin_lock (&conn->ibc_lock);
1919 while (!list_empty (&peer->ibp_tx_queue)) {
1920 tx = list_entry (peer->ibp_tx_queue.next,
1923 list_del (&tx->tx_list);
1925 /* +1 ref for each tx */
1926 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1927 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1928 atomic_read (&conn->ibc_refcount));
1929 atomic_inc (&conn->ibc_refcount);
1930 kibnal_queue_tx_locked (tx, conn);
1933 spin_unlock (&conn->ibc_lock);
1935 /* Nuke any dangling conns from a different peer instance... */
1936 kibnal_close_stale_conns_locked (conn->ibc_peer,
1937 conn->ibc_incarnation);
1939 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1941 /* queue up all the receives */
1942 for (i = 0; i < IBNAL_RX_MSGS; i++) {
1943 /* +1 ref for rx desc */
1944 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
1945 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
1946 atomic_read (&conn->ibc_refcount));
1947 atomic_inc (&conn->ibc_refcount);
1949 CDEBUG(D_NET, "RX[%d] %p->%p - "LPX64"\n",
1950 i, &conn->ibc_rxs[i], conn->ibc_rxs[i].rx_msg,
1951 conn->ibc_rxs[i].rx_vaddr);
1953 kibnal_post_rx (&conn->ibc_rxs[i], 0);
1956 kibnal_check_sends (conn);
1960 /* connection failed */
1961 if (state == IBNAL_CONN_CONNECTING) {
1962 /* schedule for connd to close */
1963 kibnal_close_conn_locked (conn, status);
1965 /* Don't have a CM comm_id; just wait for refs to drain */
1966 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
1969 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
1971 kibnal_peer_connect_failed (conn->ibc_peer, active, status);
1973 /* If we didn't establish the connection we don't have to pass
1974 * through the disconnect protocol before dropping the CM ref */
1975 if (state < IBNAL_CONN_CONNECTING)
1976 kibnal_put_conn (conn);
1980 kibnal_accept (kib_conn_t **connp, IB_HANDLE *cep,
1981 ptl_nid_t nid, __u64 incarnation, int queue_depth)
1983 kib_conn_t *conn = kibnal_create_conn();
1986 unsigned long flags;
1991 if (queue_depth != IBNAL_MSG_QUEUE_SIZE) {
1992 CERROR("Can't accept "LPX64": bad queue depth %d (%d expected)\n",
1993 nid, queue_depth, IBNAL_MSG_QUEUE_SIZE);
1994 atomic_dec (&conn->ibc_refcount);
1995 kibnal_destroy_conn(conn);
1999 /* assume 'nid' is a new peer */
2000 peer = kibnal_create_peer (nid);
2002 CDEBUG(D_NET, "--conn[%p] state %d -> "LPX64" (%d)\n",
2003 conn, conn->ibc_state, conn->ibc_peer->ibp_nid,
2004 atomic_read (&conn->ibc_refcount));
2005 atomic_dec (&conn->ibc_refcount);
2006 kibnal_destroy_conn(conn);
2010 write_lock_irqsave (&kibnal_data.kib_global_lock, flags);
2012 peer2 = kibnal_find_peer_locked(nid);
2013 if (peer2 == NULL) {
2014 /* peer table takes my ref on peer */
2015 list_add_tail (&peer->ibp_list, kibnal_nid2peerlist(nid));
2017 kib_peer_decref (peer);
2021 kib_peer_addref(peer); /* +1 ref for conn */
2022 peer->ibp_connecting++;
2024 write_unlock_irqrestore (&kibnal_data.kib_global_lock, flags);
2026 conn->ibc_peer = peer;
2027 conn->ibc_state = IBNAL_CONN_CONNECTING;
2028 /* conn->ibc_cep is set when cm_accept is called */
2029 conn->ibc_incarnation = incarnation;
2030 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2036 static void kibnal_set_qp_state(IB_HANDLE *qp, IB_QP_STATE state)
2038 IB_QP_ATTRIBUTES_MODIFY modify_attr = {0,};
2041 modify_attr.RequestState = state;
2043 frc = iibt_qp_modify(qp, &modify_attr, NULL);
2044 if (frc != FSUCCESS)
2045 CERROR("couldn't set qp state to %d, error %d\n", state, frc);
2048 static void kibnal_flush_pending(kib_conn_t *conn)
2050 LIST_HEAD (zombies);
2051 struct list_head *tmp;
2052 struct list_head *nxt;
2054 unsigned long flags;
2057 /* NB we wait until the connection has closed before completing
2058 * outstanding passive RDMAs so we can be sure the network can't
2059 * touch the mapped memory any more. */
2060 KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_DISCONNECTED);
2062 /* set the QP to the error state so that we get flush callbacks
2063 * on our posted receives which can then drop their conn refs */
2064 kibnal_set_qp_state(conn->ibc_qp, QPStateError);
2066 spin_lock_irqsave (&conn->ibc_lock, flags);
2068 /* grab passive RDMAs not waiting for the tx callback */
2069 list_for_each_safe (tmp, nxt, &conn->ibc_active_txs) {
2070 tx = list_entry (tmp, kib_tx_t, tx_list);
2072 LASSERT (tx->tx_passive_rdma ||
2073 !tx->tx_passive_rdma_wait);
2075 LASSERT (tx->tx_passive_rdma_wait ||
2076 tx->tx_sending != 0);
2078 /* still waiting for tx callback? */
2079 if (!tx->tx_passive_rdma_wait)
2082 tx->tx_status = -ECONNABORTED;
2083 tx->tx_passive_rdma_wait = 0;
2084 done = (tx->tx_sending == 0);
2089 list_del (&tx->tx_list);
2090 list_add (&tx->tx_list, &zombies);
2093 /* grab all blocked transmits */
2094 list_for_each_safe (tmp, nxt, &conn->ibc_tx_queue) {
2095 tx = list_entry (tmp, kib_tx_t, tx_list);
2097 list_del (&tx->tx_list);
2098 list_add (&tx->tx_list, &zombies);
2101 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2103 while (!list_empty(&zombies)) {
2104 tx = list_entry (zombies.next, kib_tx_t, tx_list);
2106 list_del(&tx->tx_list);
2107 kibnal_tx_done (tx);
2112 kibnal_reject (IB_HANDLE cep, uint16_t reason)
2114 CM_REJECT_INFO *rej;
2116 PORTAL_ALLOC(rej, sizeof(*rej));
2117 if (rej == NULL) /* PORTAL_ALLOC() will CERROR on failure */
2120 rej->Reason = reason;
2121 iibt_cm_reject(cep, rej);
2122 PORTAL_FREE(rej, sizeof(*rej));
2126 kibnal_qp_rts(IB_HANDLE qp_handle, __u32 qpn, __u8 resp_res,
2127 IB_PATH_RECORD *path, __u8 init_depth, __u32 send_psn)
2129 IB_QP_ATTRIBUTES_MODIFY modify_attr;
2133 modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
2134 .RequestState = QPStateReadyToRecv,
2135 .RecvPSN = IBNAL_STARTING_PSN,
2136 .DestQPNumber = qpn,
2137 .ResponderResources = resp_res,
2138 .MinRnrTimer = UsecToRnrNakTimer(2000), /* 20 ms */
2139 .Attrs = (IB_QP_ATTR_RECVPSN |
2140 IB_QP_ATTR_DESTQPNUMBER |
2141 IB_QP_ATTR_RESPONDERRESOURCES |
2143 IB_QP_ATTR_PATHMTU |
2144 IB_QP_ATTR_MINRNRTIMER),
2146 GetAVFromPath(0, path, &modify_attr.PathMTU, NULL,
2147 &modify_attr.DestAV);
2149 frc = iibt_qp_modify(qp_handle, &modify_attr, NULL);
2150 if (frc != FSUCCESS)
2153 modify_attr = (IB_QP_ATTRIBUTES_MODIFY) {
2154 .RequestState = QPStateReadyToSend,
2155 .FlowControl = TRUE,
2156 .InitiatorDepth = init_depth,
2157 .SendPSN = send_psn,
2158 .LocalAckTimeout = path->PktLifeTime + 2, /* 2 or 1? */
2159 .RetryCount = IBNAL_RETRY,
2160 .RnrRetryCount = IBNAL_RNR_RETRY,
2161 .Attrs = (IB_QP_ATTR_FLOWCONTROL |
2162 IB_QP_ATTR_INITIATORDEPTH |
2163 IB_QP_ATTR_SENDPSN |
2164 IB_QP_ATTR_LOCALACKTIMEOUT |
2165 IB_QP_ATTR_RETRYCOUNT |
2166 IB_QP_ATTR_RNRRETRYCOUNT),
2169 frc = iibt_qp_modify(qp_handle, &modify_attr, NULL);
2174 kibnal_connect_reply (IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2176 IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2177 kib_conn_t *conn = arg;
2178 kib_wire_connreq_t *wcr;
2179 CM_REPLY_INFO *rep = &info->Info.Reply;
2183 wcr = (kib_wire_connreq_t *)info->Info.Reply.PrivateData;
2185 if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2186 CERROR ("Can't connect "LPX64": bad magic %08x\n",
2187 conn->ibc_peer->ibp_nid, le32_to_cpu(wcr->wcr_magic));
2188 GOTO(reject, reason = RC_USER_REJ);
2191 if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2192 CERROR ("Can't connect "LPX64": bad version %d\n",
2193 conn->ibc_peer->ibp_nid, le16_to_cpu(wcr->wcr_magic));
2194 GOTO(reject, reason = RC_USER_REJ);
2197 if (wcr->wcr_queue_depth != cpu_to_le16(IBNAL_MSG_QUEUE_SIZE)) {
2198 CERROR ("Can't connect "LPX64": bad queue depth %d\n",
2199 conn->ibc_peer->ibp_nid,
2200 le16_to_cpu(wcr->wcr_queue_depth));
2201 GOTO(reject, reason = RC_USER_REJ);
2204 if (le64_to_cpu(wcr->wcr_nid) != conn->ibc_peer->ibp_nid) {
2205 CERROR ("Unexpected NID "LPX64" from "LPX64"\n",
2206 le64_to_cpu(wcr->wcr_nid), conn->ibc_peer->ibp_nid);
2207 GOTO(reject, reason = RC_USER_REJ);
2210 CDEBUG(D_NET, "Connection %p -> "LPX64" REP_RECEIVED.\n",
2211 conn, conn->ibc_peer->ibp_nid);
2213 conn->ibc_incarnation = le64_to_cpu(wcr->wcr_incarnation);
2214 conn->ibc_credits = IBNAL_MSG_QUEUE_SIZE;
2216 frc = kibnal_qp_rts(conn->ibc_qp, rep->QPN,
2217 min_t(__u8, rep->ArbInitiatorDepth,
2218 ca_attr->MaxQPResponderResources),
2219 &conn->ibc_connreq->cr_path,
2220 min_t(__u8, rep->ArbResponderResources,
2221 ca_attr->MaxQPInitiatorDepth),
2223 if (frc != FSUCCESS) {
2224 CERROR("Connection %p -> "LPX64" QP RTS/RTR failed: %d\n",
2225 conn, conn->ibc_peer->ibp_nid, frc);
2226 GOTO(reject, reason = RC_NO_QP);
2229 /* the callback arguments are ignored for an active accept */
2230 conn->ibc_connreq->cr_discarded.Status = FSUCCESS;
2231 frc = iibt_cm_accept(cep, &conn->ibc_connreq->cr_discarded,
2232 NULL, NULL, NULL, NULL);
2233 if (frc != FCM_CONNECT_ESTABLISHED) {
2234 CERROR("Connection %p -> "LPX64" CMAccept failed: %d\n",
2235 conn, conn->ibc_peer->ibp_nid, frc);
2236 kibnal_connreq_done (conn, 1, -ECONNABORTED);
2237 /* XXX don't call reject after accept fails? */
2241 CDEBUG(D_NET, "Connection %p -> "LPX64" Established\n",
2242 conn, conn->ibc_peer->ibp_nid);
2244 kibnal_connreq_done (conn, 1, 0);
2248 kibnal_reject(cep, reason);
2249 kibnal_connreq_done (conn, 1, -EPROTO);
2252 /* ib_cm.h has a wealth of information on the CM procedures */
2254 kibnal_cm_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2256 kib_conn_t *conn = arg;
2258 CDEBUG(D_NET, "status 0x%x\n", info->Status);
2260 /* Established Connection Notifier */
2261 switch (info->Status) {
2263 CERROR("unknown status %d on Connection %p -> "LPX64"\n",
2264 info->Status, conn, conn->ibc_peer->ibp_nid);
2268 case FCM_CONNECT_REPLY:
2269 kibnal_connect_reply(cep, info, arg);
2272 case FCM_DISCONNECT_REQUEST:
2273 /* XXX lock around these state management bits? */
2274 if (conn->ibc_state == IBNAL_CONN_ESTABLISHED)
2275 kibnal_close_conn (conn, 0);
2276 conn->ibc_state = IBNAL_CONN_DREP;
2277 iibt_cm_disconnect(conn->ibc_cep, NULL, NULL);
2280 /* these both guarantee that no more cm callbacks will occur */
2281 case FCM_DISCONNECTED: /* aka FCM_DISCONNECT_TIMEOUT */
2282 case FCM_DISCONNECT_REPLY:
2283 CDEBUG(D_NET, "Connection %p -> "LPX64" disconnect done.\n",
2284 conn, conn->ibc_peer->ibp_nid);
2286 conn->ibc_state = IBNAL_CONN_DISCONNECTED;
2287 kibnal_flush_pending(conn);
2288 kibnal_put_conn(conn); /* Lose CM's ref */
2296 kibnal_set_cm_flags(IB_HANDLE cep)
2301 frc = iibt_cm_modify_cep(cep, CM_FLAG_TIMEWAIT_CALLBACK,
2302 (char *)&value, sizeof(value), 0);
2303 if (frc != FSUCCESS) {
2304 CERROR("error setting timeout callback: %d\n", frc);
2309 frc = iibt_cm_modify_cep(cep, CM_FLAG_ASYNC_ACCEPT, (char *)&value,
2311 if (frc != FSUCCESS) {
2312 CERROR("error setting async accept: %d\n", frc);
2321 kibnal_listen_callback(IB_HANDLE cep, CM_CONN_INFO *info, void *arg)
2323 IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2324 IB_QP_ATTRIBUTES_QUERY *query;
2325 CM_REQUEST_INFO *req;
2326 CM_CONN_INFO *rep = NULL, *rcv = NULL;
2327 kib_wire_connreq_t *wcr;
2328 kib_conn_t *conn = NULL;
2329 uint16_t reason = 0;
2335 LASSERT(arg == NULL); /* no conn yet for passive */
2337 CDEBUG(D_NET, "status 0x%x\n", info->Status);
2339 req = &info->Info.Request;
2340 wcr = (kib_wire_connreq_t *)req->PrivateData;
2342 CDEBUG(D_NET, "%d from "LPX64"\n", info->Status,
2343 le64_to_cpu(wcr->wcr_nid));
2345 if (info->Status == FCM_CONNECT_CANCEL)
2348 LASSERT (info->Status == FCM_CONNECT_REQUEST);
2350 if (wcr->wcr_magic != cpu_to_le32(IBNAL_MSG_MAGIC)) {
2351 CERROR ("Can't accept: bad magic %08x\n",
2352 le32_to_cpu(wcr->wcr_magic));
2353 GOTO(out, reason = RC_USER_REJ);
2356 if (wcr->wcr_version != cpu_to_le16(IBNAL_MSG_VERSION)) {
2357 CERROR ("Can't accept: bad version %d\n",
2358 le16_to_cpu(wcr->wcr_magic));
2359 GOTO(out, reason = RC_USER_REJ);
2362 rc = kibnal_accept(&conn, cep,
2363 le64_to_cpu(wcr->wcr_nid),
2364 le64_to_cpu(wcr->wcr_incarnation),
2365 le16_to_cpu(wcr->wcr_queue_depth));
2367 CERROR ("Can't accept "LPX64": %d\n",
2368 le64_to_cpu(wcr->wcr_nid), rc);
2369 GOTO(out, reason = RC_NO_RESOURCES);
2372 frc = kibnal_qp_rts(conn->ibc_qp, req->CEPInfo.QPN,
2373 min_t(__u8, req->CEPInfo.OfferedInitiatorDepth,
2374 ca_attr->MaxQPResponderResources),
2375 &req->PathInfo.Path,
2376 min_t(__u8, req->CEPInfo.OfferedResponderResources,
2377 ca_attr->MaxQPInitiatorDepth),
2378 req->CEPInfo.StartingPSN);
2380 if (frc != FSUCCESS) {
2381 CERROR ("Can't mark QP RTS/RTR "LPX64": %d\n",
2382 le64_to_cpu(wcr->wcr_nid), frc);
2383 GOTO(out, reason = RC_NO_QP);
2386 frc = iibt_qp_query(conn->ibc_qp, &conn->ibc_qp_attrs, NULL);
2387 if (frc != FSUCCESS) {
2388 CERROR ("Couldn't query qp attributes "LPX64": %d\n",
2389 le64_to_cpu(wcr->wcr_nid), frc);
2390 GOTO(out, reason = RC_NO_QP);
2392 query = &conn->ibc_qp_attrs;
2394 PORTAL_ALLOC(rep, sizeof(*rep));
2395 PORTAL_ALLOC(rcv, sizeof(*rcv));
2396 if (rep == NULL || rcv == NULL) {
2397 CERROR ("can't reply and receive buffers\n");
2398 GOTO(out, reason = RC_INSUFFICIENT_RESP_RES);
2401 /* don't try to deref this into the incoming wcr :) */
2402 wcr = (kib_wire_connreq_t *)rep->Info.Reply.PrivateData;
2404 rep->Info.Reply = (CM_REPLY_INFO) {
2405 .QPN = query->QPNumber,
2406 .QKey = query->Qkey,
2407 .StartingPSN = query->RecvPSN,
2408 .EndToEndFlowControl = query->FlowControl,
2410 .ArbInitiatorDepth = query->InitiatorDepth,
2411 .ArbResponderResources = query->ResponderResources,
2412 .TargetAckDelay = 0,
2413 .FailoverAccepted = 0,
2414 .RnRRetryCount = req->CEPInfo.RnrRetryCount,
2417 *wcr = (kib_wire_connreq_t) {
2418 .wcr_magic = cpu_to_le32(IBNAL_MSG_MAGIC),
2419 .wcr_version = cpu_to_le16(IBNAL_MSG_VERSION),
2420 .wcr_queue_depth = cpu_to_le32(IBNAL_MSG_QUEUE_SIZE),
2421 .wcr_nid = cpu_to_le64(kibnal_data.kib_nid),
2422 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2425 frc = iibt_cm_accept(cep, rep, rcv, kibnal_cm_callback, conn,
2428 PORTAL_FREE(rep, sizeof(*rep));
2429 PORTAL_FREE(rcv, sizeof(*rcv));
2431 if (frc != FCM_CONNECT_ESTABLISHED) {
2432 /* XXX it seems we don't call reject after this point? */
2433 CERROR("iibt_cm_accept() failed: %d, aborting\n", frc);
2438 if (kibnal_set_cm_flags(conn->ibc_cep)) {
2443 CDEBUG(D_WARNING, "Connection %p -> "LPX64" ESTABLISHED.\n",
2444 conn, conn->ibc_peer->ibp_nid);
2448 kibnal_reject(cep, reason);
2452 kibnal_connreq_done(conn, 0, rc);
2458 dump_path_records(PATH_RESULTS *results)
2460 IB_PATH_RECORD *path;
2463 for(i = 0; i < results->NumPathRecords; i++) {
2464 path = &results->PathRecords[i];
2465 CDEBUG(D_NET, "%d: sgid "LPX64":"LPX64" dgid "
2466 LPX64":"LPX64" pkey %x\n",
2468 path->SGID.Type.Global.SubnetPrefix,
2469 path->SGID.Type.Global.InterfaceID,
2470 path->DGID.Type.Global.SubnetPrefix,
2471 path->DGID.Type.Global.InterfaceID,
2477 kibnal_pathreq_callback (void *arg, QUERY *query,
2478 QUERY_RESULT_VALUES *query_res)
2480 IB_CA_ATTRIBUTES *ca_attr = &kibnal_data.kib_hca_attrs;
2481 kib_conn_t *conn = arg;
2485 if (query_res->Status != FSUCCESS || query_res->ResultDataSize == 0) {
2486 CERROR ("status %d data size %d\n", query_res->Status,
2487 query_res->ResultDataSize);
2488 kibnal_connreq_done (conn, 1, -EINVAL);
2492 path = (PATH_RESULTS *)query_res->QueryResult;
2494 if (path->NumPathRecords < 1) {
2495 CERROR ("expected path records: %d\n", path->NumPathRecords);
2496 kibnal_connreq_done (conn, 1, -EINVAL);
2500 dump_path_records(path);
2502 /* just using the first. this is probably a horrible idea. */
2503 conn->ibc_connreq->cr_path = path->PathRecords[0];
2505 conn->ibc_cep = iibt_cm_create_cep(CM_RC_TYPE);
2506 if (conn->ibc_cep == NULL) {
2507 CERROR ("Can't create CEP\n");
2508 kibnal_connreq_done (conn, 1, -EINVAL);
2512 if (kibnal_set_cm_flags(conn->ibc_cep)) {
2513 kibnal_connreq_done (conn, 1, -EINVAL);
2517 conn->ibc_connreq->cr_wcr = (kib_wire_connreq_t) {
2518 .wcr_magic = cpu_to_le32(IBNAL_MSG_MAGIC),
2519 .wcr_version = cpu_to_le16(IBNAL_MSG_VERSION),
2520 .wcr_queue_depth = cpu_to_le16(IBNAL_MSG_QUEUE_SIZE),
2521 .wcr_nid = cpu_to_le64(kibnal_data.kib_nid),
2522 .wcr_incarnation = cpu_to_le64(kibnal_data.kib_incarnation),
2525 conn->ibc_connreq->cr_cmreq = (CM_REQUEST_INFO) {
2526 .SID = conn->ibc_connreq->cr_service.RID.ServiceID,
2527 .CEPInfo = (CM_CEP_INFO) {
2528 .CaGUID = kibnal_data.kib_hca_guids[0],
2529 .EndToEndFlowControl = FALSE,
2530 .PortGUID = conn->ibc_connreq->cr_path.SGID.Type.Global.InterfaceID,
2531 .RetryCount = IBNAL_RETRY,
2532 .RnrRetryCount = IBNAL_RNR_RETRY,
2533 .AckTimeout = IBNAL_ACK_TIMEOUT,
2534 .StartingPSN = IBNAL_STARTING_PSN,
2535 .QPN = conn->ibc_qp_attrs.QPNumber,
2536 .QKey = conn->ibc_qp_attrs.Qkey,
2537 .OfferedResponderResources = ca_attr->MaxQPResponderResources,
2538 .OfferedInitiatorDepth = ca_attr->MaxQPInitiatorDepth,
2540 .PathInfo = (CM_CEP_PATHINFO) {
2541 .bSubnetLocal = TRUE,
2542 .Path = conn->ibc_connreq->cr_path,
2547 /* XXX set timeout just like SDP!!!*/
2548 conn->ibc_connreq->cr_path.packet_life = 13;
2550 /* Flag I'm getting involved with the CM... */
2551 conn->ibc_state = IBNAL_CONN_CONNECTING;
2553 CDEBUG(D_NET, "Connecting to, service id "LPX64", on "LPX64"\n",
2554 conn->ibc_connreq->cr_service.RID.ServiceID,
2555 *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2557 memset(conn->ibc_connreq->cr_cmreq.PrivateData, 0,
2558 CM_REQUEST_INFO_USER_LEN);
2559 memcpy(conn->ibc_connreq->cr_cmreq.PrivateData,
2560 &conn->ibc_connreq->cr_wcr, sizeof(conn->ibc_connreq->cr_wcr));
2562 /* kibnal_cm_callback gets my conn ref */
2563 frc = iibt_cm_connect(conn->ibc_cep, &conn->ibc_connreq->cr_cmreq,
2564 kibnal_cm_callback, conn);
2565 if (frc != FPENDING && frc != FSUCCESS) {
2566 CERROR ("Connect: %d\n", frc);
2567 /* Back out state change as connect failed */
2568 conn->ibc_state = IBNAL_CONN_INIT_QP;
2569 kibnal_connreq_done (conn, 1, -EINVAL);
2574 dump_service_records(SERVICE_RECORD_RESULTS *results)
2576 IB_SERVICE_RECORD *svc;
2579 for(i = 0; i < results->NumServiceRecords; i++) {
2580 svc = &results->ServiceRecords[i];
2581 CDEBUG(D_NET, "%d: sid "LPX64" gid "LPX64":"LPX64" pkey %x\n",
2584 svc->RID.ServiceGID.Type.Global.SubnetPrefix,
2585 svc->RID.ServiceGID.Type.Global.InterfaceID,
2586 svc->RID.ServiceP_Key);
2592 kibnal_service_get_callback (void *arg, QUERY *query,
2593 QUERY_RESULT_VALUES *query_res)
2595 kib_conn_t *conn = arg;
2596 SERVICE_RECORD_RESULTS *svc;
2597 COMMAND_CONTROL_PARAMETERS sd_params;
2601 if (query_res->Status != FSUCCESS || query_res->ResultDataSize == 0) {
2602 CERROR ("status %d data size %d\n", query_res->Status,
2603 query_res->ResultDataSize);
2604 kibnal_connreq_done (conn, 1, -EINVAL);
2608 svc = (SERVICE_RECORD_RESULTS *)query_res->QueryResult;
2610 if (svc->NumServiceRecords < 1) {
2611 CERROR ("%d service records\n", svc->NumServiceRecords);
2612 kibnal_connreq_done (conn, 1, -EINVAL);
2616 dump_service_records(svc);
2618 conn->ibc_connreq->cr_service = svc->ServiceRecords[0];
2620 CDEBUG(D_NET, "Got status %d, service id "LPX64", on "LPX64"\n",
2621 query_res->Status , conn->ibc_connreq->cr_service.RID.ServiceID,
2622 *kibnal_service_nid_field(&conn->ibc_connreq->cr_service));
2624 memset(&path_query, 0, sizeof(path_query));
2625 path_query.InputType = InputTypePortGuidPair;
2626 path_query.OutputType = OutputTypePathRecord;
2627 path_query.InputValue.PortGuidPair.SourcePortGuid = kibnal_data.kib_port_guid;
2628 path_query.InputValue.PortGuidPair.DestPortGuid = conn->ibc_connreq->cr_service.RID.ServiceGID.Type.Global.InterfaceID;
2630 memset(&sd_params, 0, sizeof(sd_params));
2631 sd_params.RetryCount = IBNAL_RETRY;
2632 sd_params.Timeout = 10 * 1000; /* wait 10 seconds */
2634 /* kibnal_service_get_callback gets my conn ref */
2636 frc = iibt_sd_query_port_fabric_information(kibnal_data.kib_sd,
2637 kibnal_data.kib_port_guid,
2639 kibnal_pathreq_callback,
2641 if (frc == FPENDING)
2644 CERROR ("Path record request failed: %d\n", frc);
2645 kibnal_connreq_done (conn, 1, -EINVAL);
2649 kibnal_connect_peer (kib_peer_t *peer)
2651 COMMAND_CONTROL_PARAMETERS sd_params;
2654 kib_conn_t *conn = kibnal_create_conn();
2656 LASSERT (peer->ibp_connecting != 0);
2659 CERROR ("Can't allocate conn\n");
2660 kibnal_peer_connect_failed (peer, 1, -ENOMEM);
2664 conn->ibc_peer = peer;
2665 kib_peer_addref(peer);
2667 PORTAL_ALLOC (conn->ibc_connreq, sizeof (*conn->ibc_connreq));
2668 if (conn->ibc_connreq == NULL) {
2669 CERROR ("Can't allocate connreq\n");
2670 kibnal_connreq_done (conn, 1, -ENOMEM);
2674 memset(conn->ibc_connreq, 0, sizeof (*conn->ibc_connreq));
2676 kibnal_set_service_keys(&conn->ibc_connreq->cr_service, peer->ibp_nid);
2678 memset(&query, 0, sizeof(query));
2679 query.InputType = InputTypeServiceRecord;
2680 query.OutputType = OutputTypeServiceRecord;
2681 query.InputValue.ServiceRecordValue.ServiceRecord = conn->ibc_connreq->cr_service;
2682 query.InputValue.ServiceRecordValue.ComponentMask = KIBNAL_SERVICE_KEY_MASK;
2684 memset(&sd_params, 0, sizeof(sd_params));
2685 sd_params.RetryCount = IBNAL_RETRY;
2686 sd_params.Timeout = 10 * 1000; /* wait 10 seconds */
2688 /* kibnal_service_get_callback gets my conn ref */
2689 frc = iibt_sd_query_port_fabric_information(kibnal_data.kib_sd,
2690 kibnal_data.kib_port_guid,
2692 kibnal_service_get_callback,
2694 if (frc == FPENDING)
2697 CERROR ("iibt_sd_query_port_fabric_information(): %d\n", frc);
2698 kibnal_connreq_done (conn, 1, frc);
2702 kibnal_conn_timed_out (kib_conn_t *conn)
2705 struct list_head *ttmp;
2706 unsigned long flags;
2708 spin_lock_irqsave (&conn->ibc_lock, flags);
2710 list_for_each (ttmp, &conn->ibc_tx_queue) {
2711 tx = list_entry (ttmp, kib_tx_t, tx_list);
2713 LASSERT (!tx->tx_passive_rdma_wait);
2714 LASSERT (tx->tx_sending == 0);
2716 if (time_after_eq (jiffies, tx->tx_deadline)) {
2717 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2722 list_for_each (ttmp, &conn->ibc_active_txs) {
2723 tx = list_entry (ttmp, kib_tx_t, tx_list);
2725 LASSERT (tx->tx_passive_rdma ||
2726 !tx->tx_passive_rdma_wait);
2728 LASSERT (tx->tx_passive_rdma_wait ||
2729 tx->tx_sending != 0);
2731 if (time_after_eq (jiffies, tx->tx_deadline)) {
2732 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2737 spin_unlock_irqrestore (&conn->ibc_lock, flags);
2743 kibnal_check_conns (int idx)
2745 struct list_head *peers = &kibnal_data.kib_peers[idx];
2746 struct list_head *ptmp;
2749 struct list_head *ctmp;
2752 /* NB. We expect to have a look at all the peers and not find any
2753 * rdmas to time out, so we just use a shared lock while we
2755 read_lock (&kibnal_data.kib_global_lock);
2757 list_for_each (ptmp, peers) {
2758 peer = list_entry (ptmp, kib_peer_t, ibp_list);
2760 list_for_each (ctmp, &peer->ibp_conns) {
2761 conn = list_entry (ctmp, kib_conn_t, ibc_list);
2763 KIB_ASSERT_CONN_STATE(conn, IBNAL_CONN_ESTABLISHED);
2765 /* In case we have enough credits to return via a
2766 * NOOP, but there were no non-blocking tx descs
2767 * free to do it last time... */
2768 kibnal_check_sends(conn);
2770 if (!kibnal_conn_timed_out(conn))
2773 CDEBUG(D_NET, "++conn[%p] state %d -> "LPX64" (%d)\n",
2774 conn, conn->ibc_state, peer->ibp_nid,
2775 atomic_read (&conn->ibc_refcount));
2777 atomic_inc (&conn->ibc_refcount);
2778 read_unlock (&kibnal_data.kib_global_lock);
2780 CERROR("Timed out RDMA with "LPX64"\n",
2783 kibnal_close_conn (conn, -ETIMEDOUT);
2784 kibnal_put_conn (conn);
2786 /* start again now I've dropped the lock */
2791 read_unlock (&kibnal_data.kib_global_lock);
2795 kib_connd_handle_state(kib_conn_t *conn)
2799 switch (conn->ibc_state) {
2800 /* all refs have gone, free and be done with it */
2801 case IBNAL_CONN_DISCONNECTED:
2802 kibnal_destroy_conn (conn);
2803 return; /* avoid put_conn */
2805 case IBNAL_CONN_SEND_DREQ:
2806 frc = iibt_cm_disconnect(conn->ibc_cep, NULL, NULL);
2807 if (frc != FSUCCESS) /* XXX do real things */
2808 CERROR("disconnect failed: %d\n", frc);
2809 conn->ibc_state = IBNAL_CONN_DREQ;
2812 /* a callback got to the conn before we did */
2813 case IBNAL_CONN_DREP:
2817 CERROR ("Bad conn %p state: %d\n", conn,
2823 /* drop ref from close_conn */
2824 kibnal_put_conn(conn);
2828 kibnal_connd (void *arg)
2831 unsigned long flags;
2837 unsigned long deadline = jiffies;
2839 kportal_daemonize ("kibnal_connd");
2840 kportal_blockallsigs ();
2842 init_waitqueue_entry (&wait, current);
2844 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2847 if (!list_empty (&kibnal_data.kib_connd_conns)) {
2848 conn = list_entry (kibnal_data.kib_connd_conns.next,
2849 kib_conn_t, ibc_list);
2850 list_del (&conn->ibc_list);
2852 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2853 kib_connd_handle_state(conn);
2855 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2859 if (!list_empty (&kibnal_data.kib_connd_peers)) {
2860 peer = list_entry (kibnal_data.kib_connd_peers.next,
2861 kib_peer_t, ibp_connd_list);
2863 list_del_init (&peer->ibp_connd_list);
2864 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2866 kibnal_connect_peer (peer);
2867 kib_peer_decref (peer);
2869 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2872 /* shut down and nobody left to reap... */
2873 if (kibnal_data.kib_shutdown &&
2874 atomic_read(&kibnal_data.kib_nconns) == 0)
2877 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2879 /* careful with the jiffy wrap... */
2880 while ((timeout = (int)(deadline - jiffies)) <= 0) {
2883 int chunk = kibnal_data.kib_peer_hash_size;
2885 /* Time to check for RDMA timeouts on a few more
2886 * peers: I do checks every 'p' seconds on a
2887 * proportion of the peer table and I need to check
2888 * every connection 'n' times within a timeout
2889 * interval, to ensure I detect a timeout on any
2890 * connection within (n+1)/n times the timeout
2893 if (kibnal_tunables.kib_io_timeout > n * p)
2894 chunk = (chunk * n * p) /
2895 kibnal_tunables.kib_io_timeout;
2899 for (i = 0; i < chunk; i++) {
2900 kibnal_check_conns (peer_index);
2901 peer_index = (peer_index + 1) %
2902 kibnal_data.kib_peer_hash_size;
2908 kibnal_data.kib_connd_waketime = jiffies + timeout;
2910 set_current_state (TASK_INTERRUPTIBLE);
2911 add_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2913 if (!kibnal_data.kib_shutdown &&
2914 list_empty (&kibnal_data.kib_connd_conns) &&
2915 list_empty (&kibnal_data.kib_connd_peers))
2916 schedule_timeout (timeout);
2918 set_current_state (TASK_RUNNING);
2919 remove_wait_queue (&kibnal_data.kib_connd_waitq, &wait);
2921 spin_lock_irqsave (&kibnal_data.kib_connd_lock, flags);
2924 spin_unlock_irqrestore (&kibnal_data.kib_connd_lock, flags);
2926 kibnal_thread_fini ();
2931 kibnal_scheduler(void *arg)
2933 long id = (long)arg;
2937 unsigned long flags;
2942 snprintf(name, sizeof(name), "kibnal_sd_%02ld", id);
2943 kportal_daemonize(name);
2944 kportal_blockallsigs();
2946 spin_lock_irqsave(&kibnal_data.kib_sched_lock, flags);
2951 while (!list_empty(&kibnal_data.kib_sched_txq)) {
2952 tx = list_entry(kibnal_data.kib_sched_txq.next,
2954 list_del(&tx->tx_list);
2955 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2959 spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2963 if (!list_empty(&kibnal_data.kib_sched_rxq)) {
2964 rx = list_entry(kibnal_data.kib_sched_rxq.next,
2966 list_del(&rx->rx_list);
2967 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2973 spin_lock_irqsave(&kibnal_data.kib_sched_lock,
2977 /* shut down and no receives to complete... */
2978 if (kibnal_data.kib_shutdown &&
2979 atomic_read(&kibnal_data.kib_nconns) == 0)
2982 /* nothing to do or hogging CPU */
2983 if (!did_something || counter++ == IBNAL_RESCHED) {
2984 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock,
2988 if (!did_something) {
2989 rc = wait_event_interruptible(
2990 kibnal_data.kib_sched_waitq,
2991 !list_empty(&kibnal_data.kib_sched_txq) ||
2992 !list_empty(&kibnal_data.kib_sched_rxq) ||
2993 (kibnal_data.kib_shutdown &&
2994 atomic_read (&kibnal_data.kib_nconns) == 0));
2999 spin_lock_irqsave(&kibnal_data.kib_sched_lock,
3004 spin_unlock_irqrestore(&kibnal_data.kib_sched_lock, flags);
3006 kibnal_thread_fini();
3011 lib_nal_t kibnal_lib = {
3012 libnal_data: &kibnal_data, /* NAL private data */
3013 libnal_send: kibnal_send,
3014 libnal_send_pages: kibnal_send_pages,
3015 libnal_recv: kibnal_recv,
3016 libnal_recv_pages: kibnal_recv_pages,
3017 libnal_dist: kibnal_dist