2 * Copyright (C) 2004 Cluster File Systems, Inc.
4 * Copyright (C) 2009-2012 Cray, Inc.
6 * Derived from work by Eric Barton <eric@bartonsoftware.com>
7 * Author: Nic Henke <nic@cray.com>
9 * This file is part of Lustre, http://www.lustre.org.
11 * Lustre is free software; you can redistribute it and/or
12 * modify it under the terms of version 2 of the GNU General Public
13 * License as published by the Free Software Foundation.
15 * Lustre is distributed in the hope that it will be useful,
16 * but WITHOUT ANY WARRANTY; without even the implied warranty of
17 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18 * GNU General Public License for more details.
20 * You should have received a copy of the GNU General Public License
21 * along with Lustre; if not, write to the Free Software
22 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
26 #include <linux/nmi.h>
29 /* this is useful when needed to debug wire corruption. */
31 kgnilnd_dump_blob(int level, char *prefix, void *buf, int len) {
39 "%s 0x%p: 0x%16.16llx 0x%16.16llx 0x%16.16llx 0x%16.16llx\n",
40 prefix, ptr, *(ptr), *(ptr + 1), *(ptr + 2), *(ptr + 3));
43 } else if (len >= 16) {
45 "%s 0x%p: 0x%16.16llx 0x%16.16llx\n",
46 prefix, ptr, *(ptr), *(ptr + 1));
50 CDEBUG(level, "%s 0x%p: 0x%16.16llx\n",
59 kgnilnd_dump_msg(int mask, kgn_msg_t *msg)
61 CDEBUG(mask, "0x%8.8x 0x%4.4x 0x%4.4x 0x%16.16llx"
62 " 0x%16.16llx 0x%8.8x 0x%4.4x 0x%4.4x 0x%8.8x\n",
63 msg->gnm_magic, msg->gnm_version,
64 msg->gnm_type, msg->gnm_srcnid,
65 msg->gnm_connstamp, msg->gnm_seq,
66 msg->gnm_cksum, msg->gnm_payload_cksum,
67 msg->gnm_payload_len);
71 kgnilnd_schedule_device(kgn_device_t *dev)
73 short already_live = 0;
75 /* we'll only want to wake if the scheduler thread
76 * has come around and set ready to zero */
77 already_live = cmpxchg(&dev->gnd_ready, GNILND_DEV_IDLE, GNILND_DEV_IRQ);
80 wake_up_all(&dev->gnd_waitq);
85 void kgnilnd_schedule_device_timer(unsigned long arg)
87 kgn_device_t *dev = (kgn_device_t *) arg;
89 kgnilnd_schedule_device(dev);
93 kgnilnd_device_callback(__u32 devid, __u64 arg)
96 int index = (int) arg;
98 if (index >= kgnilnd_data.kgn_ndevs) {
99 /* use _EMERG instead of an LBUG to prevent LBUG'ing in
100 * interrupt context. */
101 LCONSOLE_EMERG("callback for unknown device %d->%d\n",
106 dev = &kgnilnd_data.kgn_devices[index];
107 /* just basic sanity */
108 if (dev->gnd_id == devid) {
109 kgnilnd_schedule_device(dev);
111 LCONSOLE_EMERG("callback for bad device %d devid %d\n",
116 /* sched_intent values:
117 * < 0 : do not reschedule under any circumstances
118 * == 0: reschedule if someone marked him WANTS_SCHED
119 * > 0 : force a reschedule */
122 kgnilnd_schedule_process_conn(kgn_conn_t *conn, int sched_intent)
126 /* move back to IDLE but save previous state.
127 * if we see WANTS_SCHED, we'll call kgnilnd_schedule_conn and
128 * let the xchg there handle any racing callers to get it
129 * onto gnd_ready_conns */
131 conn_sched = xchg(&conn->gnc_scheduled, GNILND_CONN_IDLE);
132 LASSERTF(conn_sched == GNILND_CONN_WANTS_SCHED ||
133 conn_sched == GNILND_CONN_PROCESS,
134 "conn %p after process in bad state: %d\n",
137 if (sched_intent >= 0) {
138 if ((sched_intent > 0 || (conn_sched == GNILND_CONN_WANTS_SCHED))) {
139 kgnilnd_schedule_conn(conn);
145 kgnilnd_schedule_conn(kgn_conn_t *conn)
147 kgn_device_t *dev = conn->gnc_device;
150 sched = xchg(&conn->gnc_scheduled, GNILND_CONN_WANTS_SCHED);
152 /* if we are IDLE, add to list - only one guy sees IDLE and "wins"
153 * the chance to put it onto gnd_ready_conns.
154 * otherwise, leave marked as WANTS_SCHED and the thread that "owns"
155 * the conn in process_conns will take care of moving it back to
156 * SCHED when it is done processing */
158 if (sched == GNILND_CONN_IDLE) {
159 /* if the conn is already scheduled, we've already requested
160 * the scheduler thread wakeup */
161 kgnilnd_conn_addref(conn); /* +1 ref for scheduler */
163 LASSERTF(list_empty(&conn->gnc_schedlist), "conn %p already sched state %d\n",
166 CDEBUG(D_INFO, "scheduling conn 0x%p\n", conn);
168 spin_lock(&dev->gnd_lock);
169 list_add_tail(&conn->gnc_schedlist, &dev->gnd_ready_conns);
170 spin_unlock(&dev->gnd_lock);
171 set_mb(conn->gnc_last_sched_ask, jiffies);
174 CDEBUG(D_INFO, "not scheduling conn 0x%p: %d\n", conn, sched);
177 /* make sure thread(s) going to process conns - but let it make
178 * separate decision from conn schedule */
179 kgnilnd_schedule_device(dev);
183 kgnilnd_schedule_dgram(kgn_device_t *dev)
187 wake = xchg(&dev->gnd_dgram_ready, GNILND_DGRAM_SCHED);
188 if (wake != GNILND_DGRAM_SCHED) {
189 wake_up(&dev->gnd_dgram_waitq);
191 CDEBUG(D_NETTRACE, "not waking: %d\n", wake);
196 kgnilnd_free_tx(kgn_tx_t *tx)
198 /* taken from kgnilnd_tx_add_state_locked */
200 LASSERTF((tx->tx_list_p == NULL &&
201 tx->tx_list_state == GNILND_TX_ALLOCD) &&
202 list_empty(&tx->tx_list),
203 "tx %p with bad state %s (list_p %p) tx_list %s\n",
204 tx, kgnilnd_tx_state2str(tx->tx_list_state), tx->tx_list_p,
205 list_empty(&tx->tx_list) ? "empty" : "not empty");
207 atomic_dec(&kgnilnd_data.kgn_ntx);
209 /* we only allocate this if we need to */
210 if (tx->tx_phys != NULL) {
211 cfs_mem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys);
212 CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n",
213 LNET_MAX_IOV * sizeof(gni_mem_segment_t), tx->tx_phys);
216 KGNILND_POISON(tx, 0x5a, sizeof(kgn_tx_t));
218 cfs_mem_cache_free(kgnilnd_data.kgn_tx_cache, tx);
219 CDEBUG(D_MALLOC, "slab-freed 'tx': %lu at %p.\n",
224 kgnilnd_alloc_tx(void)
228 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_ALLOC_TX))
231 tx = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_cache, CFS_ALLOC_ATOMIC);
233 CERROR("failed to allocate tx\n");
236 CDEBUG(D_MALLOC, "slab-alloced 'tx': %lu at %p.\n",
239 /* need this memset, cache alloc'd memory is not cleared */
240 memset(tx, 0, sizeof(*tx));
242 /* setup everything here to minimize time under the lock */
243 tx->tx_buftype = GNILND_BUF_NONE;
244 tx->tx_msg.gnm_type = GNILND_MSG_NONE;
245 INIT_LIST_HEAD(&tx->tx_list);
246 INIT_LIST_HEAD(&tx->tx_map_list);
247 tx->tx_list_state = GNILND_TX_ALLOCD;
249 atomic_inc(&kgnilnd_data.kgn_ntx);
254 /* csum_fold needs to be run on the return value before shipping over the wire */
255 #define _kgnilnd_cksum(seed, ptr, nob) csum_partial(ptr, nob, seed)
257 /* we don't use offset as every one is passing a buffer reference that already
258 * includes the offset into the base address -
259 * see kgnilnd_setup_virt_buffer and kgnilnd_setup_immediate_buffer */
261 kgnilnd_cksum(void *ptr, size_t nob)
265 sum = csum_fold(_kgnilnd_cksum(0, ptr, nob));
267 /* don't use magic 'no checksum' value */
271 CDEBUG(D_INFO, "cksum 0x%x for ptr 0x%p sz %zu\n",
278 kgnilnd_cksum_kiov(unsigned int nkiov, lnet_kiov_t *kiov,
279 unsigned int offset, unsigned int nob, int dump_blob)
285 unsigned int fraglen;
291 CDEBUG(D_BUFFS, "calc cksum for kiov 0x%p nkiov %u offset %u nob %u, dump %d\n",
292 kiov, nkiov, offset, nob, dump_blob);
294 /* if loops changes, please change kgnilnd_setup_phys_buffer */
296 while (offset >= kiov->kiov_len) {
297 offset -= kiov->kiov_len;
303 /* ignore nob here, if nob < (kiov_len - offset), kiov == 1 */
304 odd = (unsigned long) (kiov[0].kiov_len - offset) & 1;
306 if ((odd || *kgnilnd_tunables.kgn_vmap_cksum) && nkiov > 1) {
307 struct page **pages = kgnilnd_data.kgn_cksum_map_pages[get_cpu()];
309 LASSERTF(pages != NULL, "NULL pages for cpu %d map_pages 0x%p\n",
310 get_cpu(), kgnilnd_data.kgn_cksum_map_pages);
312 CDEBUG(D_BUFFS, "odd %d len %u offset %u nob %u\n",
313 odd, kiov[0].kiov_len, offset, nob);
315 for (i = 0; i < nkiov; i++) {
316 pages[i] = kiov[i].kiov_page;
319 addr = vmap(pages, nkiov, VM_MAP, PAGE_KERNEL);
321 CNETERR("Couldn't vmap %d frags on %d bytes to avoid odd length fragment in cksum\n",
323 /* return zero to avoid killing tx - we'll just get warning on console
324 * when remote end sees zero checksum */
327 atomic_inc(&kgnilnd_data.kgn_nvmap_cksum);
329 tmpck = _kgnilnd_cksum(0, (void *) addr + kiov[0].kiov_offset + offset, nob);
333 kgnilnd_dump_blob(D_BUFFS, "flat kiov RDMA payload",
334 (void *)addr + kiov[0].kiov_offset + offset, nob);
336 CDEBUG(D_BUFFS, "cksum 0x%x (+0x%x) for addr 0x%p+%u len %u offset %u\n",
337 cksum, tmpck, addr, kiov[0].kiov_offset, nob, offset);
341 fraglen = min(kiov->kiov_len - offset, nob);
343 /* make dang sure we don't send a bogus checksum if somehow we get
344 * an odd length fragment on anything but the last entry in a kiov -
345 * we know from kgnilnd_setup_rdma_buffer that we can't have non
346 * PAGE_SIZE pages in the middle, so if nob < PAGE_SIZE, it is the last one */
347 LASSERTF(!(fraglen&1) || (nob < PAGE_SIZE),
348 "odd fraglen %u on nkiov %d, nob %u kiov_len %u offset %u kiov 0x%p\n",
349 fraglen, nkiov, nob, kiov->kiov_len, offset, kiov);
351 addr = (void *)kmap(kiov->kiov_page) + kiov->kiov_offset + offset;
352 tmpck = _kgnilnd_cksum(cksum, addr, fraglen);
355 "cksum 0x%x (+0x%x) for page 0x%p+%u (0x%p) len %u offset %u\n",
356 cksum, tmpck, kiov->kiov_page, kiov->kiov_offset, addr,
362 kgnilnd_dump_blob(D_BUFFS, "kiov cksum", addr, fraglen);
364 kunmap(kiov->kiov_page);
371 /* iov must not run out before end of data */
372 LASSERTF(nob == 0 || nkiov > 0, "nob %u nkiov %u\n", nob, nkiov);
377 retsum = csum_fold(cksum);
379 /* don't use magic 'no checksum' value */
383 CDEBUG(D_BUFFS, "retsum 0x%x from cksum 0x%x\n", retsum, cksum);
389 kgnilnd_init_msg(kgn_msg_t *msg, int type, lnet_nid_t source)
391 msg->gnm_magic = GNILND_MSG_MAGIC;
392 msg->gnm_version = GNILND_MSG_VERSION;
393 msg->gnm_type = type;
394 msg->gnm_payload_len = 0;
395 msg->gnm_srcnid = source;
396 /* gnm_connstamp gets set when FMA is sent */
397 /* gnm_srcnid is set on creation via function argument
398 * The right interface/net and nid is passed in when the message
404 kgnilnd_new_tx_msg(int type, lnet_nid_t source)
406 kgn_tx_t *tx = kgnilnd_alloc_tx();
409 kgnilnd_init_msg(&tx->tx_msg, type, source);
411 CERROR("couldn't allocate new tx type %s!\n",
412 kgnilnd_msgtype2str(type));
419 kgnilnd_nak_rdma(kgn_conn_t *conn, int type, int error, __u64 cookie, lnet_nid_t source) {
422 /* only allow NAK on error and truncate to zero */
423 LASSERTF(error <= 0, "error %d conn 0x%p, cookie "LPU64"\n",
424 error, conn, cookie);
426 tx = kgnilnd_new_tx_msg(type, source);
428 CNETERR("can't get TX to NAK RDMA to %s\n",
429 libcfs_nid2str(conn->gnc_peer->gnp_nid));
433 tx->tx_msg.gnm_u.completion.gncm_retval = error;
434 tx->tx_msg.gnm_u.completion.gncm_cookie = cookie;
435 kgnilnd_queue_tx(conn, tx);
439 kgnilnd_setup_immediate_buffer(kgn_tx_t *tx, unsigned int niov, struct iovec *iov,
440 lnet_kiov_t *kiov, unsigned int offset, unsigned int nob)
443 kgn_msg_t *msg = &tx->tx_msg;
446 /* To help save on MDDs for short messages, we'll vmap a kiov to allow
447 * gni_smsg_send to send that as the payload */
449 LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
453 tx->tx_buffer = NULL;
454 } else if (kiov != NULL) {
455 LASSERTF(niov > 0 && niov < GNILND_MAX_IMMEDIATE/PAGE_SIZE,
456 "bad niov %d\n", niov);
458 while (offset >= kiov->kiov_len) {
459 offset -= kiov->kiov_len;
464 for (i = 0; i < niov; i++) {
465 /* We can't have a kiov_offset on anything but the first entry,
466 * otherwise we'll have a hole at the end of the mapping as we only map
468 * Also, if we have a kiov_len < PAGE_SIZE but we need to map more
469 * than kiov_len, we will also have a whole at the end of that page
470 * which isn't allowed */
471 if ((kiov[i].kiov_offset != 0 && i > 0) ||
472 (kiov[i].kiov_offset + kiov[i].kiov_len != CFS_PAGE_SIZE && i < niov - 1)) {
473 CNETERR("Can't make payload contiguous in I/O VM:"
474 "page %d, offset %u, nob %u, kiov_offset %u kiov_len %u \n",
475 i, offset, nob, kiov->kiov_offset, kiov->kiov_len);
478 tx->tx_imm_pages[i] = kiov[i].kiov_page;
481 /* hijack tx_phys for the later unmap */
483 /* tx->phyx being equal to NULL is the signal for unmap to discern between kmap and vmap */
485 tx->tx_buffer = (void *)kmap(tx->tx_imm_pages[0]) + kiov[0].kiov_offset + offset;
486 atomic_inc(&kgnilnd_data.kgn_nkmap_short);
487 GNIDBG_TX(D_NET, tx, "kmapped page for %d bytes for kiov 0x%p, buffer 0x%p",
488 nob, kiov, tx->tx_buffer);
490 tx->tx_phys = vmap(tx->tx_imm_pages, niov, VM_MAP, PAGE_KERNEL);
491 if (tx->tx_phys == NULL) {
492 CNETERR("Couldn't vmap %d frags on %d bytes\n", niov, nob);
496 atomic_inc(&kgnilnd_data.kgn_nvmap_short);
497 /* make sure we take into account the kiov offset as the start of the buffer */
498 tx->tx_buffer = (void *)tx->tx_phys + kiov[0].kiov_offset + offset;
499 GNIDBG_TX(D_NET, tx, "mapped %d pages for %d bytes from kiov 0x%p to 0x%p, buffer 0x%p",
500 niov, nob, kiov, tx->tx_phys, tx->tx_buffer);
502 tx->tx_buftype = GNILND_BUF_IMMEDIATE_KIOV;
506 /* For now this is almost identical to kgnilnd_setup_virt_buffer, but we
507 * could "flatten" the payload into a single contiguous buffer ready
508 * for sending direct over an FMA if we ever needed to. */
512 while (offset >= iov->iov_len) {
513 offset -= iov->iov_len;
519 if (nob > iov->iov_len - offset) {
520 CERROR("Can't handle multiple vaddr fragments\n");
524 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
526 tx->tx_buftype = GNILND_BUF_IMMEDIATE;
530 /* checksum payload early - it shouldn't be changing after lnd_send */
531 if (*kgnilnd_tunables.kgn_checksum >= 2) {
532 msg->gnm_payload_cksum = kgnilnd_cksum(tx->tx_buffer, nob);
533 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_SMSG_CKSUM2)) {
534 msg->gnm_payload_cksum += 0xe00e;
536 if (*kgnilnd_tunables.kgn_checksum_dump > 1) {
537 kgnilnd_dump_blob(D_BUFFS, "payload checksum",
541 msg->gnm_payload_cksum = 0;
548 kgnilnd_setup_virt_buffer(kgn_tx_t *tx,
549 unsigned int niov, struct iovec *iov,
550 unsigned int offset, unsigned int nob)
555 LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
557 while (offset >= iov->iov_len) {
558 offset -= iov->iov_len;
564 if (nob > iov->iov_len - offset) {
565 CERROR("Can't handle multiple vaddr fragments\n");
569 tx->tx_buftype = GNILND_BUF_VIRT_UNMAPPED;
571 tx->tx_buffer = (void *)(((unsigned long)iov->iov_base) + offset);
576 kgnilnd_setup_phys_buffer(kgn_tx_t *tx, int nkiov, lnet_kiov_t *kiov,
577 unsigned int offset, unsigned int nob)
579 gni_mem_segment_t *phys;
581 unsigned int fraglen;
583 GNIDBG_TX(D_NET, tx, "niov %d kiov 0x%p offset %u nob %u", nkiov, kiov, offset, nob);
587 LASSERT(tx->tx_buftype == GNILND_BUF_NONE);
589 /* only allocate this if we are going to use it */
590 tx->tx_phys = cfs_mem_cache_alloc(kgnilnd_data.kgn_tx_phys_cache,
592 if (tx->tx_phys == NULL) {
593 CERROR("failed to allocate tx_phys\n");
598 CDEBUG(D_MALLOC, "slab-alloced 'tx->tx_phys': %lu at %p.\n",
599 LNET_MAX_IOV * sizeof(gni_mem_segment_t), tx->tx_phys);
601 /* if loops changes, please change kgnilnd_cksum_kiov
602 * and kgnilnd_setup_immediate_buffer */
604 while (offset >= kiov->kiov_len) {
605 offset -= kiov->kiov_len;
611 /* at this point, kiov points to the first page that we'll actually map
612 * now that we've seeked into the koiv for offset and dropped any
613 * leading pages that fall entirely within the offset */
614 tx->tx_buftype = GNILND_BUF_PHYS_UNMAPPED;
617 /* kiov_offset is start of 'valid' buffer, so index offset past that */
618 tx->tx_buffer = (void *)((unsigned long)(kiov->kiov_offset + offset));
621 CDEBUG(D_NET, "tx 0x%p buffer 0x%p map start kiov 0x%p+%u niov %d offset %u\n",
622 tx, tx->tx_buffer, kiov, kiov->kiov_offset, nkiov, offset);
625 fraglen = min(kiov->kiov_len - offset, nob);
627 /* We can't have a kiov_offset on anything but the first entry,
628 * otherwise we'll have a hole at the end of the mapping as we only map
629 * whole pages. Only the first page is allowed to have an offset -
630 * we'll add that into tx->tx_buffer and that will get used when we
631 * map in the segments (see kgnilnd_map_buffer).
632 * Also, if we have a kiov_len < PAGE_SIZE but we need to map more
633 * than kiov_len, we will also have a whole at the end of that page
634 * which isn't allowed */
635 if ((phys != tx->tx_phys) &&
636 ((kiov->kiov_offset != 0) ||
637 ((kiov->kiov_len < PAGE_SIZE) && (nob > kiov->kiov_len)))) {
638 CERROR("Can't make payload contiguous in I/O VM:"
639 "page %d, offset %u, nob %u, kiov_offset %u kiov_len %u \n",
640 (int)(phys - tx->tx_phys),
641 offset, nob, kiov->kiov_offset, kiov->kiov_len);
646 if ((phys - tx->tx_phys) == LNET_MAX_IOV) {
647 CERROR ("payload too big (%d)\n", (int)(phys - tx->tx_phys));
652 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_PHYS_SETUP)) {
657 CDEBUG(D_BUFFS, "page 0x%p kiov_offset %u kiov_len %u nob %u "
658 "nkiov %u offset %u\n",
659 kiov->kiov_page, kiov->kiov_offset, kiov->kiov_len, nob, nkiov, offset);
661 phys->address = lnet_page2phys(kiov->kiov_page);
668 /* iov must not run out before end of data */
669 LASSERTF(nob == 0 || nkiov > 0, "nob %u nkiov %u\n", nob, nkiov);
673 tx->tx_phys_npages = phys - tx->tx_phys;
678 if (tx->tx_phys != NULL) {
679 cfs_mem_cache_free(kgnilnd_data.kgn_tx_phys_cache, tx->tx_phys);
680 CDEBUG(D_MALLOC, "slab-freed 'tx_phys': %lu at %p.\n",
681 sizeof(*tx->tx_phys), tx->tx_phys);
688 kgnilnd_setup_rdma_buffer(kgn_tx_t *tx, unsigned int niov,
689 struct iovec *iov, lnet_kiov_t *kiov,
690 unsigned int offset, unsigned int nob)
694 LASSERT((iov == NULL) != (kiov == NULL));
697 rc = kgnilnd_setup_phys_buffer(tx, niov, kiov, offset, nob);
699 rc = kgnilnd_setup_virt_buffer(tx, niov, iov, offset, nob);
705 kgnilnd_parse_lnet_rdma(lnet_msg_t *lntmsg, unsigned int *niov, unsigned int *offset,
706 unsigned int *nob, lnet_kiov_t **kiov)
708 /* GETs are weird, see kgnilnd_send */
709 if (lntmsg->msg_type == LNET_MSG_GET) {
710 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0) {
713 *kiov = lntmsg->msg_md->md_iov.kiov;
715 *niov = lntmsg->msg_md->md_niov;
716 *nob = lntmsg->msg_md->md_length;
719 *kiov = lntmsg->msg_kiov;
720 *niov = lntmsg->msg_niov;
721 *nob = lntmsg->msg_len;
722 *offset = lntmsg->msg_offset;
727 kgnilnd_compute_rdma_cksum(kgn_tx_t *tx)
729 unsigned int niov, offset, nob;
731 lnet_msg_t *lntmsg = tx->tx_lntmsg[0];
732 int dump_cksum = (*kgnilnd_tunables.kgn_checksum_dump > 1);
734 GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) ||
735 (tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE)),
736 "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
739 if (*kgnilnd_tunables.kgn_checksum < 3) {
740 tx->tx_msg.gnm_payload_cksum = 0;
744 GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
746 kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov);
749 tx->tx_msg.gnm_payload_cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, dump_cksum);
751 tx->tx_msg.gnm_payload_cksum = kgnilnd_cksum(tx->tx_buffer, nob);
753 kgnilnd_dump_blob(D_BUFFS, "peer RDMA payload", tx->tx_buffer, nob);
757 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_SMSG_CKSUM3)) {
758 tx->tx_msg.gnm_payload_cksum += 0xd00d;
763 kgnilnd_verify_rdma_cksum(kgn_tx_t *tx, __u16 rx_cksum)
767 unsigned int niov, offset, nob;
769 lnet_msg_t *lntmsg = tx->tx_lntmsg[0];
770 int dump_on_err = *kgnilnd_tunables.kgn_checksum_dump;
772 /* we can only match certain requests */
773 GNITX_ASSERTF(tx, ((tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) ||
774 (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK)),
775 "bad type %s", kgnilnd_msgtype2str(tx->tx_msg.gnm_type));
778 if (*kgnilnd_tunables.kgn_checksum >= 3) {
779 GNIDBG_MSG(D_WARNING, &tx->tx_msg,
780 "no RDMA payload checksum when enabled");
785 GNITX_ASSERTF(tx, lntmsg, "no LNet message!", NULL);
787 kgnilnd_parse_lnet_rdma(lntmsg, &niov, &offset, &nob, &kiov);
790 cksum = kgnilnd_cksum_kiov(niov, kiov, offset, nob, 0);
792 cksum = kgnilnd_cksum(tx->tx_buffer, nob);
795 if (cksum != rx_cksum) {
796 GNIDBG_MSG(D_NETERROR, &tx->tx_msg,
797 "Bad RDMA payload checksum (%x expected %x); "
798 "kiov 0x%p niov %d nob %u offset %u",
799 cksum, rx_cksum, kiov, niov, nob, offset);
800 switch (dump_on_err) {
803 kgnilnd_cksum_kiov(niov, kiov, offset, nob, 1);
805 kgnilnd_dump_blob(D_BUFFS, "RDMA payload",
808 /* fall through to dump log */
810 libcfs_debug_dumplog();
816 /* kgnilnd_check_fma_rx will close conn, kill tx with error */
822 kgnilnd_mem_add_map_list(kgn_device_t *dev, kgn_tx_t *tx)
826 GNITX_ASSERTF(tx, list_empty(&tx->tx_map_list),
827 "already mapped!", NULL);
829 spin_lock(&dev->gnd_map_lock);
830 switch (tx->tx_buftype) {
832 GNIDBG_TX(D_EMERG, tx,
833 "SOFTWARE BUG: invalid mapping %d", tx->tx_buftype);
834 spin_unlock(&dev->gnd_map_lock);
838 case GNILND_BUF_PHYS_MAPPED:
839 bytes = tx->tx_phys_npages * PAGE_SIZE;
840 dev->gnd_map_nphys++;
841 dev->gnd_map_physnop += tx->tx_phys_npages;
844 case GNILND_BUF_VIRT_MAPPED:
846 dev->gnd_map_nvirt++;
847 dev->gnd_map_virtnob += tx->tx_nob;
851 if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
852 tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
853 atomic64_add(bytes, &dev->gnd_rdmaq_bytes_out);
854 GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to "LPD64"",
855 bytes, atomic64_read(&dev->gnd_rdmaq_bytes_out));
858 atomic_inc(&dev->gnd_n_mdd);
859 atomic64_add(bytes, &dev->gnd_nbytes_map);
861 /* clear retrans to prevent any SMSG goofiness as that code uses the same counter */
864 /* we only get here in the valid cases */
865 list_add_tail(&tx->tx_map_list, &dev->gnd_map_list);
866 dev->gnd_map_version++;
867 spin_unlock(&dev->gnd_map_lock);
871 kgnilnd_mem_del_map_list(kgn_device_t *dev, kgn_tx_t *tx)
875 GNITX_ASSERTF(tx, !list_empty(&tx->tx_map_list),
876 "not mapped!", NULL);
877 spin_lock(&dev->gnd_map_lock);
879 switch (tx->tx_buftype) {
881 GNIDBG_TX(D_EMERG, tx,
882 "SOFTWARE BUG: invalid mapping %d", tx->tx_buftype);
883 spin_unlock(&dev->gnd_map_lock);
887 case GNILND_BUF_PHYS_UNMAPPED:
888 bytes = tx->tx_phys_npages * PAGE_SIZE;
889 dev->gnd_map_nphys--;
890 dev->gnd_map_physnop -= tx->tx_phys_npages;
893 case GNILND_BUF_VIRT_UNMAPPED:
895 dev->gnd_map_nvirt--;
896 dev->gnd_map_virtnob -= tx->tx_nob;
900 if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
901 tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
902 atomic64_sub(bytes, &dev->gnd_rdmaq_bytes_out);
903 LASSERTF(atomic64_read(&dev->gnd_rdmaq_bytes_out) >= 0,
904 "bytes_out negative! %ld\n", atomic64_read(&dev->gnd_rdmaq_bytes_out));
905 GNIDBG_TX(D_NETTRACE, tx, "rdma -- %d to "LPD64"",
906 bytes, atomic64_read(&dev->gnd_rdmaq_bytes_out));
909 atomic_dec(&dev->gnd_n_mdd);
910 atomic64_sub(bytes, &dev->gnd_nbytes_map);
912 /* we only get here in the valid cases */
913 list_del_init(&tx->tx_map_list);
914 dev->gnd_map_version++;
915 spin_unlock(&dev->gnd_map_lock);
919 kgnilnd_map_buffer(kgn_tx_t *tx)
921 kgn_conn_t *conn = tx->tx_conn;
922 kgn_device_t *dev = conn->gnc_device;
923 __u32 flags = GNI_MEM_READWRITE;
926 /* The kgnilnd_mem_register(_segments) Gemini Driver functions can
927 * be called concurrently as there are internal locks that protect
928 * any data structures or HW resources. We just need to ensure
929 * that our concurrency doesn't result in the kgn_device_t
930 * getting nuked while we are in here */
932 LASSERTF(conn != NULL, "tx %p with NULL conn, someone forgot"
933 " to set tx_conn before calling %s\n", tx, __FUNCTION__);
935 if (unlikely(CFS_FAIL_CHECK(CFS_FAIL_GNI_MAP_TX)))
938 if (*kgnilnd_tunables.kgn_bte_relaxed_ordering) {
939 flags |= GNI_MEM_RELAXED_PI_ORDERING;
942 switch (tx->tx_buftype) {
946 case GNILND_BUF_NONE:
947 case GNILND_BUF_IMMEDIATE:
948 case GNILND_BUF_IMMEDIATE_KIOV:
949 case GNILND_BUF_PHYS_MAPPED:
950 case GNILND_BUF_VIRT_MAPPED:
953 case GNILND_BUF_PHYS_UNMAPPED:
954 GNITX_ASSERTF(tx, tx->tx_phys != NULL, "physical buffer not there!", NULL);
955 rrc = kgnilnd_mem_register_segments(dev->gnd_handle,
956 tx->tx_phys, tx->tx_phys_npages, NULL,
957 GNI_MEM_PHYS_SEGMENTS | flags,
959 /* could race with other uses of the map counts, but this is ok
960 * - this needs to turn into a non-fatal error soon to allow
961 * GART resource, etc starvation handling */
962 if (rrc != GNI_RC_SUCCESS) {
963 GNIDBG_TX(D_NET, tx, "Can't map %d pages: dev %d "
964 "phys %u pp %u, virt %u nob "LPU64"",
965 tx->tx_phys_npages, dev->gnd_id,
966 dev->gnd_map_nphys, dev->gnd_map_physnop,
967 dev->gnd_map_nvirt, dev->gnd_map_virtnob);
968 RETURN(rrc == GNI_RC_ERROR_RESOURCE ? -ENOMEM : -EINVAL);
971 tx->tx_buftype = GNILND_BUF_PHYS_MAPPED;
972 kgnilnd_mem_add_map_list(dev, tx);
975 case GNILND_BUF_VIRT_UNMAPPED:
976 rrc = kgnilnd_mem_register(dev->gnd_handle,
977 (__u64)tx->tx_buffer, tx->tx_nob,
978 NULL, flags, &tx->tx_map_key);
979 if (rrc != GNI_RC_SUCCESS) {
980 GNIDBG_TX(D_NET, tx, "Can't map %u bytes: dev %d "
981 "phys %u pp %u, virt %u nob "LPU64"",
982 tx->tx_nob, dev->gnd_id,
983 dev->gnd_map_nphys, dev->gnd_map_physnop,
984 dev->gnd_map_nvirt, dev->gnd_map_virtnob);
985 RETURN(rrc == GNI_RC_ERROR_RESOURCE ? -ENOMEM : -EINVAL);
988 tx->tx_buftype = GNILND_BUF_VIRT_MAPPED;
989 kgnilnd_mem_add_map_list(dev, tx);
990 if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_ACK ||
991 tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
992 atomic64_add(tx->tx_nob, &dev->gnd_rdmaq_bytes_out);
993 GNIDBG_TX(D_NETTRACE, tx, "rdma ++ %d to %ld\n",
994 tx->tx_nob, atomic64_read(&dev->gnd_rdmaq_bytes_out));
1002 kgnilnd_add_purgatory_tx(kgn_tx_t *tx)
1004 kgn_conn_t *conn = tx->tx_conn;
1005 kgn_mdd_purgatory_t *gmp;
1007 LIBCFS_ALLOC(gmp, sizeof(*gmp));
1008 LASSERTF(gmp != NULL, "couldn't allocate MDD purgatory member;"
1009 " asserting to avoid data corruption\n");
1011 gmp->gmp_map_key = tx->tx_map_key;
1012 atomic_inc(&conn->gnc_device->gnd_n_mdd_held);
1014 /* ensure that we don't have a blank purgatory - indicating the
1015 * conn is not already on purgatory lists - we'd never recover these
1016 * MDD if that were the case */
1017 GNITX_ASSERTF(tx, conn->gnc_in_purgatory,
1018 "conn 0x%p->%s with NULL purgatory",
1019 conn, libcfs_nid2str(conn->gnc_peer->gnp_nid));
1021 /* link 'er up! - only place we really need to lock for
1022 * concurrent access */
1023 spin_lock(&conn->gnc_list_lock);
1024 list_add_tail(&gmp->gmp_list, &conn->gnc_mdd_list);
1025 spin_unlock(&conn->gnc_list_lock);
1029 kgnilnd_unmap_buffer(kgn_tx_t *tx, int error)
1033 int hold_timeout = 0;
1035 /* code below relies on +1 relationship ... */
1036 CLASSERT(GNILND_BUF_PHYS_MAPPED == (GNILND_BUF_PHYS_UNMAPPED + 1));
1037 CLASSERT(GNILND_BUF_VIRT_MAPPED == (GNILND_BUF_VIRT_UNMAPPED + 1));
1039 switch (tx->tx_buftype) {
1043 case GNILND_BUF_NONE:
1044 case GNILND_BUF_IMMEDIATE:
1045 case GNILND_BUF_PHYS_UNMAPPED:
1046 case GNILND_BUF_VIRT_UNMAPPED:
1048 case GNILND_BUF_IMMEDIATE_KIOV:
1049 if (tx->tx_phys != NULL) {
1050 vunmap(tx->tx_phys);
1051 } else if (tx->tx_phys == NULL && tx->tx_buffer != NULL) {
1052 kunmap(tx->tx_imm_pages[0]);
1054 /* clear to prevent kgnilnd_free_tx from thinking
1055 * this is a RDMA descriptor */
1059 case GNILND_BUF_PHYS_MAPPED:
1060 case GNILND_BUF_VIRT_MAPPED:
1061 LASSERT(tx->tx_conn != NULL);
1063 dev = tx->tx_conn->gnc_device;
1065 /* only want to hold if we are closing conn without
1066 * verified peer notification - the theory is that
1067 * a TX error can be communicated in all other cases */
1068 if (tx->tx_conn->gnc_state != GNILND_CONN_ESTABLISHED &&
1069 kgnilnd_check_purgatory_conn(tx->tx_conn)) {
1070 kgnilnd_add_purgatory_tx(tx);
1072 /* The timeout we give to kgni is a deadman stop only.
1073 * we are setting high to ensure we don't have the kgni timer
1074 * fire before ours fires _and_ is handled */
1075 hold_timeout = GNILND_TIMEOUT2DEADMAN;
1077 GNIDBG_TX(D_NET, tx,
1078 "dev %p delaying MDD release for %dms key "LPX64"."LPX64"",
1079 tx->tx_conn->gnc_device, hold_timeout,
1080 tx->tx_map_key.qword1, tx->tx_map_key.qword2);
1083 rrc = kgnilnd_mem_deregister(dev->gnd_handle, &tx->tx_map_key, hold_timeout);
1085 LASSERTF(rrc == GNI_RC_SUCCESS, "rrc %d\n", rrc);
1088 kgnilnd_mem_del_map_list(dev, tx);
1094 kgnilnd_tx_done(kgn_tx_t *tx, int completion)
1096 lnet_msg_t *lntmsg0, *lntmsg1;
1097 int status0, status1;
1098 lnet_ni_t *ni = NULL;
1099 kgn_conn_t *conn = tx->tx_conn;
1101 LASSERT(!in_interrupt());
1103 lntmsg0 = tx->tx_lntmsg[0]; tx->tx_lntmsg[0] = NULL;
1104 lntmsg1 = tx->tx_lntmsg[1]; tx->tx_lntmsg[1] = NULL;
1107 !(tx->tx_state & GNILND_TX_QUIET_ERROR) &&
1108 !kgnilnd_conn_clean_errno(completion)) {
1109 GNIDBG_TOMSG(D_NETERROR, &tx->tx_msg,
1110 "error %d on tx 0x%p->%s id %u/%d state %s age %ds",
1111 completion, tx, conn ?
1112 libcfs_nid2str(conn->gnc_peer->gnp_nid) : "<?>",
1113 tx->tx_id.txe_smsg_id, tx->tx_id.txe_idx,
1114 kgnilnd_tx_state2str(tx->tx_list_state),
1115 cfs_duration_sec((long)jiffies - tx->tx_qtime));
1118 /* The error codes determine if we hold onto the MDD */
1119 kgnilnd_unmap_buffer(tx, completion);
1121 /* we have to deliver a reply on lntmsg[1] for the GET, so make sure
1122 * we play nice with the error codes to avoid delivering a failed
1123 * REQUEST and then a REPLY event as well */
1125 /* return -EIO to lnet - it is the magic value for failed sends */
1126 if (tx->tx_msg.gnm_type == GNILND_MSG_GET_REQ) {
1128 status1 = completion;
1130 status0 = status1 = completion;
1133 tx->tx_buftype = GNILND_BUF_NONE;
1134 tx->tx_msg.gnm_type = GNILND_MSG_NONE;
1136 /* lnet_finalize doesn't do anything with the *ni, so ok for us to
1137 * set NULL when we are a tx without a conn */
1139 ni = conn->gnc_peer->gnp_net->gnn_ni;
1141 spin_lock(&conn->gnc_tx_lock);
1143 LASSERTF(test_and_clear_bit(tx->tx_id.txe_idx,
1144 (volatile unsigned long *)&conn->gnc_tx_bits),
1145 "conn %p tx %p bit %d already cleared\n",
1146 conn, tx, tx->tx_id.txe_idx);
1148 LASSERTF(conn->gnc_tx_ref_table[tx->tx_id.txe_idx] != NULL,
1149 "msg_id %d already NULL\n", tx->tx_id.txe_idx);
1151 conn->gnc_tx_ref_table[tx->tx_id.txe_idx] = NULL;
1152 spin_unlock(&conn->gnc_tx_lock);
1155 kgnilnd_free_tx(tx);
1157 /* finalize AFTER freeing lnet msgs */
1159 /* warning - we should hold no locks here - calling lnet_finalize
1160 * could free up lnet credits, resulting in a call chain back into
1161 * the LND via kgnilnd_send and friends */
1162 lnet_finalize(ni, lntmsg0, status0);
1164 if (lntmsg1 != NULL) {
1165 lnet_finalize(ni, lntmsg1, status1);
1170 kgnilnd_txlist_done(struct list_head *txlist, int error)
1173 int err_printed = 0;
1175 if (list_empty(txlist))
1178 list_for_each_entry_safe(tx, txn, txlist, tx_list) {
1179 /* only print the first error */
1181 tx->tx_state |= GNILND_TX_QUIET_ERROR;
1182 list_del_init(&tx->tx_list);
1183 kgnilnd_tx_done(tx, error);
1188 kgnilnd_set_tx_id(kgn_tx_t *tx, kgn_conn_t *conn)
1192 spin_lock(&conn->gnc_tx_lock);
1194 /* ID zero is NOT ALLOWED!!! */
1197 id = find_next_zero_bit((unsigned long *)&conn->gnc_tx_bits,
1198 GNILND_MAX_MSG_ID, conn->gnc_next_tx);
1199 if (id == GNILND_MAX_MSG_ID) {
1200 if (conn->gnc_next_tx != 1) {
1201 /* we only searched from next_tx to end and didn't find
1202 * one, so search again from start */
1203 conn->gnc_next_tx = 1;
1206 /* couldn't find one! */
1207 spin_unlock(&conn->gnc_tx_lock);
1211 /* bump next_tx to prevent immediate reuse */
1212 conn->gnc_next_tx = id + 1;
1214 set_bit(id, (volatile unsigned long *)&conn->gnc_tx_bits);
1215 LASSERTF(conn->gnc_tx_ref_table[id] == NULL,
1216 "tx 0x%p already at id %d\n",
1217 conn->gnc_tx_ref_table[id], id);
1219 /* delay these until we have a valid ID - prevents bad clear of the bit
1220 * in kgnilnd_tx_done */
1222 tx->tx_id.txe_cqid = conn->gnc_cqid;
1224 tx->tx_id.txe_idx = id;
1225 conn->gnc_tx_ref_table[id] = tx;
1227 /* Using jiffies to help differentiate against TX reuse - with
1228 * the usual minimum of a 250HZ clock, we wrap jiffies on the same TX
1229 * if we are sending to the same node faster than 256000/sec.
1230 * To help guard against this, we OR in the tx_seq - that is 32 bits */
1232 tx->tx_id.txe_chips = (__u32)(jiffies | conn->gnc_tx_seq);
1234 GNIDBG_TX(D_NET, tx, "set cookie/id/bits", NULL);
1236 spin_unlock(&conn->gnc_tx_lock);
1241 kgnilnd_tx_should_retry(kgn_conn_t *conn, kgn_tx_t *tx)
1243 int max_retrans = *kgnilnd_tunables.kgn_max_retransmits;
1245 int log_retrans_level;
1247 /* I need kgni credits to send this. Replace tx at the head of the
1248 * fmaq and I'll get rescheduled when credits appear */
1251 conn->gnc_tx_retrans++;
1252 log_retrans = ((tx->tx_retrans < 25) || ((tx->tx_retrans % 25) == 0) ||
1253 (tx->tx_retrans > (max_retrans / 2)));
1254 log_retrans_level = tx->tx_retrans < (max_retrans / 2) ? D_NET : D_NETERROR;
1256 /* Decision time - either error, warn or just retransmit */
1258 /* we don't care about TX timeout - it could be that the network is slower
1259 * or throttled. We'll keep retranmitting - so if the network is so slow
1260 * that we fill up our mailbox, we'll keep trying to resend that msg
1261 * until we exceed the max_retrans _or_ gnc_last_rx expires, indicating
1262 * that he hasn't send us any traffic in return */
1264 if (tx->tx_retrans > max_retrans) {
1265 /* this means we are not backing off the retransmits
1266 * in a healthy manner and are likely chewing up the
1267 * CPU cycles quite badly */
1268 GNIDBG_TOMSG(D_ERROR, &tx->tx_msg,
1269 "SOFTWARE BUG: too many retransmits (%d) for tx id %x "
1271 tx->tx_retrans, tx->tx_id, conn,
1272 libcfs_nid2str(conn->gnc_peer->gnp_nid));
1274 /* yes - double errors to help debug this condition */
1275 GNIDBG_TOMSG(D_NETERROR, &tx->tx_msg, "connection dead. "
1276 "unable to send to %s for %lu secs (%d tries)",
1277 libcfs_nid2str(tx->tx_conn->gnc_peer->gnp_nid),
1278 cfs_duration_sec(jiffies - tx->tx_cred_wait),
1281 kgnilnd_close_conn(conn, -ETIMEDOUT);
1283 /* caller should terminate */
1286 /* some reasonable throttling of the debug message */
1288 unsigned long now = jiffies;
1289 /* XXX Nic: Mystical TX debug here... */
1290 GNIDBG_SMSG_CREDS(log_retrans_level, conn);
1291 GNIDBG_TOMSG(log_retrans_level, &tx->tx_msg,
1292 "NOT_DONE on conn 0x%p->%s id %x retrans %d wait %dus"
1293 " last_msg %uus/%uus last_cq %uus/%uus",
1294 conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
1295 tx->tx_id, tx->tx_retrans,
1296 jiffies_to_usecs(now - tx->tx_cred_wait),
1297 jiffies_to_usecs(now - conn->gnc_last_tx),
1298 jiffies_to_usecs(now - conn->gnc_last_rx),
1299 jiffies_to_usecs(now - conn->gnc_last_tx_cq),
1300 jiffies_to_usecs(now - conn->gnc_last_rx_cq));
1302 /* caller should retry */
1307 /* caller must be holding gnd_cq_mutex and not unlock it afterwards, as we need to drop it
1308 * to avoid bad ordering with state_lock */
1311 kgnilnd_sendmsg_nolock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
1312 spinlock_t *state_lock, kgn_tx_list_state_t state)
1314 kgn_conn_t *conn = tx->tx_conn;
1315 kgn_msg_t *msg = &tx->tx_msg;
1318 unsigned long newest_last_rx, timeout;
1321 LASSERTF((msg->gnm_type == GNILND_MSG_IMMEDIATE) ?
1322 immediatenob <= *kgnilnd_tunables.kgn_max_immediate :
1324 "msg 0x%p type %d wrong payload size %d\n",
1325 msg, msg->gnm_type, immediatenob);
1327 /* make sure we catch all the cases where we'd send on a dirty old mbox
1328 * but allow case for sending CLOSE. Since this check is within the CQ
1329 * mutex barrier and the close message is only sent through
1330 * kgnilnd_send_conn_close the last message out the door will be the
1333 if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) != 0 && msg->gnm_type != GNILND_MSG_CLOSE) {
1334 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1335 /* Return -ETIME, we are closing the connection already so we dont want to
1336 * have this tx hit the wire. The tx will be killed by the calling function.
1337 * Once the EP is marked dirty the close message will be the last
1338 * thing to hit the wire */
1343 timeout = cfs_time_seconds(conn->gnc_timeout);
1345 newest_last_rx = GNILND_LASTRX(conn);
1347 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_SEND_TIMEOUT)) {
1348 now = now + (GNILND_TIMEOUTRX(timeout) * 2);
1351 if (time_after_eq(now, newest_last_rx + GNILND_TIMEOUTRX(timeout))) {
1352 GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn, "Cant send to %s after timeout lapse of %lu; TO %lu",
1353 libcfs_nid2str(conn->gnc_peer->gnp_nid),
1354 cfs_duration_sec(now - newest_last_rx),
1355 cfs_duration_sec(GNILND_TIMEOUTRX(timeout)));
1356 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1360 GNITX_ASSERTF(tx, (conn != NULL) && (tx->tx_id.txe_idx != 0), "tx id unset!", NULL);
1361 /* msg->gnm_srcnid is set when the message is initialized by whatever function is
1362 * creating the message this allows the message to contain the correct LNET NID/NET needed
1363 * instead of the one that the peer/conn uses for sending the data.
1365 msg->gnm_connstamp = conn->gnc_my_connstamp;
1366 msg->gnm_payload_len = immediatenob;
1367 msg->gnm_seq = conn->gnc_tx_seq;
1369 /* always init here - kgn_checksum is a /sys module tunable
1370 * and can be flipped at any point, even between msg init and sending */
1372 if (*kgnilnd_tunables.kgn_checksum) {
1373 /* We must set here and not in kgnilnd_init_msg,
1374 * we could resend this msg many times
1375 * (NOT_DONE from gni_smsg_send below) and wouldn't pass
1376 * through init_msg again */
1377 msg->gnm_cksum = kgnilnd_cksum(msg, sizeof(kgn_msg_t));
1378 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_SMSG_CKSUM1)) {
1379 msg->gnm_cksum += 0xf00f;
1383 GNIDBG_TOMSG(D_NET, msg, "tx 0x%p conn 0x%p->%s sending SMSG sz %u id %x/%d [%p for %u]",
1384 tx, conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
1385 sizeof(kgn_msg_t), tx->tx_id.txe_smsg_id,
1386 tx->tx_id.txe_idx, immediate, immediatenob);
1388 if (unlikely(tx->tx_state & GNILND_TX_FAIL_SMSG)) {
1389 rrc = cfs_fail_val ? cfs_fail_val : GNI_RC_NOT_DONE;
1391 rrc = kgnilnd_smsg_send(conn->gnc_ephandle,
1392 msg, sizeof(*msg), immediate, immediatenob,
1393 tx->tx_id.txe_smsg_id);
1397 case GNI_RC_SUCCESS:
1399 conn->gnc_last_tx = jiffies;
1400 /* no locking here as LIVE isn't a list */
1401 kgnilnd_tx_add_state_locked(tx, NULL, conn, GNILND_TX_LIVE_FMAQ, 1);
1403 /* this needs to be checked under lock as it might be freed from a completion
1406 if (msg->gnm_type == GNILND_MSG_NOOP) {
1407 set_mb(conn->gnc_last_noop_sent, jiffies);
1410 /* serialize with seeing CQ events for completion on this, as well as
1412 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1414 atomic_inc(&conn->gnc_device->gnd_short_ntx);
1415 atomic64_add(immediatenob, &conn->gnc_device->gnd_short_txbytes);
1416 kgnilnd_peer_alive(conn->gnc_peer);
1417 GNIDBG_SMSG_CREDS(D_NET, conn);
1420 case GNI_RC_NOT_DONE:
1421 /* XXX Nic: We need to figure out how to track this
1422 * - there are bound to be good reasons for it,
1423 * but we want to know when it happens */
1425 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1426 /* We'll handle this error inline - makes the calling logic much more
1429 /* If no lock, caller doesn't want us to retry */
1430 if (state_lock == NULL) {
1434 retry_send = kgnilnd_tx_should_retry(conn, tx);
1436 /* add to head of list for the state and retries */
1437 spin_lock(state_lock);
1438 kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, state, 0);
1439 spin_unlock(state_lock);
1441 /* We only reschedule for a certain number of retries, then
1442 * we will wait for the CQ events indicating a release of SMSG
1444 if (tx->tx_retrans < (*kgnilnd_tunables.kgn_max_retransmits/4)) {
1445 kgnilnd_schedule_conn(conn);
1448 /* CQ event coming in signifies either TX completed or
1449 * RX receive. Either of these *could* free up credits
1450 * in the SMSG mbox and we should try sending again */
1451 GNIDBG_TX(D_NET, tx, "waiting for CQID %u event to resend",
1452 tx->tx_conn->gnc_cqid);
1453 /* use +ve return code to let upper layers know they
1454 * should stop looping on sends */
1461 /* handle bad retcode gracefully */
1462 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1467 /* kgnilnd_sendmsg has hard wait on gnd_cq_mutex */
1469 kgnilnd_sendmsg(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
1470 spinlock_t *state_lock, kgn_tx_list_state_t state)
1472 kgn_device_t *dev = tx->tx_conn->gnc_device;
1473 unsigned long timestamp;
1476 timestamp = jiffies;
1477 mutex_lock(&dev->gnd_cq_mutex);
1478 /* delay in jiffies - we are really concerned only with things that
1479 * result in a schedule() or really holding this off for long times .
1480 * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
1481 dev->gnd_mutex_delay += (long) jiffies - timestamp;
1483 rc = kgnilnd_sendmsg_nolock(tx, immediate, immediatenob, state_lock, state);
1489 /* returns -EAGAIN for lock miss, anything else < 0 is hard error, >=0 for success */
1491 kgnilnd_sendmsg_trylock(kgn_tx_t *tx, void *immediate, unsigned int immediatenob,
1492 spinlock_t *state_lock, kgn_tx_list_state_t state)
1494 kgn_conn_t *conn = tx->tx_conn;
1495 kgn_device_t *dev = conn->gnc_device;
1496 unsigned long timestamp;
1499 timestamp = jiffies;
1501 /* technically we are doing bad things with the read_lock on the peer_conn
1502 * table, but we shouldn't be sleeping inside here - and we don't sleep/block
1503 * for the mutex. I bet lockdep is gonna flag this one though... */
1505 /* there are a few cases where we don't want the immediate send - like
1506 * when we are in the scheduler thread and it'd harm the latency of
1507 * getting messages up to LNet */
1509 /* rmb for gnd_ready */
1511 if (conn->gnc_device->gnd_ready == GNILND_DEV_LOOP) {
1513 atomic_inc(&conn->gnc_device->gnd_fast_block);
1514 } else if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
1515 /* dont hit HW during quiesce */
1517 } else if (unlikely(atomic_read(&conn->gnc_peer->gnp_dirty_eps))) {
1518 /* dont hit HW if stale EPs and conns left to close */
1521 atomic_inc(&conn->gnc_device->gnd_fast_try);
1522 rc = mutex_trylock(&conn->gnc_device->gnd_cq_mutex);
1527 /* we got the mutex and weren't blocked */
1529 /* delay in jiffies - we are really concerned only with things that
1530 * result in a schedule() or really holding this off for long times .
1531 * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
1532 dev->gnd_mutex_delay += (long) jiffies - timestamp;
1534 atomic_inc(&conn->gnc_device->gnd_fast_ok);
1535 tx->tx_qtime = jiffies;
1536 tx->tx_state = GNILND_TX_WAITING_COMPLETION;
1537 rc = kgnilnd_sendmsg_nolock(tx, tx->tx_buffer, tx->tx_nob, &conn->gnc_list_lock, GNILND_TX_FMAQ);
1538 /* _nolock unlocks the mutex for us */
1544 /* lets us know if we can push this RDMA through now */
1546 kgnilnd_auth_rdma_bytes(kgn_device_t *dev, kgn_tx_t *tx)
1550 bytes_left = atomic64_sub_return(tx->tx_nob, &dev->gnd_rdmaq_bytes_ok);
1552 if (bytes_left < 0) {
1553 atomic64_add(tx->tx_nob, &dev->gnd_rdmaq_bytes_ok);
1554 atomic_inc(&dev->gnd_rdmaq_nstalls);
1557 CDEBUG(D_NET, "no bytes to send, turning on timer for %lu\n",
1558 dev->gnd_rdmaq_deadline);
1559 mod_timer(&dev->gnd_rdmaq_timer, dev->gnd_rdmaq_deadline);
1560 /* we never del this timer - at worst it schedules us.. */
1567 /* this adds a TX to the queue pending throttling authorization before
1568 * we allow our remote peer to launch a PUT at us */
1570 kgnilnd_queue_rdma(kgn_conn_t *conn, kgn_tx_t *tx)
1574 /* we cannot go into send_mapped_tx from here as we are holding locks
1575 * and mem registration might end up allocating memory in kgni.
1576 * That said, we'll push this as far as we can into the queue process */
1577 rc = kgnilnd_auth_rdma_bytes(conn->gnc_device, tx);
1580 spin_lock(&conn->gnc_device->gnd_rdmaq_lock);
1581 kgnilnd_tx_add_state_locked(tx, NULL, conn, GNILND_TX_RDMAQ, 0);
1582 /* lets us know how delayed RDMA is */
1583 tx->tx_qtime = jiffies;
1584 spin_unlock(&conn->gnc_device->gnd_rdmaq_lock);
1586 /* we have RDMA authorized, now it just needs a MDD and to hit the wire */
1587 spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
1588 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 0);
1589 /* lets us know how delayed mapping is */
1590 tx->tx_qtime = jiffies;
1591 spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
1594 /* make sure we wake up sched to run this */
1595 kgnilnd_schedule_device(tx->tx_conn->gnc_device);
1598 /* push TX through state machine */
1600 kgnilnd_queue_tx(kgn_conn_t *conn, kgn_tx_t *tx)
1605 /* set the tx_id here, we delay it until we have an actual conn
1607 * in some cases, the tx_id is already set to provide for things
1608 * like RDMA completion cookies, etc */
1609 if (tx->tx_id.txe_idx == 0) {
1610 rc = kgnilnd_set_tx_id(tx, conn);
1612 kgnilnd_tx_done(tx, rc);
1617 CDEBUG(D_NET, "%s to conn %p for %s\n", kgnilnd_msgtype2str(tx->tx_msg.gnm_type),
1618 conn, libcfs_nid2str(conn->gnc_peer->gnp_nid));
1620 /* Only let NOOPs to be sent while fail loc is set, otherwise kill the tx.
1622 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_ONLY_NOOP) && (tx->tx_msg.gnm_type != GNILND_MSG_NOOP)) {
1623 kgnilnd_tx_done(tx, rc);
1627 switch (tx->tx_msg.gnm_type) {
1628 case GNILND_MSG_PUT_ACK:
1629 case GNILND_MSG_GET_REQ:
1630 /* hijacking time! If this messages will authorize our peer to
1631 * send his dirty little bytes in an RDMA, we need to get permission */
1632 kgnilnd_queue_rdma(conn, tx);
1634 case GNILND_MSG_IMMEDIATE:
1635 /* try to send right now, can help reduce latency */
1636 rc = kgnilnd_sendmsg_trylock(tx, tx->tx_buffer, tx->tx_nob, &conn->gnc_list_lock, GNILND_TX_FMAQ);
1639 /* it was sent, break out of switch to avoid default case of queueing */
1641 } else if (rc == -EAGAIN) {
1642 /* needs to queue to try again, so fall through to default case */
1644 /* bail: it wasnt sent and we didn't get EAGAIN indicating
1645 * we should retrans - We do not close the conn due to locking
1646 * we let the reaper thread take care of it. There are no hard
1647 * errors from send_msg that would require close to be called
1649 kgnilnd_tx_done(tx, rc);
1652 case GNILND_MSG_NOOP:
1653 /* Just make sure this goes out first for this conn */
1655 /* fall through... */
1657 spin_lock(&conn->gnc_list_lock);
1658 kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, GNILND_TX_FMAQ, add_tail);
1659 tx->tx_qtime = jiffies;
1660 spin_unlock(&conn->gnc_list_lock);
1661 kgnilnd_schedule_conn(conn);
1666 kgnilnd_launch_tx(kgn_tx_t *tx, kgn_net_t *net, lnet_process_id_t *target)
1669 kgn_peer_t *new_peer = NULL;
1670 kgn_conn_t *conn = NULL;
1675 /* If I get here, I've committed to send, so I complete the tx with
1676 * failure on any problems */
1678 GNITX_ASSERTF(tx, tx->tx_conn == NULL,
1679 "tx already has connection %p", tx->tx_conn);
1681 /* do all of the peer & conn searching in one swoop - this avoids
1682 * nastiness when dropping locks and needing to maintain a sane state
1683 * in the face of stack reset or something else nuking peers & conns */
1685 /* I expect to find him, so only take a read lock */
1686 read_lock(&kgnilnd_data.kgn_peer_conn_lock);
1688 peer = kgnilnd_find_peer_locked(target->nid);
1690 conn = kgnilnd_find_conn_locked(peer);
1691 /* this could be NULL during quiesce */
1693 /* Connection exists; queue message on it */
1694 kgnilnd_queue_tx(conn, tx);
1695 read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
1700 /* creating peer or conn; I'll need a write lock... */
1701 read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
1703 CFS_RACE(CFS_FAIL_GNI_FIND_TARGET);
1705 /* NB - this will not block during normal operations -
1706 * the only writer of this is in the startup/shutdown path. */
1707 rc = down_read_trylock(&kgnilnd_data.kgn_net_rw_sem);
1713 /* ignore previous peer entirely - we cycled the lock, so we
1714 * will create new peer and at worst drop it if peer is still
1716 rc = kgnilnd_create_peer_safe(&new_peer, target->nid, net);
1718 up_read(&kgnilnd_data.kgn_net_rw_sem);
1722 write_lock(&kgnilnd_data.kgn_peer_conn_lock);
1723 up_read(&kgnilnd_data.kgn_net_rw_sem);
1725 /* search for peer again now that we have the lock
1726 * if we don't find it, add our new one to the list */
1727 kgnilnd_add_peer_locked(target->nid, new_peer, &peer);
1729 conn = kgnilnd_find_or_create_conn_locked(peer);
1731 /* oh hey, found a conn now... magical */
1732 kgnilnd_queue_tx(conn, tx);
1734 /* no conn, must be trying to connect - so we queue for now */
1735 tx->tx_qtime = jiffies;
1736 kgnilnd_tx_add_state_locked(tx, peer, NULL, GNILND_TX_PEERQ, 1);
1738 write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
1741 kgnilnd_tx_done(tx, rc);
1746 kgnilnd_rdma(kgn_tx_t *tx, int type,
1747 kgn_rdma_desc_t *sink, unsigned int nob, __u64 cookie)
1749 kgn_conn_t *conn = tx->tx_conn;
1750 unsigned long timestamp;
1753 LASSERTF(kgnilnd_tx_mapped(tx),
1754 "unmapped tx %p\n", tx);
1755 LASSERTF(conn != NULL,
1756 "NULL conn on tx %p, naughty, naughty\n", tx);
1757 LASSERTF(nob <= sink->gnrd_nob,
1758 "nob %u > sink->gnrd_nob %d (%p)\n",
1759 nob, sink->gnrd_nob, sink);
1760 LASSERTF(nob <= tx->tx_nob,
1761 "nob %d > tx(%p)->tx_nob %d\n",
1762 nob, tx, tx->tx_nob);
1764 memset(&tx->tx_rdma_desc, 0, sizeof(tx->tx_rdma_desc));
1765 tx->tx_rdma_desc.post_id = tx->tx_id.txe_cookie;
1766 tx->tx_rdma_desc.type = GNI_POST_RDMA_PUT;
1767 tx->tx_rdma_desc.cq_mode = GNI_CQMODE_GLOBAL_EVENT;
1768 tx->tx_rdma_desc.local_addr = (__u64)((unsigned long)tx->tx_buffer);
1769 tx->tx_rdma_desc.local_mem_hndl = tx->tx_map_key;
1770 tx->tx_rdma_desc.remote_addr = sink->gnrd_addr;
1771 tx->tx_rdma_desc.remote_mem_hndl = sink->gnrd_key;
1772 tx->tx_rdma_desc.length = nob;
1773 if (!*kgnilnd_tunables.kgn_bte_hash)
1774 tx->tx_rdma_desc.dlvr_mode |= GNI_DLVMODE_NO_HASH;
1775 if (!*kgnilnd_tunables.kgn_bte_adapt)
1776 tx->tx_rdma_desc.dlvr_mode |= (GNI_DLVMODE_NO_ADAPT | GNI_DLVMODE_NO_RADAPT);
1778 /* prep final completion message */
1779 kgnilnd_init_msg(&tx->tx_msg, type, tx->tx_msg.gnm_srcnid);
1780 tx->tx_msg.gnm_u.completion.gncm_cookie = cookie;
1781 /* send actual size RDMA'd in retval */
1782 tx->tx_msg.gnm_u.completion.gncm_retval = nob;
1784 kgnilnd_compute_rdma_cksum(tx);
1787 kgnilnd_queue_tx(conn, tx);
1791 /* Don't lie (CLOSE == RDMA idle) */
1792 LASSERTF(!conn->gnc_close_sent, "tx %p on conn %p after close sent %d\n",
1793 tx, conn, conn->gnc_close_sent);
1795 GNIDBG_TX(D_NET, tx, "Post RDMA type 0x%02x dlvr_mode 0x%x",
1796 type, tx->tx_rdma_desc.dlvr_mode);
1798 /* set CQ dedicated for RDMA */
1799 tx->tx_rdma_desc.src_cq_hndl = conn->gnc_device->gnd_snd_rdma_cqh;
1801 timestamp = jiffies;
1802 mutex_lock(&conn->gnc_device->gnd_cq_mutex);
1803 /* delay in jiffies - we are really concerned only with things that
1804 * result in a schedule() or really holding this off for long times .
1805 * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
1806 conn->gnc_device->gnd_mutex_delay += (long) jiffies - timestamp;
1808 rrc = kgnilnd_post_rdma(conn->gnc_ephandle, &tx->tx_rdma_desc);
1810 spin_lock(&conn->gnc_list_lock);
1811 kgnilnd_tx_add_state_locked(tx, conn->gnc_peer, conn, GNILND_TX_LIVE_RDMAQ, 1);
1812 tx->tx_qtime = jiffies;
1813 spin_unlock(&conn->gnc_list_lock);
1815 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1817 /* XXX Nic: is this a place we should handle more errors for
1818 * robustness sake */
1819 LASSERT(rrc == GNI_RC_SUCCESS);
1824 kgnilnd_alloc_rx(void)
1828 rx = cfs_mem_cache_alloc(kgnilnd_data.kgn_rx_cache, CFS_ALLOC_ATOMIC);
1830 CERROR("failed to allocate rx\n");
1833 CDEBUG(D_MALLOC, "slab-alloced 'rx': %lu at %p.\n",
1836 /* no memset to zero, we'll always fill all members */
1840 /* release is to just free connection resources
1841 * we use this for the eager path after copying */
1843 kgnilnd_release_msg(kgn_conn_t *conn)
1846 unsigned long timestamp;
1848 CDEBUG(D_NET, "consuming %p\n", conn);
1850 timestamp = jiffies;
1851 mutex_lock(&conn->gnc_device->gnd_cq_mutex);
1852 /* delay in jiffies - we are really concerned only with things that
1853 * result in a schedule() or really holding this off for long times .
1854 * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
1855 conn->gnc_device->gnd_mutex_delay += (long) jiffies - timestamp;
1857 rrc = kgnilnd_smsg_release(conn->gnc_ephandle);
1858 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
1860 LASSERTF(rrc == GNI_RC_SUCCESS, "bad rrc %d\n", rrc);
1861 GNIDBG_SMSG_CREDS(D_NET, conn);
1867 kgnilnd_consume_rx(kgn_rx_t *rx)
1869 kgn_conn_t *conn = rx->grx_conn;
1870 kgn_msg_t *rxmsg = rx->grx_msg;
1872 /* if we are eager, free the cache alloc'd msg */
1873 if (unlikely(rx->grx_eager)) {
1874 LIBCFS_FREE(rxmsg, sizeof(*rxmsg) + *kgnilnd_tunables.kgn_max_immediate);
1876 /* release ref from eager_recv */
1877 kgnilnd_conn_decref(conn);
1879 GNIDBG_MSG(D_NET, rxmsg, "rx %p processed", rx);
1880 kgnilnd_release_msg(conn);
1883 cfs_mem_cache_free(kgnilnd_data.kgn_rx_cache, rx);
1884 CDEBUG(D_MALLOC, "slab-freed 'rx': %lu at %p.\n",
1891 kgnilnd_send(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg)
1893 lnet_hdr_t *hdr = &lntmsg->msg_hdr;
1894 int type = lntmsg->msg_type;
1895 lnet_process_id_t target = lntmsg->msg_target;
1896 int target_is_router = lntmsg->msg_target_is_router;
1897 int routing = lntmsg->msg_routing;
1898 unsigned int niov = lntmsg->msg_niov;
1899 struct iovec *iov = lntmsg->msg_iov;
1900 lnet_kiov_t *kiov = lntmsg->msg_kiov;
1901 unsigned int offset = lntmsg->msg_offset;
1902 unsigned int nob = lntmsg->msg_len;
1903 unsigned int msg_vmflush = lntmsg->msg_vmflush;
1904 kgn_net_t *net = ni->ni_data;
1909 /* NB 'private' is different depending on what we're sending.... */
1910 LASSERT(!in_interrupt());
1912 CDEBUG(D_NET, "sending msg type %d with %d bytes in %d frags to %s\n",
1913 type, nob, niov, libcfs_id2str(target));
1915 LASSERTF(nob == 0 || niov > 0,
1916 "lntmsg %p nob %d niov %d\n", lntmsg, nob, niov);
1917 LASSERTF(niov <= LNET_MAX_IOV,
1918 "lntmsg %p niov %d\n", lntmsg, niov);
1920 /* payload is either all vaddrs or all pages */
1921 LASSERTF(!(kiov != NULL && iov != NULL),
1922 "lntmsg %p kiov %p iov %p\n", lntmsg, kiov, iov);
1925 mpflag = cfs_memory_pressure_get_and_set();
1929 CERROR("lntmsg %p with unexpected type %d\n",
1934 LASSERTF(nob == 0, "lntmsg %p nob %d\n",
1942 if (routing || target_is_router)
1943 break; /* send IMMEDIATE */
1945 /* it is safe to do direct GET with out mapping buffer for RDMA as we
1946 * check the eventual sink buffer here - if small enough, remote
1947 * end is perfectly capable of returning data in short message -
1948 * The magic is that we call lnet_parse in kgnilnd_recv with rdma_req=0
1949 * for IMMEDIATE messages which will have it send a real reply instead
1950 * of doing kgnilnd_recv to have the RDMA continued */
1951 if (lntmsg->msg_md->md_length <= *kgnilnd_tunables.kgn_max_immediate)
1954 tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_REQ, ni->ni_nid);
1960 /* slightly different options as we might actually have a GET with a
1961 * MD_KIOV set but a non-NULL md_iov.iov */
1962 if ((lntmsg->msg_md->md_options & LNET_MD_KIOV) == 0)
1963 rc = kgnilnd_setup_rdma_buffer(tx, lntmsg->msg_md->md_niov,
1964 lntmsg->msg_md->md_iov.iov, NULL,
1965 0, lntmsg->msg_md->md_length);
1967 rc = kgnilnd_setup_rdma_buffer(tx, lntmsg->msg_md->md_niov,
1968 NULL, lntmsg->msg_md->md_iov.kiov,
1969 0, lntmsg->msg_md->md_length);
1971 CERROR("unable to setup buffer: %d\n", rc);
1972 kgnilnd_tx_done(tx, rc);
1977 tx->tx_lntmsg[1] = lnet_create_reply_msg(ni, lntmsg);
1978 if (tx->tx_lntmsg[1] == NULL) {
1979 CERROR("Can't create reply for GET to %s\n",
1980 libcfs_nid2str(target.nid));
1981 kgnilnd_tx_done(tx, rc);
1986 tx->tx_lntmsg[0] = lntmsg;
1987 tx->tx_msg.gnm_u.get.gngm_hdr = *hdr;
1988 /* rest of tx_msg is setup just before it is sent */
1989 kgnilnd_launch_tx(tx, net, &target);
1992 case LNET_MSG_REPLY:
1994 /* to save on MDDs, we'll handle short kiov by vmap'ing
1995 * and sending via SMSG */
1996 if (nob <= *kgnilnd_tunables.kgn_max_immediate)
1999 tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_REQ, ni->ni_nid);
2005 rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
2007 kgnilnd_tx_done(tx, rc);
2012 tx->tx_lntmsg[0] = lntmsg;
2013 tx->tx_msg.gnm_u.putreq.gnprm_hdr = *hdr;
2014 /* rest of tx_msg is setup just before it is sent */
2015 kgnilnd_launch_tx(tx, net, &target);
2019 /* send IMMEDIATE */
2021 LASSERTF(nob <= *kgnilnd_tunables.kgn_max_immediate,
2022 "lntmsg 0x%p too large %d\n", lntmsg, nob);
2024 tx = kgnilnd_new_tx_msg(GNILND_MSG_IMMEDIATE, ni->ni_nid);
2030 rc = kgnilnd_setup_immediate_buffer(tx, niov, iov, kiov, offset, nob);
2032 kgnilnd_tx_done(tx, rc);
2036 tx->tx_msg.gnm_u.immediate.gnim_hdr = *hdr;
2037 tx->tx_lntmsg[0] = lntmsg;
2038 kgnilnd_launch_tx(tx, net, &target);
2041 /* use stored value as we could have already finalized lntmsg here from a failed launch */
2043 cfs_memory_pressure_restore(mpflag);
2048 kgnilnd_reply(lnet_ni_t *ni, kgn_rx_t *rx, lnet_msg_t *lntmsg)
2050 kgn_conn_t *conn = rx->grx_conn;
2051 kgn_msg_t *rxmsg = rx->grx_msg;
2052 unsigned int niov = lntmsg->msg_niov;
2053 struct iovec *iov = lntmsg->msg_iov;
2054 lnet_kiov_t *kiov = lntmsg->msg_kiov;
2055 unsigned int offset = lntmsg->msg_offset;
2056 unsigned int nob = lntmsg->msg_len;
2060 tx = kgnilnd_new_tx_msg(GNILND_MSG_GET_DONE, ni->ni_nid);
2064 rc = kgnilnd_set_tx_id(tx, conn);
2068 rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, nob);
2072 tx->tx_lntmsg[0] = lntmsg;
2073 tx->tx_getinfo = rxmsg->gnm_u.get;
2075 /* we only queue from kgnilnd_recv - we might get called from other contexts
2076 * and we don't want to block the mutex in those cases */
2078 spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
2079 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
2080 spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
2081 kgnilnd_schedule_device(tx->tx_conn->gnc_device);
2086 kgnilnd_tx_done(tx, rc);
2087 kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
2089 lnet_finalize(ni, lntmsg, rc);
2093 kgnilnd_eager_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
2096 kgn_rx_t *rx = private;
2097 kgn_conn_t *conn = rx->grx_conn;
2098 kgn_msg_t *rxmsg = rx->grx_msg;
2099 kgn_msg_t *eagermsg = NULL;
2101 GNIDBG_MSG(D_NET, rxmsg, "eager recv for conn %p, rxmsg %p, lntmsg %p",
2102 conn, rxmsg, lntmsg);
2104 if (rxmsg->gnm_payload_len > *kgnilnd_tunables.kgn_max_immediate) {
2105 GNIDBG_MSG(D_ERROR, rxmsg, "payload too large %d",
2106 rxmsg->gnm_payload_len);
2110 /* we have no credits or buffers for this message, so copy it
2111 * somewhere for a later kgnilnd_recv */
2112 LIBCFS_ALLOC(eagermsg, sizeof(*eagermsg) + *kgnilnd_tunables.kgn_max_immediate);
2113 if (eagermsg == NULL) {
2114 CERROR("couldn't allocate eager rx message for conn %p to %s\n",
2115 conn, libcfs_nid2str(conn->gnc_peer->gnp_nid));
2119 /* copy msg and payload */
2120 memcpy(eagermsg, rxmsg, sizeof(*rxmsg) + rxmsg->gnm_payload_len);
2121 rx->grx_msg = eagermsg;
2124 /* stash this for lnet_finalize on cancel-on-conn-close */
2125 rx->grx_lntmsg = lntmsg;
2127 /* add conn ref to ensure it doesn't go away until all eager messages processed */
2128 kgnilnd_conn_addref(conn);
2130 /* keep the same rx_t, it just has a new grx_msg now */
2131 *new_private = private;
2133 /* release SMSG buffer */
2134 kgnilnd_release_msg(conn);
2140 kgnilnd_recv(lnet_ni_t *ni, void *private, lnet_msg_t *lntmsg,
2141 int delayed, unsigned int niov,
2142 struct iovec *iov, lnet_kiov_t *kiov,
2143 unsigned int offset, unsigned int mlen, unsigned int rlen)
2145 kgn_rx_t *rx = private;
2146 kgn_conn_t *conn = rx->grx_conn;
2147 kgn_msg_t *rxmsg = rx->grx_msg;
2153 LASSERT(!in_interrupt());
2154 LASSERTF(mlen <= rlen, "%d <= %d\n", mlen, rlen);
2155 /* Either all pages or all vaddrs */
2156 LASSERTF(!(kiov != NULL && iov != NULL), "kiov %p iov %p\n",
2159 GNIDBG_MSG(D_NET, rxmsg, "conn %p, rxmsg %p, lntmsg %p"
2160 " niov=%d kiov=%p iov=%p offset=%d mlen=%d rlen=%d",
2161 conn, rxmsg, lntmsg,
2162 niov, kiov, iov, offset, mlen, rlen);
2164 /* we need to lock here as recv can be called from any context */
2165 read_lock(&kgnilnd_data.kgn_peer_conn_lock);
2166 if (rx->grx_eager && conn->gnc_state != GNILND_CONN_ESTABLISHED) {
2167 read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
2169 /* someone closed the conn after we copied this out, nuke it */
2170 kgnilnd_consume_rx(rx);
2171 lnet_finalize(ni, lntmsg, conn->gnc_error);
2174 read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
2176 switch (rxmsg->gnm_type) {
2180 case GNILND_MSG_IMMEDIATE:
2181 if (mlen > rxmsg->gnm_payload_len) {
2182 GNIDBG_MSG(D_ERROR, rxmsg,
2183 "Immediate message from %s too big: %d > %d",
2184 libcfs_nid2str(conn->gnc_peer->gnp_nid), mlen,
2185 rxmsg->gnm_payload_len);
2187 kgnilnd_consume_rx(rx);
2191 /* rxmsg[1] is a pointer to the payload, sitting in the buffer
2192 * right after the kgn_msg_t header - so just 'cute' way of saying
2193 * rxmsg + sizeof(kgn_msg_t) */
2195 /* check payload checksum if sent */
2197 if (*kgnilnd_tunables.kgn_checksum >= 2 &&
2198 !rxmsg->gnm_payload_cksum &&
2199 rxmsg->gnm_payload_len != 0)
2200 GNIDBG_MSG(D_WARNING, rxmsg, "no msg payload checksum when enabled");
2202 if (rxmsg->gnm_payload_cksum != 0) {
2203 /* gnm_payload_len set in kgnilnd_sendmsg from tx->tx_nob,
2204 * which is what is used to calculate the cksum on the TX side */
2205 pload_cksum = kgnilnd_cksum(&rxmsg[1], rxmsg->gnm_payload_len);
2207 if (rxmsg->gnm_payload_cksum != pload_cksum) {
2208 GNIDBG_MSG(D_NETERROR, rxmsg,
2209 "Bad payload checksum (%x expected %x)",
2210 pload_cksum, rxmsg->gnm_payload_cksum);
2211 switch (*kgnilnd_tunables.kgn_checksum_dump) {
2213 kgnilnd_dump_blob(D_BUFFS, "bad payload checksum",
2214 &rxmsg[1], rxmsg->gnm_payload_len);
2215 /* fall through to dump */
2217 libcfs_debug_dumplog();
2223 /* checksum problems are fatal, kill the conn */
2224 kgnilnd_consume_rx(rx);
2225 kgnilnd_close_conn(conn, rc);
2231 lnet_copy_flat2kiov(
2233 *kgnilnd_tunables.kgn_max_immediate,
2234 &rxmsg[1], 0, mlen);
2238 *kgnilnd_tunables.kgn_max_immediate,
2239 &rxmsg[1], 0, mlen);
2241 kgnilnd_consume_rx(rx);
2242 lnet_finalize(ni, lntmsg, 0);
2245 case GNILND_MSG_PUT_REQ:
2246 /* LNET wants to truncate or drop transaction, sending NAK */
2248 kgnilnd_consume_rx(rx);
2249 lnet_finalize(ni, lntmsg, 0);
2251 /* only error if lntmsg == NULL, otherwise we are just
2252 * short circuiting the rdma process of 0 bytes */
2253 kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK,
2254 lntmsg == NULL ? -ENOENT : 0,
2255 rxmsg->gnm_u.get.gngm_cookie,
2259 /* sending ACK with sink buff. info */
2260 tx = kgnilnd_new_tx_msg(GNILND_MSG_PUT_ACK, ni->ni_nid);
2262 kgnilnd_consume_rx(rx);
2266 rc = kgnilnd_set_tx_id(tx, conn);
2268 GOTO(nak_put_req, rc);
2271 rc = kgnilnd_setup_rdma_buffer(tx, niov, iov, kiov, offset, mlen);
2273 GOTO(nak_put_req, rc);
2276 tx->tx_msg.gnm_u.putack.gnpam_src_cookie =
2277 rxmsg->gnm_u.putreq.gnprm_cookie;
2278 tx->tx_msg.gnm_u.putack.gnpam_dst_cookie = tx->tx_id.txe_cookie;
2279 tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_addr =
2280 (__u64)((unsigned long)tx->tx_buffer);
2281 tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_nob = mlen;
2283 tx->tx_lntmsg[0] = lntmsg; /* finalize this on RDMA_DONE */
2285 /* we only queue from kgnilnd_recv - we might get called from other contexts
2286 * and we don't want to block the mutex in those cases */
2288 spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
2289 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
2290 spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
2291 kgnilnd_schedule_device(tx->tx_conn->gnc_device);
2293 kgnilnd_consume_rx(rx);
2297 /* make sure we send an error back when the PUT fails */
2298 kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK, rc, rxmsg->gnm_u.get.gngm_cookie, ni->ni_nid);
2299 kgnilnd_tx_done(tx, rc);
2300 kgnilnd_consume_rx(rx);
2302 /* return magic LNet network error */
2305 case GNILND_MSG_GET_REQ:
2306 if (lntmsg != NULL) {
2308 kgnilnd_reply(ni, rx, lntmsg);
2311 kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK,
2313 rxmsg->gnm_u.get.gngm_cookie,
2316 kgnilnd_consume_rx(rx);
2322 /* needs write_lock on kgn_peer_conn_lock held */
2324 kgnilnd_check_conn_timeouts_locked(kgn_conn_t *conn)
2326 unsigned long timeout, keepalive;
2327 unsigned long now = jiffies;
2328 unsigned long newest_last_rx;
2331 /* given that we found this conn hanging off a peer, it better damned
2332 * well be connected */
2333 LASSERTF(conn->gnc_state == GNILND_CONN_ESTABLISHED,
2334 "conn 0x%p->%s with bad state%s\n", conn,
2335 conn->gnc_peer ? libcfs_nid2str(conn->gnc_peer->gnp_nid)
2337 kgnilnd_conn_state2str(conn));
2339 CDEBUG(D_NET, "checking conn %p->%s timeout %d keepalive %d "
2340 "rx_diff %lu tx_diff %lu\n",
2341 conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
2342 conn->gnc_timeout, GNILND_TO2KA(conn->gnc_timeout),
2343 cfs_duration_sec(now - conn->gnc_last_rx_cq),
2344 cfs_duration_sec(now - conn->gnc_last_tx));
2346 timeout = cfs_time_seconds(conn->gnc_timeout);
2347 keepalive = cfs_time_seconds(GNILND_TO2KA(conn->gnc_timeout));
2349 /* just in case our lack of RX msg processing is gumming up the works - give the
2350 * remove an extra chance */
2352 newest_last_rx = GNILND_LASTRX(conn);
2354 if (time_after_eq(now, newest_last_rx + timeout)) {
2355 GNIDBG_CONN(D_CONSOLE|D_NETERROR, conn, "No gnilnd traffic received from %s for %lu "
2356 "seconds, terminating connection. Is node down? ",
2357 libcfs_nid2str(conn->gnc_peer->gnp_nid),
2358 cfs_duration_sec(now - newest_last_rx));
2362 /* we don't timeout on last_tx stalls - we are going to trust the
2363 * underlying network to let us know when sends are failing.
2364 * At worst, the peer will timeout our RX stamp and drop the connection
2365 * at that point. We'll then see his CLOSE or at worst his RX
2366 * stamp stop and drop the connection on our end */
2368 if (time_after_eq(now, conn->gnc_last_tx + keepalive)) {
2369 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%lu)) "
2370 "last %lu/%lu/%lu %lus/%lus/%lus\n",
2371 libcfs_nid2str(conn->gnc_peer->gnp_nid), conn,
2372 cfs_duration_sec(jiffies - conn->gnc_last_tx),
2374 conn->gnc_last_noop_want, conn->gnc_last_noop_sent,
2375 conn->gnc_last_noop_cq,
2376 cfs_duration_sec(jiffies - conn->gnc_last_noop_want),
2377 cfs_duration_sec(jiffies - conn->gnc_last_noop_sent),
2378 cfs_duration_sec(jiffies - conn->gnc_last_noop_cq));
2379 set_mb(conn->gnc_last_noop_want, jiffies);
2380 atomic_inc(&conn->gnc_reaper_noop);
2381 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_NOOP_SEND))
2384 tx = kgnilnd_new_tx_msg(GNILND_MSG_NOOP, conn->gnc_peer->gnp_net->gnn_ni->ni_nid);
2387 kgnilnd_queue_tx(conn, tx);
2393 /* needs write_lock on kgn_peer_conn_lock held */
2395 kgnilnd_check_peer_timeouts_locked(kgn_peer_t *peer, struct list_head *todie,
2396 struct list_head *souls)
2398 unsigned long timeout;
2399 kgn_conn_t *conn, *connN = NULL;
2404 short releaseconn = 0;
2405 unsigned long first_rx = 0;
2407 CDEBUG(D_NET, "checking peer 0x%p->%s for timeouts; interval %lus\n",
2408 peer, libcfs_nid2str(peer->gnp_nid),
2409 peer->gnp_reconnect_interval);
2411 timeout = cfs_time_seconds(MAX(*kgnilnd_tunables.kgn_timeout,
2412 GNILND_MIN_TIMEOUT));
2414 conn = kgnilnd_find_conn_locked(peer);
2416 /* if there is a valid conn, check the queues for timeouts */
2417 rc = kgnilnd_check_conn_timeouts_locked(conn);
2419 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_RX_CLOSE_CLOSING)) {
2420 /* simulate a RX CLOSE after the timeout but before
2421 * the scheduler thread gets it */
2422 conn->gnc_close_recvd = GNILND_CLOSE_INJECT1;
2423 conn->gnc_peer_error = -ETIMEDOUT;
2425 /* Once we mark closed, any of the scheduler threads could
2426 * get it and move through before we hit the fail loc code */
2427 kgnilnd_close_conn_locked(conn, rc);
2429 /* first_rx is used to decide when to release a conn from purgatory.
2431 first_rx = conn->gnc_first_rx;
2435 /* now regardless of starting new conn, find tx on peer queue that
2436 * are old and smell bad - do this first so we don't trigger
2437 * reconnect on empty queue if we timeout all */
2438 list_for_each_entry_safe(tx, txN, &peer->gnp_tx_queue, tx_list) {
2439 if (time_after_eq(jiffies, tx->tx_qtime + timeout)) {
2441 LCONSOLE_INFO("could not send to %s due to connection"
2442 " setup failure after %lu seconds\n",
2443 libcfs_nid2str(peer->gnp_nid),
2444 cfs_duration_sec(jiffies - tx->tx_qtime));
2446 kgnilnd_tx_del_state_locked(tx, peer, NULL,
2448 list_add_tail(&tx->tx_list, todie);
2453 if (count || peer->gnp_connecting == GNILND_PEER_KILL) {
2454 CDEBUG(D_NET, "canceling %d tx for peer 0x%p->%s\n",
2455 count, peer, libcfs_nid2str(peer->gnp_nid));
2456 /* if we nuked all the TX, stop peer connection attempt (if there is one..) */
2457 if (list_empty(&peer->gnp_tx_queue) ||
2458 peer->gnp_connecting == GNILND_PEER_KILL) {
2459 /* we pass down todie to use a common function - but we know there are
2461 kgnilnd_cancel_peer_connect_locked(peer, todie);
2465 /* Don't reconnect if we are still trying to clear out old conns.
2466 * This prevents us sending traffic on the new mbox before ensuring we are done
2467 * with the old one */
2468 reconnect = (atomic_read(&peer->gnp_dirty_eps) == 0);
2470 /* if we are not connected and there are tx on the gnp_tx_queue waiting
2471 * to be sent, we'll check the reconnect interval and fire up a new
2472 * connection request */
2474 if ((peer->gnp_connecting == GNILND_PEER_IDLE) &&
2475 (time_after_eq(jiffies, peer->gnp_reconnect_time)) &&
2476 !list_empty(&peer->gnp_tx_queue) && reconnect) {
2478 CDEBUG(D_NET, "starting connect to %s\n",
2479 libcfs_nid2str(peer->gnp_nid));
2480 LASSERTF(peer->gnp_connecting == GNILND_PEER_IDLE, "Peer was idle and we"
2481 "have a write_lock, state issue %d\n", peer->gnp_connecting);
2483 peer->gnp_connecting = GNILND_PEER_CONNECT;
2484 kgnilnd_peer_addref(peer); /* extra ref for connd */
2486 spin_lock(&peer->gnp_net->gnn_dev->gnd_connd_lock);
2487 list_add_tail(&peer->gnp_connd_list,
2488 &peer->gnp_net->gnn_dev->gnd_connd_peers);
2489 spin_unlock(&peer->gnp_net->gnn_dev->gnd_connd_lock);
2491 kgnilnd_schedule_dgram(peer->gnp_net->gnn_dev);
2494 /* fail_loc to allow us to delay release of purgatory */
2495 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_PURG_REL_DELAY))
2498 /* This check allows us to verify that the new conn is actually being used. This allows us to
2499 * pull the old conns out of purgatory if they have actually seen traffic.
2500 * We only release a conn from purgatory during stack reset, admin command, or when a peer reconnects
2503 time_after(jiffies, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout))) {
2504 CDEBUG(D_NET,"We can release conn %p from purgatory %lu\n",
2505 conn, first_rx + cfs_time_seconds(*kgnilnd_tunables.kgn_hardware_timeout));
2509 list_for_each_entry_safe (conn, connN, &peer->gnp_conns, gnc_list) {
2510 /* check for purgatory timeouts */
2511 if (conn->gnc_in_purgatory) {
2512 /* We cannot detach this conn from purgatory if it has not been closed so we reschedule it
2513 * that way the next time we check it we can detach it from purgatory
2516 if (conn->gnc_state != GNILND_CONN_DONE) {
2517 /* Skip over conns that are currently not DONE. If they arent already scheduled
2518 * for completion something in the state machine is broken.
2523 /* We only detach a conn that is in purgatory if we have received a close message,
2524 * we have a new valid connection that has successfully received data, or an admin
2525 * command tells us we need to detach.
2528 if (conn->gnc_close_recvd || releaseconn || conn->gnc_needs_detach) {
2529 unsigned long waiting;
2531 waiting = (long) jiffies - conn->gnc_last_rx_cq;
2533 /* C.E: The remote peer is expected to close the
2534 * connection (see kgnilnd_check_conn_timeouts)
2535 * via the reaper thread and nuke out the MDD and
2536 * FMA resources after conn->gnc_timeout has expired
2537 * without an FMA RX */
2538 CDEBUG(D_NET, "Reconnected to %s in %lds or admin forced detach, dropping "
2539 " held resources\n",
2540 libcfs_nid2str(conn->gnc_peer->gnp_nid),
2541 cfs_duration_sec(waiting));
2543 kgnilnd_detach_purgatory_locked(conn, souls);
2552 kgnilnd_reaper_check(int idx)
2554 struct list_head *peers = &kgnilnd_data.kgn_peers[idx];
2555 struct list_head *ctmp, *ctmpN;
2556 struct list_head geriatrics;
2557 struct list_head souls;
2559 INIT_LIST_HEAD(&geriatrics);
2560 INIT_LIST_HEAD(&souls);
2562 write_lock(&kgnilnd_data.kgn_peer_conn_lock);
2564 list_for_each_safe(ctmp, ctmpN, peers) {
2565 kgn_peer_t *peer = NULL;
2567 /* don't timeout stuff if the network is mucked or shutting down */
2568 if (kgnilnd_check_hw_quiesce()) {
2571 peer = list_entry(ctmp, kgn_peer_t, gnp_list);
2573 kgnilnd_check_peer_timeouts_locked(peer, &geriatrics, &souls);
2576 write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
2578 kgnilnd_txlist_done(&geriatrics, -EHOSTUNREACH);
2579 kgnilnd_release_purgatory_list(&souls);
2583 kgnilnd_update_reaper_timeout(long timeout)
2585 LASSERT(timeout > 0);
2587 spin_lock(&kgnilnd_data.kgn_reaper_lock);
2589 if (timeout < kgnilnd_data.kgn_new_min_timeout)
2590 kgnilnd_data.kgn_new_min_timeout = timeout;
2592 spin_unlock(&kgnilnd_data.kgn_reaper_lock);
2596 kgnilnd_reaper_poke_with_stick(unsigned long arg)
2598 wake_up(&kgnilnd_data.kgn_reaper_waitq);
2602 kgnilnd_reaper(void *arg)
2607 unsigned long next_check_time = jiffies;
2608 long current_min_timeout = MAX_SCHEDULE_TIMEOUT;
2609 struct timer_list timer;
2612 cfs_daemonize("kgnilnd_rpr");
2613 cfs_block_allsigs();
2615 /* all gnilnd threads need to run fairly urgently */
2616 set_user_nice(current, *kgnilnd_tunables.kgn_nice);
2617 spin_lock(&kgnilnd_data.kgn_reaper_lock);
2619 while (!kgnilnd_data.kgn_shutdown) {
2620 /* I wake up every 'p' seconds to check for timeouts on some
2621 * more peers. I try to check every connection 'n' times
2622 * within the global minimum of all keepalive and timeout
2623 * intervals, to ensure I attend to every connection within
2624 * (n+1)/n times its timeout intervals. */
2625 const int p = GNILND_REAPER_THREAD_WAKE;
2626 const int n = GNILND_REAPER_NCHECKS;
2628 /* to quiesce or to not quiesce, that is the question */
2629 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
2630 spin_unlock(&kgnilnd_data.kgn_reaper_lock);
2631 KGNILND_SPIN_QUIESCE;
2632 spin_lock(&kgnilnd_data.kgn_reaper_lock);
2635 /* careful with the jiffy wrap... */
2636 timeout = (long)(next_check_time - jiffies);
2639 prepare_to_wait(&kgnilnd_data.kgn_reaper_waitq, &wait,
2640 TASK_INTERRUPTIBLE);
2641 spin_unlock(&kgnilnd_data.kgn_reaper_lock);
2642 setup_timer(&timer, kgnilnd_reaper_poke_with_stick,
2644 mod_timer(&timer, (long) jiffies + timeout);
2646 /* check flag variables before comitting */
2647 if (!kgnilnd_data.kgn_shutdown &&
2648 !kgnilnd_data.kgn_quiesce_trigger) {
2649 CDEBUG(D_INFO, "schedule timeout %ld (%lu sec)\n",
2650 timeout, cfs_duration_sec(timeout));
2652 CDEBUG(D_INFO, "awake after schedule\n");
2655 del_singleshot_timer_sync(&timer);
2656 spin_lock(&kgnilnd_data.kgn_reaper_lock);
2657 finish_wait(&kgnilnd_data.kgn_reaper_waitq, &wait);
2661 /* new_min_timeout is set from the conn timeouts and keepalive
2662 * this should end up with a min timeout of
2663 * GNILND_TIMEOUT2KEEPALIVE(t) or roughly LND_TIMEOUT/2 */
2664 if (kgnilnd_data.kgn_new_min_timeout < current_min_timeout) {
2665 current_min_timeout = kgnilnd_data.kgn_new_min_timeout;
2666 CDEBUG(D_NET, "Set new min timeout %ld\n",
2667 current_min_timeout);
2670 spin_unlock(&kgnilnd_data.kgn_reaper_lock);
2672 /* Compute how many table entries to check now so I get round
2673 * the whole table fast enough given that I do this at fixed
2674 * intervals of 'p' seconds) */
2675 chunk = *kgnilnd_tunables.kgn_peer_hash_size;
2676 if (kgnilnd_data.kgn_new_min_timeout > n * p)
2677 chunk = (chunk * n * p) /
2678 kgnilnd_data.kgn_new_min_timeout;
2681 for (i = 0; i < chunk; i++) {
2682 kgnilnd_reaper_check(hash_index);
2683 hash_index = (hash_index + 1) %
2684 *kgnilnd_tunables.kgn_peer_hash_size;
2686 next_check_time = (long) jiffies + cfs_time_seconds(p);
2687 CDEBUG(D_INFO, "next check at %lu or in %d sec\n", next_check_time, p);
2689 spin_lock(&kgnilnd_data.kgn_reaper_lock);
2692 spin_unlock(&kgnilnd_data.kgn_reaper_lock);
2694 kgnilnd_thread_fini();
2699 kgnilnd_check_rdma_cq(kgn_device_t *dev)
2702 gni_post_descriptor_t *desc;
2704 kgn_tx_ev_id_t ev_id;
2706 int should_retry, rc;
2707 long num_processed = 0;
2708 kgn_conn_t *conn = NULL;
2709 kgn_tx_t *tx = NULL;
2712 /* make sure we don't keep looping if we need to reset */
2713 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
2714 return num_processed;
2716 rc = kgnilnd_mutex_trylock(&dev->gnd_cq_mutex);
2718 /* we didn't get the mutex, so return that there is still work
2722 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DELAY_RDMA)) {
2723 /* a bit gross - but we need a good way to test for
2724 * delayed RDMA completions and the easiest way to do
2725 * that is to delay the RDMA CQ events */
2726 rrc = GNI_RC_NOT_DONE;
2728 rrc = kgnilnd_cq_get_event(dev->gnd_snd_rdma_cqh, &event_data);
2731 if (rrc == GNI_RC_NOT_DONE) {
2732 mutex_unlock(&dev->gnd_cq_mutex);
2733 CDEBUG(D_INFO, "SEND RDMA CQ %d empty processed %ld\n",
2734 dev->gnd_id, num_processed);
2735 return num_processed;
2737 dev->gnd_sched_alive = jiffies;
2740 LASSERTF(!GNI_CQ_OVERRUN(event_data),
2741 "this is bad, somehow our credits didn't protect us"
2742 " from CQ overrun\n");
2743 LASSERTF(GNI_CQ_GET_TYPE(event_data) == GNI_CQ_EVENT_TYPE_POST,
2744 "rrc %d, GNI_CQ_GET_TYPE("LPX64") = "LPX64"\n", rrc,
2745 event_data, GNI_CQ_GET_TYPE(event_data));
2747 rrc = kgnilnd_get_completed(dev->gnd_snd_rdma_cqh, event_data,
2749 mutex_unlock(&dev->gnd_cq_mutex);
2751 /* XXX Nic: Need better error handling here... */
2752 LASSERTF((rrc == GNI_RC_SUCCESS) ||
2753 (rrc == GNI_RC_TRANSACTION_ERROR),
2756 ev_id.txe_cookie = desc->post_id;
2758 kgnilnd_validate_tx_ev_id(&ev_id, &tx, &conn);
2760 if (conn == NULL || tx == NULL) {
2761 /* either conn or tx was already nuked and this is a "late"
2762 * completion, so drop it */
2766 GNITX_ASSERTF(tx, tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE ||
2767 tx->tx_msg.gnm_type == GNILND_MSG_GET_DONE,
2768 "tx %p with type %d\n", tx, tx->tx_msg.gnm_type);
2770 GNIDBG_TX(D_NET, tx, "RDMA completion for %d bytes", tx->tx_nob);
2772 /* remove from rdmaq */
2773 spin_lock(&conn->gnc_list_lock);
2774 kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
2775 spin_unlock(&conn->gnc_list_lock);
2777 if (likely(desc->status == GNI_RC_SUCCESS)) {
2778 atomic_inc(&dev->gnd_rdma_ntx);
2779 atomic64_add(tx->tx_nob, &dev->gnd_rdma_txbytes);
2780 /* transaction succeeded, add into fmaq */
2781 kgnilnd_queue_tx(conn, tx);
2782 kgnilnd_peer_alive(conn->gnc_peer);
2784 /* drop ref from kgnilnd_validate_tx_ev_id */
2785 kgnilnd_conn_decref(conn);
2789 /* fall through to the TRANSACTION_ERROR case */
2792 /* get stringified version for log messages */
2793 kgnilnd_cq_error_str(event_data, &err_str, 256);
2794 kgnilnd_cq_error_recoverable(event_data, &should_retry);
2796 /* make sure we are not off in the weeds with this tx */
2797 if (tx->tx_retrans >
2798 *kgnilnd_tunables.kgn_max_retransmits) {
2799 GNIDBG_TX(D_NETERROR, tx,
2800 "giving up on TX, too many retries", NULL);
2804 GNIDBG_TX(D_NETERROR, tx, "RDMA %s error (%s)",
2805 should_retry ? "transient" : "unrecoverable", err_str);
2807 if (tx->tx_msg.gnm_type == GNILND_MSG_PUT_DONE) {
2809 kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
2810 &tx->tx_putinfo.gnpam_desc,
2811 tx->tx_putinfo.gnpam_desc.gnrd_nob,
2812 tx->tx_putinfo.gnpam_dst_cookie);
2814 kgnilnd_nak_rdma(conn, GNILND_MSG_PUT_NAK,
2816 tx->tx_putinfo.gnpam_dst_cookie,
2817 tx->tx_msg.gnm_srcnid);
2818 kgnilnd_tx_done(tx, -EFAULT);
2822 kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
2823 &tx->tx_getinfo.gngm_desc,
2824 tx->tx_lntmsg[0]->msg_len,
2825 tx->tx_getinfo.gngm_cookie);
2827 kgnilnd_nak_rdma(conn, GNILND_MSG_GET_NAK,
2829 tx->tx_getinfo.gngm_cookie,
2830 tx->tx_msg.gnm_srcnid);
2831 kgnilnd_tx_done(tx, -EFAULT);
2835 /* drop ref from kgnilnd_validate_tx_ev_id */
2836 kgnilnd_conn_decref(conn);
2841 kgnilnd_check_fma_send_cq(kgn_device_t *dev)
2845 kgn_tx_ev_id_t ev_id;
2846 kgn_tx_t *tx = NULL;
2847 kgn_conn_t *conn = NULL;
2848 int queued_fma, saw_reply, rc;
2849 long num_processed = 0;
2852 /* make sure we don't keep looping if we need to reset */
2853 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
2854 return num_processed;
2857 rc = kgnilnd_mutex_trylock(&dev->gnd_cq_mutex);
2859 /* we didn't get the mutex, so return that there is still work
2864 rrc = kgnilnd_cq_get_event(dev->gnd_snd_fma_cqh, &event_data);
2865 mutex_unlock(&dev->gnd_cq_mutex);
2867 if (rrc == GNI_RC_NOT_DONE) {
2869 "SMSG send CQ %d not ready (data "LPX64") "
2870 "processed %ld\n", dev->gnd_id, event_data,
2872 return num_processed;
2875 dev->gnd_sched_alive = jiffies;
2878 LASSERTF(!GNI_CQ_OVERRUN(event_data),
2879 "this is bad, somehow our credits didn't "
2880 "protect us from CQ overrun\n");
2881 LASSERTF(GNI_CQ_GET_TYPE(event_data) == GNI_CQ_EVENT_TYPE_SMSG,
2882 "rrc %d, GNI_CQ_GET_TYPE("LPX64") = "LPX64"\n", rrc,
2883 event_data, GNI_CQ_GET_TYPE(event_data));
2885 /* if SMSG couldn't handle an error, time for conn to die */
2886 if (unlikely(rrc == GNI_RC_TRANSACTION_ERROR)) {
2889 /* need to take the write_lock to ensure atomicity
2890 * on the conn state if we need to close it */
2891 write_lock(&kgnilnd_data.kgn_peer_conn_lock);
2892 conn = kgnilnd_cqid2conn_locked(GNI_CQ_GET_INST_ID(event_data));
2894 /* Conn was destroyed? */
2896 "SMSG CQID lookup "LPX64" failed\n",
2897 GNI_CQ_GET_INST_ID(event_data));
2898 write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
2902 kgnilnd_cq_error_str(event_data, &err_str, 256);
2903 CNETERR("SMSG send error to %s: rc %d (%s)\n",
2904 libcfs_nid2str(conn->gnc_peer->gnp_nid),
2906 kgnilnd_close_conn_locked(conn, -ECOMM);
2908 write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
2910 /* no need to process rest of this tx -
2911 * it is getting canceled */
2915 /* fall through to GNI_RC_SUCCESS case */
2916 ev_id.txe_smsg_id = GNI_CQ_GET_MSG_ID(event_data);
2918 kgnilnd_validate_tx_ev_id(&ev_id, &tx, &conn);
2919 if (conn == NULL || tx == NULL) {
2920 /* either conn or tx was already nuked and this is a "late"
2921 * completion, so drop it */
2925 tx->tx_conn->gnc_last_tx_cq = jiffies;
2926 if (tx->tx_msg.gnm_type == GNILND_MSG_NOOP) {
2927 set_mb(conn->gnc_last_noop_cq, jiffies);
2930 /* lock tx_list_state and tx_state */
2931 spin_lock(&tx->tx_conn->gnc_list_lock);
2933 GNITX_ASSERTF(tx, tx->tx_list_state == GNILND_TX_LIVE_FMAQ,
2934 "state not GNILND_TX_LIVE_FMAQ", NULL);
2935 GNITX_ASSERTF(tx, tx->tx_state & GNILND_TX_WAITING_COMPLETION,
2936 "not waiting for completion", NULL);
2938 GNIDBG_TX(D_NET, tx, "SMSG complete tx_state %x rc %d",
2941 tx->tx_state &= ~GNILND_TX_WAITING_COMPLETION;
2943 /* This will trigger other FMA sends that were
2944 * pending this completion */
2945 queued_fma = !list_empty(&tx->tx_conn->gnc_fmaq);
2947 /* we either did not expect reply or we already got it */
2948 saw_reply = !(tx->tx_state & GNILND_TX_WAITING_REPLY);
2950 spin_unlock(&tx->tx_conn->gnc_list_lock);
2953 CDEBUG(D_NET, "scheduling conn 0x%p->%s for fmaq\n",
2955 libcfs_nid2str(conn->gnc_peer->gnp_nid));
2956 kgnilnd_schedule_conn(conn);
2959 /* If saw_reply is false as soon as gnc_list_lock is dropped the tx could be nuked
2960 * If saw_reply is true we know that the tx is safe to use as the other thread
2961 * is already finished with it.
2965 /* no longer need to track on the live_fmaq */
2966 kgnilnd_tx_del_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_ALLOCD);
2968 if (tx->tx_state & GNILND_TX_PENDING_RDMA) {
2969 /* we already got reply & were waiting for
2970 * completion of initial send */
2971 /* to initiate RDMA transaction */
2972 GNIDBG_TX(D_NET, tx,
2973 "Pending RDMA 0x%p type 0x%02x",
2974 tx->tx_msg.gnm_type);
2975 tx->tx_state &= ~GNILND_TX_PENDING_RDMA;
2976 rc = kgnilnd_send_mapped_tx(tx, 0);
2977 GNITX_ASSERTF(tx, rc == 0, "RDMA send failed: %d\n", rc);
2979 /* we are done with this tx */
2980 GNIDBG_TX(D_NET, tx,
2981 "Done with tx type 0x%02x",
2982 tx->tx_msg.gnm_type);
2983 kgnilnd_tx_done(tx, tx->tx_rc);
2987 /* drop ref from kgnilnd_validate_tx_ev_id */
2988 kgnilnd_conn_decref(conn);
2990 /* if we are waiting for a REPLY, we'll handle the tx then */
2991 } /* end for loop */
2995 kgnilnd_check_fma_rcv_cq(kgn_device_t *dev)
3000 long num_processed = 0;
3001 struct list_head *conns;
3002 struct list_head *tmp;
3006 /* make sure we don't keep looping if we need to reset */
3007 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
3008 return num_processed;
3011 rc = kgnilnd_mutex_trylock(&dev->gnd_cq_mutex);
3013 /* we didn't get the mutex, so return that there is still work
3017 rrc = kgnilnd_cq_get_event(dev->gnd_rcv_fma_cqh, &event_data);
3018 mutex_unlock(&dev->gnd_cq_mutex);
3020 if (rrc == GNI_RC_NOT_DONE) {
3021 CDEBUG(D_INFO, "SMSG RX CQ %d empty data "LPX64" "
3023 dev->gnd_id, event_data, num_processed);
3024 return num_processed;
3026 dev->gnd_sched_alive = jiffies;
3029 /* this is the only CQ that can really handle transient
3031 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_CQ_GET_EVENT)) {
3032 rrc = cfs_fail_val ? cfs_fail_val
3033 : GNI_RC_ERROR_RESOURCE;
3034 if (rrc == GNI_RC_ERROR_RESOURCE) {
3035 /* set overrun too */
3036 event_data |= (1UL << 63);
3037 LASSERTF(GNI_CQ_OVERRUN(event_data),
3038 "(1UL << 63) is no longer the bit to"
3039 "set to indicate CQ_OVERRUN\n");
3042 /* sender should get error event too and take care
3043 of failed transaction by re-transmitting */
3044 if (rrc == GNI_RC_TRANSACTION_ERROR) {
3045 CDEBUG(D_NET, "SMSG RX CQ error "LPX64"\n", event_data);
3049 if (likely(!GNI_CQ_OVERRUN(event_data))) {
3050 read_lock(&kgnilnd_data.kgn_peer_conn_lock);
3051 conn = kgnilnd_cqid2conn_locked(
3052 GNI_CQ_GET_INST_ID(event_data));
3054 CDEBUG(D_NET, "SMSG RX CQID lookup "LPU64" "
3055 "failed, dropping event "LPX64"\n",
3056 GNI_CQ_GET_INST_ID(event_data),
3059 CDEBUG(D_NET, "SMSG RX: CQID "LPU64" "
3061 GNI_CQ_GET_INST_ID(event_data),
3062 conn, conn->gnc_peer ?
3063 libcfs_nid2str(conn->gnc_peer->gnp_nid) :
3066 conn->gnc_last_rx_cq = jiffies;
3068 /* stash first rx so we can clear out purgatory.
3070 if (conn->gnc_first_rx == 0) {
3071 conn->gnc_first_rx = jiffies;
3073 kgnilnd_peer_alive(conn->gnc_peer);
3074 kgnilnd_schedule_conn(conn);
3076 read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
3080 /* FMA CQ has overflowed: check ALL conns */
3081 CNETERR("SMSG RX CQ overflow: scheduling ALL "
3082 "conns on device %d\n", dev->gnd_id);
3084 for (rc = 0; rc < *kgnilnd_tunables.kgn_peer_hash_size; rc++) {
3086 read_lock(&kgnilnd_data.kgn_peer_conn_lock);
3087 conns = &kgnilnd_data.kgn_conns[rc];
3089 list_for_each(tmp, conns) {
3090 conn = list_entry(tmp, kgn_conn_t,
3093 if (conn->gnc_device == dev) {
3094 kgnilnd_schedule_conn(conn);
3095 conn->gnc_last_rx_cq = jiffies;
3099 /* don't block write lockers for too long... */
3100 read_unlock(&kgnilnd_data.kgn_peer_conn_lock);
3105 /* try_map_if_full should only be used when processing TX from list of
3106 * backlog TX waiting on mappings to free up
3109 * try_map_if_full = 0: 0 (sent or queued), (-|+)errno failure of kgnilnd_sendmsg
3110 * try_map_if_full = 1: 0 (sent), -ENOMEM for caller to requeue, (-|+)errno failure of kgnilnd_sendmsg */
3113 kgnilnd_send_mapped_tx(kgn_tx_t *tx, int try_map_if_full)
3115 /* slight bit of race if multiple people calling, but at worst we'll have
3116 * order altered just a bit... which would not be determenistic anyways */
3117 int rc = atomic_read(&tx->tx_conn->gnc_device->gnd_nq_map);
3119 GNIDBG_TX(D_NET, tx, "try %d nq_map %d", try_map_if_full, rc);
3121 /* We know that we have a GART reservation that should guarantee forward progress.
3122 * This means we don't need to take any extraordinary efforts if we are failing
3123 * mappings here - even if we are holding a very small number of these. */
3125 if (try_map_if_full || (rc == 0)) {
3126 rc = kgnilnd_map_buffer(tx);
3129 /* rc should be 0 if we mapped succesfully here, if non-zero we are queueing */
3131 /* if try_map_if_full set, they handle requeuing */
3132 if (unlikely(try_map_if_full)) {
3135 spin_lock(&tx->tx_conn->gnc_device->gnd_lock);
3136 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 1);
3137 spin_unlock(&tx->tx_conn->gnc_device->gnd_lock);
3138 /* make sure we wake up sched to run this */
3139 kgnilnd_schedule_device(tx->tx_conn->gnc_device);
3140 /* return 0 as this is now queued for later sending */
3145 switch (tx->tx_msg.gnm_type) {
3149 /* GET_REQ and PUT_ACK are outbound messages sending our mapping key to
3150 * remote node where the RDMA will be started
3151 * Special case -EAGAIN logic - this should just queued as if the mapping couldn't
3152 * be satisified. The rest of the errors are "hard" errors that require
3153 * upper layers to handle themselves */
3154 case GNILND_MSG_GET_REQ:
3155 tx->tx_msg.gnm_u.get.gngm_desc.gnrd_key = tx->tx_map_key;
3156 tx->tx_msg.gnm_u.get.gngm_cookie = tx->tx_id.txe_cookie;
3157 tx->tx_msg.gnm_u.get.gngm_desc.gnrd_addr = (__u64)((unsigned long)tx->tx_buffer);
3158 tx->tx_msg.gnm_u.get.gngm_desc.gnrd_nob = tx->tx_nob;
3159 tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
3160 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_GET_REQ_AGAIN)) {
3161 tx->tx_state |= GNILND_TX_FAIL_SMSG;
3163 /* redirect to FMAQ on failure, no need to infinite loop here in MAPQ */
3164 rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
3166 case GNILND_MSG_PUT_ACK:
3167 tx->tx_msg.gnm_u.putack.gnpam_desc.gnrd_key = tx->tx_map_key;
3168 tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
3169 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_PUT_ACK_AGAIN)) {
3170 tx->tx_state |= GNILND_TX_FAIL_SMSG;
3172 /* redirect to FMAQ on failure, no need to infinite loop here in MAPQ */
3173 rc = kgnilnd_sendmsg(tx, NULL, 0, &tx->tx_conn->gnc_list_lock, GNILND_TX_FMAQ);
3176 /* PUT_REQ and GET_DONE are where we do the actual RDMA */
3177 case GNILND_MSG_PUT_REQ:
3178 kgnilnd_rdma(tx, GNILND_MSG_PUT_DONE,
3179 &tx->tx_putinfo.gnpam_desc,
3180 tx->tx_putinfo.gnpam_desc.gnrd_nob,
3181 tx->tx_putinfo.gnpam_dst_cookie);
3183 case GNILND_MSG_GET_DONE:
3184 kgnilnd_rdma(tx, GNILND_MSG_GET_DONE,
3185 &tx->tx_getinfo.gngm_desc,
3186 tx->tx_lntmsg[0]->msg_len,
3187 tx->tx_getinfo.gngm_cookie);
3196 kgnilnd_process_fmaq(kgn_conn_t *conn)
3199 kgn_tx_t *tx = NULL;
3200 void *buffer = NULL;
3201 unsigned int nob = 0;
3204 /* NB 1. kgnilnd_sendmsg() may fail if I'm out of credits right now.
3205 * However I will be rescheduled by an FMA completion event
3206 * when I eventually get some.
3207 * NB 2. Sampling gnc_state here races with setting it elsewhere.
3208 * But it doesn't matter if I try to send a "real" message just
3209 * as I start closing because I'll get scheduled to send the
3212 /* Short circuit if the ep_handle is null we cant send anyway. */
3213 if (conn->gnc_ephandle == NULL)
3216 LASSERTF(!conn->gnc_close_sent, "Conn %p close was sent\n", conn);
3218 spin_lock(&conn->gnc_list_lock);
3220 if (list_empty(&conn->gnc_fmaq)) {
3221 int keepalive = GNILND_TO2KA(conn->gnc_timeout);
3223 spin_unlock(&conn->gnc_list_lock);
3225 if (time_after_eq(jiffies, conn->gnc_last_tx + cfs_time_seconds(keepalive))) {
3226 CDEBUG(D_NET, "sending NOOP -> %s (%p idle %lu(%d)) "
3227 "last %lu/%lu/%lu %lus/%lus/%lus\n",
3228 libcfs_nid2str(conn->gnc_peer->gnp_nid), conn,
3229 cfs_duration_sec(jiffies - conn->gnc_last_tx),
3231 conn->gnc_last_noop_want, conn->gnc_last_noop_sent,
3232 conn->gnc_last_noop_cq,
3233 cfs_duration_sec(jiffies - conn->gnc_last_noop_want),
3234 cfs_duration_sec(jiffies - conn->gnc_last_noop_sent),
3235 cfs_duration_sec(jiffies - conn->gnc_last_noop_cq));
3236 atomic_inc(&conn->gnc_sched_noop);
3237 set_mb(conn->gnc_last_noop_want, jiffies);
3239 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_NOOP_SEND))
3242 tx = kgnilnd_new_tx_msg(GNILND_MSG_NOOP, conn->gnc_peer->gnp_net->gnn_ni->ni_nid);
3246 rc = kgnilnd_set_tx_id(tx, conn);
3248 kgnilnd_tx_done(tx, rc);
3254 tx = list_first_entry(&conn->gnc_fmaq, kgn_tx_t, tx_list);
3255 /* move from fmaq to allocd, kgnilnd_sendmsg will move to live_fmaq */
3256 kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
3257 more_to_do = !list_empty(&conn->gnc_fmaq);
3258 spin_unlock(&conn->gnc_list_lock);
3261 /* if there is no real TX or no NOOP to send, bail */
3266 if (!tx->tx_retrans)
3267 tx->tx_cred_wait = jiffies;
3269 GNITX_ASSERTF(tx, tx->tx_id.txe_smsg_id != 0,
3270 "tx with zero id", NULL);
3272 CDEBUG(D_NET, "sending regular msg: %p, type %s(0x%02x), cookie "LPX64"\n",
3273 tx, kgnilnd_msgtype2str(tx->tx_msg.gnm_type),
3274 tx->tx_msg.gnm_type, tx->tx_id.txe_cookie);
3278 switch (tx->tx_msg.gnm_type) {
3282 case GNILND_MSG_NOOP:
3283 case GNILND_MSG_CLOSE:
3284 case GNILND_MSG_IMMEDIATE:
3285 tx->tx_state = GNILND_TX_WAITING_COMPLETION;
3286 buffer = tx->tx_buffer;
3290 case GNILND_MSG_GET_DONE:
3291 case GNILND_MSG_PUT_DONE:
3292 case GNILND_MSG_PUT_NAK:
3293 case GNILND_MSG_GET_NAK:
3294 tx->tx_state = GNILND_TX_WAITING_COMPLETION;
3297 case GNILND_MSG_PUT_REQ:
3298 tx->tx_msg.gnm_u.putreq.gnprm_cookie = tx->tx_id.txe_cookie;
3300 case GNILND_MSG_PUT_ACK:
3301 case GNILND_MSG_GET_REQ:
3302 /* This is really only to handle the retransmit of SMSG once these
3303 * two messages are setup in send_mapped_tx */
3304 tx->tx_state = GNILND_TX_WAITING_COMPLETION | GNILND_TX_WAITING_REPLY;
3308 if (likely(rc == 0)) {
3309 rc = kgnilnd_sendmsg(tx, buffer, nob, &conn->gnc_list_lock, GNILND_TX_FMAQ);
3313 /* don't explicitly reschedule here - we are short credits and will rely on
3314 * kgnilnd_sendmsg to resched the conn if need be */
3316 } else if (rc < 0) {
3317 /* bail: it wasn't sent and we didn't get EAGAIN indicating we should retrans
3318 * almost certainly a software bug, but lets play nice with the other kids */
3319 kgnilnd_tx_done(tx, rc);
3320 /* just for fun, kick peer in arse - resetting conn might help to correct
3321 * this almost certainly buggy software caused return code */
3322 kgnilnd_close_conn(conn, rc);
3326 CDEBUG(D_NET, "Rescheduling %p (more to do)\n", conn);
3327 kgnilnd_schedule_conn(conn);
3332 kgnilnd_process_rdmaq(kgn_device_t *dev)
3337 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DELAY_RDMAQ)) {
3341 if (time_after_eq(jiffies, dev->gnd_rdmaq_deadline)) {
3342 unsigned long dead_bump;
3345 /* if we think we need to adjust, take lock to serialize and recheck */
3346 spin_lock(&dev->gnd_rdmaq_lock);
3347 if (time_after_eq(jiffies, dev->gnd_rdmaq_deadline)) {
3348 del_singleshot_timer_sync(&dev->gnd_rdmaq_timer);
3350 dead_bump = cfs_time_seconds(1) / *kgnilnd_tunables.kgn_rdmaq_intervals;
3352 /* roll the bucket forward */
3353 dev->gnd_rdmaq_deadline = jiffies + dead_bump;
3355 if (kgnilnd_data.kgn_rdmaq_override &&
3356 (*kgnilnd_tunables.kgn_rdmaq_intervals != 0)) {
3357 new_ok = kgnilnd_data.kgn_rdmaq_override / *kgnilnd_tunables.kgn_rdmaq_intervals;
3362 /* roll current outstanding forward to make sure we carry outstanding
3363 * committment forward
3364 * new_ok starts out as the whole interval value
3365 * - first subtract bytes_out from last interval, as that would push us over
3366 * strict limits for this interval
3367 * - second, set bytes_ok to new_ok to ensure it doesn't exceed the current auth
3369 * there is a small race here if someone is actively processing mappings and
3370 * adding to rdmaq_bytes_out, but it should be small as the mappings are triggered
3371 * quite quickly after kgnilnd_auth_rdma_bytes gives us the go-ahead
3372 * - if this gives us problems in the future, we could use a read/write lock
3373 * to protect the resetting of these values */
3374 new_ok -= atomic64_read(&dev->gnd_rdmaq_bytes_out);
3375 atomic64_set(&dev->gnd_rdmaq_bytes_ok, new_ok);
3377 CDEBUG(D_NET, "resetting rdmaq bytes to %ld, deadline +%lu -> %lu, "
3378 "current out %ld\n",
3379 atomic64_read(&dev->gnd_rdmaq_bytes_ok), dead_bump, dev->gnd_rdmaq_deadline,
3380 atomic64_read(&dev->gnd_rdmaq_bytes_out));
3382 spin_unlock(&dev->gnd_rdmaq_lock);
3385 spin_lock(&dev->gnd_rdmaq_lock);
3386 while (!list_empty(&dev->gnd_rdmaq)) {
3389 /* make sure we break out early on quiesce */
3390 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
3391 /* always break with lock held - we unlock outside loop */
3395 tx = list_first_entry(&dev->gnd_rdmaq, kgn_tx_t, tx_list);
3396 kgnilnd_tx_del_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_ALLOCD);
3399 /* sample with lock held, serializing with kgnilnd_complete_closed_conn */
3400 if (tx->tx_conn->gnc_state != GNILND_CONN_ESTABLISHED) {
3401 /* if conn is dying, mark tx in tx_ref_table for
3402 * kgnilnd_complete_closed_conn to finish up */
3403 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_DYING, 1);
3405 /* tx was moved to DYING, get next */
3408 spin_unlock(&dev->gnd_rdmaq_lock);
3410 rc = kgnilnd_auth_rdma_bytes(dev, tx);
3411 spin_lock(&dev->gnd_rdmaq_lock);
3414 /* no ticket! add back to head */
3415 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_RDMAQ, 0);
3416 /* clear found_work so scheduler threads wait for timer */
3420 /* TX is GO for launch */
3421 tx->tx_qtime = jiffies;
3422 kgnilnd_send_mapped_tx(tx, 0);
3426 spin_unlock(&dev->gnd_rdmaq_lock);
3432 kgnilnd_swab_rdma_desc(kgn_rdma_desc_t *d)
3434 __swab64s(&d->gnrd_key.qword1);
3435 __swab64s(&d->gnrd_key.qword2);
3436 __swab64s(&d->gnrd_addr);
3437 __swab32s(&d->gnrd_nob);
3440 #define kgnilnd_match_reply_either(w, x, y, z) _kgnilnd_match_reply(w, x, y, z)
3441 #define kgnilnd_match_reply(x, y, z) _kgnilnd_match_reply(x, y, GNILND_MSG_NONE, z)
3444 _kgnilnd_match_reply(kgn_conn_t *conn, int type1, int type2, __u64 cookie)
3446 kgn_tx_ev_id_t ev_id;
3449 /* we use the cookie from the original TX, so we can find the match
3450 * by parsing that and using the txe_idx */
3451 ev_id.txe_cookie = cookie;
3453 tx = conn->gnc_tx_ref_table[ev_id.txe_idx];
3456 /* check tx to make sure kgni didn't eat it */
3457 GNITX_ASSERTF(tx, tx->tx_msg.gnm_magic == GNILND_MSG_MAGIC,
3458 "came back from kgni with bad magic %x\n", tx->tx_msg.gnm_magic);
3460 GNITX_ASSERTF(tx, ((tx->tx_id.txe_idx == ev_id.txe_idx) &&
3461 (tx->tx_id.txe_cookie = cookie)),
3462 "conn 0x%p->%s tx_ref_table hosed: wanted "
3463 "txe_cookie "LPX64" txe_idx %d "
3464 "found tx %p cookie "LPX64" txe_idx %d\n",
3465 conn, libcfs_nid2str(conn->gnc_peer->gnp_nid),
3466 cookie, ev_id.txe_idx,
3467 tx, tx->tx_id.txe_cookie, tx->tx_id.txe_idx);
3469 LASSERTF((((tx->tx_msg.gnm_type == type1) || (tx->tx_msg.gnm_type == type2)) &&
3470 (tx->tx_state & GNILND_TX_WAITING_REPLY)),
3471 "Unexpected TX type (%x, %x or %x) "
3472 "or state (%x, expected +%x) "
3473 "matched reply from %s\n",
3474 tx->tx_msg.gnm_type, type1, type2,
3475 tx->tx_state, GNILND_TX_WAITING_REPLY,
3476 libcfs_nid2str(conn->gnc_peer->gnp_nid));
3478 CWARN("Unmatched reply %02x, or %02x/"LPX64" from %s\n",
3479 type1, type2, cookie, libcfs_nid2str(conn->gnc_peer->gnp_nid));
3485 kgnilnd_complete_tx(kgn_tx_t *tx, int rc)
3488 kgn_conn_t *conn = tx->tx_conn;
3490 spin_lock(&conn->gnc_list_lock);
3492 GNITX_ASSERTF(tx, tx->tx_state & GNILND_TX_WAITING_REPLY,
3493 "not waiting for reply", NULL);
3496 tx->tx_state &= ~GNILND_TX_WAITING_REPLY;
3498 if (!(tx->tx_state & GNILND_TX_WAITING_COMPLETION)) {
3499 kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
3500 /* sample under lock as follow on steps require gnc_list_lock
3501 * - or call kgnilnd_tx_done which requires no locks held over
3502 * call to lnet_finalize */
3505 spin_unlock(&conn->gnc_list_lock);
3508 kgnilnd_tx_done(tx, tx->tx_rc);
3513 kgnilnd_finalize_rx_done(kgn_tx_t *tx, kgn_msg_t *msg)
3516 kgn_conn_t *conn = tx->tx_conn;
3518 atomic_inc(&conn->gnc_device->gnd_rdma_nrx);
3519 atomic64_add(tx->tx_nob, &conn->gnc_device->gnd_rdma_rxbytes);
3521 rc = kgnilnd_verify_rdma_cksum(tx, msg->gnm_payload_cksum);
3523 kgnilnd_complete_tx(tx, rc);
3527 kgnilnd_check_fma_rx(kgn_conn_t *conn)
3535 kgn_peer_t *peer = conn->gnc_peer;
3538 __u16 tmp_cksum = 0, msg_cksum = 0;
3539 int repost = 1, saw_complete;
3540 unsigned long timestamp, newest_last_rx, timeout;
3542 void *memory = NULL;
3545 /* Short circuit if the ep_handle is null.
3546 * It's likely that its about to be closed as stale.
3548 if (conn->gnc_ephandle == NULL)
3551 timestamp = jiffies;
3552 mutex_lock(&conn->gnc_device->gnd_cq_mutex);
3553 /* delay in jiffies - we are really concerned only with things that
3554 * result in a schedule() or really holding this off for long times .
3555 * NB - mutex_lock could spin for 2 jiffies before going to sleep to wait */
3556 conn->gnc_device->gnd_mutex_delay += (long) jiffies - timestamp;
3558 /* Resample current time as we have no idea how long it took to get the mutex */
3559 timestamp = jiffies;
3561 /* We check here when the last time we received an rx, we do this before
3562 * we call getnext in case the thread has been blocked for a while. If we
3563 * havent received an rx since our timeout value we close the connection
3564 * as we should assume the other side has closed the connection. This will
3565 * stop us from sending replies to a mailbox that is already in purgatory.
3568 timeout = cfs_time_seconds(conn->gnc_timeout);
3569 newest_last_rx = GNILND_LASTRX(conn);
3571 /* Error injection to validate that timestamp checking works and closing the conn */
3572 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_RECV_TIMEOUT)) {
3573 timestamp = timestamp + (GNILND_TIMEOUTRX(timeout) * 2);
3576 if (time_after_eq(timestamp, newest_last_rx + (GNILND_TIMEOUTRX(timeout)))) {
3577 GNIDBG_CONN(D_NETERROR|D_CONSOLE, conn, "Cant receive from %s after timeout lapse of %lu; TO %lu",
3578 libcfs_nid2str(conn->gnc_peer->gnp_nid),
3579 cfs_duration_sec(timestamp - newest_last_rx),
3580 cfs_duration_sec(GNILND_TIMEOUTRX(timeout)));
3581 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
3583 kgnilnd_close_conn(conn, rc);
3587 rrc = kgnilnd_smsg_getnext(conn->gnc_ephandle, &prefix);
3589 if (rrc == GNI_RC_NOT_DONE) {
3590 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
3591 CDEBUG(D_INFO, "SMSG RX empty\n");
3595 if (rrc == GNI_RC_INVALID_STATE) {
3596 LIBCFS_ALLOC(memory, conn->gnpr_smsg_attr.buff_size);
3597 if (memory == NULL) {
3598 memory = (void *)0xdeadbeef;
3600 memcpy(memory, conn->gnpr_smsg_attr.msg_buffer + conn->gnpr_smsg_attr.mbox_offset, conn->gnpr_smsg_attr.buff_size);
3604 LASSERTF(rrc == GNI_RC_SUCCESS,
3605 "bad rc %d on conn %p from peer %s mailbox copy %p\n",
3606 rrc, conn, libcfs_nid2str(peer->gnp_nid), memory);
3608 msg = (kgn_msg_t *)prefix;
3610 rx = kgnilnd_alloc_rx();
3612 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
3613 kgnilnd_release_msg(conn);
3614 GNIDBG_MSG(D_NETERROR, msg, "Dropping SMSG RX from 0x%p->%s, no RX memory",
3615 conn, libcfs_nid2str(peer->gnp_nid));
3619 GNIDBG_MSG(D_INFO, msg, "SMSG RX on %p from %s",
3620 conn, libcfs_nid2str(peer->gnp_nid));
3622 timestamp = conn->gnc_last_rx;
3623 last_seq = conn->gnc_rx_seq;
3625 conn->gnc_last_rx = jiffies;
3626 /* stash first rx so we can clear out purgatory
3628 if (conn->gnc_first_rx == 0)
3629 conn->gnc_first_rx = jiffies;
3631 seq = conn->gnc_rx_seq++;
3633 /* needs to linger to protect gnc_rx_seq like we do with gnc_tx_seq */
3634 mutex_unlock(&conn->gnc_device->gnd_cq_mutex);
3635 kgnilnd_peer_alive(conn->gnc_peer);
3638 rx->grx_conn = conn;
3640 rx->grx_received = current_kernel_time();
3642 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_NET_LOOKUP)) {
3645 rc = kgnilnd_find_net(msg->gnm_srcnid, &net);
3651 kgnilnd_net_decref(net);
3654 if (*kgnilnd_tunables.kgn_checksum && !msg->gnm_cksum)
3655 GNIDBG_MSG(D_WARNING, msg, "no msg header checksum when enabled");
3657 /* XXX Nic: Do we need to swab cksum */
3658 if (msg->gnm_cksum != 0) {
3659 msg_cksum = msg->gnm_cksum;
3661 tmp_cksum = kgnilnd_cksum(msg, sizeof(kgn_msg_t));
3663 if (tmp_cksum != msg_cksum) {
3664 GNIDBG_MSG(D_NETERROR, msg, "Bad hdr checksum (%x expected %x)",
3665 tmp_cksum, msg_cksum);
3666 kgnilnd_dump_msg(D_BUFFS, msg);
3671 /* restore checksum for future debug messages */
3672 msg->gnm_cksum = tmp_cksum;
3674 if (msg->gnm_magic != GNILND_MSG_MAGIC) {
3675 if (__swab32(msg->gnm_magic) != GNILND_MSG_MAGIC) {
3676 GNIDBG_MSG(D_NETERROR, msg, "Unexpected magic %08x from %s",
3677 msg->gnm_magic, libcfs_nid2str(peer->gnp_nid));
3682 __swab32s(&msg->gnm_magic);
3683 __swab16s(&msg->gnm_version);
3684 __swab16s(&msg->gnm_type);
3685 __swab64s(&msg->gnm_srcnid);
3686 __swab64s(&msg->gnm_connstamp);
3687 __swab32s(&msg->gnm_seq);
3689 /* NB message type checked below; NOT here... */
3690 switch (msg->gnm_type) {
3691 case GNILND_MSG_PUT_ACK:
3692 kgnilnd_swab_rdma_desc(&msg->gnm_u.putack.gnpam_desc);
3695 case GNILND_MSG_GET_REQ:
3696 kgnilnd_swab_rdma_desc(&msg->gnm_u.get.gngm_desc);
3704 if (msg->gnm_version != GNILND_MSG_VERSION) {
3705 GNIDBG_MSG(D_NETERROR, msg, "Unexpected protocol version %d from %s",
3706 msg->gnm_version, libcfs_nid2str(peer->gnp_nid));
3711 if (LNET_NIDADDR(msg->gnm_srcnid) != LNET_NIDADDR(peer->gnp_nid)) {
3712 GNIDBG_MSG(D_NETERROR, msg, "Unexpected peer %s from %s",
3713 libcfs_nid2str(msg->gnm_srcnid),
3714 libcfs_nid2str(peer->gnp_nid));
3719 if (msg->gnm_connstamp != conn->gnc_peer_connstamp) {
3720 GNIDBG_MSG(D_NETERROR, msg, "Unexpected connstamp "LPX64"("LPX64
3721 " expected) from %s",
3722 msg->gnm_connstamp, conn->gnc_peer_connstamp,
3723 libcfs_nid2str(peer->gnp_nid));
3728 if (msg->gnm_seq != seq) {
3729 GNIDBG_MSG(D_NETERROR, msg, "Unexpected sequence number %d(%d expected) from %s",
3730 msg->gnm_seq, seq, libcfs_nid2str(peer->gnp_nid));
3735 atomic_inc(&conn->gnc_device->gnd_short_nrx);
3737 if (msg->gnm_type == GNILND_MSG_CLOSE) {
3738 CDEBUG(D_NETTRACE, "%s sent us CLOSE msg\n",
3739 libcfs_nid2str(conn->gnc_peer->gnp_nid));
3740 write_lock(&kgnilnd_data.kgn_peer_conn_lock);
3741 conn->gnc_close_recvd = GNILND_CLOSE_RX;
3742 conn->gnc_peer_error = msg->gnm_u.completion.gncm_retval;
3743 /* double check state with lock held */
3744 if (conn->gnc_state == GNILND_CONN_ESTABLISHED) {
3745 /* only error if we are not already closing */
3746 if (conn->gnc_peer_error == -ETIMEDOUT) {
3747 unsigned long now = jiffies;
3748 CNETERR("peer 0x%p->%s closed connection 0x%p due to timeout. "
3750 "RX %d @ %lus/%lus; TX %d @ %lus/%lus; "
3751 "NOOP %lus/%lus/%lus; sched %lus/%lus/%lus ago\n",
3752 conn->gnc_peer, libcfs_nid2str(conn->gnc_peer->gnp_nid),
3754 cfs_duration_sec(now - timestamp),
3755 cfs_duration_sec(now - conn->gnc_last_rx_cq),
3757 cfs_duration_sec(now - conn->gnc_last_tx),
3758 cfs_duration_sec(now - conn->gnc_last_tx_cq),
3759 cfs_duration_sec(now - conn->gnc_last_noop_want),
3760 cfs_duration_sec(now - conn->gnc_last_noop_sent),
3761 cfs_duration_sec(now - conn->gnc_last_noop_cq),
3762 cfs_duration_sec(now - conn->gnc_last_sched_ask),
3763 cfs_duration_sec(now - conn->gnc_last_sched_do),
3764 cfs_duration_sec(now - conn->gnc_device->gnd_sched_alive));
3766 kgnilnd_close_conn_locked(conn, -ECONNRESET);
3768 write_unlock(&kgnilnd_data.kgn_peer_conn_lock);
3772 if (conn->gnc_close_recvd) {
3773 GNIDBG_MSG(D_NETERROR, msg, "Unexpected message %s(%d/%d) after CLOSE from %s",
3774 kgnilnd_msgtype2str(msg->gnm_type),
3775 msg->gnm_type, conn->gnc_close_recvd,
3776 libcfs_nid2str(conn->gnc_peer->gnp_nid));
3781 if (conn->gnc_state != GNILND_CONN_ESTABLISHED) {
3782 /* XXX Nic: log message received on bad connection state */
3786 switch (msg->gnm_type) {
3787 case GNILND_MSG_NOOP:
3788 /* Nothing to do; just a keepalive */
3791 case GNILND_MSG_IMMEDIATE:
3792 /* only get SMSG payload for IMMEDIATE */
3793 atomic64_add(msg->gnm_payload_len, &conn->gnc_device->gnd_short_rxbytes);
3794 rc = lnet_parse(net->gnn_ni, &msg->gnm_u.immediate.gnim_hdr,
3795 msg->gnm_srcnid, rx, 0);
3799 case GNILND_MSG_PUT_REQ:
3800 rc = lnet_parse(net->gnn_ni, &msg->gnm_u.putreq.gnprm_hdr,
3801 msg->gnm_srcnid, rx, 1);
3805 case GNILND_MSG_PUT_NAK:
3806 tx = kgnilnd_match_reply_either(conn, GNILND_MSG_PUT_REQ, GNILND_MSG_PUT_ACK,
3807 msg->gnm_u.completion.gncm_cookie);
3811 kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
3814 case GNILND_MSG_PUT_ACK:
3815 tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_REQ,
3816 msg->gnm_u.putack.gnpam_src_cookie);
3820 /* store putack data for later: deferred rdma or re-try */
3821 tx->tx_putinfo = msg->gnm_u.putack;
3824 spin_lock(&tx->tx_conn->gnc_list_lock);
3826 GNITX_ASSERTF(tx, tx->tx_state & GNILND_TX_WAITING_REPLY,
3827 "not waiting for reply", NULL);
3829 tx->tx_state &= ~GNILND_TX_WAITING_REPLY;
3831 if (likely(!(tx->tx_state & GNILND_TX_WAITING_COMPLETION))) {
3832 kgnilnd_tx_del_state_locked(tx, NULL, conn, GNILND_TX_ALLOCD);
3833 /* sample under lock as follow on steps require gnc_list_lock
3834 * - or call kgnilnd_tx_done which requires no locks held over
3835 * call to lnet_finalize */
3838 /* cannot launch rdma if still waiting for fma-msg completion */
3839 CDEBUG(D_NET, "tx 0x%p type 0x%02x will need to "
3840 "wait for SMSG completion\n", tx, tx->tx_msg.gnm_type);
3841 tx->tx_state |= GNILND_TX_PENDING_RDMA;
3843 spin_unlock(&tx->tx_conn->gnc_list_lock);
3846 rc = kgnilnd_send_mapped_tx(tx, 0);
3848 kgnilnd_tx_done(tx, rc);
3852 case GNILND_MSG_PUT_DONE:
3853 tx = kgnilnd_match_reply(conn, GNILND_MSG_PUT_ACK,
3854 msg->gnm_u.completion.gncm_cookie);
3858 GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
3859 tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
3860 "bad tx buftype %d", tx->tx_buftype);
3862 kgnilnd_finalize_rx_done(tx, msg);
3865 case GNILND_MSG_GET_REQ:
3866 rc = lnet_parse(net->gnn_ni, &msg->gnm_u.get.gngm_hdr,
3867 msg->gnm_srcnid, rx, 1);
3871 case GNILND_MSG_GET_NAK:
3872 tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_REQ,
3873 msg->gnm_u.completion.gncm_cookie);
3877 GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
3878 tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
3879 "bad tx buftype %d", tx->tx_buftype);
3881 kgnilnd_complete_tx(tx, msg->gnm_u.completion.gncm_retval);
3884 case GNILND_MSG_GET_DONE:
3885 tx = kgnilnd_match_reply(conn, GNILND_MSG_GET_REQ,
3886 msg->gnm_u.completion.gncm_cookie);
3890 GNITX_ASSERTF(tx, tx->tx_buftype == GNILND_BUF_PHYS_MAPPED ||
3891 tx->tx_buftype == GNILND_BUF_VIRT_MAPPED,
3892 "bad tx buftype %d", tx->tx_buftype);
3894 lnet_set_reply_msg_len(net->gnn_ni, tx->tx_lntmsg[1],
3895 msg->gnm_u.completion.gncm_retval);
3897 kgnilnd_finalize_rx_done(tx, msg);
3902 if (rc < 0) /* protocol/comms error */
3903 kgnilnd_close_conn(conn, rc);
3905 if (repost && rx != NULL) {
3906 kgnilnd_consume_rx(rx);
3909 /* we got an event so assume more there and call for reschedule */
3911 kgnilnd_schedule_conn(conn);
3915 /* Do the failure injections that we need to affect conn processing in the following function.
3916 * When writing tests that use this function make sure to use a fail_loc with a fail mask.
3917 * If you dont you can cause the scheduler threads to spin on the conn without it leaving
3920 * intent is used to signal the calling function whether or not the conn needs to be rescheduled.
3924 kgnilnd_check_conn_fail_loc(kgn_device_t *dev, kgn_conn_t *conn, int *intent)
3928 /* short circuit out when not set */
3929 if (likely(!cfs_fail_loc)) {
3933 /* failure injection to test for stack reset clean ups */
3934 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DROP_CLOSING)) {
3935 /* we can't rely on busy loops being nice enough to get the
3936 * stack reset triggered - it'd just spin on this conn */
3937 CFS_RACE(CFS_FAIL_GNI_DROP_CLOSING);
3940 GOTO(did_fail_loc, rc);
3943 if (conn->gnc_state == GNILND_CONN_DESTROY_EP) {
3944 /* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */
3946 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_DROP_DESTROY_EP)) {
3947 CFS_RACE(CFS_FAIL_GNI_DROP_DESTROY_EP);
3950 GOTO(did_fail_loc, rc);
3954 /* CFS_FAIL_GNI_FINISH_PURG2 is used to stop a connection from fully closing. This scheduler
3955 * will spin on the CFS_FAIL_TIMEOUT until the fail_loc is cleared at which time the connection
3956 * will be closed by kgnilnd_complete_closed_conn.
3958 if ((conn->gnc_state == GNILND_CONN_CLOSED) && CFS_FAIL_CHECK(CFS_FAIL_GNI_FINISH_PURG2)) {
3959 while (CFS_FAIL_TIMEOUT(CFS_FAIL_GNI_FINISH_PURG2, 1)) {};
3962 GOTO(did_fail_loc, rc);
3965 /* this one is a bit gross - we can't hold the mutex from process_conns
3966 * across a CFS_RACE here - it'd block the conn threads from doing an ep_bind
3967 * and moving onto finish_connect
3968 * so, we'll just set the rc - kgnilnd_process_conns will clear
3969 * found_work on a fail_loc, getting the scheduler thread to call schedule()
3970 * and effectively getting this thread to sleep */
3971 if ((conn->gnc_state == GNILND_CONN_CLOSED) && CFS_FAIL_CHECK(CFS_FAIL_GNI_FINISH_PURG)) {
3974 GOTO(did_fail_loc, rc);
3982 kgnilnd_send_conn_close(kgn_conn_t *conn)
3986 /* we are closing the conn - we will try to send the CLOSE msg
3987 * but will not wait for anything else to flush */
3989 /* send the close if not already done so or received one */
3990 if (!conn->gnc_close_sent && !conn->gnc_close_recvd) {
3991 /* set close_sent regardless of the success of the
3992 * CLOSE message. We are going to try once and then
3993 * kick him out of the sandbox */
3994 conn->gnc_close_sent = 1;
3997 /* EP might be null already if remote side initiated a new connection.
3998 * kgnilnd_finish_connect destroys existing ep_handles before wiring up the new connection,
3999 * so this check is here to make sure we dont attempt to send with a null ep_handle.
4001 if (conn->gnc_ephandle != NULL) {
4004 tx = kgnilnd_new_tx_msg(GNILND_MSG_CLOSE, conn->gnc_peer->gnp_net->gnn_ni->ni_nid);
4006 tx->tx_msg.gnm_u.completion.gncm_retval = conn->gnc_error;
4007 tx->tx_state = GNILND_TX_WAITING_COMPLETION;
4008 tx->tx_qtime = jiffies;
4010 if (tx->tx_id.txe_idx == 0) {
4011 rc = kgnilnd_set_tx_id(tx, conn);
4013 kgnilnd_tx_done(tx, rc);
4017 CDEBUG(D_NETTRACE, "sending close with errno %d\n",
4020 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_CLOSE_SEND)) {
4021 kgnilnd_tx_done(tx, -EAGAIN);
4023 rc = kgnilnd_sendmsg(tx, NULL, 0, NULL, GNILND_TX_FMAQ);
4025 /* It wasnt sent and we dont care. */
4026 kgnilnd_tx_done(tx, rc);
4034 conn->gnc_state = GNILND_CONN_CLOSED;
4035 /* mark this conn as CLOSED now that we processed it
4036 * do after TX, so we can use CLOSING in asserts */
4040 if (CFS_FAIL_CHECK(CFS_FAIL_GNI_RX_CLOSE_CLOSED)) {
4041 /* simulate a RX CLOSE after the timeout but before
4042 * the scheduler thread gets it */
4043 conn->gnc_close_recvd = GNILND_CLOSE_INJECT2;
4044 conn->gnc_peer_error = -ETIMEDOUT;
4046 /* schedule to allow potential CLOSE and get the complete phase run */
4047 kgnilnd_schedule_conn(conn);
4051 kgnilnd_process_mapped_tx(kgn_device_t *dev)
4056 int max_retrans = *kgnilnd_tunables.kgn_max_retransmits;
4057 int log_retrans, log_retrans_level;
4058 static int last_map_version;
4061 spin_lock(&dev->gnd_lock);
4062 if (list_empty(&dev->gnd_map_tx)) {
4063 spin_unlock(&dev->gnd_lock);
4067 dev->gnd_sched_alive = jiffies;
4069 /* we'll retry as fast as possible up to 25% of the limit, then we start
4070 * backing off until our map version changes - indicating we unmapped
4072 tx = list_first_entry(&dev->gnd_map_tx, kgn_tx_t, tx_list);
4073 if ((tx->tx_retrans > (max_retrans / 4)) &&
4074 (last_map_version == dev->gnd_map_version)) {
4075 GNIDBG_TX(D_NET, tx, "waiting for mapping event event to retry", NULL);
4076 spin_unlock(&dev->gnd_lock);
4080 /* stash the last map version to let us know when a good one was seen */
4081 last_map_version = dev->gnd_map_version;
4083 /* we need to to take the lock and continually refresh the head of the list as
4084 * kgnilnd_complete_closed_conn might be nuking stuff and we are cycling the lock
4085 * allowing them to squeeze in */
4087 while (!list_empty(&dev->gnd_map_tx)) {
4088 /* make sure we break out early on quiesce */
4089 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
4090 /* always break with lock held - we unlock outside loop */
4094 tx = list_first_entry(&dev->gnd_map_tx, kgn_tx_t, tx_list);
4096 kgnilnd_tx_del_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_ALLOCD);
4099 /* sample with lock held, serializing with kgnilnd_complete_closed_conn */
4100 if (tx->tx_conn->gnc_state != GNILND_CONN_ESTABLISHED) {
4101 /* if conn is dying, mark tx in tx_ref_table for
4102 * kgnilnd_complete_closed_conn to finish up */
4103 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_DYING, 1);
4106 /* tx was moved to DYING, get next */
4110 spin_unlock(&dev->gnd_lock);
4111 rc = kgnilnd_send_mapped_tx(tx, 1);
4113 /* We made it! skip error handling.. */
4115 /* OK to continue on +ve errors as it won't get seen until
4116 * this function is called again - we operate on a copy of the original
4117 * list and not the live list */
4118 spin_lock(&dev->gnd_lock);
4120 } else if (rc != -ENOMEM) {
4121 /* carp, failure we can't handle */
4122 kgnilnd_tx_done(tx, rc);
4123 spin_lock(&dev->gnd_lock);
4127 /* time to handle the retry cases.. */
4129 if (tx->tx_retrans == 1)
4130 tx->tx_qtime = jiffies;
4132 /* only log occasionally once we've retried max / 2 */
4133 log_retrans = (tx->tx_retrans >= (max_retrans / 2)) &&
4134 ((tx->tx_retrans % 32) == 0);
4135 log_retrans_level = log_retrans ? D_NETERROR : D_NET;
4137 /* make sure we are not off in the weeds with this tx */
4138 if (tx->tx_retrans > *kgnilnd_tunables.kgn_max_retransmits) {
4139 GNIDBG_TX(D_NETERROR, tx,
4140 "giving up on TX, too many retries", NULL);
4141 kgnilnd_tx_done(tx, -ENOMEM);
4142 GOTO(get_out_mapped, rc);
4144 GNIDBG_TX(log_retrans_level, tx,
4145 "transient map failure #%d %d pages/%d bytes phys %u@%u "
4147 "nq_map %d mdd# %d/%d GART %ld",
4148 tx->tx_retrans, tx->tx_phys_npages, tx->tx_nob,
4149 dev->gnd_map_nphys, dev->gnd_map_physnop * PAGE_SIZE,
4150 dev->gnd_map_nvirt, dev->gnd_map_virtnob,
4151 atomic_read(&dev->gnd_nq_map),
4152 atomic_read(&dev->gnd_n_mdd), atomic_read(&dev->gnd_n_mdd_held),
4153 atomic64_read(&dev->gnd_nbytes_map));
4156 /* we need to stop processing the rest of the list, so add it back in */
4157 spin_lock(&dev->gnd_lock);
4158 kgnilnd_tx_add_state_locked(tx, NULL, tx->tx_conn, GNILND_TX_MAPQ, 0);
4159 spin_unlock(&dev->gnd_lock);
4160 GOTO(get_out_mapped, rc);
4162 spin_unlock(&dev->gnd_lock);
4168 kgnilnd_process_conns(kgn_device_t *dev)
4175 spin_lock(&dev->gnd_lock);
4176 while (!list_empty(&dev->gnd_ready_conns)) {
4177 dev->gnd_sched_alive = jiffies;
4179 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
4180 /* break with lock held */
4184 conn = list_first_entry(&dev->gnd_ready_conns, kgn_conn_t, gnc_schedlist);
4185 list_del_init(&conn->gnc_schedlist);
4186 spin_unlock(&dev->gnd_lock);
4188 conn_sched = xchg(&conn->gnc_scheduled, GNILND_CONN_PROCESS);
4190 LASSERTF(conn_sched != GNILND_CONN_IDLE &&
4191 conn_sched != GNILND_CONN_PROCESS,
4192 "conn %p on ready list but in bad state: %d\n",
4195 CDEBUG(D_INFO, "conn %p@%s for processing\n",
4196 conn, kgnilnd_conn_state2str(conn));
4199 set_mb(conn->gnc_last_sched_do, jiffies);
4201 if (kgnilnd_check_conn_fail_loc(dev, conn, &intent)) {
4203 /* based on intent see if we should run again. */
4204 kgnilnd_schedule_process_conn(conn, intent);
4206 /* drop ref from gnd_ready_conns */
4207 kgnilnd_conn_decref(conn);
4208 /* clear this so that scheduler thread doesn't spin */
4210 /* break with lock held... */
4211 spin_lock(&dev->gnd_lock);
4215 if (unlikely(conn->gnc_state == GNILND_CONN_CLOSED)) {
4216 /* CONN_CLOSED set in procces_fmaq when CLOSE is sent */
4217 kgnilnd_complete_closed_conn(conn);
4218 } else if (unlikely(conn->gnc_state == GNILND_CONN_DESTROY_EP)) {
4219 /* DESTROY_EP set in kgnilnd_conn_decref on gnc_refcount = 1 */
4220 /* serialize SMSG CQs with ep_bind and smsg_release */
4221 kgnilnd_destroy_conn_ep(conn);
4222 } else if (unlikely(conn->gnc_state == GNILND_CONN_CLOSING)) {
4223 /* if we need to do some CLOSE sending, etc done here do it */
4224 kgnilnd_send_conn_close(conn);
4225 kgnilnd_check_fma_rx(conn);
4226 } else if (atomic_read(&conn->gnc_peer->gnp_dirty_eps) == 0) {
4227 /* start moving traffic if the old conns are cleared out */
4228 kgnilnd_check_fma_rx(conn);
4229 kgnilnd_process_fmaq(conn);
4232 kgnilnd_schedule_process_conn(conn, 0);
4234 /* drop ref from gnd_ready_conns */
4235 kgnilnd_conn_decref(conn);
4237 /* check list again with lock held */
4238 spin_lock(&dev->gnd_lock);
4240 spin_unlock(&dev->gnd_lock);
4246 kgnilnd_scheduler(void *arg)
4248 int threadno = (long)arg;
4254 dev = &kgnilnd_data.kgn_devices[(threadno + 1) % kgnilnd_data.kgn_ndevs];
4256 snprintf(name, sizeof(name), "kgnilnd_sd_%02d", threadno);
4257 cfs_daemonize(name);
4258 cfs_block_allsigs();
4260 /* all gnilnd threads need to run fairly urgently */
4261 set_user_nice(current, *kgnilnd_tunables.kgn_nice);
4263 while (!kgnilnd_data.kgn_shutdown) {
4265 /* Safe: kgn_shutdown only set when quiescent */
4267 /* to quiesce or to not quiesce, that is the question */
4269 if (unlikely(kgnilnd_data.kgn_quiesce_trigger)) {
4270 KGNILND_SPIN_QUIESCE;
4273 /* tracking for when thread goes AWOL */
4274 dev->gnd_sched_alive = jiffies;
4276 /* let folks know we are up and kicking
4277 * - they can use this for latency savings, etc
4278 * - only change if IRQ, if IDLE leave alone as that
4279 * schedule_device calls to put us back to IRQ */
4280 (void)cmpxchg(&dev->gnd_ready, GNILND_DEV_IRQ, GNILND_DEV_LOOP);
4282 /* always check these - they are super low cost */
4283 found_work += kgnilnd_check_fma_send_cq(dev);
4284 found_work += kgnilnd_check_fma_rcv_cq(dev);
4286 /* rdma CQ doesn't care about eps */
4287 found_work += kgnilnd_check_rdma_cq(dev);
4289 /* move some RDMA ? */
4290 found_work += kgnilnd_process_rdmaq(dev);
4292 /* map some pending RDMA requests ? */
4293 found_work += kgnilnd_process_mapped_tx(dev);
4295 /* the EP for a conn is not destroyed until all the references
4296 * to it are gone, so these checks should be safe
4297 * even if run in parallel with the CQ checking functions
4298 * _AND_ a thread that processes the CLOSED->DONE
4302 /* process all conns ready now */
4303 found_work += kgnilnd_process_conns(dev);
4305 /* do an eager check to avoid the IRQ disabling in
4306 * prepare_to_wait and friends */
4308 if (found_work && busy_loops++ < *kgnilnd_tunables.kgn_loops) {
4310 if ((busy_loops % 10) == 0) {
4311 /* tickle heartbeat and watchdog to ensure our
4312 * piggishness doesn't turn into heartbeat failure */
4313 touch_nmi_watchdog();
4314 if (kgnilnd_hssops.hb_to_l0 != NULL) {
4315 kgnilnd_hssops.hb_to_l0();
4321 /* if we got here, found_work was zero or busy_loops means we
4322 * need to take a break. We'll clear gnd_ready but we'll check
4323 * one last time if there is an IRQ that needs processing */
4325 prepare_to_wait(&dev->gnd_waitq, &wait, TASK_INTERRUPTIBLE);
4327 /* the first time this will go LOOP -> IDLE and let us do one final check
4328 * during which we might get an IRQ, then IDLE->IDLE and schedule()
4329 * - this might allow other threads to block us for a bit if they
4330 * try to get the mutex, but that is good as we'd need to wake
4331 * up soon to handle the CQ or other processing anyways */
4333 found_work += xchg(&dev->gnd_ready, GNILND_DEV_IDLE);
4335 if (busy_loops >= *kgnilnd_tunables.kgn_loops) {
4337 "yeilding: found_work %d busy_loops %d\n",
4338 found_work, busy_loops);
4340 /* use yield if we are bailing due to busy_loops
4341 * - this will ensure we wake up soonish. This closes
4342 * a race with kgnilnd_device_callback - where it'd
4343 * not call wake_up() because gnd_ready == 1, but then
4344 * we come down and schedule() because of busy_loops.
4345 * We'd not be woken up until something poked our waitq
4346 * again. yield() ensures we wake up without another
4347 * waitq poke in that case */
4348 atomic_inc(&dev->gnd_n_yield);
4350 CDEBUG(D_INFO, "awake after yeild\n");
4351 } else if (found_work == GNILND_DEV_IDLE) {
4352 /* busy_loops is low and there is nothing to do,
4353 * go to sleep and wait for a waitq poke */
4355 "scheduling: found_work %d busy_loops %d\n",
4356 found_work, busy_loops);
4357 atomic_inc(&dev->gnd_n_schedule);
4359 CDEBUG(D_INFO, "awake after schedule\n");
4361 finish_wait(&dev->gnd_waitq, &wait);
4364 kgnilnd_thread_fini();