Whamcloud - gitweb
LU-16035 kfilnd: Initial kfilnd implementation 09/48009/8
authorDoug Oucharek <doucharek@cray.com>
Tue, 16 Oct 2018 22:51:21 +0000 (15:51 -0700)
committerOleg Drokin <green@whamcloud.com>
Thu, 1 Sep 2022 05:54:25 +0000 (05:54 +0000)
Initial implementation of the kfabric Lustre Network Driver.

Test-Parameters: trivial
HPE-bug-id: LUS-6565
Signed-off-by: Doug Oucharek <dougso@me.com>
Signed-off-by: Ian Ziemba <ian.ziemba@hpe.com>
Signed-off-by: Chris Horn <chris.horn@hpe.com>
Change-Id: I48a070ca0ba37e4923cd6dcb3327676ae6ddaae1
Reviewed-on: https://review.whamcloud.com/48009
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Ron Gredvig <ron.gredvig@hpe.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
27 files changed:
lnet/autoconf/lustre-lnet.m4
lnet/include/uapi/linux/lnet/lnet-dlc.h
lnet/include/uapi/linux/lnet/lnet-idl.h
lnet/klnds/Makefile.in
lnet/klnds/autoMakefile.am
lnet/klnds/kfilnd/Makefile.in [new file with mode: 0644]
lnet/klnds/kfilnd/autoMakefile.am [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd.h [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_cq.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_cq.h [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_debugfs.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_dev.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_dev.h [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_dom.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_dom.h [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_ep.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_ep.h [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_modparams.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_peer.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_peer.h [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_tn.c [new file with mode: 0644]
lnet/klnds/kfilnd/kfilnd_tn.h [new file with mode: 0644]
lnet/utils/lnetconfig/liblnetconfig.c
lnet/utils/lnetconfig/liblnetconfig_lnd.c
lnet/utils/lnetctl.c
lustre/utils/gss/lsupport.c

index 90b7700..f76bf5a 100644 (file)
@@ -651,6 +651,44 @@ AC_SUBST(GNICPPFLAGS)
 AC_SUBST(GNILND)
 ]) # LN_CONFIG_GNILND
 
+#
+# LN_CONFIG_KFILND
+#
+# check whether to use the kfabric Network Interface lnd
+#
+AC_DEFUN([LN_CONFIG_KFILND], [
+AC_ARG_WITH([kfi],
+       AC_HELP_STRING([--with-kfi=<path>], [Kfabric build path for kfilnd]),
+       [
+               AC_CHECK_FILE([$with_kfi/Module.symvers],
+               [
+                       # KFICPPFLAGS was set in spec file
+                       KFICPPFLAGS="-I$with_kfi/include"
+                       EXTRA_KCFLAGS_save="$EXTRA_KCFLAGS"
+                       EXTRA_KCFLAGS="$EXTRA_KCFLAGS $KFICPPFLAGS"
+                       KBUILD_EXTRA_SYMBOLS="$KBUILD_EXTRA_SYMBOLS $with_kfi/Module.symvers"
+                       LB_CHECK_COMPILE([if kfabric headers are present], KFI_header,
+                       [
+                               #include <kfi_endpoint.h>
+                       ],[
+                               struct kfi_info *hints;
+                               hints = kfi_allocinfo();
+                       ],[
+                               KFILND="kfilnd"
+                               AC_MSG_NOTICE([adding $with_kfi/Module.symvers to Symbol Path])
+                               EXTRA_SYMBOLS="$EXTRA_SYMBOLS $with_kfi/Module.symvers"
+                       ],[
+                               AC_MSG_ERROR([can't compile kfilnd with given KFICPPFLAGS: $KFICPPFLAGS])
+                       ])
+               ],[
+                       AC_MSG_ERROR(["$with_kfi/Module.symvers does not exist"])
+               ])
+       ],[])
+AC_SUBST(KFICPPFLAGS)
+AC_SUBST(KFILND)
+AC_SUBST(EXTRA_SYMBOLS)
+]) # LN_CONFIG_KFILND
+
 # LN_CONFIG_STRSCPY_EXISTS
 #
 # If strscpy exists, prefer it over strlcpy
@@ -897,6 +935,7 @@ LN_CONFIG_BACKOFF
 LN_CONFIG_O2IB
 LN_CONFIG_GNILND
 LN_CONFIG_STRSCPY_EXISTS
+LN_CONFIG_KFILND
 # 3.10
 LN_EXPORT_KMAP_TO_PAGE
 # 3.15
@@ -1038,6 +1077,7 @@ LN_USR_NLMSGERR
 AC_DEFUN([LN_CONDITIONALS], [
 AM_CONDITIONAL(BUILD_O2IBLND,    test x$O2IBLND = "xo2iblnd")
 AM_CONDITIONAL(BUILD_GNILND,     test x$GNILND  = "xgnilnd")
+AM_CONDITIONAL(BUILD_KFILND,     test x$KFILND  = "xkfilnd")
 ]) # LN_CONDITIONALS
 
 #
@@ -1062,6 +1102,8 @@ lnet/klnds/gnilnd/Makefile
 lnet/klnds/gnilnd/autoMakefile
 lnet/klnds/socklnd/Makefile
 lnet/klnds/socklnd/autoMakefile
+lnet/klnds/kfilnd/Makefile
+lnet/klnds/kfilnd/autoMakefile
 lnet/lnet/Makefile
 lnet/lnet/autoMakefile
 lnet/selftest/Makefile
index 2b2c05f..9b6cbe3 100644 (file)
@@ -81,6 +81,13 @@ struct lnet_ioctl_config_o2iblnd_tunables {
        __u16 lnd_ntx;
 };
 
+struct lnet_ioctl_config_kfilnd_tunables {
+       __u32 lnd_version;
+       __u32 lnd_prov_major_version;
+       __u32 lnd_prov_minor_version;
+       __u32 lnd_auth_key;
+};
+
 struct lnet_ioctl_config_socklnd_tunables {
        __u32 lnd_version;
        __u16 lnd_conns_per_peer;
@@ -91,6 +98,7 @@ struct lnet_lnd_tunables {
        union {
                struct lnet_ioctl_config_o2iblnd_tunables lnd_o2ib;
                struct lnet_ioctl_config_socklnd_tunables lnd_sock;
+               struct lnet_ioctl_config_kfilnd_tunables lnd_kfi;
        } lnd_tun_u;
 };
 
index 7ed58a3..d10b588 100644 (file)
@@ -203,6 +203,7 @@ struct lnet_magicversion {
 #define LNET_PROTO_IB_MAGIC            0x0be91b91
 #define LNET_PROTO_GNI_MAGIC           0xb00fbabe /* ask Kim */
 #define LNET_PROTO_TCP_MAGIC           0xeebc0ded
+#define LNET_PROTO_KFI_MAGIC           0xdeadbeef
 #define LNET_PROTO_ACCEPTOR_MAGIC      0xacce7100
 #define LNET_PROTO_PING_MAGIC          0x70696E67 /* 'ping' */
 
index d968d82..b2f23ae 100644 (file)
@@ -1,5 +1,6 @@
 @BUILD_GNILND_TRUE@obj-m += gnilnd/
 @BUILD_O2IBLND_TRUE@obj-m += o2iblnd/
+@BUILD_KFILND_TRUE@obj-m += kfilnd/
 obj-m += socklnd/
 
 @INCLUDE_RULES@
index eca1c05..0ef56f0 100644 (file)
@@ -29,4 +29,4 @@
 # This file is part of Lustre, http://www.lustre.org/
 #
 
-SUBDIRS = socklnd gnilnd o2iblnd
+SUBDIRS = socklnd gnilnd o2iblnd kfilnd
diff --git a/lnet/klnds/kfilnd/Makefile.in b/lnet/klnds/kfilnd/Makefile.in
new file mode 100644 (file)
index 0000000..045f292
--- /dev/null
@@ -0,0 +1,16 @@
+MODULES := kkfilnd
+
+kkfilnd-objs := \
+       kfilnd.o \
+       kfilnd_modparams.o \
+       kfilnd_tn.o \
+       kfilnd_ep.o \
+       kfilnd_dev.o \
+       kfilnd_dom.o \
+       kfilnd_peer.o \
+       kfilnd_cq.o \
+       kfilnd_debugfs.o \
+
+EXTRA_POST_CFLAGS += @KFICPPFLAGS@
+
+@INCLUDE_RULES@
diff --git a/lnet/klnds/kfilnd/autoMakefile.am b/lnet/klnds/kfilnd/autoMakefile.am
new file mode 100644 (file)
index 0000000..3bf6bae
--- /dev/null
@@ -0,0 +1,10 @@
+if MODULES
+if BUILD_KFILND
+modulenet_DATA := kkfilnd$(KMODEXT)
+endif # BUILD_KFILND
+endif # MODULES
+
+EXTRA_DIST := $(kkfilnd-objs:%.o=%.c) kfilnd.h kfilnd_dev.h kfilnd_dom.h \
+       kfilnd_ep.h kfilnd_peer.h kfilnd_tn.h kfilnd_cq.h
+
+MOSTLYCLEANFILES = @MOSTLYCLEANFILES@
diff --git a/lnet/klnds/kfilnd/kfilnd.c b/lnet/klnds/kfilnd/kfilnd.c
new file mode 100644 (file)
index 0000000..e8547a1
--- /dev/null
@@ -0,0 +1,443 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd main interface.
+ */
+
+#include <linux/delay.h>
+#include "kfilnd.h"
+#include "kfilnd_tn.h"
+#include "kfilnd_dev.h"
+
+/* These are temp constants to get stuff to compile */
+#define KFILND_DEFAULT_DEVICE "eth0"
+
+/* Some constants which should be turned into tunables */
+#define KFILND_MAX_WORKER_THREADS 4
+#define KFILND_MAX_EVENT_QUEUE 100
+
+#define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
+struct workqueue_struct *kfilnd_wq;
+struct dentry *kfilnd_debug_dir;
+
+static void kfilnd_shutdown(struct lnet_ni *ni)
+{
+       struct kfilnd_dev *dev = ni->ni_data;
+
+       kfilnd_dev_free(dev);
+}
+
+static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
+{
+       int cpt;
+
+       /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
+       cpt = lnet_cpt_current();
+       if (dev->cpt_to_endpoint[cpt])
+               return cpt;
+
+       /* Hash to a LNet NI CPT based on target NID. */
+       return  dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
+}
+
+int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt, lnet_nid_t nid)
+{
+       struct kfilnd_transaction *tn;
+       int rc;
+
+       tn = kfilnd_tn_alloc(dev, cpt, nid, true, true, false);
+       if (IS_ERR(tn)) {
+               rc = PTR_ERR(tn);
+               CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
+               return rc;
+       }
+
+       kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
+
+       return 0;
+}
+
+static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
+{
+       int type = msg->msg_type;
+       struct lnet_processid *target = &msg->msg_target;
+       struct kfilnd_transaction *tn;
+       int nob;
+       struct kfilnd_dev *dev = ni->ni_data;
+       enum kfilnd_msg_type lnd_msg_type;
+       int cpt;
+       enum tn_events event = TN_EVENT_INVALID;
+       int rc;
+       bool tn_key = false;
+       lnet_nid_t tgt_nid4;
+
+       switch (type) {
+       default:
+               return -EIO;
+
+       case LNET_MSG_ACK:
+               if (msg->msg_len != 0)
+                       return -EINVAL;
+               lnd_msg_type = KFILND_MSG_IMMEDIATE;
+               break;
+
+       case LNET_MSG_GET:
+               nob = offsetof(struct kfilnd_msg,
+                              proto.immed.payload[msg->msg_md->md_length]);
+               if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
+                       lnd_msg_type = KFILND_MSG_IMMEDIATE;
+                       break;
+               }
+
+               lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
+               tn_key = true;
+               break;
+
+       case LNET_MSG_REPLY:
+       case LNET_MSG_PUT:
+               nob = offsetof(struct kfilnd_msg,
+                              proto.immed.payload[msg->msg_len]);
+               if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
+                       lnd_msg_type = KFILND_MSG_IMMEDIATE;
+                       break;
+               }
+
+               lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
+               tn_key = true;
+               break;
+       }
+
+       tgt_nid4 = lnet_nid_to_nid4(&target->nid);
+
+       cpt = kfilnd_send_cpt(dev, tgt_nid4);
+       tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
+       if (IS_ERR(tn)) {
+               rc = PTR_ERR(tn);
+               CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
+               return rc;
+       }
+
+       /* Need to fire off special transaction if this is a new peer. */
+       if (kfilnd_peer_is_new_peer(tn->peer)) {
+               rc = kfilnd_send_hello_request(dev, cpt, tgt_nid4);
+               if (rc) {
+                       kfilnd_tn_free(tn);
+                       return 0;
+               }
+       }
+
+       switch (lnd_msg_type) {
+       case KFILND_MSG_IMMEDIATE:
+               rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
+                                           msg->msg_offset, msg->msg_len);
+               if (rc) {
+                       CERROR("Failed to setup immediate buffer rc %d\n", rc);
+                       kfilnd_tn_free(tn);
+                       return rc;
+               }
+
+               event = TN_EVENT_INIT_IMMEDIATE;
+               break;
+
+       case KFILND_MSG_BULK_PUT_REQ:
+               tn->sink_buffer = false;
+               rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
+                                           msg->msg_offset, msg->msg_len);
+               if (rc) {
+                       CERROR("Failed to setup PUT source buffer rc %d\n", rc);
+                       kfilnd_tn_free(tn);
+                       return rc;
+               }
+
+               event = TN_EVENT_INIT_BULK;
+               break;
+
+       case KFILND_MSG_BULK_GET_REQ:
+               /* We need to create a reply message to inform LNet our
+                * optimized GET is done.
+                */
+               tn->tn_getreply = lnet_create_reply_msg(ni, msg);
+               if (!tn->tn_getreply) {
+                       CERROR("Can't create reply for GET -> %s\n",
+                              libcfs_nidstr(&target->nid));
+                       kfilnd_tn_free(tn);
+                       return -EIO;
+               }
+
+               tn->sink_buffer = true;
+               rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
+                                           msg->msg_md->md_niov,
+                                           msg->msg_md->md_offset,
+                                           msg->msg_md->md_length);
+               if (rc) {
+                       CERROR("Failed to setup GET sink buffer rc %d\n", rc);
+                       kfilnd_tn_free(tn);
+                       return rc;
+               }
+               event = TN_EVENT_INIT_BULK;
+               break;
+
+       default:
+               kfilnd_tn_free(tn);
+               return -EIO;
+       }
+
+       tn->msg_type = lnd_msg_type;
+       tn->tn_lntmsg = msg;    /* finalise msg on completion */
+       tn->lnet_msg_len = tn->tn_nob;
+
+       KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
+                       msg_type_to_str(lnd_msg_type), tn->tn_nob,
+                       tn->tn_num_iovec);
+
+       /* Start the state machine processing this transaction */
+       kfilnd_tn_event_handler(tn, event, 0);
+
+       return 0;
+}
+
+static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
+                      int delayed, unsigned int niov,
+                      struct bio_vec *kiov,
+                      unsigned int offset, unsigned int mlen,
+                      unsigned int rlen)
+{
+       struct kfilnd_transaction *tn = private;
+       struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
+       int nob;
+       int rc = 0;
+       int status = 0;
+       enum tn_events event;
+
+       if (mlen > rlen)
+               return -EINVAL;
+
+       /* Transaction must be in receive state */
+       if (tn->tn_state != TN_STATE_IMM_RECV)
+               return -EINVAL;
+
+       tn->tn_lntmsg = msg;
+       tn->lnet_msg_len = rlen;
+
+       switch (rxmsg->type) {
+       case KFILND_MSG_IMMEDIATE:
+               nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
+               if (nob > tn->tn_rx_msg.length) {
+                       CERROR("Immediate message from %s too big: %d(%lu)\n",
+                              libcfs_nidstr(&msg->msg_hdr.src_nid),
+                              nob, tn->tn_rx_msg.length);
+                       return -EPROTO;
+               }
+               tn->tn_nob = nob;
+
+               lnet_copy_flat2kiov(niov, kiov, offset,
+                                   KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
+                                   offsetof(struct kfilnd_msg,
+                                            proto.immed.payload),
+                                   mlen);
+
+               kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
+               return 0;
+
+       case KFILND_MSG_BULK_PUT_REQ:
+               if (mlen == 0) {
+                       event = TN_EVENT_SKIP_TAG_RMA;
+               } else {
+                       /* Post the buffer given us as a sink  */
+                       tn->sink_buffer = true;
+                       rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
+                                                   mlen);
+                       if (rc) {
+                               CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
+                               kfilnd_tn_free(tn);
+                               return rc;
+                       }
+                       event = TN_EVENT_INIT_TAG_RMA;
+               }
+               break;
+
+       case KFILND_MSG_BULK_GET_REQ:
+               if (!msg) {
+                       event = TN_EVENT_SKIP_TAG_RMA;
+                       status = -ENODATA;
+               } else {
+                       /* Post the buffer given to us as a source  */
+                       tn->sink_buffer = false;
+                       rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
+                                                   msg->msg_niov,
+                                                   msg->msg_offset,
+                                                   msg->msg_len);
+                       if (rc) {
+                               CERROR("Failed to setup GET source buffer rc %d\n", rc);
+                               kfilnd_tn_free(tn);
+                               return rc;
+                       }
+                       event = TN_EVENT_INIT_TAG_RMA;
+               }
+               break;
+
+       default:
+               /* TODO: TN leaks here. */
+               CERROR("Invalid message type = %d\n", rxmsg->type);
+               return -EINVAL;
+       }
+
+       /* Store relevant fields to generate a bulk response. */
+       tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
+       tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
+
+#if 0
+       tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
+                                                KFILND_MSG_BULK_RSP,
+                                                sizeof(struct kfilnd_bulk_rsp),
+                                                ni);
+#endif
+
+       KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
+                       msg_type_to_str(rxmsg->type), tn->tn_nob,
+                       tn->tn_num_iovec);
+
+       kfilnd_tn_event_handler(tn, event, status);
+
+       return rc;
+}
+
+static int kfilnd_startup(struct lnet_ni *ni);
+
+static const struct lnet_lnd the_kfilnd = {
+       .lnd_type       = KFILND,
+       .lnd_startup    = kfilnd_startup,
+       .lnd_shutdown   = kfilnd_shutdown,
+       .lnd_send       = kfilnd_send,
+       .lnd_recv       = kfilnd_recv,
+};
+
+static int kfilnd_startup(struct lnet_ni *ni)
+{
+       const char *node;
+       int rc;
+       struct kfilnd_dev *kfdev;
+
+       if (!ni)
+               return -EINVAL;
+
+       if (ni->ni_net->net_lnd != &the_kfilnd) {
+               CERROR("Wrong lnd type\n");
+               return -EINVAL;
+       }
+
+       kfilnd_tunables_setup(ni);
+
+       /* Only a single interface is supported. */
+       if (!ni->ni_interface) {
+               rc = -ENODEV;
+               CERROR("No LNet network interface address defined\n");
+               goto err;
+       }
+
+       node = ni->ni_interface;
+
+       kfdev = kfilnd_dev_alloc(ni, node);
+       if (IS_ERR(kfdev)) {
+               rc = PTR_ERR(kfdev);
+               CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
+                      rc);
+               goto err;
+       }
+
+       ni->ni_data = kfdev;
+       ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
+
+       /* Post a series of immediate receive buffers */
+       rc = kfilnd_dev_post_imm_buffers(kfdev);
+       if (rc) {
+               CERROR("Can't post buffers, rc = %d\n", rc);
+               goto err_free_dev;
+       }
+
+       return 0;
+
+err_free_dev:
+       kfilnd_dev_free(kfdev);
+err:
+       return rc;
+}
+
+static void __exit kfilnd_exit(void)
+{
+       destroy_workqueue(kfilnd_wq);
+
+       kfilnd_tn_cleanup();
+
+       lnet_unregister_lnd(&the_kfilnd);
+
+       debugfs_remove_recursive(kfilnd_debug_dir);
+}
+
+static int __init kfilnd_init(void)
+{
+       int rc;
+
+       kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
+
+       rc = kfilnd_tunables_init();
+       if (rc)
+               goto err;
+
+       /* Do any initialization of the transaction system */
+       rc = kfilnd_tn_init();
+       if (rc) {
+               CERROR("Cannot initialize transaction system\n");
+               goto err;
+       }
+
+       kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
+                                   WQ_MAX_ACTIVE);
+       if (!kfilnd_wq) {
+               rc = -ENOMEM;
+               CERROR("Failed to allocated kfilnd work queue\n");
+               goto err_tn_cleanup;
+       }
+
+       lnet_register_lnd(&the_kfilnd);
+
+       return 0;
+
+err_tn_cleanup:
+       kfilnd_tn_cleanup();
+err:
+       return rc;
+}
+
+MODULE_AUTHOR("Cray Inc.");
+MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
+MODULE_VERSION(KFILND_VERSION);
+MODULE_LICENSE("GPL");
+
+module_init(kfilnd_init);
+module_exit(kfilnd_exit);
diff --git a/lnet/klnds/kfilnd/kfilnd.h b/lnet/klnds/kfilnd/kfilnd.h
new file mode 100644 (file)
index 0000000..a1a47a4
--- /dev/null
@@ -0,0 +1,692 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd main interface.
+ */
+
+#ifndef _KFILND_
+#define _KFILND_
+
+#include <linux/version.h>
+#include <linux/module.h>
+#include <linux/kernel.h>
+#include <linux/kthread.h>
+#include <linux/mm.h>
+#include <linux/string.h>
+#include <linux/stat.h>
+#include <linux/errno.h>
+#include <linux/unistd.h>
+#include <linux/uio.h>
+#include <linux/rwsem.h>
+#include <linux/mutex.h>
+#include <linux/rhashtable.h>
+#include <linux/workqueue.h>
+#include <linux/debugfs.h>
+#include <linux/seq_file.h>
+#include <linux/ktime.h>
+
+#include <asm/uaccess.h>
+#include <asm/io.h>
+
+#include <linux/init.h>
+#include <linux/fs.h>
+#include <linux/file.h>
+#include <linux/stat.h>
+#include <linux/list.h>
+#include <linux/kmod.h>
+#include <linux/sysctl.h>
+#include <linux/pci.h>
+
+#include <net/sock.h>
+#include <linux/in.h>
+
+#define KFILND_VERSION "0.2.0"
+
+#define DEBUG_SUBSYSTEM S_LND
+
+#include <libcfs/libcfs.h>
+#include <lnet/lib-lnet.h>
+#include "kfi_endpoint.h"
+#include "kfi_errno.h"
+#include "kfi_rma.h"
+#include "kfi_tagged.h"
+#include "kfi_cxi_ext.h"
+
+/* KFILND CFS fail range 0xF100 - 0xF1FF. */
+
+#define CFS_KFI_FAIL_SEND_EVENT 0xF100
+#define CFS_KFI_FAIL_READ_EVENT 0xF101
+#define CFS_KFI_FAIL_WRITE_EVENT 0xF102
+#define CFS_KFI_FAIL_TAGGED_SEND_EVENT 0xF103
+#define CFS_KFI_FAIL_TAGGED_RECV_EVENT 0xF104
+#define CFS_KFI_FAIL_BULK_TIMEOUT 0xF105
+#define CFS_KFI_FAIL_SEND 0xF106
+#define CFS_KFI_FAIL_READ 0xF107
+#define CFS_KFI_FAIL_WRITE 0xF108
+#define CFS_KFI_FAIL_TAGGED_SEND 0xF109
+#define CFS_KFI_FAIL_TAGGED_RECV 0xF10A
+#define CFS_KFI_FAIL_SEND_EAGAIN 0xF10B
+#define CFS_KFI_FAIL_READ_EAGAIN 0xF10C
+#define CFS_KFI_FAIL_WRITE_EAGAIN 0xF10D
+#define CFS_KFI_FAIL_TAGGED_SEND_EAGAIN 0xF10E
+#define CFS_KFI_FAIL_TAGGED_RECV_EAGAIN 0xF10F
+#define CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN 0xF110
+#define CFS_KFI_FAIL_RECV_EAGAIN 0xF111
+#define CFS_KFI_FAIL_RECV 0xF112
+#define CFS_KFI_FAIL_MSG_UNPACK 0xF113
+
+/* Maximum number of transaction keys supported. */
+#define KFILND_EP_KEY_BITS 16U
+#define KFILND_EP_KEY_MAX (BIT(KFILND_EP_KEY_BITS) - 1)
+
+/* Some constants which should be turned into tunables */
+#define KFILND_IMMEDIATE_MSG_SIZE 4096
+
+#define KFILND_MY_PROCID 49152
+
+/* 256 Rx contexts max */
+#define KFILND_FAB_RX_CTX_BITS 8
+
+/* Get the KFI base address from a KFI RX address. RX context information is
+ * stored in the MSBs of the KFI address.
+ */
+#define KFILND_BASE_ADDR(addr) \
+       ((addr) & ((1UL << (64 - KFILND_FAB_RX_CTX_BITS)) - 1))
+
+/* States used by all kfilnd structures */
+enum kfilnd_object_states {
+       KFILND_STATE_UNINITIALIZED,
+       KFILND_STATE_INITIALIZED,
+       KFILND_STATE_SHUTTING_DOWN
+};
+
+extern struct dentry *kfilnd_debug_dir;
+extern const struct file_operations kfilnd_initiator_state_stats_file_ops;
+extern const struct file_operations kfilnd_target_state_stats_file_ops;
+extern const struct file_operations kfilnd_target_stats_file_ops;
+extern const struct file_operations kfilnd_initiator_stats_file_ops;
+extern const struct file_operations kfilnd_reset_stats_file_ops;
+
+extern struct workqueue_struct *kfilnd_wq;
+
+extern unsigned int cksum;
+extern unsigned int tx_scale_factor;
+extern unsigned int rx_cq_scale_factor;
+extern unsigned int tx_cq_scale_factor;
+extern unsigned int eq_size;
+extern unsigned int immediate_rx_buf_count;
+
+int kfilnd_tunables_setup(struct lnet_ni *ni);
+int kfilnd_tunables_init(void);
+
+struct kfilnd_transaction;
+struct kfilnd_ep;
+struct kfilnd_dev;
+
+/* Multi-receive buffers for immediate receives */
+struct kfilnd_immediate_buffer {
+       void *immed_buf;
+       size_t immed_buf_size;
+       struct page *immed_buf_page;
+       atomic_t immed_ref;
+       bool immed_no_repost;
+       struct list_head replay_entry;
+       struct kfilnd_ep *immed_end;
+};
+
+extern atomic_t kfilnd_rx_count;
+
+struct kfilnd_cq;
+
+struct kfilnd_cq_work {
+       struct kfilnd_cq *cq;
+       unsigned int work_cpu;
+       struct work_struct work;
+};
+
+struct kfilnd_cq {
+       struct kfilnd_ep *ep;
+       struct kfid_cq *cq;
+       unsigned int cq_work_count;
+       struct kfilnd_cq_work cq_works[];
+};
+
+struct kfilnd_ep {
+       /* The contexts for this CPT */
+       struct kfid_ep *end_tx;
+       struct kfid_ep *end_rx;
+
+       /* Corresponding CQs */
+       struct kfilnd_cq *end_tx_cq;
+       struct kfilnd_cq *end_rx_cq;
+
+       /* Specific config values for this endpoint */
+       struct kfilnd_dev *end_dev;
+       int end_cpt;
+       int end_context_id;
+
+       /* List of transactions. */
+       struct list_head tn_list;
+       spinlock_t tn_list_lock;
+
+       /* Replay queues. */
+       struct list_head tn_replay;
+       struct list_head imm_buffer_replay;
+       spinlock_t replay_lock;
+       struct timer_list replay_timer;
+       struct work_struct replay_work;
+       atomic_t replay_count;
+
+       /* Key used to build the tag for tagged buffers. */
+       struct ida keys;
+
+       /* Pre-posted immediate buffers */
+       struct kfilnd_immediate_buffer end_immed_bufs[];
+};
+
+struct kfilnd_peer {
+       struct rhash_head node;
+       struct rcu_head rcu_head;
+       struct kfilnd_dev *dev;
+       lnet_nid_t nid;
+       kfi_addr_t addr;
+       atomic_t rx_base;
+       atomic_t remove_peer;
+       refcount_t cnt;
+       time64_t last_alive;
+       u16 version;
+       u32 local_session_key;
+       u32 remote_session_key;
+};
+
+static inline bool kfilnd_peer_is_new_peer(struct kfilnd_peer *peer)
+{
+       return peer->version == 0;
+}
+
+static inline void kfilnd_peer_set_version(struct kfilnd_peer *peer,
+                                          u16 version)
+{
+       peer->version = version;
+}
+
+static inline void kfilnd_peer_set_remote_session_key(struct kfilnd_peer *peer,
+                                                     u32 session_key)
+{
+       peer->remote_session_key = session_key;
+}
+
+struct kfilnd_fab {
+       struct list_head entry;
+       struct list_head dom_list;
+       struct mutex dom_list_lock;
+       struct kfid_fabric *fabric;
+       struct kref cnt;
+};
+
+struct kfilnd_dom {
+       struct list_head entry;
+       struct list_head dev_list;
+       spinlock_t lock;
+       struct kfilnd_fab *fab;
+       struct kfid_domain *domain;
+       struct kref cnt;
+};
+
+/* Transaction States */
+enum tn_states {
+       TN_STATE_INVALID,
+
+       /* Shared initiator and target states. */
+       TN_STATE_IDLE,
+       TN_STATE_WAIT_TAG_COMP,
+
+       /* Initiator immediate states. */
+       TN_STATE_IMM_SEND,
+
+       /* Initiator bulk states. */
+       TN_STATE_TAGGED_RECV_POSTED,
+       TN_STATE_SEND_FAILED,
+       TN_STATE_WAIT_COMP,
+       TN_STATE_WAIT_TIMEOUT_COMP,
+       TN_STATE_WAIT_SEND_COMP,
+       TN_STATE_WAIT_TIMEOUT_TAG_COMP,
+       TN_STATE_FAIL,
+
+       /* Target states. */
+       TN_STATE_IMM_RECV,
+       TN_STATE_WAIT_TAG_RMA_COMP,
+
+       /* Invalid max value. */
+       TN_STATE_MAX,
+};
+
+/* Base duration state stats. */
+struct kfilnd_tn_duration_stat {
+       atomic64_t accumulated_duration;
+       atomic_t accumulated_count;
+};
+
+/* Transaction state stats group into 22 buckets. Bucket zero corresponds to
+ * LNet message size of 0 bytes and buckets 1 through 21 correspond to LNet
+ * message sizes of 1 to 1048576 bytes increasing by a power of 2. LNet message
+ * sizes are round up to the nearest power of 2.
+ */
+#define KFILND_DATA_SIZE_BUCKETS 22U
+#define KFILND_DATA_SIZE_MAX_SIZE (1U << (KFILND_DATA_SIZE_BUCKETS - 2))
+struct kfilnd_tn_data_size_duration_stats {
+       struct kfilnd_tn_duration_stat data_size[KFILND_DATA_SIZE_BUCKETS];
+};
+
+static inline unsigned int kfilnd_msg_len_to_data_size_bucket(size_t size)
+{
+       u64 bit;
+
+       if (size == 0)
+               return 0;
+       if (size >= KFILND_DATA_SIZE_MAX_SIZE)
+               return KFILND_DATA_SIZE_BUCKETS - 1;
+
+       /* Round size up to the nearest power of 2. */
+       bit = fls64(size);
+       if (BIT(bit) < size)
+               bit++;
+
+       return (unsigned int)bit;
+}
+
+/* One data size duraction state bucket for each transaction state. */
+struct kfilnd_tn_state_data_size_duration_stats {
+       struct kfilnd_tn_data_size_duration_stats state[TN_STATE_MAX];
+};
+
+struct kfilnd_dev {
+       struct list_head        kfd_list;       /* chain on kfid_devs */
+       struct lnet_ni          *kfd_ni;
+       enum kfilnd_object_states kfd_state;
+
+       /* KFI LND domain the device is associated with. */
+       struct kfilnd_dom       *dom;
+
+       /* Fields specific to kfabric operation */
+       spinlock_t              kfd_lock;
+       struct kfid_ep          *kfd_sep;
+       struct kfid_av          *kfd_av;
+       struct kfilnd_ep        **kfd_endpoints;
+
+       /* Map of LNet NI CPTs to endpoints. */
+       struct kfilnd_ep        **cpt_to_endpoint;
+
+       /* Hash of LNet NIDs to KFI addresses. */
+       struct rhashtable peer_cache;
+
+       /* Per LNet NI states. */
+       struct kfilnd_tn_state_data_size_duration_stats initiator_state_stats;
+       struct kfilnd_tn_state_data_size_duration_stats target_state_stats;
+       struct kfilnd_tn_data_size_duration_stats initiator_stats;
+       struct kfilnd_tn_data_size_duration_stats target_stats;
+
+       /* Per LNet NI debugfs stats. */
+       struct dentry *dev_dir;
+       struct dentry *initiator_state_stats_file;
+       struct dentry *initiator_stats_file;
+       struct dentry *target_state_stats_file;
+       struct dentry *target_stats_file;
+       struct dentry *reset_stats_file;
+
+       /* Physical NIC address. */
+       unsigned int nic_addr;
+       atomic_t session_keys;
+};
+
+/* Invalid checksum value is treated as no checksum. */
+/* TODO: Module parameter to disable checksum? */
+#define NO_CHECKSUM 0x0
+
+/* Hello message header. */
+struct kfilnd_hello_msg {
+       /* Support kfilnd version. */
+       __u16 version;
+
+       /* Base RX context peer should used. */
+       __u16 rx_base;
+
+       /* Session key used by peer. */
+       __u32 session_key;
+
+       /* RX context count peer can target. */
+       __u16 rx_count;
+} __packed;
+
+/* Immediate message header. */
+struct kfilnd_immed_msg {
+       /* Entire LNet header needed by the destination to match incoming
+        * message.
+        */
+       struct lnet_hdr_nid4    hdr;
+
+       /* Entire LNet message payload. */
+       char payload[0];
+} __packed;
+
+/* Bulk request message header. */
+struct kfilnd_bulk_req_msg {
+       /* Entire LNet header needed by the destination to match incoming
+        * message.
+        */
+       struct lnet_hdr_nid4    hdr;
+
+       /* Specific RX context the target must target to push/pull LNet
+        * payload.
+        */
+       __u32 response_rx;
+
+       /* Memory key needed by the target to push/pull LNet payload. */
+       __u16 key;
+} __packed;
+
+/* Kfilnd message. Includes base transport header plus embedded protocol
+ * message.
+ */
+struct kfilnd_msg {
+       /* Unique kfilnd magic. */
+       __u32 magic;
+
+       /* Version of the kfilnd protocol. */
+       __u16 version;
+
+       /* Specific kfilnd protocol type. */
+       __u8 type;
+
+       /* Unused 8 bits. */
+       __u8 reserved;
+
+       /* Number of bytes in message. */
+       __u16 nob;
+
+       /* Checksum of entire message. 0 is checksum disabled. */
+       __sum16 cksum;
+
+       /* Message LNet source NID. */
+       __u64 srcnid;
+
+       /* Message LNet target NID. */
+       __u64 dstnid;
+
+       /* Embedded protocol headers. Must remain at bottom. */
+       union {
+               struct kfilnd_immed_msg immed;
+               struct kfilnd_bulk_req_msg bulk_req;
+               struct kfilnd_hello_msg hello;
+       } __packed proto;
+} __packed;
+
+#define KFILND_MSG_MAGIC LNET_PROTO_KFI_MAGIC  /* unique magic */
+
+#define KFILND_MSG_VERSION_1   0x1
+#define KFILND_MSG_VERSION     KFILND_MSG_VERSION_1
+
+/* Get the KFI RX context from a KFI RX address. RX context information is
+ * stored in the MSBs of the KFI address.
+ */
+#define KFILND_RX_CONTEXT(addr) ((addr) >> (64 - KFILND_FAB_RX_CTX_BITS))
+
+#define KFILND_EP_DEBUG(ep, fmt, ...) \
+       CDEBUG(D_NET, "%s:%d " fmt "\n", \
+              libcfs_nidstr(&(ep)->end_dev->kfd_ni->ni_nid), \
+              (ep)->end_context_id, ##__VA_ARGS__)
+
+#define KFILND_EP_ERROR(ep, fmt, ...) \
+       CNETERR("%s:%d " fmt "\n", \
+               libcfs_nidstr(&(ep)->end_dev->kfd_ni->ni_nid), \
+               (ep)->end_context_id, ##__VA_ARGS__)
+
+#define KFILND_TN_PEER_VALID(tn) \
+       !IS_ERR_OR_NULL((tn)->peer)
+
+#define KFILND_TN_DIR_DEBUG(tn, fmt, dir, ...) \
+       CDEBUG(D_NET, "Transaction ID %p: %s:%u %s %s:%llu " fmt "\n", \
+              (tn), \
+              libcfs_nidstr(&(tn)->tn_ep->end_dev->kfd_ni->ni_nid), \
+              (tn)->tn_ep->end_context_id, dir, \
+              libcfs_nid2str((tn)->peer->nid), \
+              KFILND_TN_PEER_VALID(tn) ? \
+               KFILND_RX_CONTEXT((tn)->peer->addr) : 0, \
+              ##__VA_ARGS__)
+
+#define KFILND_TN_DEBUG(tn, fmt, ...) \
+       do { \
+               if ((tn)->is_initiator) \
+                       KFILND_TN_DIR_DEBUG(tn, fmt, "->", ##__VA_ARGS__); \
+               else \
+                       KFILND_TN_DIR_DEBUG(tn, fmt, "<-", ##__VA_ARGS__); \
+       } while (0)
+
+#define KFILND_TN_DIR_ERROR(tn, fmt, dir, ...) \
+       CNETERR("Transaction ID %p: %s:%u %s %s:%llu " fmt "\n", \
+               (tn), \
+               libcfs_nidstr(&(tn)->tn_ep->end_dev->kfd_ni->ni_nid), \
+               (tn)->tn_ep->end_context_id, dir, \
+               libcfs_nid2str((tn)->peer->nid), \
+               KFILND_TN_PEER_VALID(tn) ? \
+                       KFILND_RX_CONTEXT((tn)->peer->addr) : 0, \
+               ##__VA_ARGS__)
+
+#define KFILND_TN_ERROR(tn, fmt, ...) \
+       do { \
+               if ((tn)->is_initiator) \
+                       KFILND_TN_DIR_ERROR(tn, fmt, "->", ##__VA_ARGS__); \
+               else \
+                       KFILND_TN_DIR_ERROR(tn, fmt, "<-", ##__VA_ARGS__); \
+       } while (0)
+
+/* TODO: Support NOOPs? */
+enum kfilnd_msg_type {
+       /* Valid message types start at 1. */
+       KFILND_MSG_INVALID,
+
+       /* Valid message types. */
+       KFILND_MSG_IMMEDIATE,
+       KFILND_MSG_BULK_PUT_REQ,
+       KFILND_MSG_BULK_GET_REQ,
+       KFILND_MSG_HELLO_REQ,
+       KFILND_MSG_HELLO_RSP,
+
+       /* Invalid max value. */
+       KFILND_MSG_MAX,
+};
+
+static inline const char *msg_type_to_str(enum kfilnd_msg_type type)
+{
+       static const char *str[KFILND_MSG_MAX] = {
+               [KFILND_MSG_INVALID] = "KFILND_MSG_INVALID",
+               [KFILND_MSG_IMMEDIATE] = "KFILND_MSG_IMMEDIATE",
+               [KFILND_MSG_BULK_PUT_REQ] = "KFILND_MSG_BULK_PUT_REQ",
+               [KFILND_MSG_BULK_GET_REQ] = "KFILND_MSG_BULK_GET_REQ",
+               [KFILND_MSG_HELLO_REQ] = "KFILND_MSG_HELLO_REQ",
+               [KFILND_MSG_HELLO_RSP] = "KFILND_MSG_HELLO_RSP",
+       };
+
+       if (type >= KFILND_MSG_MAX)
+               return "KFILND_MSG_INVALID";
+
+       return str[type];
+};
+
+static inline const char *tn_state_to_str(enum tn_states type)
+{
+       static const char *str[TN_STATE_MAX] = {
+               [TN_STATE_INVALID] = "TN_STATE_INVALID",
+               [TN_STATE_IDLE] = "TN_STATE_IDLE",
+               [TN_STATE_WAIT_TAG_COMP] = "TN_STATE_WAIT_TAG_COMP",
+               [TN_STATE_IMM_SEND] = "TN_STATE_IMM_SEND",
+               [TN_STATE_TAGGED_RECV_POSTED] = "TN_STATE_TAGGED_RECV_POSTED",
+               [TN_STATE_SEND_FAILED] = "TN_STATE_SEND_FAILED",
+               [TN_STATE_WAIT_COMP] = "TN_STATE_WAIT_COMP",
+               [TN_STATE_WAIT_TIMEOUT_COMP] = "TN_STATE_WAIT_TIMEOUT_COMP",
+               [TN_STATE_WAIT_SEND_COMP] = "TN_STATE_WAIT_SEND_COMP",
+               [TN_STATE_WAIT_TIMEOUT_TAG_COMP] = "TN_STATE_WAIT_TIMEOUT_TAG_COMP",
+               [TN_STATE_FAIL] = "TN_STATE_FAIL",
+               [TN_STATE_IMM_RECV] = "TN_STATE_IMM_RECV",
+               [TN_STATE_WAIT_TAG_RMA_COMP] = "TN_STATE_WAIT_TAG_RMA_COMP",
+       };
+
+       return str[type];
+};
+
+/* Transaction Events */
+enum tn_events {
+       TN_EVENT_INVALID,
+
+       /* Initiator events. */
+       TN_EVENT_INIT_IMMEDIATE,
+       TN_EVENT_INIT_BULK,
+       TN_EVENT_TX_HELLO,
+       TN_EVENT_TX_OK,
+       TN_EVENT_TX_FAIL,
+       TN_EVENT_TAG_RX_OK,
+       TN_EVENT_TAG_RX_FAIL,
+       TN_EVENT_TAG_RX_CANCEL,
+       TN_EVENT_TIMEOUT,
+
+       /* Target events. */
+       TN_EVENT_RX_HELLO,
+       TN_EVENT_RX_OK,
+       TN_EVENT_RX_FAIL,
+       TN_EVENT_INIT_TAG_RMA,
+       TN_EVENT_SKIP_TAG_RMA,
+       TN_EVENT_TAG_TX_OK,
+       TN_EVENT_TAG_TX_FAIL,
+
+       /* Invalid max value. */
+       TN_EVENT_MAX,
+};
+
+static inline const char *tn_event_to_str(enum tn_events type)
+{
+       static const char *str[TN_EVENT_MAX] = {
+               [TN_EVENT_INVALID] = "TN_EVENT_INVALID",
+               [TN_EVENT_INIT_IMMEDIATE] = "TN_EVENT_INIT_IMMEDIATE",
+               [TN_EVENT_INIT_BULK] = "TN_EVENT_INIT_BULK",
+               [TN_EVENT_TX_HELLO] = "TN_EVENT_TX_HELLO",
+               [TN_EVENT_TX_OK] = "TN_EVENT_TX_OK",
+               [TN_EVENT_TX_FAIL] = "TN_EVENT_TX_FAIL",
+               [TN_EVENT_TAG_RX_OK] = "TN_EVENT_TAG_RX_OK",
+               [TN_EVENT_TAG_RX_FAIL] = "TN_EVENT_TAG_RX_FAIL",
+               [TN_EVENT_TAG_RX_CANCEL] = "TN_EVENT_TAG_RX_CANCEL",
+               [TN_EVENT_TIMEOUT] = "TN_EVENT_TIMEOUT",
+               [TN_EVENT_RX_HELLO] = "TN_EVENT_RX_HELLO",
+               [TN_EVENT_RX_OK] = "TN_EVENT_RX_OK",
+               [TN_EVENT_RX_FAIL] = "TN_EVENT_RX_FAIL",
+               [TN_EVENT_INIT_TAG_RMA] = "TN_EVENT_INIT_TAG_RMA",
+               [TN_EVENT_SKIP_TAG_RMA] = "TN_EVENT_SKIP_TAG_RMA",
+               [TN_EVENT_TAG_TX_FAIL] = "TN_EVENT_TAG_TX_FAIL",
+       };
+
+       return str[type];
+};
+
+struct kfilnd_transaction_msg {
+       struct kfilnd_msg *msg;
+       size_t length;
+};
+
+/* Initiator and target transaction structure. */
+struct kfilnd_transaction {
+       /* Endpoint list transaction lives on. */
+       struct list_head        tn_entry;
+       struct mutex            tn_lock;        /* to serialize events */
+       int                     tn_status;      /* return code from ops */
+       struct kfilnd_ep        *tn_ep;         /* endpoint we operate under */
+       enum tn_states          tn_state;       /* current state of Tn */
+       struct lnet_msg         *tn_lntmsg;     /* LNet msg to finalize */
+       struct lnet_msg         *tn_getreply;   /* GET LNet msg to finalize */
+
+       bool                    is_initiator;   /* Initiated LNet transfer. */
+
+       /* Transaction send message and target address. */
+       kfi_addr_t              tn_target_addr;
+       struct kfilnd_peer      *peer;
+       struct kfilnd_transaction_msg tn_tx_msg;
+
+       /* Transaction multi-receive buffer and associated receive message. */
+       struct kfilnd_immediate_buffer *tn_posted_buf;
+       struct kfilnd_transaction_msg tn_rx_msg;
+
+       /* LNet buffer used to register a memory region or perform a RMA
+        * operation.
+        */
+       struct bio_vec          tn_kiov[LNET_MAX_IOV];
+       unsigned int            tn_num_iovec;
+
+       /* LNet transaction payload byte count. */
+       unsigned int            tn_nob;
+
+       /* Bulk transaction buffer is sink or source buffer. */
+       bool sink_buffer;
+
+       /* Memory region and remote key used to cover initiator's buffer. */
+       u16                     tn_mr_key;
+
+       /* RX context used to perform response operations to a Put/Get
+        * request. This is required since the request initiator locks in a
+        * transactions to a specific RX context.
+        */
+       u16                     tn_response_mr_key;
+       u8                      tn_response_rx;
+
+       /* Immediate data used to convey transaction state from LNet target to
+        * LNet intiator.
+        */
+       u64 tagged_data;
+
+       /* Bulk operation timeout timer. */
+       struct timer_list timeout_timer;
+       struct work_struct timeout_work;
+
+       /* Transaction health status. */
+       enum lnet_msg_hstatus hstatus;
+
+       /* Transaction deadline. */
+       ktime_t deadline;
+
+       ktime_t tn_alloc_ts;
+       ktime_t tn_state_ts;
+       size_t lnet_msg_len;
+
+       /* Fields used to replay transaction. */
+       struct list_head replay_entry;
+       enum tn_events replay_event;
+       int replay_status;
+
+       enum kfilnd_msg_type msg_type;
+};
+
+int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt, lnet_nid_t nid);
+
+#endif /* _KFILND_ */
diff --git a/lnet/klnds/kfilnd/kfilnd_cq.c b/lnet/klnds/kfilnd/kfilnd_cq.c
new file mode 100644 (file)
index 0000000..d070afe
--- /dev/null
@@ -0,0 +1,253 @@
+
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd completion queue.
+ */
+#include <linux/idr.h>
+#include <linux/mutex.h>
+#include <linux/byteorder/generic.h>
+
+#include "kfilnd_cq.h"
+#include "kfilnd_tn.h"
+#include "kfilnd_ep.h"
+
+void kfilnd_cq_process_error(struct kfilnd_ep *ep,
+                            struct kfi_cq_err_entry *error)
+{
+       struct kfilnd_immediate_buffer *buf;
+       struct kfilnd_transaction *tn;
+       enum tn_events tn_event;
+       int status;
+
+       switch (error->flags) {
+       case KFI_MSG | KFI_RECV:
+               if (error->err != ECANCELED) {
+                       KFILND_EP_ERROR(ep, "Dropping error receive event %d\n",
+                                       -error->err);
+                       return;
+               }
+               fallthrough;
+       case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
+               buf = error->op_context;
+               kfilnd_ep_imm_buffer_put(buf);
+               return;
+
+       case KFI_TAGGED | KFI_RECV:
+       case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
+       case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
+       case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
+               tn = error->op_context;
+               if (error->err == ECANCELED) {
+                       tn_event = TN_EVENT_TAG_RX_CANCEL;
+                       status = 0;
+               } else {
+                       tn_event = TN_EVENT_TAG_RX_FAIL;
+                       status = -error->err;
+               }
+               break;
+
+       case KFI_MSG | KFI_SEND:
+               tn = error->op_context;
+               tn_event = TN_EVENT_TX_FAIL;
+               status = -error->err;
+               break;
+
+       case KFI_TAGGED | KFI_SEND:
+       case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
+       case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
+               tn = error->op_context;
+               tn_event = TN_EVENT_TAG_TX_FAIL;
+               status = -error->err;
+               break;
+
+       default:
+               LBUG();
+       }
+
+       kfilnd_tn_event_handler(tn, tn_event, status);
+}
+
+static void kfilnd_cq_process_event(struct kfi_cq_data_entry *event)
+{
+       struct kfilnd_immediate_buffer *buf;
+       struct kfilnd_msg *rx_msg;
+       struct kfilnd_transaction *tn;
+       enum tn_events tn_event;
+       int64_t status = 0;
+
+       switch (event->flags) {
+       case KFI_MSG | KFI_RECV:
+       case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
+               buf = event->op_context;
+               rx_msg = event->buf;
+
+               kfilnd_tn_process_rx_event(buf, rx_msg, event->len);
+
+               /* If the KFI_MULTI_RECV flag is set, the buffer was
+                * unlinked.
+                */
+               if (event->flags & KFI_MULTI_RECV)
+                       kfilnd_ep_imm_buffer_put(buf);
+               return;
+
+       case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
+               status = -1 * (int64_t)be64_to_cpu(event->data);
+               fallthrough;
+       case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
+       case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
+               tn_event = TN_EVENT_TAG_RX_OK;
+               tn = event->op_context;
+               break;
+
+       case KFI_TAGGED | KFI_SEND:
+       case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
+       case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
+               tn = event->op_context;
+               tn_event = TN_EVENT_TAG_TX_OK;
+               break;
+
+       case KFI_MSG | KFI_SEND:
+               tn = event->op_context;
+               tn_event = TN_EVENT_TX_OK;
+               break;
+
+       default:
+               LBUG();
+       }
+
+       kfilnd_tn_event_handler(tn, tn_event, status);
+}
+
+static void kfilnd_cq_process_completion(struct work_struct *work)
+{
+       struct kfilnd_cq_work *cq_work =
+               container_of(work, struct kfilnd_cq_work, work);
+       struct kfilnd_cq *kfilnd_cq = cq_work->cq;
+       struct kfid_cq *cq = kfilnd_cq->cq;
+       struct kfi_cq_data_entry event;
+       struct kfi_cq_err_entry error;
+       ssize_t rc;
+       bool done = false;
+
+       /* Drain the KFI completion queue of all events and errors. */
+       while (!done) {
+               rc = kfi_cq_read(cq, &event, 1);
+               if (rc == -KFI_EAVAIL) {
+                       while (kfi_cq_readerr(cq, &error, 0) == 1)
+                               kfilnd_cq_process_error(kfilnd_cq->ep, &error);
+               } else if (rc == 1) {
+                       kfilnd_cq_process_event(&event);
+               } else if (rc == -EAGAIN) {
+                       done = true;
+               } else {
+                       KFILND_EP_ERROR(kfilnd_cq->ep, "Unexpected rc = %ld",
+                                       rc);
+                       done = true;
+               }
+       }
+
+       if (kfilnd_ep_replays_pending(kfilnd_cq->ep))
+               kfilnd_ep_flush_replay_queue(kfilnd_cq->ep);
+}
+
+static void kfilnd_cq_completion(struct kfid_cq *cq, void *context)
+{
+       struct kfilnd_cq *kfilnd_cq = context;
+       struct kfilnd_cq_work *cq_work;
+       unsigned int i;
+
+       for (i = 0; i < kfilnd_cq->cq_work_count; i++) {
+               cq_work = &kfilnd_cq->cq_works[i];
+               queue_work_on(cq_work->work_cpu, kfilnd_wq, &cq_work->work);
+       }
+}
+
+#define CQ_ALLOC_SIZE(cpu_count) \
+       (sizeof(struct kfilnd_cq) + \
+        (sizeof(struct kfilnd_cq_work) * (cpu_count)))
+
+struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep,
+                                 struct kfi_cq_attr *attr)
+{
+       struct kfilnd_cq *cq;
+       cpumask_var_t *cpu_mask;
+       int rc;
+       unsigned int cpu_count = 0;
+       unsigned int cpu;
+       unsigned int i;
+       size_t alloc_size;
+       struct kfilnd_cq_work *cq_work;
+
+       cpu_mask = cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt);
+       for_each_cpu(cpu, *cpu_mask)
+               cpu_count++;
+
+       alloc_size = CQ_ALLOC_SIZE(cpu_count);
+       LIBCFS_CPT_ALLOC(cq, lnet_cpt_table(), ep->end_cpt, alloc_size);
+       if (!cq) {
+               rc = -ENOMEM;
+               KFILND_EP_ERROR(ep, "Failed to allocate memory: rc=%d", rc);
+               goto err;
+       }
+
+       memset(cq, 0, alloc_size);
+
+       rc = kfi_cq_open(ep->end_dev->dom->domain, attr, &cq->cq,
+                        kfilnd_cq_completion, cq);
+       if (rc) {
+               KFILND_EP_ERROR(ep, "Failed to open KFI CQ: rc=%d", rc);
+               goto err_free_kfilnd_cq;
+       }
+
+       i = 0;
+       for_each_cpu(cpu, *cpu_mask) {
+               cq_work = &cq->cq_works[i];
+               cq_work->cq = cq;
+               cq_work->work_cpu = cpu;
+               INIT_WORK(&cq_work->work, kfilnd_cq_process_completion);
+               i++;
+       }
+
+       cq->ep = ep;
+       cq->cq_work_count = cpu_count;
+
+       return cq;
+
+err_free_kfilnd_cq:
+       LIBCFS_FREE(cq, alloc_size);
+err:
+       return ERR_PTR(rc);
+}
+
+void kfilnd_cq_free(struct kfilnd_cq *cq)
+{
+       flush_workqueue(kfilnd_wq);
+       kfi_close(&cq->cq->fid);
+       LIBCFS_FREE(cq, CQ_ALLOC_SIZE(cq->cq_work_count));
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_cq.h b/lnet/klnds/kfilnd/kfilnd_cq.h
new file mode 100644 (file)
index 0000000..2aad454
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd completion queue.
+ */
+#ifndef _KFILND_CQ_
+#define _KFILND_CQ_
+
+#include "kfilnd.h"
+
+void kfilnd_cq_process_error(struct kfilnd_ep *ep,
+                            struct kfi_cq_err_entry *error);
+struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep,
+                                 struct kfi_cq_attr *attr);
+void kfilnd_cq_free(struct kfilnd_cq *cq);
+
+#endif /*_KFILND_CQ_ */
diff --git a/lnet/klnds/kfilnd/kfilnd_debugfs.c b/lnet/klnds/kfilnd/kfilnd_debugfs.c
new file mode 100644 (file)
index 0000000..21504c5
--- /dev/null
@@ -0,0 +1,204 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd device implementation.
+ */
+#include "kfilnd.h"
+#include "kfilnd_dev.h"
+
+#define TIME_MAX 0xFFFFFFFFFFFF
+static s64 print_duration(struct kfilnd_tn_duration_stat *stat)
+{
+       s64 duration;
+
+       if (!atomic_read(&stat->accumulated_count))
+               return 0;
+
+       duration = atomic64_read(&stat->accumulated_duration) /
+               atomic_read(&stat->accumulated_count);
+
+       return min_t(s64, duration, TIME_MAX);
+}
+
+static void seq_print_tn_state_stats(struct seq_file *s, struct kfilnd_dev *dev,
+                                    bool initiator)
+{
+       struct kfilnd_tn_state_data_size_duration_stats *state_stats;
+       unsigned int data_size;
+
+       if (initiator)
+               state_stats = &dev->initiator_state_stats;
+       else
+               state_stats = &dev->target_state_stats;
+
+       seq_printf(s, "%-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s\n",
+                  "MSG_SIZE", "IDLE", "WAIT_TAG_COMP", "IMM_SEND",
+                  "TAGGED_RECV_POSTED", "SEND_FAILED", "WAIT_COMP",
+                  "WAIT_TOUT_COMP", "SEND_COMP", "WAIT_TOUT_TAG_COMP", "FAIL",
+                  "IMM_RECV", "WAIT_TAG_RMA_COMP");
+
+       for (data_size = 0; data_size < KFILND_DATA_SIZE_BUCKETS; data_size++) {
+               seq_printf(s, "%-20lu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu\n",
+                          data_size == 0 ? 0 : BIT(data_size - 1),
+                          print_duration(&state_stats->state[TN_STATE_IDLE].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_WAIT_TAG_COMP].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_IMM_SEND].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_TAGGED_RECV_POSTED].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_SEND_FAILED].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_WAIT_COMP].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_WAIT_TIMEOUT_COMP].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_WAIT_SEND_COMP].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_WAIT_TIMEOUT_TAG_COMP].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_FAIL].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_IMM_RECV].data_size[data_size]),
+                          print_duration(&state_stats->state[TN_STATE_WAIT_TAG_RMA_COMP].data_size[data_size]));
+       }
+}
+
+static int kfilnd_initiator_state_stats_file_show(struct seq_file *s,
+                                                 void *unused)
+{
+       seq_print_tn_state_stats(s, s->private, true);
+
+       return 0;
+}
+
+static int kfilnd_initiator_state_stats_file_open(struct inode *inode,
+                                                 struct file *file)
+{
+       return single_open(file, kfilnd_initiator_state_stats_file_show,
+                          inode->i_private);
+}
+
+const struct file_operations kfilnd_initiator_state_stats_file_ops = {
+       .owner = THIS_MODULE,
+       .open = kfilnd_initiator_state_stats_file_open,
+       .read = seq_read,
+       .llseek  = seq_lseek,
+       .release = seq_release,
+};
+
+static int kfilnd_target_state_stats_file_show(struct seq_file *s,
+                                              void *unused)
+{
+       seq_print_tn_state_stats(s, s->private, false);
+
+       return 0;
+}
+
+static int kfilnd_target_state_stats_file_open(struct inode *inode,
+                                              struct file *file)
+{
+       return single_open(file, kfilnd_target_state_stats_file_show,
+                          inode->i_private);
+}
+
+const struct file_operations kfilnd_target_state_stats_file_ops = {
+       .owner = THIS_MODULE,
+       .open = kfilnd_target_state_stats_file_open,
+       .read = seq_read,
+       .llseek  = seq_lseek,
+       .release = seq_release,
+};
+
+static void seq_print_tn_stats(struct seq_file *s, struct kfilnd_dev *dev,
+                              bool initiator)
+{
+       struct kfilnd_tn_data_size_duration_stats *stats;
+       unsigned int data_size;
+
+       if (initiator)
+               stats = &dev->initiator_stats;
+       else
+               stats = &dev->target_stats;
+
+       seq_printf(s, "%-16s %-16s\n", "MSG_SIZE", "DURATION");
+
+       for (data_size = 0; data_size < KFILND_DATA_SIZE_BUCKETS; data_size++) {
+               seq_printf(s, "%-16lu %-16llu\n",
+                          data_size == 0 ? 0 : BIT(data_size - 1),
+                          print_duration(&stats->data_size[data_size]));
+       }
+}
+
+static int kfilnd_initiator_stats_file_show(struct seq_file *s, void *unused)
+{
+       seq_print_tn_stats(s, s->private, true);
+
+       return 0;
+}
+
+static int kfilnd_initiator_stats_file_open(struct inode *inode,
+                                           struct file *file)
+{
+       return single_open(file, kfilnd_initiator_stats_file_show,
+                          inode->i_private);
+}
+
+const struct file_operations kfilnd_initiator_stats_file_ops = {
+       .owner = THIS_MODULE,
+       .open = kfilnd_initiator_stats_file_open,
+       .read = seq_read,
+       .llseek  = seq_lseek,
+       .release = seq_release,
+};
+
+static int kfilnd_target_stats_file_show(struct seq_file *s, void *unused)
+{
+       seq_print_tn_stats(s, s->private, false);
+
+       return 0;
+}
+
+static int kfilnd_target_stats_file_open(struct inode *inode, struct file *file)
+{
+       return single_open(file, kfilnd_target_stats_file_show,
+                          inode->i_private);
+}
+
+const struct file_operations kfilnd_target_stats_file_ops = {
+       .owner = THIS_MODULE,
+       .open = kfilnd_target_stats_file_open,
+       .read = seq_read,
+       .llseek  = seq_lseek,
+       .release = seq_release,
+};
+
+static ssize_t kfilnd_reset_stats_file_write(struct file *filp,
+                                            const char __user *buf,
+                                            size_t count, loff_t *loff)
+{
+       kfilnd_dev_reset_stats(filp->f_inode->i_private);
+
+       return count;
+}
+
+const struct file_operations kfilnd_reset_stats_file_ops = {
+       .owner = THIS_MODULE,
+       .write = kfilnd_reset_stats_file_write,
+};
diff --git a/lnet/klnds/kfilnd/kfilnd_dev.c b/lnet/klnds/kfilnd/kfilnd_dev.c
new file mode 100644 (file)
index 0000000..5113551
--- /dev/null
@@ -0,0 +1,335 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd device implementation.
+ */
+#include "kfilnd_dev.h"
+#include "kfilnd_ep.h"
+#include "kfilnd_dom.h"
+#include "kfilnd_peer.h"
+
+/**
+ * kfilnd_dev_post_imm_buffers() - Post all immediate receive buffers on each
+ * KFI LND endpoint.
+ * @dev: KFI LND device to have all endpoint receive buffers posted.
+ *
+ * This function should be called only during KFI LND device initialization.
+ *
+ * Return: On success, zero. Else, negative errno.
+ */
+int kfilnd_dev_post_imm_buffers(struct kfilnd_dev *dev)
+{
+       int i;
+       int rc;
+
+       if (!dev)
+               return -EINVAL;
+
+       for (i = 0; i < dev->kfd_ni->ni_ncpts; i++) {
+               rc = kfilnd_ep_post_imm_buffers(dev->kfd_endpoints[i]);
+               if (rc)
+                       return rc;
+       }
+
+       return 0;
+}
+
+/**
+ * kfilnd_dev_free() - Free a KFI LND device.
+ *
+ * This function will not complete until all underlying KFI LND transactions are
+ * complete.
+ *
+ * Once the KFI LND device is freed, a reference is returned to the module.
+ */
+void kfilnd_dev_free(struct kfilnd_dev *dev)
+{
+       int i;
+       int lnet_ncpts;
+
+       if (!dev)
+               return;
+
+       debugfs_remove_recursive(dev->dev_dir);
+
+       /* Change state to shutting down so TNs stop using it */
+       dev->kfd_state = KFILND_STATE_SHUTTING_DOWN;
+
+       /* Cancel all outstanding RX buffers. */
+       for (i = 0; i < dev->kfd_ni->ni_ncpts; i++)
+               kfilnd_ep_cancel_imm_buffers(dev->kfd_endpoints[i]);
+
+       /* Free all endpoints. */
+       for (i = 0; i < dev->kfd_ni->ni_ncpts; i++)
+               kfilnd_ep_free(dev->kfd_endpoints[i]);
+
+       kfilnd_peer_destroy(dev);
+
+       lnet_ncpts = cfs_cpt_number(lnet_cpt_table());
+       LIBCFS_FREE(dev->cpt_to_endpoint,
+                   lnet_ncpts * sizeof(*dev->cpt_to_endpoint));
+
+       LIBCFS_FREE(dev->kfd_endpoints,
+                   dev->kfd_ni->ni_ncpts * sizeof(*dev->kfd_endpoints));
+
+       kfi_close(&dev->kfd_sep->fid);
+       kfi_close(&dev->kfd_av->fid);
+
+       kfilnd_dom_put(dev->dom);
+
+       LIBCFS_FREE(dev, sizeof(*dev));
+
+       module_put(THIS_MODULE);
+}
+
+/**
+ * kfilnd_dev_alloc() - Allocate a new KFI LND device a LNet NI.
+ * @ni: LNet NI used to allocate the KFI LND device.
+ * @node: Node string which can be passed into kfi_getinfo().
+ *
+ * During KFI LND device allocation, the LNet NID NID is used to build node
+ * and service string. The LNet NID address (IPv4 address) is used for the node
+ * string. The LNet NID net number is used for the service string. Together, the
+ * node and service string define the address of the KFI LND device.
+ *
+ * The node and service strings are used to allocate a KFI scalable endpoint.
+ * The KFI scalable endpoint is later used to allocate KFI LND endpoints.
+ *
+ * For each successful KFI LND device allocation, a reference is taken against
+ * this module to it free being prematurely removed.
+ *
+ * Return: On success, valid pointer. On error, negative errno pointer.
+ */
+struct kfilnd_dev *kfilnd_dev_alloc(struct lnet_ni *ni,
+                                   const char *node)
+{
+       int i;
+       int rc;
+       struct kfi_av_attr av_attr = {};
+       struct kfi_info *dev_info;
+       int cpt;
+       int lnet_ncpts;
+       struct kfilnd_dev *dev;
+
+       if (!ni) {
+               rc = -EINVAL;
+               goto err;
+       }
+
+       /* Start allocating memory and underlying hardware resources for the
+        * LNet NI.
+        */
+       LIBCFS_ALLOC(dev, sizeof(*dev));
+       if (!dev) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       dev->kfd_ni = ni;
+       spin_lock_init(&dev->kfd_lock);
+       atomic_set(&dev->session_keys, 0);
+
+       dev->dom = kfilnd_dom_get(ni, node, &dev_info);
+       if (IS_ERR(dev->dom)) {
+               rc = PTR_ERR(dev->dom);
+               CERROR("Failed to get KFI LND domain: rc=%d\n", rc);
+               goto err_free_dev;
+       }
+
+       /* KFI LNet NID address needs to be unique per LNet NID and something
+        * which can be inserted into the KFI AV. The NIC address is one of the
+        * unique components. Local interface NIC address needs to be extracted
+        * and used to build the LNet NID.
+        *
+        * At this point, only the KFI CXI provider is supported.
+        */
+       if (!dev_info->src_addr ||
+           dev_info->src_addrlen != sizeof(struct kcxi_addr)) {
+               rc = -EADDRNOTAVAIL;
+               CERROR("No kfabric source address returned\n");
+               goto err_put_dom;
+       }
+
+       dev->nic_addr = ((struct kcxi_addr *)dev_info->src_addr)->nic;
+
+       /* Create an AV for this device */
+       av_attr.type = KFI_AV_UNSPEC;
+       av_attr.rx_ctx_bits = KFILND_FAB_RX_CTX_BITS;
+       rc = kfi_av_open(dev->dom->domain, &av_attr, &dev->kfd_av, dev);
+       if (rc) {
+               CERROR("Could not open AV, rc = %d\n", rc);
+               goto err_put_dom;
+       }
+
+       /* Create a scalable endpont to represent the device. */
+       rc = kfi_scalable_ep(dev->dom->domain, dev_info, &dev->kfd_sep, dev);
+       if (rc) {
+               CERROR("Could not create scalable endpoint, rc = %d\n", rc);
+               goto err_free_av;
+       }
+
+       /* Done with info. */
+       kfi_freeinfo(dev_info);
+       dev_info = NULL;
+
+       /* Bind the endpoint to the AV */
+       rc = kfi_scalable_ep_bind(dev->kfd_sep, &dev->kfd_av->fid, 0);
+       if (rc) {
+               CERROR("Could not bind scalable endpoint to AV, rc = %d\n", rc);
+               goto err_free_sep;
+       }
+
+       /* Enable the scalable endpoint */
+       rc = kfi_enable(dev->kfd_sep);
+       if (rc) {
+               CERROR("Could not enable scalable endpoint, rc = %d\n", rc);
+               goto err_free_sep;
+       }
+
+       /* Allocate an array to store all the KFI LND endpoints. */
+       LIBCFS_ALLOC_GFP(dev->kfd_endpoints,
+                        ni->ni_ncpts * sizeof(*dev->kfd_endpoints),
+                        GFP_KERNEL);
+       if (!dev->kfd_endpoints) {
+               rc = -ENOMEM;
+               goto err_free_sep;
+       }
+
+       /* Map of all LNet CPTs to endpoints. */
+       lnet_ncpts = cfs_cpt_number(lnet_cpt_table());
+       LIBCFS_ALLOC_GFP(dev->cpt_to_endpoint,
+                        lnet_ncpts * sizeof(*dev->cpt_to_endpoint),
+                        GFP_KERNEL);
+       if (!dev->cpt_to_endpoint) {
+               rc = -ENOMEM;
+               goto err_free_ep_array;
+       }
+
+       /* Create RX/TX contexts in kfabric for each LNet NI CPT. */
+       for (i = 0; i < ni->ni_ncpts; i++) {
+               cpt = !ni->ni_cpts ? i : ni->ni_cpts[i];
+
+               dev->kfd_endpoints[i] =
+                       kfilnd_ep_alloc(dev, i, cpt,
+                                       ni->ni_net->net_tunables.lct_max_tx_credits,
+                                       KFILND_IMMEDIATE_MSG_SIZE);
+               if (IS_ERR(dev->kfd_endpoints[i])) {
+                       rc = PTR_ERR(dev->kfd_endpoints[i]);
+                       goto err_free_endpoints;
+               }
+
+               dev->cpt_to_endpoint[cpt] = dev->kfd_endpoints[i];
+       }
+
+       kfilnd_peer_init(dev);
+
+       /* Mark that the dev/NI has now been initialized */
+       dev->kfd_state = KFILND_STATE_INITIALIZED;
+
+       /* Initialize debugfs stats. */
+       dev->dev_dir = debugfs_create_dir(libcfs_nidstr(&ni->ni_nid),
+                                         kfilnd_debug_dir);
+       dev->initiator_state_stats_file =
+               debugfs_create_file("initiator_state_stats", 0444,
+                                   dev->dev_dir, dev,
+                                   &kfilnd_initiator_state_stats_file_ops);
+       dev->initiator_state_stats_file =
+               debugfs_create_file("initiator_stats", 0444,
+                                   dev->dev_dir, dev,
+                                   &kfilnd_initiator_stats_file_ops);
+       dev->initiator_state_stats_file =
+               debugfs_create_file("target_state_stats", 0444, dev->dev_dir,
+                                   dev, &kfilnd_target_state_stats_file_ops);
+       dev->initiator_state_stats_file =
+               debugfs_create_file("target_stats", 0444, dev->dev_dir, dev,
+                                   &kfilnd_target_stats_file_ops);
+       dev->initiator_state_stats_file =
+               debugfs_create_file("reset_stats", 0444, dev->dev_dir, dev,
+                                   &kfilnd_reset_stats_file_ops);
+
+       kfilnd_dev_reset_stats(dev);
+
+       try_module_get(THIS_MODULE);
+
+       return dev;
+
+err_free_endpoints:
+       for (i = 0; i < ni->ni_ncpts; i++)
+               kfilnd_ep_free(dev->kfd_endpoints[i]);
+
+       LIBCFS_FREE(dev->cpt_to_endpoint,
+                   lnet_ncpts * sizeof(*dev->cpt_to_endpoint));
+err_free_ep_array:
+       LIBCFS_FREE(dev->kfd_endpoints,
+                   ni->ni_ncpts * sizeof(*dev->kfd_endpoints));
+err_free_sep:
+       kfi_close(&dev->kfd_sep->fid);
+err_free_av:
+       kfi_close(&dev->kfd_av->fid);
+err_put_dom:
+       kfilnd_dom_put(dev->dom);
+       if (dev_info)
+               kfi_freeinfo(dev_info);
+err_free_dev:
+       LIBCFS_FREE(dev, sizeof(*dev));
+err:
+       return ERR_PTR(rc);
+}
+
+
+void kfilnd_dev_reset_stats(struct kfilnd_dev *dev)
+{
+       unsigned int data_size;
+       enum tn_states state;
+       struct kfilnd_tn_duration_stat *stat;
+
+       for (data_size = 0; data_size < KFILND_DATA_SIZE_BUCKETS; data_size++) {
+               stat = &dev->initiator_stats.data_size[data_size];
+               atomic64_set(&stat->accumulated_duration, 0);
+               atomic_set(&stat->accumulated_count, 0);
+
+               stat = &dev->target_stats.data_size[data_size];
+               atomic64_set(&stat->accumulated_duration, 0);
+               atomic_set(&stat->accumulated_count, 0);
+
+               for (state = 0; state < TN_STATE_MAX; state++) {
+                       stat = &dev->initiator_state_stats.state[state].data_size[data_size];
+                       atomic64_set(&stat->accumulated_duration, 0);
+                       atomic_set(&stat->accumulated_count, 0);
+
+                       stat = &dev->target_state_stats.state[state].data_size[data_size];
+                       atomic64_set(&stat->accumulated_duration, 0);
+                       atomic_set(&stat->accumulated_count, 0);
+               }
+       }
+}
+
+u32 kfilnd_dev_get_session_key(struct kfilnd_dev *dev)
+{
+       return (u32)atomic_add_return(1, &dev->session_keys);
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_dev.h b/lnet/klnds/kfilnd/kfilnd_dev.h
new file mode 100644 (file)
index 0000000..8bd948d
--- /dev/null
@@ -0,0 +1,46 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd device implementation.
+ */
+#ifndef _KFILND_DEV_
+#define _KFILND_DEV_
+
+#include "kfilnd.h"
+
+/* TODO: Module parameters? */
+#define KFILND_CURRENT_HASH_BITS 7
+#define KFILND_MAX_HASH_BITS 12
+
+int kfilnd_dev_post_imm_buffers(struct kfilnd_dev *dev);
+void kfilnd_dev_free(struct kfilnd_dev *dev);
+struct kfilnd_dev *kfilnd_dev_alloc(struct lnet_ni *ni, const char *node);
+void kfilnd_dev_reset_stats(struct kfilnd_dev *dev);
+u32 kfilnd_dev_get_session_key(struct kfilnd_dev *dev);
+
+#endif /* _KFILND_DEV_ */
diff --git a/lnet/klnds/kfilnd/kfilnd_dom.c b/lnet/klnds/kfilnd/kfilnd_dom.c
new file mode 100644 (file)
index 0000000..74cdc7f
--- /dev/null
@@ -0,0 +1,450 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd domain and fabric implementation.
+ */
+#include "kfilnd_dom.h"
+#include "kfilnd_tn.h"
+
+/* Global list of allocated KFI LND fabrics. */
+static LIST_HEAD(fab_list);
+static DEFINE_MUTEX(fab_list_lock);
+
+/**
+ * kfilnd_dom_free() - Free a KFI LND domain.
+ * @dom: KFI LND domain to be freed.
+ */
+static void kfilnd_dom_free(struct kref *kref)
+{
+       struct kfilnd_dom *dom;
+
+       if (!kref)
+               return;
+
+       dom = container_of(kref, struct kfilnd_dom, cnt);
+
+       mutex_lock(&dom->fab->dom_list_lock);
+       list_del(&dom->entry);
+       mutex_unlock(&dom->fab->dom_list_lock);
+
+       kfi_close(&dom->domain->fid);
+       LIBCFS_FREE(dom, sizeof(*dom));
+}
+
+/**
+ * kfilnd_dom_alloc() - Allocate a new KFI LND domain.
+ * @dom_info: KFI info structure used to allocate the KFI LND domain.
+ * @fab: KFI LND fabric used by the domain.
+ *
+ * A KFI LND domain (and the underlying KFI domain) provides access to a
+ * specific NIC on a fabric. The same KFI LND domain can be used to allocate
+ * different KFI LND devices.
+ *
+ * Return: On success, valid pointer. Else, negative errno pointer.
+ */
+static struct kfilnd_dom *kfilnd_dom_alloc(struct kfi_info *dom_info,
+                                          struct kfilnd_fab *fab)
+{
+       int rc;
+       struct kfilnd_dom *dom;
+
+       if (!dom_info || !fab) {
+               rc = -EINVAL;
+               goto err;
+       }
+
+       LIBCFS_ALLOC_GFP(dom, sizeof(*dom), GFP_KERNEL);
+       if (!dom) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       INIT_LIST_HEAD(&dom->dev_list);
+       spin_lock_init(&dom->lock);
+       dom->fab = fab;
+       kref_init(&dom->cnt);
+
+       rc = kfi_domain(fab->fabric, dom_info, &dom->domain, dom);
+       if (rc) {
+               CERROR("Failed to create KFI domain: rc=%d\n", rc);
+               goto err_free_dom;
+       }
+
+       mutex_lock(&fab->dom_list_lock);
+       list_add_tail(&dom->entry, &fab->dom_list);
+       mutex_unlock(&fab->dom_list_lock);
+
+       return dom;
+
+err_free_dom:
+       LIBCFS_FREE(dom, sizeof(*dom));
+err:
+       return ERR_PTR(rc);
+}
+
+/**
+ * kfilnd_dom_reuse() - Attempt to reuse an already allocated domain.
+ * @node: Node string used to limit domains to.
+ * @service: Service string used to limit domains to.
+ * @hints: Hints used to allocate KFI info structures.
+ * @fab: Fabric used to limit domains to.
+ *
+ * Return: On success (matching domain is found), valid pointer is returned.
+ * Else, NULL.
+ */
+struct kfilnd_dom *kfilnd_dom_reuse(const char *node, const char *service,
+                                   struct kfi_info *hints,
+                                   struct kfilnd_fab *fab)
+{
+       struct kfilnd_dom *dom;
+       struct kfi_info *info;
+       int rc;
+
+       if (!node || !service || !hints || !fab)
+               return NULL;
+
+       /* Update the hints domain attribute with an already allocated domain to
+        * see if domains can be reused.
+        */
+       hints->fabric_attr->fabric = fab->fabric;
+
+       mutex_lock(&fab->dom_list_lock);
+       list_for_each_entry(dom, &fab->dom_list, entry) {
+               hints->domain_attr->domain = dom->domain;
+
+               rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints, &info);
+               if (!rc) {
+                       kref_get(&dom->cnt);
+
+                       mutex_unlock(&fab->dom_list_lock);
+
+                       kfi_freeinfo(info);
+
+                       return dom;
+               }
+       }
+       mutex_unlock(&fab->dom_list_lock);
+
+       hints->domain_attr->domain = NULL;
+
+       return NULL;
+}
+
+/**
+ * kfilnd_fab_free() - Free KFI LND fabric.
+ */
+static void kfilnd_fab_free(struct kref *kref)
+{
+       struct kfilnd_fab *fab;
+
+       if (!kref)
+               return;
+
+       fab = container_of(kref, struct kfilnd_fab, cnt);
+
+       mutex_lock(&fab_list_lock);
+       list_del(&fab->entry);
+       mutex_unlock(&fab_list_lock);
+
+       kfi_close(&fab->fabric->fid);
+       LIBCFS_FREE(fab, sizeof(*fab));
+}
+
+/**
+ * kfilnd_fab_alloc() - Allocate a new KFI LND fabric.
+ * @attr: KFI fabric attributes used to allocate the underlying KFI fabric.
+ *
+ * A KFI LND fabric (and the underlying KFI fabric) providers access to NICs on
+ * the same fabric. The underlying KFI fabric should be shared between all NICs
+ * (KFI domains) on the same fabric.
+ *
+ * Return: On success, valid pointer. Else, negative errno pointer.
+ */
+static struct kfilnd_fab *kfilnd_fab_alloc(struct kfi_fabric_attr *attr)
+{
+       int rc;
+       struct kfilnd_fab *fab;
+
+       if (!attr) {
+               rc = -EINVAL;
+               goto err;
+       }
+
+       LIBCFS_ALLOC_GFP(fab, sizeof(*fab), GFP_KERNEL);
+       if (!fab) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       INIT_LIST_HEAD(&fab->dom_list);
+       mutex_init(&fab->dom_list_lock);
+       kref_init(&fab->cnt);
+
+       rc = kfi_fabric(attr, &fab->fabric, fab);
+       if (rc) {
+               CERROR("Failed to allocate KFI fabric: rc=%d\n", rc);
+               goto err_free_fab;
+       }
+
+       mutex_lock(&fab_list_lock);
+       list_add_tail(&fab->entry, &fab_list);
+       mutex_unlock(&fab_list_lock);
+
+       return fab;
+
+err_free_fab:
+       LIBCFS_FREE(fab, sizeof(*fab));
+err:
+       return ERR_PTR(rc);
+}
+
+/**
+ * kfilnd_fab_reuse() - Attempt to reuse an already allocated fabric.
+ * @node: Node string used to limit fabrics to.
+ * @service: Service string used to limit fabrics to.
+ * @hints: Hints used to allocate KFI info structures.
+ *
+ * Return: On success (matching fabric is found), valid pointer is returned.
+ * Else, NULL.
+ */
+struct kfilnd_fab *kfilnd_fab_reuse(const char *node, const char *service,
+                                   struct kfi_info *hints)
+{
+       struct kfilnd_fab *fab;
+       struct kfi_info *info;
+       int rc;
+
+       if (!node || !service || !hints)
+               return NULL;
+
+       /* Update the hints fabric attribute with an already allocated fabric to
+        * see if fabrics can be reused.
+        */
+       mutex_lock(&fab_list_lock);
+       list_for_each_entry(fab, &fab_list, entry) {
+               hints->fabric_attr->fabric = fab->fabric;
+
+               rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints, &info);
+               if (!rc) {
+                       kref_get(&fab->cnt);
+
+                       mutex_unlock(&fab_list_lock);
+
+                       kfi_freeinfo(info);
+
+                       return fab;
+               }
+       }
+       mutex_unlock(&fab_list_lock);
+
+       hints->fabric_attr->fabric = NULL;
+
+       return NULL;
+}
+
+/**
+ * kfi_domain_put() - Put a KFI LND domain reference.
+ */
+void kfilnd_dom_put(struct kfilnd_dom *dom)
+{
+       struct kfilnd_fab *fab;
+
+       if (!dom)
+               return;
+
+       fab = dom->fab;
+
+       kref_put(&dom->cnt, kfilnd_dom_free);
+
+       kref_put(&fab->cnt, kfilnd_fab_free);
+}
+
+/**
+ * kfilnd_dom_get() - Get a KFI LND domain.
+ * @ni: LNet NI used to define the KFI LND domain address.
+ * @node: Node string which can be passed into kfi_getinfo().
+ * @dev_info: KFI info structure which should be used to allocate a KFI LND
+ * device using this domain.
+ *
+ * On success, a KFI info structure is returned to the user in addition to a KFI
+ * LND domain. Callers should free the KFI info structure once done using it.
+ *
+ * Return: On success, dev_info is set to a valid KFI info structure and a valid
+ * KFI LND domain is returned. Else, negative errno pointer is returned.
+ */
+struct kfilnd_dom *kfilnd_dom_get(struct lnet_ni *ni, const char *node,
+                                 struct kfi_info **dev_info)
+{
+       int rc;
+       struct kfi_info *hints;
+       struct kfi_info *info;
+       struct kfi_info *hints_tmp;
+       struct kfi_info *info_tmp;
+       struct kfilnd_fab *fab;
+       struct kfilnd_dom *dom;
+       struct kfi_cxi_fabric_ops *fab_ops;
+       char *service;
+
+       if (!ni || !dev_info) {
+               rc = -EINVAL;
+               goto err;
+       }
+
+       service = kasprintf(GFP_KERNEL, "%u", ni->ni_nid.nid_num);
+       if (!service) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       hints = kfi_allocinfo();
+       if (!hints) {
+               rc = -ENOMEM;
+               goto err_free_service;
+       }
+
+       hints->caps = KFI_MSG | KFI_RMA | KFI_SEND | KFI_RECV | KFI_READ |
+               KFI_WRITE | KFI_REMOTE_READ | KFI_REMOTE_WRITE |
+               KFI_MULTI_RECV | KFI_REMOTE_COMM | KFI_NAMED_RX_CTX |
+               KFI_TAGGED | KFI_TAGGED_RMA | KFI_DIRECTED_RECV;
+       hints->fabric_attr->prov_version =
+               KFI_VERSION(ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_prov_major_version,
+                           ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_prov_minor_version);
+       hints->domain_attr->mr_iov_limit = 256; /* 1 MiB LNet message */
+       hints->domain_attr->mr_key_size = sizeof(int);
+       hints->domain_attr->resource_mgmt = KFI_RM_DISABLED;
+       hints->ep_attr->max_msg_size = LNET_MAX_PAYLOAD;
+       hints->rx_attr->op_flags = KFI_COMPLETION | KFI_MULTI_RECV;
+       hints->rx_attr->iov_limit = 256; /* 1 MiB LNet message */
+       hints->tx_attr->op_flags = KFI_COMPLETION;
+       hints->tx_attr->iov_limit = 256; /* 1 MiB LNet message */
+       hints->tx_attr->rma_iov_limit = 256; /* 1 MiB LNet message */
+       hints->ep_attr->auth_key =
+               (void *)&ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key;
+       hints->ep_attr->auth_key_size =
+               sizeof(ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key);
+
+       /* Check if dynamic resource allocation is supported.
+        * Set dynamic resource alloc hints if it is.
+        *
+        * Need to check if op is supported since due to a bug can't
+        * simply set ctx_cnts greater than 1 (default value) if it isn't.
+        */
+       hints_tmp = kfi_dupinfo(hints);
+       if (hints_tmp) {
+               rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints_tmp,
+                                &info_tmp);
+               if (!rc) {
+                       fab = kfilnd_fab_alloc(info_tmp->fabric_attr);
+                       if (!IS_ERR(fab)) {
+                               rc = kfi_open_ops(&fab->fabric->fid,
+                                       KFI_CXI_FAB_OPS_1, 0, (void **)&fab_ops,
+                                       NULL);
+                               if (!rc) {
+                                       /* Set dynamic resource alloc hints */
+                                       hints->domain_attr->cq_cnt = ni->ni_ncpts * 2;
+                                       hints->domain_attr->tx_ctx_cnt = ni->ni_ncpts;
+                                       hints->domain_attr->rx_ctx_cnt = ni->ni_ncpts;
+                                       hints->rx_attr->size =
+                                               ni->ni_net->net_tunables.lct_max_tx_credits +
+                                               immediate_rx_buf_count;
+                               }
+                               kref_put(&fab->cnt, kfilnd_fab_free);
+                       }
+                       kfi_freeinfo(info_tmp);
+               }
+               kfi_freeinfo(hints_tmp);
+       }
+
+       /* Check to see if any KFI LND fabrics/domains can be reused. */
+       fab = kfilnd_fab_reuse(node, service, hints);
+       dom = kfilnd_dom_reuse(node, service, hints, fab);
+
+       if (fab)
+               hints->fabric_attr->fabric = fab->fabric;
+       if (dom)
+               hints->domain_attr->domain = dom->domain;
+
+       /* Allocate the official KFI info structure to be used for KFI LND
+        * device allocation.
+        */
+       rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints, &info);
+
+       /* Authorization key information is now stored in the returned kfi_info
+        * structure. Since kfi_freeinfo() will try to free the auth_key pointer
+        * and this memory is owned as part of the LNet NI, need to zero this
+        * information in the hints to prevent LNet NI corruption.
+        */
+       hints->ep_attr->auth_key = NULL;
+       hints->ep_attr->auth_key_size = 0;
+
+       kfi_freeinfo(hints);
+       kfree(service);
+       node = NULL;
+       service = NULL;
+
+       if (rc)
+               goto err_free_service;
+
+       /* Allocate a new KFI LND fabric and domain if necessary. */
+       if (!fab) {
+               fab = kfilnd_fab_alloc(info->fabric_attr);
+               if (IS_ERR(fab)) {
+                       rc = PTR_ERR(fab);
+                       goto err_free_info;
+               }
+       }
+
+       if (!dom) {
+               /* Enable dynamic resource allocation if operation supported */
+               rc = kfi_open_ops(&fab->fabric->fid, KFI_CXI_FAB_OPS_1, 0,
+                                 (void **)&fab_ops, NULL);
+               if (!rc) {
+                       rc = fab_ops->enable_dynamic_rsrc_alloc(&fab->fabric->fid, true);
+                       if (!rc)
+                               CDEBUG(D_NET, "Enabled dynamic resource allocation for KFI domain\n");
+               }
+               dom = kfilnd_dom_alloc(info, fab);
+               if (IS_ERR(dom)) {
+                       rc = PTR_ERR(dom);
+                       goto err_put_fab;
+               }
+       }
+
+       *dev_info = info;
+
+       return dom;
+
+err_put_fab:
+       kref_put(&fab->cnt, kfilnd_fab_free);
+err_free_info:
+       kfi_freeinfo(info);
+err_free_service:
+       kfree(service);
+err:
+       return ERR_PTR(rc);
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_dom.h b/lnet/klnds/kfilnd/kfilnd_dom.h
new file mode 100644 (file)
index 0000000..94649ad
--- /dev/null
@@ -0,0 +1,40 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd domain implementation.
+ */
+#ifndef _KFILND_DOM_
+#define _KFILND_DOM_
+
+#include "kfilnd.h"
+
+void kfilnd_dom_put(struct kfilnd_dom *dom);
+struct kfilnd_dom *kfilnd_dom_get(struct lnet_ni *ni, const char *node,
+                                 struct kfi_info **dev_info);
+
+#endif /* _KFILND_DOM_ */
diff --git a/lnet/klnds/kfilnd/kfilnd_ep.c b/lnet/klnds/kfilnd/kfilnd_ep.c
new file mode 100644 (file)
index 0000000..72b4e0e
--- /dev/null
@@ -0,0 +1,954 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd endpoint implementation.
+ */
+#include "kfilnd_ep.h"
+#include "kfilnd_dev.h"
+#include "kfilnd_tn.h"
+#include "kfilnd_cq.h"
+
+/**
+ * kfilnd_ep_post_recv() - Post a single receive buffer.
+ * @ep: KFI LND endpoint to have receive buffers posted on.
+ * @buf: Receive buffer to be posted.
+ *
+ * Return: On succes, zero. Else, negative errno.
+ */
+static int kfilnd_ep_post_recv(struct kfilnd_ep *ep,
+                              struct kfilnd_immediate_buffer *buf)
+{
+       int rc;
+
+       if (!ep || !buf)
+               return -EINVAL;
+
+       if (buf->immed_no_repost)
+               return 0;
+
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV))
+               return -EIO;
+       else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV_EAGAIN))
+               return -EAGAIN;
+
+       atomic_inc(&buf->immed_ref);
+       rc = kfi_recv(ep->end_rx, buf->immed_buf, buf->immed_buf_size, NULL,
+                     KFI_ADDR_UNSPEC, buf);
+       if (rc)
+               atomic_dec(&buf->immed_ref);
+
+       return rc;
+}
+
+#define KFILND_EP_REPLAY_TIMER_MSEC (100U)
+
+/**
+ * kfilnd_ep_imm_buffer_put() - Decrement the immediate buffer count reference
+ * counter.
+ * @buf: Immediate buffer to have reference count decremented.
+ *
+ * If the immediate buffer's reference count reaches zero, the buffer will
+ * automatically be reposted.
+ */
+void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf)
+{
+       unsigned long expires;
+       int rc;
+
+       if (!buf)
+               return;
+
+       if (atomic_sub_return(1, &buf->immed_ref) != 0)
+               return;
+
+       rc = kfilnd_ep_post_recv(buf->immed_end, buf);
+       switch (rc) {
+       case 0:
+               break;
+
+       /* Return the buffer reference and queue the immediate buffer put to be
+        * replayed.
+        */
+       case -EAGAIN:
+               expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
+                       jiffies;
+               atomic_inc(&buf->immed_ref);
+
+               spin_lock(&buf->immed_end->replay_lock);
+               list_add_tail(&buf->replay_entry,
+                             &buf->immed_end->imm_buffer_replay);
+               atomic_inc(&buf->immed_end->replay_count);
+               spin_unlock(&buf->immed_end->replay_lock);
+
+               if (!timer_pending(&buf->immed_end->replay_timer))
+                       mod_timer(&buf->immed_end->replay_timer, expires);
+               break;
+
+       /* Unexpected error resulting in immediate buffer not being able to be
+        * posted. Since immediate buffers are used to sink incoming messages,
+        * failure to post immediate buffers means failure to communicate.
+        *
+        * TODO: Prevent LNet NI from doing sends/recvs?
+        */
+       default:
+               KFILND_EP_ERROR(buf->immed_end,
+                               "Failed to post immediate receive buffer: rc=%d",
+                               rc);
+       }
+}
+
+/**
+ * kfilnd_ep_post_imm_buffers() - Post all immediate receive buffers.
+ * @ep: KFI LND endpoint to have receive buffers posted on.
+ *
+ * This function should be called only during KFI LND device initialization.
+ *
+ * Return: On success, zero. Else, negative errno.
+ */
+int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep)
+{
+       int rc = 0;
+       int i;
+
+       if (!ep)
+               return -EINVAL;
+
+       for (i = 0; i < immediate_rx_buf_count; i++) {
+               rc = kfilnd_ep_post_recv(ep, &ep->end_immed_bufs[i]);
+               if (rc)
+                       goto out;
+       }
+
+out:
+       return rc;
+}
+
+/**
+ * kfilnd_ep_cancel_imm_buffers() - Cancel all immediate receive buffers.
+ * @ep: KFI LND endpoint to have receive buffers canceled.
+ */
+void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep)
+{
+       int i;
+
+       if (!ep)
+               return;
+
+       for (i = 0; i < immediate_rx_buf_count; i++) {
+               ep->end_immed_bufs[i].immed_no_repost = true;
+
+               /* Since this is called during LNet NI teardown, no need to
+                * pipeline retries. Just spin until -EAGAIN is not returned.
+                */
+               while (kfi_cancel(&ep->end_rx->fid, &ep->end_immed_bufs[i]) ==
+                      -EAGAIN)
+                       schedule();
+       }
+}
+
+static void kfilnd_ep_err_fail_loc_work(struct work_struct *work)
+{
+       struct kfilnd_ep_err_fail_loc_work *err =
+               container_of(work, struct kfilnd_ep_err_fail_loc_work, work);
+
+       kfilnd_cq_process_error(err->ep, &err->err);
+       kfree(err);
+}
+
+static int kfilnd_ep_gen_fake_err(struct kfilnd_ep *ep,
+                                 const struct kfi_cq_err_entry *err)
+{
+       struct kfilnd_ep_err_fail_loc_work *fake_err;
+
+       fake_err = kmalloc(sizeof(*fake_err), GFP_KERNEL);
+       if (!fake_err)
+               return -ENOMEM;
+
+       fake_err->ep = ep;
+       fake_err->err = *err;
+       INIT_WORK(&fake_err->work, kfilnd_ep_err_fail_loc_work);
+       queue_work(kfilnd_wq, &fake_err->work);
+
+       return 0;
+}
+
+static uint64_t gen_init_tag_bits(struct kfilnd_transaction *tn)
+{
+       return (tn->peer->remote_session_key << KFILND_EP_KEY_BITS) |
+               tn->tn_response_mr_key;
+}
+
+/**
+ * kfilnd_ep_post_tagged_send() - Post a tagged send operation.
+ * @ep: KFI LND endpoint used to post the tagged receivce operation.
+ * @tn: Transaction structure containing the send buffer to be posted.
+ *
+ * The tag for the post tagged send operation is the response memory region key
+ * associated with the transaction.
+ *
+ * Return: On success, zero. Else, negative errno value.
+ */
+int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep,
+                              struct kfilnd_transaction *tn)
+{
+       struct kfi_cq_err_entry fake_error = {
+               .op_context = tn,
+               .flags = KFI_TAGGED | KFI_SEND,
+               .err = EIO,
+       };
+       int rc;
+
+       if (!ep || !tn)
+               return -EINVAL;
+
+       /* Make sure the device is not being shut down */
+       if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
+               return -EINVAL;
+
+       /* Progress transaction to failure if send should fail. */
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EVENT)) {
+               rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
+               if (!rc)
+                       return 0;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND)) {
+               return -EIO;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EAGAIN)) {
+               return -EAGAIN;
+       }
+
+       rc = kfi_tsenddata(ep->end_tx, NULL, 0, NULL, tn->tagged_data,
+                          tn->tn_target_addr, gen_init_tag_bits(tn), tn);
+       switch (rc) {
+       case 0:
+       case -EAGAIN:
+               KFILND_EP_DEBUG(ep,
+                               "Transaction ID %p: %s tagged send of with tag 0x%x to peer 0x%llx: rc=%d",
+                               tn, rc ? "Failed to post" : "Posted",
+                               tn->tn_response_mr_key, tn->tn_target_addr, rc);
+               break;
+
+       default:
+               KFILND_EP_ERROR(ep,
+                               "Transaction ID %p: Failed to post tagged send with tag 0x%x to peer 0x%llx: rc=%d",
+                               tn, tn->tn_response_mr_key,
+                               tn->tn_target_addr, rc);
+       }
+
+       return rc;
+}
+
+/**
+ * kfilnd_ep_cancel_tagged_recv() - Cancel a tagged recv.
+ * @ep: KFI LND endpoint used to cancel the tagged receivce operation.
+ * @tn: Transaction structure containing the receive buffer to be cancelled.
+ *
+ * The tagged receive buffer context pointer is used to cancel a tagged receive
+ * operation. The context pointer is always the transaction pointer.
+ *
+ * Return: 0 on success. -ENOENT if the tagged receive buffer is not found. The
+ * tagged receive buffer may not be found due to a tagged send operation already
+ * landing or the tagged receive buffer never being posted. Negative errno value
+ * on error.
+ */
+int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep,
+                                struct kfilnd_transaction *tn)
+{
+       if (!ep || !tn)
+               return -EINVAL;
+
+       /* Make sure the device is not being shut down */
+       if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
+               return -EINVAL;
+
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN))
+               return -EAGAIN;
+
+       /* The async event count is not decremented for a cancel operation since
+        * it was incremented for the post tagged receive.
+        */
+       return kfi_cancel(&ep->end_rx->fid, tn);
+}
+
+static uint64_t gen_target_tag_bits(struct kfilnd_transaction *tn)
+{
+       return (tn->peer->local_session_key << KFILND_EP_KEY_BITS) |
+               tn->tn_mr_key;
+}
+
+/**
+ * kfilnd_ep_post_tagged_recv() - Post a tagged receive operation.
+ * @ep: KFI LND endpoint used to post the tagged receivce operation.
+ * @tn: Transaction structure containing the receive buffer to be posted.
+ *
+ * The tag for the post tagged receive operation is the memory region key
+ * associated with the transaction.
+ *
+ * Return: On success, zero. Else, negative errno value.
+ */
+int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep,
+                              struct kfilnd_transaction *tn)
+{
+       struct kfi_msg_tagged msg = {
+               .tag = gen_target_tag_bits(tn),
+               .context = tn,
+               .addr = tn->peer->addr,
+       };
+       struct kfi_cq_err_entry fake_error = {
+               .op_context = tn,
+               .flags = KFI_TAGGED | KFI_RECV,
+               .err = EIO,
+       };
+       int rc;
+
+       if (!ep || !tn)
+               return -EINVAL;
+
+       /* Make sure the device is not being shut down */
+       if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
+               return -EINVAL;
+
+       /* Progress transaction to failure if send should fail. */
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EVENT)) {
+               rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
+               if (!rc)
+                       return 0;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV)) {
+               return -EIO;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EAGAIN)) {
+               return -EAGAIN;
+       }
+
+       msg.iov_count = tn->tn_num_iovec;
+       msg.type = KFI_BVEC;
+       msg.msg_biov = tn->tn_kiov;
+
+       rc = kfi_trecvmsg(ep->end_rx, &msg, KFI_COMPLETION);
+       switch (rc) {
+       case 0:
+       case -EAGAIN:
+               KFILND_EP_DEBUG(ep,
+                               "Transaction ID %p: %s tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
+                               tn, rc ? "Failed to post" : "Posted",
+                               tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
+               break;
+
+       default:
+               KFILND_EP_ERROR(ep,
+                               "Transaction ID %p: Failed to post tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
+                               tn, tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
+       }
+
+       return rc;
+}
+
+/**
+ * kfilnd_ep_post_send() - Post a send operation.
+ * @ep: KFI LND endpoint used to post the send operation.
+ * @tn: Transaction structure containing the buffer to be sent.
+ *
+ * The target of the send operation is based on the target LNet NID field within
+ * the transaction structure. A lookup of LNet NID to KFI address is performed.
+ *
+ * Return: On success, zero. Else, negative errno value.
+ */
+int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
+{
+       size_t len;
+       void *buf;
+       struct kfi_cq_err_entry fake_error = {
+               .op_context = tn,
+               .flags = KFI_MSG | KFI_SEND,
+               .err = EIO,
+       };
+       int rc;
+
+       if (!ep || !tn)
+               return -EINVAL;
+
+       buf = tn->tn_tx_msg.msg;
+       len = tn->tn_tx_msg.length;
+
+       /* Make sure the device is not being shut down */
+       if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
+               return -EINVAL;
+
+       /* Progress transaction to failure if send should fail. */
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EVENT)) {
+               rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
+               if (!rc)
+                       return 0;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND)) {
+               return -EIO;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EAGAIN)) {
+               return -EAGAIN;
+       }
+
+       rc = kfi_send(ep->end_tx, buf, len, NULL, tn->tn_target_addr, tn);
+       switch (rc) {
+       case 0:
+       case -EAGAIN:
+               KFILND_EP_DEBUG(ep,
+                               "Transaction ID %p: %s send of %lu bytes to peer 0x%llx: rc=%d",
+                               tn, rc ? "Failed to post" : "Posted",
+                               len, tn->tn_target_addr, rc);
+               break;
+
+       default:
+               KFILND_EP_ERROR(ep,
+                               "Transaction ID %p: Failed to post send of %lu bytes to peer 0x%llx: rc=%d",
+                               tn, len, tn->tn_target_addr, rc);
+       }
+
+       return rc;
+}
+
+/**
+ * kfilnd_ep_post_write() - Post a write operation.
+ * @ep: KFI LND endpoint used to post the write operation.
+ * @tn: Transaction structure containing the buffer to be read from.
+ *
+ * The target of the write operation is based on the target LNet NID field
+ * within the transaction structure. A lookup of LNet NID to KFI address is
+ * performed.
+ *
+ * The transaction cookie is used as the remote key for the target memory
+ * region.
+ *
+ * Return: On success, zero. Else, negative errno value.
+ */
+int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
+{
+       int rc;
+       struct kfi_cq_err_entry fake_error = {
+               .op_context = tn,
+               .flags = KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND,
+               .err = EIO,
+       };
+       struct kfi_rma_iov rma_iov = {
+               .len = tn->tn_nob,
+               .key = gen_init_tag_bits(tn),
+       };
+       struct kfi_msg_rma rma = {
+               .addr = tn->tn_target_addr,
+               .rma_iov = &rma_iov,
+               .rma_iov_count = 1,
+               .context = tn,
+       };
+
+       if (!ep || !tn)
+               return -EINVAL;
+
+       /* Make sure the device is not being shut down */
+       if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
+               return -EINVAL;
+
+       /* Progress transaction to failure if read should fail. */
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EVENT)) {
+               rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
+               if (!rc)
+                       return 0;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE)) {
+               return -EIO;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EAGAIN)) {
+               return -EAGAIN;
+       }
+
+       rma.iov_count = tn->tn_num_iovec;
+       rma.type = KFI_BVEC;
+       rma.msg_biov = tn->tn_kiov;
+
+       rc = kfi_writemsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
+       switch (rc) {
+       case 0:
+       case -EAGAIN:
+               KFILND_EP_DEBUG(ep,
+                               "Transaction ID %p: %s write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
+                               tn, rc ? "Failed to post" : "Posted",
+                               tn->tn_nob, tn->tn_num_iovec,
+                               tn->tn_response_mr_key, tn->tn_target_addr, rc);
+               break;
+
+       default:
+               KFILND_EP_ERROR(ep,
+                               "Transaction ID %p: Failed to post write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
+                               tn, tn->tn_nob, tn->tn_num_iovec,
+                               tn->tn_response_mr_key, tn->tn_target_addr,
+                               rc);
+       }
+
+       return rc;
+}
+
+/**
+ * kfilnd_ep_post_read() - Post a read operation.
+ * @ep: KFI LND endpoint used to post the read operation.
+ * @tn: Transaction structure containing the buffer to be read into.
+ *
+ * The target of the read operation is based on the target LNet NID field within
+ * the transaction structure. A lookup of LNet NID to KFI address is performed.
+ *
+ * The transaction cookie is used as the remote key for the target memory
+ * region.
+ *
+ * Return: On success, zero. Else, negative errno value.
+ */
+int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
+{
+       int rc;
+       struct kfi_cq_err_entry fake_error = {
+               .op_context = tn,
+               .flags = KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND,
+               .err = EIO,
+       };
+       struct kfi_rma_iov rma_iov = {
+               .len = tn->tn_nob,
+               .key = gen_init_tag_bits(tn),
+       };
+       struct kfi_msg_rma rma = {
+               .addr = tn->tn_target_addr,
+               .rma_iov = &rma_iov,
+               .rma_iov_count = 1,
+               .context = tn,
+       };
+
+       if (!ep || !tn)
+               return -EINVAL;
+
+       /* Make sure the device is not being shut down */
+       if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
+               return -EINVAL;
+
+       /* Progress transaction to failure if read should fail. */
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EVENT)) {
+               rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
+               if (!rc)
+                       return 0;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ)) {
+               return -EIO;
+       } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EAGAIN)) {
+               return -EAGAIN;
+       }
+
+       rma.iov_count = tn->tn_num_iovec;
+       rma.type = KFI_BVEC;
+       rma.msg_biov = tn->tn_kiov;
+
+       rc = kfi_readmsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
+       switch (rc) {
+       case 0:
+       case -EAGAIN:
+               KFILND_EP_DEBUG(ep,
+                               "Transaction ID %p: %s read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
+                               tn, rc ? "Failed to post" : "Posted",
+                               tn->tn_nob, tn->tn_num_iovec,
+                               tn->tn_response_mr_key, tn->tn_target_addr, rc);
+               break;
+
+       default:
+               KFILND_EP_ERROR(ep,
+                               "Transaction ID %p: Failed to post read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
+                               tn, tn->tn_nob, tn->tn_num_iovec,
+                               tn->tn_response_mr_key, tn->tn_target_addr, rc);
+       }
+
+       return rc;
+}
+
+void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep,
+                              struct kfilnd_transaction *tn)
+{
+       unsigned long expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
+               jiffies;
+
+       spin_lock(&ep->replay_lock);
+       list_add_tail(&tn->replay_entry, &ep->tn_replay);
+       atomic_inc(&ep->replay_count);
+       spin_unlock(&ep->replay_lock);
+
+       if (!timer_pending(&ep->replay_timer))
+               mod_timer(&ep->replay_timer, expires);
+}
+
+void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep)
+{
+       LIST_HEAD(tn_replay);
+       LIST_HEAD(imm_buf_replay);
+       struct kfilnd_transaction *tn_first;
+       struct kfilnd_transaction *tn_last;
+       struct kfilnd_immediate_buffer *buf_first;
+       struct kfilnd_immediate_buffer *buf_last;
+
+       /* Since the endpoint replay lists can be manipulated while
+        * attempting to do replays, the entire replay list is moved to a
+        * temporary list.
+        */
+       spin_lock(&ep->replay_lock);
+
+       tn_first = list_first_entry_or_null(&ep->tn_replay,
+                                           struct kfilnd_transaction,
+                                           replay_entry);
+       if (tn_first) {
+               tn_last = list_last_entry(&ep->tn_replay,
+                                         struct kfilnd_transaction,
+                                         replay_entry);
+               list_bulk_move_tail(&tn_replay, &tn_first->replay_entry,
+                                   &tn_last->replay_entry);
+               LASSERT(list_empty(&ep->tn_replay));
+       }
+
+       buf_first = list_first_entry_or_null(&ep->imm_buffer_replay,
+                                            struct kfilnd_immediate_buffer,
+                                            replay_entry);
+       if (buf_first) {
+               buf_last = list_last_entry(&ep->imm_buffer_replay,
+                                          struct kfilnd_immediate_buffer,
+                                          replay_entry);
+               list_bulk_move_tail(&imm_buf_replay, &buf_first->replay_entry,
+                                   &buf_last->replay_entry);
+               LASSERT(list_empty(&ep->imm_buffer_replay));
+       }
+
+       spin_unlock(&ep->replay_lock);
+
+       /* Replay all queued transactions. */
+       list_for_each_entry_safe(tn_first, tn_last, &tn_replay, replay_entry) {
+               list_del(&tn_first->replay_entry);
+               atomic_dec(&ep->replay_count);
+               kfilnd_tn_event_handler(tn_first, tn_first->replay_event,
+                                       tn_first->replay_status);
+       }
+
+       list_for_each_entry_safe(buf_first, buf_last, &imm_buf_replay,
+                                replay_entry) {
+               list_del(&buf_first->replay_entry);
+               atomic_dec(&ep->replay_count);
+               kfilnd_ep_imm_buffer_put(buf_first);
+       }
+}
+
+static void kfilnd_ep_replay_work(struct work_struct *work)
+{
+       struct kfilnd_ep *ep =
+               container_of(work, struct kfilnd_ep, replay_work);
+
+       kfilnd_ep_flush_replay_queue(ep);
+}
+
+static void kfilnd_ep_replay_timer(cfs_timer_cb_arg_t data)
+{
+       struct kfilnd_ep *ep = cfs_from_timer(ep, data, replay_timer);
+       unsigned int cpu =
+               cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt));
+
+       queue_work_on(cpu, kfilnd_wq, &ep->replay_work);
+}
+
+#define KFILND_EP_ALLOC_SIZE \
+       (sizeof(struct kfilnd_ep) + \
+        (sizeof(struct kfilnd_immediate_buffer) * immediate_rx_buf_count))
+
+/**
+ * kfilnd_ep_free() - Free a KFI LND endpoint.
+ * @ep: KFI LND endpoint to be freed.
+ *
+ * Safe to call on NULL or error pointer.
+ */
+void kfilnd_ep_free(struct kfilnd_ep *ep)
+{
+       int i;
+       int k = 2;
+
+       if (IS_ERR_OR_NULL(ep))
+               return;
+
+       while (atomic_read(&ep->replay_count)) {
+               k++;
+               CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
+                       "Waiting for replay count %d not zero\n",
+                       atomic_read(&ep->replay_count));
+               schedule_timeout_uninterruptible(HZ);
+       }
+
+       /* Cancel any outstanding immediate receive buffers. */
+       kfilnd_ep_cancel_imm_buffers(ep);
+
+       /* Wait for RX buffers to no longer be used and then free them. */
+       for (i = 0; i < immediate_rx_buf_count; i++) {
+               k = 2;
+               while (atomic_read(&ep->end_immed_bufs[i].immed_ref)) {
+                       k++;
+                       CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
+                              "Waiting for RX buffer %d to release\n", i);
+                       schedule_timeout_uninterruptible(HZ);
+               }
+       }
+
+       /* Wait for all transactions to complete. */
+       k = 2;
+       spin_lock(&ep->tn_list_lock);
+       while (!list_empty(&ep->tn_list)) {
+               spin_unlock(&ep->tn_list_lock);
+               k++;
+               CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
+                      "Waiting for transactions to complete\n");
+               schedule_timeout_uninterruptible(HZ);
+               spin_lock(&ep->tn_list_lock);
+       }
+       spin_unlock(&ep->tn_list_lock);
+
+       /* Free all immediate buffers. */
+       for (i = 0; i < immediate_rx_buf_count; i++)
+               __free_pages(ep->end_immed_bufs[i].immed_buf_page,
+                            order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
+
+       kfi_close(&ep->end_tx->fid);
+       kfi_close(&ep->end_rx->fid);
+       kfilnd_cq_free(ep->end_tx_cq);
+       kfilnd_cq_free(ep->end_rx_cq);
+       ida_destroy(&ep->keys);
+       LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
+}
+
+/**
+ * kfilnd_ep_alloc() - Allocate a new KFI LND endpoint.
+ * @dev: KFI LND device used to allocate endpoints.
+ * @context_id: Context ID associated with the endpoint.
+ * @cpt: CPT KFI LND endpoint should be associated with.
+ *
+ * An KFI LND endpoint consists of unique transmit/receive command queues
+ * (contexts) and completion queues. The underlying completion queue interrupt
+ * vector is associated with a core within the CPT.
+ *
+ * Return: On success, valid pointer. Else, negative errno pointer.
+ */
+struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev,
+                                 unsigned int context_id, unsigned int cpt,
+                                 size_t nrx, size_t rx_size)
+{
+       int rc;
+       struct kfi_cq_attr cq_attr = {};
+       struct kfi_rx_attr rx_attr = {};
+       struct kfi_tx_attr tx_attr = {};
+       int ncpts;
+       size_t min_multi_recv = KFILND_IMMEDIATE_MSG_SIZE;
+       struct kfilnd_ep *ep;
+       int i;
+       size_t rx_buf_size;
+
+       if (!dev || !nrx || !rx_size) {
+               rc = -EINVAL;
+               goto err;
+       }
+
+       ncpts = dev->kfd_ni->ni_ncpts;
+
+       LIBCFS_CPT_ALLOC(ep, lnet_cpt_table(), cpt, KFILND_EP_ALLOC_SIZE);
+       if (!ep) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       ep->end_dev = dev;
+       ep->end_cpt = cpt;
+       ep->end_context_id = context_id;
+       INIT_LIST_HEAD(&ep->tn_list);
+       spin_lock_init(&ep->tn_list_lock);
+       INIT_LIST_HEAD(&ep->tn_replay);
+       INIT_LIST_HEAD(&ep->imm_buffer_replay);
+       spin_lock_init(&ep->replay_lock);
+       cfs_timer_setup(&ep->replay_timer, kfilnd_ep_replay_timer,
+                       (unsigned long)ep, 0);
+       INIT_WORK(&ep->replay_work, kfilnd_ep_replay_work);
+       atomic_set(&ep->replay_count, 0);
+       ida_init(&ep->keys);
+
+       /* Create a CQ for this CPT */
+       cq_attr.flags = KFI_AFFINITY;
+       cq_attr.format = KFI_CQ_FORMAT_DATA;
+       cq_attr.wait_cond = KFI_CQ_COND_NONE;
+       cq_attr.wait_obj = KFI_WAIT_NONE;
+
+       /* Vector is set to first core in the CPT */
+       cq_attr.signaling_vector =
+               cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), cpt));
+
+       cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
+               rx_cq_scale_factor;
+       ep->end_rx_cq = kfilnd_cq_alloc(ep, &cq_attr);
+       if (IS_ERR(ep->end_rx_cq)) {
+               rc = PTR_ERR(ep->end_rx_cq);
+               CERROR("Failed to allocated KFILND RX CQ: rc=%d\n", rc);
+               goto err_free_ep;
+       }
+
+       cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
+               tx_cq_scale_factor;
+       ep->end_tx_cq = kfilnd_cq_alloc(ep, &cq_attr);
+       if (IS_ERR(ep->end_tx_cq)) {
+               rc = PTR_ERR(ep->end_tx_cq);
+               CERROR("Failed to allocated KFILND TX CQ: rc=%d\n", rc);
+               goto err_free_rx_cq;
+       }
+
+       /* Initialize the RX/TX contexts for the given CPT */
+       rx_attr.op_flags = KFI_COMPLETION | KFI_MULTI_RECV;
+       rx_attr.msg_order = KFI_ORDER_NONE;
+       rx_attr.comp_order = KFI_ORDER_NONE;
+       rx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits +
+               immediate_rx_buf_count;
+       rx_attr.iov_limit = LNET_MAX_IOV;
+       rc = kfi_rx_context(dev->kfd_sep, context_id, &rx_attr, &ep->end_rx,
+                           ep);
+       if (rc) {
+               CERROR("Could not create RX context on CPT %d, rc = %d\n", cpt,
+                      rc);
+               goto err_free_tx_cq;
+       }
+
+       /* Set the lower limit for multi-receive buffers */
+       rc = kfi_setopt(&ep->end_rx->fid, KFI_OPT_ENDPOINT,
+                       KFI_OPT_MIN_MULTI_RECV, &min_multi_recv,
+                       sizeof(min_multi_recv));
+       if (rc) {
+               CERROR("Could not set min_multi_recv on CPT %d, rc = %d\n", cpt,
+                      rc);
+               goto err_free_rx_context;
+       }
+
+       tx_attr.op_flags = KFI_COMPLETION | KFI_TRANSMIT_COMPLETE;
+       tx_attr.msg_order = KFI_ORDER_NONE;
+       tx_attr.comp_order = KFI_ORDER_NONE;
+       tx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
+               tx_scale_factor;
+       tx_attr.iov_limit = LNET_MAX_IOV;
+       tx_attr.rma_iov_limit = LNET_MAX_IOV;
+       rc = kfi_tx_context(dev->kfd_sep, context_id, &tx_attr, &ep->end_tx,
+                           ep);
+       if (rc) {
+               CERROR("Could not create TX context on CPT %d, rc = %d\n", cpt,
+                      rc);
+               goto err_free_rx_context;
+       }
+
+       /* Bind these two contexts to the CPT's CQ */
+       rc = kfi_ep_bind(ep->end_rx, &ep->end_rx_cq->cq->fid, 0);
+       if (rc) {
+               CERROR("Could not bind RX context on CPT %d, rc = %d\n", cpt,
+                      rc);
+               goto err_free_tx_context;
+       }
+
+       rc = kfi_ep_bind(ep->end_tx, &ep->end_tx_cq->cq->fid, 0);
+       if (rc) {
+               CERROR("Could not bind TX context on CPT %d, rc = %d\n", cpt,
+                      rc);
+               goto err_free_tx_context;
+       }
+
+       /* Enable both endpoints */
+       rc = kfi_enable(ep->end_rx);
+       if (rc) {
+               CERROR("Could not enable RX context on CPT %d, rc = %d\n", cpt,
+                      rc);
+               goto err_free_tx_context;
+       }
+
+       rc = kfi_enable(ep->end_tx);
+       if (rc) {
+               CERROR("Could not enable TX context on CPT %d, rc=%d\n", cpt,
+                      rc);
+               goto err_free_tx_context;
+       }
+
+       /* The nrx value is the max number of immediate messages any one peer
+        * can send us.  Given that compute nodes are RPC-based, we should not
+        * see any more incoming messages than we are able to send.  A such, nrx
+        * is a good size for each multi-receive buffer.  However, if we are
+        * a server or LNet router, we need a multiplier of this value. For
+        * now, we will just have nrx drive the buffer size per CPT.  Then,
+        * LNet routers and servers can just define more CPTs to get a better
+        * spread of buffers to receive messages from multiple peers.  A better
+        * way should be devised in the future.
+        */
+       rx_buf_size = roundup_pow_of_two(max(nrx * rx_size, PAGE_SIZE));
+
+       for (i = 0; i < immediate_rx_buf_count; i++) {
+
+               /* Using physically contiguous allocations can allow for
+                * underlying kfabric providers to use untranslated addressing
+                * instead of having to setup NIC memory mappings. This
+                * typically leads to improved performance.
+                */
+               ep->end_immed_bufs[i].immed_buf_page =
+                       alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
+                                        GFP_KERNEL | __GFP_NOWARN,
+                                        order_base_2(rx_buf_size / PAGE_SIZE));
+               if (!ep->end_immed_bufs[i].immed_buf_page) {
+                       rc = -ENOMEM;
+                       goto err_free_rx_buffers;
+               }
+
+               atomic_set(&ep->end_immed_bufs[i].immed_ref, 0);
+               ep->end_immed_bufs[i].immed_buf =
+                       page_address(ep->end_immed_bufs[i].immed_buf_page);
+               ep->end_immed_bufs[i].immed_buf_size = rx_buf_size;
+               ep->end_immed_bufs[i].immed_end = ep;
+       }
+
+       return ep;
+
+err_free_rx_buffers:
+       for (i = 0; i < immediate_rx_buf_count; i++) {
+               if (ep->end_immed_bufs[i].immed_buf_page)
+                       __free_pages(ep->end_immed_bufs[i].immed_buf_page,
+                                    order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
+       }
+
+err_free_tx_context:
+       kfi_close(&ep->end_tx->fid);
+err_free_rx_context:
+       kfi_close(&ep->end_rx->fid);
+err_free_tx_cq:
+       kfilnd_cq_free(ep->end_tx_cq);
+err_free_rx_cq:
+       kfilnd_cq_free(ep->end_rx_cq);
+err_free_ep:
+       LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
+err:
+       return ERR_PTR(rc);
+}
+
+int kfilnd_ep_get_key(struct kfilnd_ep *ep)
+{
+       return ida_simple_get(&ep->keys, 1, KFILND_EP_KEY_MAX, GFP_KERNEL);
+}
+
+void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key)
+{
+       ida_simple_remove(&ep->keys, key);
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_ep.h b/lnet/klnds/kfilnd/kfilnd_ep.h
new file mode 100644 (file)
index 0000000..6b83fdf
--- /dev/null
@@ -0,0 +1,70 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+#ifndef _KFILND_EP_
+#define _KFILND_EP_
+
+#include "kfilnd.h"
+
+struct kfilnd_ep_err_fail_loc_work {
+       struct kfilnd_ep *ep;
+       struct work_struct work;
+       struct kfi_cq_err_entry err;
+};
+
+static inline bool kfilnd_ep_replays_pending(struct kfilnd_ep *ep)
+{
+       return atomic_read(&ep->replay_count) > 0;
+};
+
+void kfilnd_ep_dereg_mr(struct kfilnd_ep *ep, struct kfilnd_transaction *tn);
+int kfilnd_ep_reg_mr(struct kfilnd_ep *ep, struct kfilnd_transaction *tn);
+int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep,
+                              struct kfilnd_transaction *tn);
+int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep,
+                                struct kfilnd_transaction *tn);
+int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep,
+                              struct kfilnd_transaction *tn);
+int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn);
+int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn);
+int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn);
+void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf);
+int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep);
+void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep);
+void kfilnd_ep_free(struct kfilnd_ep *ep);
+struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev,
+                                 unsigned int context_id, unsigned int cpt,
+                                 size_t nrx, size_t rx_size);
+void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep);
+void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep,
+                              struct kfilnd_transaction *tn);
+
+int kfilnd_ep_get_key(struct kfilnd_ep *ep);
+void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key);
+
+
+#endif /* _KFILND_EP_ */
diff --git a/lnet/klnds/kfilnd/kfilnd_modparams.c b/lnet/klnds/kfilnd/kfilnd_modparams.c
new file mode 100644 (file)
index 0000000..798983d
--- /dev/null
@@ -0,0 +1,183 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd module parameters
+ */
+
+#include "kfilnd.h"
+
+unsigned int cksum;
+module_param(cksum, uint, 0444);
+MODULE_PARM_DESC(cksum, "Enable checksums for non-zero messages (not RDMA)");
+
+/* Scale factor for TX context queue depth. The factor is applied to the number
+ * of credits to determine queue depth.
+ */
+unsigned int tx_scale_factor = 2;
+module_param(tx_scale_factor, uint, 0444);
+MODULE_PARM_DESC(tx_scale_factor,
+                "Factor applied to credits to determine TX context size");
+
+/* Scale factor for TX and RX completion queue depth. The factor is applied to
+ * the number of credits to determine queue depth.
+ */
+unsigned int rx_cq_scale_factor = 10;
+module_param(rx_cq_scale_factor, uint, 0444);
+MODULE_PARM_DESC(rx_cq_scale_factor,
+                "Factor applied to credits to determine RX CQ size");
+
+unsigned int tx_cq_scale_factor = 10;
+module_param(tx_cq_scale_factor, uint, 0444);
+MODULE_PARM_DESC(tx_cq_scale_factor,
+                "Factor applied to credits to determine TX CQ size");
+
+unsigned int eq_size = 1024;
+module_param(eq_size, uint, 0444);
+MODULE_PARM_DESC(eq_size, "Default event queue size used by all kfi LNet NIs");
+
+unsigned int immediate_rx_buf_count = 2;
+module_param(immediate_rx_buf_count, uint, 0444);
+MODULE_PARM_DESC(immediate_rx_buf_count,
+                "Number of immediate multi-receive buffers posted per CPT");
+
+/* Common LND network tunables. */
+static int credits = 256;
+module_param(credits, int, 0444);
+MODULE_PARM_DESC(credits, "Number of concurrent sends on network");
+
+static int peer_credits = 128;
+module_param(peer_credits, int, 0444);
+MODULE_PARM_DESC(peer_credits, "Number of concurrent sends to 1 peer");
+
+static int peer_buffer_credits = -1;
+module_param(peer_buffer_credits, int, 0444);
+MODULE_PARM_DESC(peer_buffer_credits,
+                "Number of per-peer router buffer credits");
+
+static int peer_timeout = -1;
+module_param(peer_timeout, int, 0444);
+MODULE_PARM_DESC(peer_timeout,
+                "Seconds without aliveness news to declare peer dead (less than or equal to 0 to disable).");
+
+static unsigned int prov_major_version = 1;
+module_param(prov_major_version, int, 0444);
+MODULE_PARM_DESC(prov_major_version,
+                "Default kfabric provider major version kfilnd should use");
+
+static unsigned int prov_minor_version;
+module_param(prov_minor_version, int, 0444);
+MODULE_PARM_DESC(prov_minor_version,
+                "Default kfabric provider minor version kfilnd should use");
+
+static unsigned int auth_key = 255;
+module_param(auth_key, uint, 0444);
+MODULE_PARM_DESC(auth_key, "Default authorization key to be used for LNet NIs");
+
+int kfilnd_tunables_setup(struct lnet_ni *ni)
+{
+       struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables;
+       struct lnet_ioctl_config_kfilnd_tunables *kfilnd_tunables;
+
+       net_tunables = &ni->ni_net->net_tunables;
+       kfilnd_tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi;
+
+       if (!ni->ni_net->net_tunables_set) {
+               net_tunables->lct_max_tx_credits = credits;
+               net_tunables->lct_peer_tx_credits = peer_credits;
+               net_tunables->lct_peer_rtr_credits = peer_buffer_credits;
+               net_tunables->lct_peer_timeout = peer_timeout;
+
+               if (net_tunables->lct_peer_tx_credits >
+                   net_tunables->lct_max_tx_credits)
+                       net_tunables->lct_peer_tx_credits =
+                               net_tunables->lct_max_tx_credits;
+       }
+
+       kfilnd_tunables->lnd_version = KFILND_MSG_VERSION;
+       if (!ni->ni_lnd_tunables_set) {
+               kfilnd_tunables->lnd_prov_major_version = prov_major_version;
+               kfilnd_tunables->lnd_prov_minor_version = prov_minor_version;
+
+               /* Treat zero as uninitialized. */
+               if (ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key == 0)
+                       ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key =
+                               auth_key;
+       }
+
+       if (net_tunables->lct_max_tx_credits > KFILND_EP_KEY_MAX) {
+               CERROR("Credits cannot exceed %lu\n", KFILND_EP_KEY_MAX);
+               return -EINVAL;
+       }
+
+       if (net_tunables->lct_peer_tx_credits > KFILND_EP_KEY_MAX) {
+               CERROR("Peer credits cannot exceed %lu\n", KFILND_EP_KEY_MAX);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
+int kfilnd_tunables_init(void)
+{
+       if (tx_scale_factor < 1) {
+               CERROR("TX context scale factor less than 1");
+               return -EINVAL;
+       }
+
+       if (rx_cq_scale_factor < 1) {
+               CERROR("RX CQ scale factor less than 1");
+               return -EINVAL;
+       }
+
+       if (tx_cq_scale_factor < 1) {
+               CERROR("TX CQ scale factor less than 1");
+               return -EINVAL;
+       }
+
+       if (immediate_rx_buf_count < 2) {
+               CERROR("Immediate multi-receive buffer count less than 2");
+               return -EINVAL;
+       }
+
+       if (auth_key < 1) {
+               CERROR("Authorization key cannot be less than 1");
+               return -EINVAL;
+       }
+
+       if (credits > KFILND_EP_KEY_MAX) {
+               CERROR("Credits cannot exceed %lu\n", KFILND_EP_KEY_MAX);
+               return -EINVAL;
+       }
+
+       if (peer_credits > KFILND_EP_KEY_MAX) {
+               CERROR("Peer credits cannot exceed %lu\n", KFILND_EP_KEY_MAX);
+               return -EINVAL;
+       }
+
+       return 0;
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_peer.c b/lnet/klnds/kfilnd/kfilnd_peer.c
new file mode 100644 (file)
index 0000000..e38f9d3
--- /dev/null
@@ -0,0 +1,274 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd peer management implementation.
+ */
+#include "kfilnd_peer.h"
+#include "kfilnd_dev.h"
+
+static const struct rhashtable_params peer_cache_params = {
+       .head_offset = offsetof(struct kfilnd_peer, node),
+       .key_offset = offsetof(struct kfilnd_peer, nid),
+       .key_len = sizeof_field(struct kfilnd_peer, nid),
+       .automatic_shrinking = true,
+};
+
+/**
+ * kfilnd_peer_free() - RCU safe way to free a peer.
+ * @ptr: Pointer to peer.
+ * @arg: Unused.
+ */
+static void kfilnd_peer_free(void *ptr, void *arg)
+{
+       struct kfilnd_peer *peer = ptr;
+
+       CDEBUG(D_NET, "%s(0x%llx) peer entry freed\n",
+              libcfs_nid2str(peer->nid), peer->addr);
+
+       kfi_av_remove(peer->dev->kfd_av, &peer->addr, 1, 0);
+
+       kfree_rcu(peer, rcu_head);
+}
+
+/**
+ * kfilnd_peer_down() - Mark a peer as down.
+ * @peer: Peer to be downed.
+ */
+void kfilnd_peer_down(struct kfilnd_peer *peer)
+{
+       if (atomic_cmpxchg(&peer->remove_peer, 0, 1) == 0) {
+               CDEBUG(D_NET, "%s(0x%llx) marked for removal from peer cache\n",
+                      libcfs_nid2str(peer->nid), peer->addr);
+
+               lnet_notify(peer->dev->kfd_ni, peer->nid, false, false,
+                           peer->last_alive);
+       }
+}
+
+/**
+ * kfilnd_peer_put() - Return a reference for a peer.
+ * @peer: Peer where the reference should be returned.
+ */
+void kfilnd_peer_put(struct kfilnd_peer *peer)
+{
+       rcu_read_lock();
+
+       /* Return allocation reference if the peer was marked for removal. */
+       if (atomic_cmpxchg(&peer->remove_peer, 1, 2) == 1) {
+               rhashtable_remove_fast(&peer->dev->peer_cache, &peer->node,
+                                      peer_cache_params);
+               refcount_dec(&peer->cnt);
+
+               CDEBUG(D_NET, "%s(0x%llx) removed from peer cache\n",
+                      libcfs_nid2str(peer->nid), peer->addr);
+       }
+
+       if (refcount_dec_and_test(&peer->cnt))
+               kfilnd_peer_free(peer, NULL);
+
+       rcu_read_unlock();
+}
+
+u16 kfilnd_peer_target_rx_base(struct kfilnd_peer *peer)
+{
+       int cpt = lnet_cpt_of_nid(peer->nid, peer->dev->kfd_ni);
+       struct kfilnd_ep *ep = peer->dev->cpt_to_endpoint[cpt];
+
+       return ep->end_context_id;
+}
+
+/**
+ * kfilnd_peer_get() - Get a reference for a peer.
+ * @dev: Device used to lookup peer.
+ * @nid: LNet NID of peer.
+ *
+ * Return: On success, pointer to a valid peer structed. Else, ERR_PTR.
+ */
+struct kfilnd_peer *kfilnd_peer_get(struct kfilnd_dev *dev, lnet_nid_t nid)
+{
+       char *node;
+       char *service;
+       int rc;
+       u32 nid_addr = LNET_NIDADDR(nid);
+       u32 net_num = LNET_NETNUM(LNET_NIDNET(nid));
+       struct kfilnd_peer *peer;
+       struct kfilnd_peer *clash_peer;
+
+again:
+       /* Check the cache for a match. */
+       rcu_read_lock();
+       peer = rhashtable_lookup_fast(&dev->peer_cache, &nid,
+                                     peer_cache_params);
+       if (peer && !refcount_inc_not_zero(&peer->cnt))
+               peer = NULL;
+       rcu_read_unlock();
+
+       if (peer)
+               return peer;
+
+       /* Allocate a new peer for the cache. */
+       peer = kzalloc(sizeof(*peer), GFP_KERNEL);
+       if (!peer) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       node = kasprintf(GFP_KERNEL, "%#x", nid_addr);
+       if (!node) {
+               rc = -ENOMEM;
+               goto err_free_peer;
+       }
+
+       service = kasprintf(GFP_KERNEL, "%u", net_num);
+       if (!service) {
+               rc = -ENOMEM;
+               goto err_free_node_str;
+       }
+
+       /* Use the KFI address vector to translate node and service string into
+        * a KFI address handle.
+        */
+       rc = kfi_av_insertsvc(dev->kfd_av, node, service, &peer->addr, 0, dev);
+
+       kfree(service);
+       kfree(node);
+
+       if (rc < 0) {
+               goto err_free_peer;
+       } else if (rc != 1) {
+               rc = -ECONNABORTED;
+               goto err_free_peer;
+       }
+
+       peer->dev = dev;
+       peer->nid = nid;
+       atomic_set(&peer->rx_base, 0);
+       atomic_set(&peer->remove_peer, 0);
+       peer->local_session_key = kfilnd_dev_get_session_key(dev);
+
+       /* One reference for the allocation and another for get operation
+        * performed for this peer. The allocation reference is returned when
+        * the entry is marked for removal.
+        */
+       refcount_set(&peer->cnt, 2);
+
+       clash_peer = rhashtable_lookup_get_insert_fast(&dev->peer_cache,
+                                                      &peer->node,
+                                                      peer_cache_params);
+
+       if (clash_peer) {
+               kfi_av_remove(dev->kfd_av, &peer->addr, 1, 0);
+               kfree(peer);
+
+               if (IS_ERR(clash_peer)) {
+                       rc = PTR_ERR(clash_peer);
+                       goto err;
+               } else {
+                       goto again;
+               }
+       }
+
+       kfilnd_peer_alive(peer);
+
+       CDEBUG(D_NET, "%s(0x%llx) peer entry allocated\n",
+              libcfs_nid2str(peer->nid), peer->addr);
+
+       return peer;
+
+err_free_node_str:
+       kfree(node);
+err_free_peer:
+       kfree(peer);
+err:
+       return ERR_PTR(rc);
+}
+
+/**
+ * kfilnd_peer_get_kfi_addr() - Return kfi_addr_t used for eager untagged send
+ * kfi operations.
+ * @peer: Peer struct.
+ *
+ * The returned kfi_addr_t is updated to target a specific RX context. The
+ * address return by this function should not be used if a specific RX context
+ * needs to be targeted (i/e the response RX context for a bulk transfer
+ * operation).
+ *
+ * Return: kfi_addr_t.
+ */
+kfi_addr_t kfilnd_peer_get_kfi_addr(struct kfilnd_peer *peer)
+{
+       /* TODO: Support RX count by round-robining the generated kfi_addr_t's
+        * across multiple RX contexts using RX base and RX count.
+        */
+       return kfi_rx_addr(KFILND_BASE_ADDR(peer->addr),
+                          atomic_read(&peer->rx_base), KFILND_FAB_RX_CTX_BITS);
+}
+
+/**
+ * kfilnd_peer_update_rx_contexts() - Update the RX context for a peer.
+ * @peer: Peer to be updated.
+ * @rx_base: New RX base for peer.
+ * @rx_count: New RX count for peer.
+ */
+void kfilnd_peer_update_rx_contexts(struct kfilnd_peer *peer,
+                                   unsigned int rx_base, unsigned int rx_count)
+{
+       /* TODO: Support RX count. */
+       LASSERT(rx_count > 0);
+       atomic_set(&peer->rx_base, rx_base);
+}
+
+/**
+ * kfilnd_peer_alive() - Update when the peer was last alive.
+ * @peer: Peer to be updated.
+ */
+void kfilnd_peer_alive(struct kfilnd_peer *peer)
+{
+       peer->last_alive = ktime_get_seconds();
+
+       /* Ensure timestamp is committed to memory before used. */
+       smp_mb();
+}
+
+/**
+ * kfilnd_peer_destroy() - Destroy peer cache.
+ * @dev: Device peer cache to be destroyed.
+ */
+void kfilnd_peer_destroy(struct kfilnd_dev *dev)
+{
+       rhashtable_free_and_destroy(&dev->peer_cache, kfilnd_peer_free, NULL);
+}
+
+/**
+ * kfilnd_peer_init() - Initialize peer cache.
+ * @dev: Device peer cache to be initialized.
+ */
+void kfilnd_peer_init(struct kfilnd_dev *dev)
+{
+       rhashtable_init(&dev->peer_cache, &peer_cache_params);
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_peer.h b/lnet/klnds/kfilnd/kfilnd_peer.h
new file mode 100644 (file)
index 0000000..27a72bd
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd peer interface.
+ */
+
+#ifndef _KFILND_PEER_
+#define _KFILND_PEER_
+
+#include "kfilnd.h"
+
+void kfilnd_peer_down(struct kfilnd_peer *peer);
+void kfilnd_peer_put(struct kfilnd_peer *peer);
+struct kfilnd_peer *kfilnd_peer_get(struct kfilnd_dev *dev, lnet_nid_t nid);
+void kfilnd_peer_update_rx_contexts(struct kfilnd_peer *peer,
+                                   unsigned int rx_base,
+                                   unsigned int rx_count);
+void kfilnd_peer_alive(struct kfilnd_peer *peer);
+void kfilnd_peer_destroy(struct kfilnd_dev *dev);
+void kfilnd_peer_init(struct kfilnd_dev *dev);
+kfi_addr_t kfilnd_peer_get_kfi_addr(struct kfilnd_peer *peer);
+u16 kfilnd_peer_target_rx_base(struct kfilnd_peer *peer);
+
+#endif /* _KFILND_PEER_ */
diff --git a/lnet/klnds/kfilnd/kfilnd_tn.c b/lnet/klnds/kfilnd/kfilnd_tn.c
new file mode 100644 (file)
index 0000000..ba79c80
--- /dev/null
@@ -0,0 +1,1605 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd transaction and state machine processing.
+ */
+
+#include "kfilnd_tn.h"
+#include "kfilnd_ep.h"
+#include "kfilnd_dev.h"
+#include "kfilnd_dom.h"
+#include "kfilnd_peer.h"
+#include <asm/checksum.h>
+
+static struct kmem_cache *tn_cache;
+static struct kmem_cache *imm_buf_cache;
+
+static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
+{
+       if (cksum)
+               return csum_fold(csum_partial(ptr, nob, 0));
+       return NO_CHECKSUM;
+}
+
+static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
+{
+       const int hdr_size = offsetof(struct kfilnd_msg, proto);
+
+       switch (type) {
+       case KFILND_MSG_IMMEDIATE:
+               return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
+
+       case KFILND_MSG_BULK_PUT_REQ:
+       case KFILND_MSG_BULK_GET_REQ:
+               return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
+
+       default:
+               return -1;
+       }
+}
+
+static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
+{
+       struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
+
+       /* Pack the protocol header and payload. */
+       msg->proto.hello.version = KFILND_MSG_VERSION;
+       msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->peer);
+       msg->proto.hello.session_key = tn->peer->local_session_key;
+
+       /* TODO: Support multiple RX contexts per peer. */
+       msg->proto.hello.rx_count = 1;
+
+       /* Pack the transport header. */
+       msg->magic = KFILND_MSG_MAGIC;
+
+       /* Mesage version zero is only valid for hello requests. */
+       msg->version = 0;
+       msg->type = KFILND_MSG_HELLO_REQ;
+       msg->nob = sizeof(struct kfilnd_hello_msg) +
+               offsetof(struct kfilnd_msg, proto);
+       msg->cksum = NO_CHECKSUM;
+       msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
+       msg->dstnid = tn->peer->nid;
+
+       /* Checksum entire message. */
+       msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
+
+       tn->tn_tx_msg.length = msg->nob;
+}
+
+static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
+{
+       struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
+
+       /* Pack the protocol header and payload. */
+       msg->proto.hello.version = tn->peer->version;
+       msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->peer);
+       msg->proto.hello.session_key = tn->peer->local_session_key;
+
+       /* TODO: Support multiple RX contexts per peer. */
+       msg->proto.hello.rx_count = 1;
+
+       /* Pack the transport header. */
+       msg->magic = KFILND_MSG_MAGIC;
+
+       /* Mesage version zero is only valid for hello requests. */
+       msg->version = 0;
+       msg->type = KFILND_MSG_HELLO_RSP;
+       msg->nob = sizeof(struct kfilnd_hello_msg) +
+               offsetof(struct kfilnd_msg, proto);
+       msg->cksum = NO_CHECKSUM;
+       msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
+       msg->dstnid = tn->peer->nid;
+
+       /* Checksum entire message. */
+       msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
+
+       tn->tn_tx_msg.length = msg->nob;
+}
+
+static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
+{
+       struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
+
+       /* Pack the protocol header and payload. */
+       lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
+       msg->proto.bulk_req.key = tn->tn_mr_key;
+       msg->proto.bulk_req.response_rx = tn->tn_response_rx;
+
+       /* Pack the transport header. */
+       msg->magic = KFILND_MSG_MAGIC;
+       msg->version = KFILND_MSG_VERSION;
+       msg->type = tn->msg_type;
+       msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
+               offsetof(struct kfilnd_msg, proto);
+       msg->cksum = NO_CHECKSUM;
+       msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
+       msg->dstnid = tn->peer->nid;
+
+       /* Checksum entire message. */
+       msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
+
+       tn->tn_tx_msg.length = msg->nob;
+}
+
+static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
+{
+       struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
+
+       /* Pack the protocol header and payload. */
+       lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
+
+       lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
+                           msg,
+                           offsetof(struct kfilnd_msg,
+                                    proto.immed.payload),
+                           tn->tn_num_iovec, tn->tn_kiov, 0,
+                           tn->tn_nob);
+
+       /* Pack the transport header. */
+       msg->magic = KFILND_MSG_MAGIC;
+       msg->version = KFILND_MSG_VERSION;
+       msg->type = tn->msg_type;
+       msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
+       msg->cksum = NO_CHECKSUM;
+       msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
+       msg->dstnid = tn->peer->nid;
+
+       /* Checksum entire message. */
+       msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
+
+       tn->tn_tx_msg.length = msg->nob;
+}
+
+static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
+                               unsigned int nob)
+{
+       const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
+
+       if (nob < hdr_size) {
+               KFILND_EP_ERROR(ep, "Short message: %u", nob);
+               return -EPROTO;
+       }
+
+       /* TODO: Support byte swapping on mixed endian systems. */
+       if (msg->magic != KFILND_MSG_MAGIC) {
+               KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
+               return -EPROTO;
+       }
+
+       /* TODO: Allow for older versions. */
+       if (msg->version > KFILND_MSG_VERSION) {
+               KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
+               return -EPROTO;
+       }
+
+       if (msg->nob > nob) {
+               KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
+                               msg->nob);
+               return -EPROTO;
+       }
+
+       /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
+       if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
+               KFILND_EP_ERROR(ep, "Bad checksum");
+               return -EPROTO;
+       }
+
+       if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
+               KFILND_EP_ERROR(ep, "Bad destination nid: %s",
+                               libcfs_nid2str(msg->dstnid));
+               return -EPROTO;
+       }
+
+       if (msg->srcnid == LNET_NID_ANY) {
+               KFILND_EP_ERROR(ep, "Bad source nid: %s",
+                               libcfs_nid2str(msg->srcnid));
+               return -EPROTO;
+       }
+
+       if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
+               KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
+                               msg_type_to_str(msg->type),
+                               msg->nob, kfilnd_tn_msgtype2size(msg->type));
+               return -EPROTO;
+       }
+
+       switch ((enum kfilnd_msg_type)msg->type) {
+       case KFILND_MSG_IMMEDIATE:
+       case KFILND_MSG_BULK_PUT_REQ:
+       case KFILND_MSG_BULK_GET_REQ:
+               if (msg->version == 0) {
+                       KFILND_EP_ERROR(ep,
+                                       "Bad message type and version: type=%s version=%u",
+                                       msg_type_to_str(msg->type),
+                                       msg->version);
+                       return -EPROTO;
+               }
+               break;
+
+       case KFILND_MSG_HELLO_REQ:
+       case KFILND_MSG_HELLO_RSP:
+               if (msg->version != 0) {
+                       KFILND_EP_ERROR(ep,
+                                       "Bad message type and version: type=%s version=%u",
+                                       msg_type_to_str(msg->type),
+                                       msg->version);
+                       return -EPROTO;
+               }
+               break;
+
+       default:
+               CERROR("Unknown message type %x\n", msg->type);
+               return -EPROTO;
+       }
+       return 0;
+}
+
+static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
+{
+       unsigned int data_size_bucket =
+               kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
+       struct kfilnd_tn_duration_stat *stat;
+
+       if (tn->is_initiator)
+               stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
+       else
+               stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
+
+       atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts)),
+                    &stat->accumulated_duration);
+       atomic_inc(&stat->accumulated_count);
+}
+
+static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
+                                  enum tn_states new_state)
+{
+       KFILND_TN_DEBUG(tn, "%s -> %s state change",
+                       tn_state_to_str(tn->tn_state),
+                       tn_state_to_str(new_state));
+
+       kfilnd_tn_record_state_change(tn);
+
+       tn->tn_state = new_state;
+       tn->tn_state_ts = ktime_get();
+}
+
+static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
+                                   enum lnet_msg_hstatus hstatus)
+{
+       /* Only the first non-ok status will take. */
+       if (tn->tn_status == 0) {
+               KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
+                               status);
+               tn->tn_status = status;
+       }
+
+       if (tn->hstatus == LNET_MSG_STATUS_OK) {
+               KFILND_TN_DEBUG(tn, "%d -> %d health status change",
+                               tn->hstatus, hstatus);
+               tn->hstatus = hstatus;
+       }
+}
+
+static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
+{
+       return tn->tn_status != 0;
+}
+
+/**
+ * kfilnd_tn_process_rx_event() - Process an immediate receive event.
+ *
+ * For each immediate receive, a transaction structure needs to be allocated to
+ * process the receive.
+ */
+void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
+                               struct kfilnd_msg *rx_msg, int msg_size)
+{
+       struct kfilnd_transaction *tn;
+       bool alloc_msg = true;
+       int rc;
+       enum tn_events event = TN_EVENT_RX_HELLO;
+
+       /* Increment buf ref count for this work */
+       atomic_inc(&bufdesc->immed_ref);
+
+       /* Unpack the message */
+       rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
+       if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
+               kfilnd_ep_imm_buffer_put(bufdesc);
+               KFILND_EP_ERROR(bufdesc->immed_end,
+                               "Failed to unpack message %d", rc);
+               return;
+       }
+
+       switch ((enum kfilnd_msg_type)rx_msg->type) {
+       case KFILND_MSG_IMMEDIATE:
+       case KFILND_MSG_BULK_PUT_REQ:
+       case KFILND_MSG_BULK_GET_REQ:
+               event = TN_EVENT_RX_OK;
+               fallthrough;
+       case KFILND_MSG_HELLO_RSP:
+               alloc_msg = false;
+               fallthrough;
+       case KFILND_MSG_HELLO_REQ:
+               /* Context points to a received buffer and status is the length.
+                * Allocate a Tn structure, set its values, then launch the
+                * receive.
+                */
+               tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
+                                    bufdesc->immed_end->end_cpt,
+                                    rx_msg->srcnid, alloc_msg, false,
+                                    false);
+               if (IS_ERR(tn)) {
+                       kfilnd_ep_imm_buffer_put(bufdesc);
+                       KFILND_EP_ERROR(bufdesc->immed_end,
+                                       "Failed to allocate transaction struct: rc=%ld",
+                                       PTR_ERR(tn));
+                       return;
+               }
+
+               tn->tn_rx_msg.msg = rx_msg;
+               tn->tn_rx_msg.length = msg_size;
+               tn->tn_posted_buf = bufdesc;
+
+               KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
+                               msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
+                               tn->tn_mr_key);
+               break;
+
+       default:
+               KFILND_EP_ERROR(bufdesc->immed_end,
+                               "Unhandled kfilnd message type: %d",
+                               (enum kfilnd_msg_type)rx_msg->type);
+               LBUG();
+       };
+
+       kfilnd_tn_event_handler(tn, event, 0);
+}
+
+static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
+{
+       unsigned int data_size_bucket =
+               kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
+       struct kfilnd_tn_duration_stat *stat;
+
+       if (tn->is_initiator)
+               stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
+       else
+               stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
+
+       atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts)),
+                    &stat->accumulated_duration);
+       atomic_inc(&stat->accumulated_count);
+}
+
+/**
+ * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
+ *
+ * All state machine functions should call kfilnd_tn_finalize() instead of
+ * kfilnd_tn_free(). Once all expected asynchronous events have been received,
+ * if the transaction lock has not been released, it will now be released,
+ * transaction resources cleaned up, and LNet finalized will be called.
+ */
+static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
+{
+       if (!*tn_released) {
+               mutex_unlock(&tn->tn_lock);
+               *tn_released = true;
+       }
+
+       /* Release the reference on the multi-receive buffer. */
+       if (tn->tn_posted_buf)
+               kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
+
+       /* Finalize LNet operation. */
+       if (tn->tn_lntmsg) {
+               tn->tn_lntmsg->msg_health_status = tn->hstatus;
+               lnet_finalize(tn->tn_lntmsg, tn->tn_status);
+       }
+
+       if (tn->tn_getreply) {
+               tn->tn_getreply->msg_health_status = tn->hstatus;
+               lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
+                                      tn->tn_getreply,
+                                      tn->tn_status ? 0 : tn->tn_nob);
+               lnet_finalize(tn->tn_getreply, tn->tn_status);
+       }
+
+       if (KFILND_TN_PEER_VALID(tn))
+               kfilnd_peer_put(tn->peer);
+
+       kfilnd_tn_record_state_change(tn);
+       kfilnd_tn_record_duration(tn);
+
+       kfilnd_tn_free(tn);
+}
+
+/**
+ * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
+ * @tn: Transaction to have tagged received cancelled.
+ *
+ * Return: 0 on success. Else, negative errno. If an error occurs, resources may
+ * be leaked.
+ */
+static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
+{
+       int rc;
+
+       /* Issue a cancel. A return code of zero means the operation issued an
+        * async cancel. A return code of -ENOENT means the tagged receive was
+        * not found. The assumption here is that a tagged send landed thus
+        * removing the tagged receive buffer from hardware. For both cases,
+        * async events should occur.
+        */
+       rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
+       if (rc != 0 && rc != -ENOENT) {
+               KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
+               return rc;
+       }
+
+       return 0;
+}
+
+static void kfilnd_tn_timeout_work(struct work_struct *work)
+{
+       struct kfilnd_transaction *tn =
+               container_of(work, struct kfilnd_transaction, timeout_work);
+
+       KFILND_TN_ERROR(tn, "Bulk operation timeout");
+       kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
+}
+
+static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
+{
+       struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
+
+       queue_work(kfilnd_wq, &tn->timeout_work);
+}
+
+static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
+{
+       return del_timer(&tn->timeout_timer);
+}
+
+static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
+{
+       ktime_t remaining_time = max_t(ktime_t, 0,
+                                      tn->deadline - ktime_get_seconds());
+       unsigned long expires = remaining_time * HZ + jiffies;
+
+       if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
+               expires = jiffies;
+
+       cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
+                       (unsigned long)tn, 0);
+       mod_timer(&tn->timeout_timer, expires);
+}
+
+/*  The following are the state machine routines for the transactions. */
+static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
+                                      enum tn_events event, int status,
+                                      bool *tn_released)
+{
+       int rc;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_INIT_BULK:
+               /* Need to cancel the tagged receive to prevent resources from
+                * being leaked.
+                */
+               rc = kfilnd_tn_cancel_tag_recv(tn);
+
+               switch (rc) {
+               /* Async event will progress transaction. */
+               case 0:
+                       kfilnd_tn_state_change(tn, TN_STATE_FAIL);
+                       return 0;
+
+               /* Need to replay TN_EVENT_INIT_BULK event while in the
+                * TN_STATE_SEND_FAILED state.
+                */
+               case -EAGAIN:
+                       KFILND_TN_DEBUG(tn,
+                                       "Need to replay cancel tagged recv");
+                       return -EAGAIN;
+
+               default:
+                       KFILND_TN_ERROR(tn,
+                                       "Unexpected error during cancel tagged receive: rc=%d",
+                                       rc);
+                       LBUG();
+               }
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+}
+
+static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
+                                             enum tn_events event, int status,
+                                             bool *tn_released)
+{
+       int rc;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_INIT_BULK:
+               tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->peer);
+               KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
+                               libcfs_nid2str(tn->peer->nid),
+                               tn->tn_target_addr);
+
+               kfilnd_tn_pack_bulk_req(tn);
+
+               rc = kfilnd_ep_post_send(tn->tn_ep, tn);
+               switch (rc) {
+               /* Async event will progress immediate send. */
+               case 0:
+                       kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
+                       return 0;
+
+               /* Need to replay TN_EVENT_INIT_BULK event while in the
+                * TN_STATE_TAGGED_RECV_POSTED state.
+                */
+               case -EAGAIN:
+                       KFILND_TN_DEBUG(tn,
+                                       "Need to replay post send to %s(%#llx)",
+                                       libcfs_nid2str(tn->peer->nid),
+                                       tn->tn_target_addr);
+                       return -EAGAIN;
+
+               /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
+                * posted tagged receive buffer.
+                */
+               default:
+                       KFILND_TN_ERROR(tn,
+                                       "Failed to post send to %s(%#llx): rc=%d",
+                                       libcfs_nid2str(tn->peer->nid),
+                                       tn->tn_target_addr, rc);
+                       kfilnd_tn_status_update(tn, rc,
+                                               LNET_MSG_STATUS_LOCAL_ERROR);
+                       kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
+
+                       /* Propogate TN_EVENT_INIT_BULK event to
+                        * TN_STATE_SEND_FAILED handler.
+                        */
+                       return kfilnd_tn_state_send_failed(tn, event, rc,
+                                                          tn_released);
+               }
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+}
+
+static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
+                               enum tn_events event, int status,
+                               bool *tn_released)
+{
+       struct kfilnd_msg *msg;
+       int rc;
+       bool finalize = false;
+       ktime_t remaining_time;
+       struct lnet_hdr hdr;
+       struct lnet_nid srcnid;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       /* For new peers, send a hello request message and queue the true LNet
+        * message for replay.
+        */
+       if (kfilnd_peer_is_new_peer(tn->peer) &&
+           (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
+               remaining_time = max_t(ktime_t, 0,
+                                      tn->deadline - ktime_get_seconds());
+
+               /* If transaction deadline has not be met, return -EAGAIN. This
+                * will cause this transaction event to be replayed. During this
+                * time, an async message from the peer should occur at which
+                * point the kfilnd version should be negotiated.
+                */
+               if (remaining_time > 0) {
+                       KFILND_TN_DEBUG(tn, "%s hello response pending",
+                                       libcfs_nid2str(tn->peer->nid));
+                       return -EAGAIN;
+               }
+
+               rc = 0;
+               kfilnd_tn_status_update(tn, -ETIMEDOUT,
+                                       LNET_MSG_STATUS_NETWORK_TIMEOUT);
+               goto out;
+       }
+
+       switch (event) {
+       case TN_EVENT_INIT_IMMEDIATE:
+       case TN_EVENT_TX_HELLO:
+               tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->peer);
+               KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
+                               libcfs_nid2str(tn->peer->nid),
+                               tn->tn_target_addr);
+
+               if (event == TN_EVENT_INIT_IMMEDIATE)
+                       kfilnd_tn_pack_immed_msg(tn);
+               else
+                       kfilnd_tn_pack_hello_req(tn);
+
+               /* Send immediate message. */
+               rc = kfilnd_ep_post_send(tn->tn_ep, tn);
+               switch (rc) {
+               /* Async event will progress immediate send. */
+               case 0:
+                       kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
+                       return 0;
+
+               /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
+                * state.
+                */
+               case -EAGAIN:
+                       KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
+                                       libcfs_nid2str(tn->peer->nid),
+                                       tn->tn_target_addr);
+                       return -EAGAIN;
+
+               default:
+                       KFILND_TN_ERROR(tn,
+                                       "Failed to post send to %s(%#llx): rc=%d",
+                                       libcfs_nid2str(tn->peer->nid),
+                                       tn->tn_target_addr, rc);
+                       kfilnd_tn_status_update(tn, rc,
+                                               LNET_MSG_STATUS_LOCAL_ERROR);
+               }
+               break;
+
+       case TN_EVENT_INIT_BULK:
+               /* Post tagged receive buffer used to land bulk response. */
+               rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
+
+               switch (rc) {
+               /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
+               case 0:
+                       kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
+
+                       /* Propogate TN_EVENT_INIT_BULK event to
+                        * TN_STATE_TAGGED_RECV_POSTED handler.
+                        */
+                       return kfilnd_tn_state_tagged_recv_posted(tn, event,
+                                                                 rc,
+                                                                 tn_released);
+
+               /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
+                * state.
+                */
+               case -EAGAIN:
+                       KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
+                       return -EAGAIN;
+
+               default:
+                       KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
+                                       rc);
+                       kfilnd_tn_status_update(tn, rc,
+                                               LNET_MSG_STATUS_LOCAL_ERROR);
+               }
+               break;
+
+       case TN_EVENT_RX_OK:
+               /* If TN_EVENT_RX_OK occurs on a new peer, this is a sign of a
+                * peer having a stale peer structure. Stale peer structures
+                * requires dropping the incoming message and initiating a hello
+                * handshake.
+                */
+               if (kfilnd_peer_is_new_peer(tn->peer)) {
+                       rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
+                                                      tn->tn_ep->end_cpt,
+                                                      tn->peer->nid);
+                       if (rc)
+                               KFILND_TN_ERROR(tn,
+                                               "Failed to send hello request: rc=%d",
+                                               rc);
+
+                       /* Need to drop this message since it is uses stale
+                        * peer.
+                        */
+                       KFILND_TN_ERROR(tn,
+                                       "Dropping message from %s due to stale peer",
+                                       libcfs_nid2str(tn->peer->nid));
+                       kfilnd_tn_status_update(tn, -EPROTO,
+                                               LNET_MSG_STATUS_LOCAL_DROPPED);
+                       rc = 0;
+                       goto out;
+               }
+
+               LASSERT(kfilnd_peer_is_new_peer(tn->peer) == false);
+               msg = tn->tn_rx_msg.msg;
+
+               /* Update the NID address with the new preferred RX context. */
+               kfilnd_peer_alive(tn->peer);
+
+               /* Pass message up to LNet
+                * The TN will be reused in this call chain so we need to
+                * release the lock on the TN before proceeding.
+                */
+               KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
+                               tn_state_to_str(tn->tn_state));
+
+               /* TODO: Do not manually update this state change. */
+               tn->tn_state = TN_STATE_IMM_RECV;
+               mutex_unlock(&tn->tn_lock);
+               *tn_released = true;
+               lnet_nid4_to_nid(msg->srcnid, &srcnid);
+               if (msg->type == KFILND_MSG_IMMEDIATE) {
+                       lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
+                       rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
+                                       &hdr, &srcnid, tn, 0);
+               } else {
+                       lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
+                       rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
+                                       &hdr, &srcnid, tn, 1);
+               }
+
+               /* If successful, transaction has been accepted by LNet and we
+                * cannot process the transaction anymore within this context.
+                */
+               if (!rc)
+                       return 0;
+
+               KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
+               kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
+               break;
+
+       case TN_EVENT_RX_HELLO:
+               msg = tn->tn_rx_msg.msg;
+
+               switch (msg->type) {
+               case KFILND_MSG_HELLO_REQ:
+                       kfilnd_peer_update_rx_contexts(tn->peer,
+                                                      msg->proto.hello.rx_base,
+                                                      msg->proto.hello.rx_count);
+                       kfilnd_peer_set_remote_session_key(tn->peer,
+                                                          msg->proto.hello.session_key);
+
+                       /* Negotiate kfilnd version used between peers. Fallback
+                        * to the minimum implemented kfilnd version.
+                        */
+                       kfilnd_peer_set_version(tn->peer,
+                                               min_t(__u16, KFILND_MSG_VERSION,
+                                                   msg->proto.hello.version));
+                       KFILND_TN_DEBUG(tn,
+                                       "Peer kfilnd version: %u; Local kfilnd version: %u; Negotiated kfilnd verions: %u",
+                                       msg->proto.hello.version,
+                                       KFILND_MSG_VERSION, tn->peer->version);
+
+                       tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->peer);
+                       KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
+                                       libcfs_nid2str(tn->peer->nid),
+                                       tn->tn_target_addr);
+
+                       kfilnd_tn_pack_hello_rsp(tn);
+
+                       /* Send immediate message. */
+                       rc = kfilnd_ep_post_send(tn->tn_ep, tn);
+                       switch (rc) {
+                       case 0:
+                               kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
+                               return 0;
+
+                       case -EAGAIN:
+                               KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
+                                               libcfs_nid2str(tn->peer->nid),
+                                               tn->tn_target_addr);
+                               return -EAGAIN;
+
+                       default:
+                               KFILND_TN_ERROR(tn,
+                                               "Failed to post send to %s(%#llx): rc=%d",
+                                               libcfs_nid2str(tn->peer->nid),
+                                               tn->tn_target_addr, rc);
+                               kfilnd_tn_status_update(tn, rc,
+                                                       LNET_MSG_STATUS_LOCAL_ERROR);
+                       }
+                       break;
+
+               case KFILND_MSG_HELLO_RSP:
+                       rc = 0;
+                       kfilnd_peer_update_rx_contexts(tn->peer,
+                                                      msg->proto.hello.rx_base,
+                                                      msg->proto.hello.rx_count);
+                       kfilnd_peer_set_remote_session_key(tn->peer,
+                                                          msg->proto.hello.session_key);
+                       kfilnd_peer_set_version(tn->peer,
+                                               msg->proto.hello.version);
+                       KFILND_TN_DEBUG(tn, "Negotiated kfilnd version: %u",
+                                       msg->proto.hello.version);
+                       finalize = true;
+                       break;
+
+               default:
+                       KFILND_TN_ERROR(tn, "Invalid message type: %s",
+                                       msg_type_to_str(msg->type));
+                       LBUG();
+               }
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+out:
+       if (kfilnd_tn_has_failed(tn))
+               finalize = true;
+
+       if (finalize)
+               kfilnd_tn_finalize(tn, tn_released);
+
+       return rc;
+}
+
+static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
+                                   enum tn_events event, int status,
+                                   bool *tn_released)
+{
+       enum lnet_msg_hstatus hstatus;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_TX_FAIL:
+               if (status == -ETIMEDOUT || status == -EIO)
+                       hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
+               else
+                       hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
+
+               kfilnd_tn_status_update(tn, status, hstatus);
+               kfilnd_peer_down(tn->peer);
+               break;
+
+       case TN_EVENT_TX_OK:
+               kfilnd_peer_alive(tn->peer);
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       kfilnd_tn_finalize(tn, tn_released);
+
+       return 0;
+}
+
+static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
+                                   enum tn_events event, int status,
+                                   bool *tn_released)
+{
+       int rc = 0;
+       bool finalize = false;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_INIT_TAG_RMA:
+       case TN_EVENT_SKIP_TAG_RMA:
+               /* Release the buffer we received the request on. All relevant
+                * information to perform the RMA operation is stored in the
+                * transaction structure. This should be done before the RMA
+                * operation to prevent two contexts from potentially processing
+                * the same transaction.
+                *
+                * TODO: Prevent this from returning -EAGAIN.
+                */
+               if (tn->tn_posted_buf) {
+                       kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
+                       tn->tn_posted_buf = NULL;
+               }
+
+               /* Update the KFI address to use the response RX context. */
+               tn->tn_target_addr =
+                       kfi_rx_addr(KFILND_BASE_ADDR(tn->peer->addr),
+                                   tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
+               KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
+                               libcfs_nid2str(tn->peer->nid),
+                               tn->tn_target_addr);
+
+               /* Initiate the RMA operation to push/pull the LNet payload or
+                * send a tagged message to finalize the bulk operation if the
+                * RMA operation should be skipped.
+                */
+               if (event == TN_EVENT_INIT_TAG_RMA) {
+                       if (tn->sink_buffer)
+                               rc = kfilnd_ep_post_read(tn->tn_ep, tn);
+                       else
+                               rc = kfilnd_ep_post_write(tn->tn_ep, tn);
+
+                       switch (rc) {
+                       /* Async tagged RMA event will progress transaction. */
+                       case 0:
+                               kfilnd_tn_state_change(tn,
+                                                      TN_STATE_WAIT_TAG_RMA_COMP);
+                               return 0;
+
+                       /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
+                        * the TN_STATE_IMM_RECV state.
+                        */
+                       case -EAGAIN:
+                               KFILND_TN_DEBUG(tn,
+                                               "Need to replay tagged %s to %s(%#llx)",
+                                               tn->sink_buffer ? "read" : "write",
+                                               libcfs_nid2str(tn->peer->nid),
+                                               tn->tn_target_addr);
+                               return -EAGAIN;
+
+                       default:
+                               KFILND_TN_ERROR(tn,
+                                               "Failed to post tagged %s to %s(%#llx): rc=%d",
+                                               tn->sink_buffer ? "read" : "write",
+                                               libcfs_nid2str(tn->peer->nid),
+                                               tn->tn_target_addr, rc);
+                               kfilnd_tn_status_update(tn, rc,
+                                                       LNET_MSG_STATUS_LOCAL_ERROR);
+                       }
+               } else {
+                       kfilnd_tn_status_update(tn, status,
+                                               LNET_MSG_STATUS_OK);
+
+                       /* Since the LNet initiator has posted a unique tagged
+                        * buffer specific for this LNet transaction and the
+                        * LNet target has decide not to push/pull to/for the
+                        * LNet initiator tagged buffer, a noop operation is
+                        * done to this tagged buffer (i/e payload transfer size
+                        * is zero). But, immediate data, which contains the
+                        * LNet target status for the transaction, is sent to
+                        * the LNet initiator. Immediate data only appears in
+                        * the completion event at the LNet initiator and not in
+                        * the tagged buffer.
+                        */
+                       tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
+
+                       rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
+                       switch (rc) {
+                       /* Async tagged RMA event will progress transaction. */
+                       case 0:
+                               kfilnd_tn_state_change(tn,
+                                                      TN_STATE_WAIT_TAG_COMP);
+                               return 0;
+
+                       /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
+                        * the TN_STATE_IMM_RECV state.
+                        */
+                       case -EAGAIN:
+                               KFILND_TN_DEBUG(tn,
+                                               "Need to replay tagged send to %s(%#llx)",
+                                               libcfs_nid2str(tn->peer->nid),
+                                               tn->tn_target_addr);
+                               return -EAGAIN;
+
+                       default:
+                               KFILND_TN_ERROR(tn,
+                                               "Failed to post tagged send to %s(%#llx): rc=%d",
+                                               libcfs_nid2str(tn->peer->nid),
+                                               tn->tn_target_addr, rc);
+                               kfilnd_tn_status_update(tn, rc,
+                                                       LNET_MSG_STATUS_LOCAL_ERROR);
+                       }
+               }
+               break;
+
+       case TN_EVENT_RX_OK:
+               finalize = true;
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       if (kfilnd_tn_has_failed(tn))
+               finalize = true;
+
+       if (finalize)
+               kfilnd_tn_finalize(tn, tn_released);
+
+       return rc;
+}
+
+static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
+                                    enum tn_events event, int status,
+                                    bool *tn_released)
+{
+       int rc;
+       enum lnet_msg_hstatus hstatus;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_TX_OK:
+               kfilnd_peer_alive(tn->peer);
+               kfilnd_tn_timeout_enable(tn);
+               kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
+               break;
+
+       case TN_EVENT_TAG_RX_OK:
+               kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
+               break;
+
+       case TN_EVENT_TX_FAIL:
+               if (status == -ETIMEDOUT)
+                       hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
+               else
+                       hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
+
+               kfilnd_tn_status_update(tn, status, hstatus);
+               kfilnd_peer_down(tn->peer);
+
+               /* Need to cancel the tagged receive to prevent resources from
+                * being leaked.
+                */
+               rc = kfilnd_tn_cancel_tag_recv(tn);
+
+               switch (rc) {
+               /* Async cancel event will progress transaction. */
+               case 0:
+                       kfilnd_tn_status_update(tn, status,
+                                               LNET_MSG_STATUS_LOCAL_ERROR);
+                       kfilnd_tn_state_change(tn, TN_STATE_FAIL);
+                       return 0;
+
+               /* Need to replay TN_EVENT_INIT_BULK event while in the
+                * TN_STATE_SEND_FAILED state.
+                */
+               case -EAGAIN:
+                       KFILND_TN_DEBUG(tn,
+                                       "Need to replay cancel tagged recv");
+                       return -EAGAIN;
+
+               default:
+                       KFILND_TN_ERROR(tn,
+                                       "Unexpected error during cancel tagged receive: rc=%d",
+                                       rc);
+                       LBUG();
+               }
+               break;
+
+       case TN_EVENT_TAG_RX_FAIL:
+               kfilnd_tn_status_update(tn, status,
+                                       LNET_MSG_STATUS_LOCAL_ERROR);
+               kfilnd_tn_state_change(tn, TN_STATE_FAIL);
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       return 0;
+}
+
+static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
+                                         enum tn_events event, int status,
+                                         bool *tn_released)
+{
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       if (event == TN_EVENT_TX_OK) {
+               kfilnd_peer_alive(tn->peer);
+               kfilnd_tn_finalize(tn, tn_released);
+       } else {
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       return 0;
+}
+
+static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
+                                            enum tn_events event, int status,
+                                            bool *tn_released)
+{
+       enum lnet_msg_hstatus hstatus;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_TAG_TX_OK:
+               kfilnd_peer_alive(tn->peer);
+               break;
+
+       case TN_EVENT_TAG_TX_FAIL:
+               if (status == -ETIMEDOUT)
+                       hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
+               else
+                       hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
+
+               kfilnd_tn_status_update(tn, status, hstatus);
+               kfilnd_peer_down(tn->peer);
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       kfilnd_tn_finalize(tn, tn_released);
+
+       return 0;
+}
+
+static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
+                                        enum tn_events event, int status,
+                                        bool *tn_released)
+{
+       int rc;
+       enum lnet_msg_hstatus hstatus;
+
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_TAG_RX_FAIL:
+       case TN_EVENT_TAG_RX_OK:
+               /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
+                * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
+                * LNet target returned -ENODATA.
+                */
+               if (status) {
+                       if (event == TN_EVENT_TAG_RX_FAIL)
+                               kfilnd_tn_status_update(tn, status,
+                                                       LNET_MSG_STATUS_LOCAL_ERROR);
+                       else
+                               kfilnd_tn_status_update(tn, status,
+                                                       LNET_MSG_STATUS_OK);
+               }
+
+               if (!kfilnd_tn_timeout_cancel(tn)) {
+                       kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
+                       return 0;
+               }
+               break;
+
+       case TN_EVENT_TIMEOUT:
+               /* Need to cancel the tagged receive to prevent resources from
+                * being leaked.
+                */
+               rc = kfilnd_tn_cancel_tag_recv(tn);
+
+               switch (rc) {
+               /* Async cancel event will progress transaction. */
+               case 0:
+                       kfilnd_tn_state_change(tn,
+                                              TN_STATE_WAIT_TIMEOUT_TAG_COMP);
+                       return 0;
+
+               /* Need to replay TN_EVENT_INIT_BULK event while in the
+                * TN_STATE_WAIT_TAG_COMP state.
+                */
+               case -EAGAIN:
+                       KFILND_TN_DEBUG(tn,
+                                       "Need to replay cancel tagged recv");
+                       return -EAGAIN;
+
+               default:
+                       KFILND_TN_ERROR(tn,
+                                       "Unexpected error during cancel tagged receive: rc=%d",
+                                       rc);
+                       LBUG();
+               }
+               break;
+
+       case TN_EVENT_TAG_TX_FAIL:
+               if (status == -ETIMEDOUT)
+                       hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
+               else
+                       hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
+
+               kfilnd_tn_status_update(tn, status, hstatus);
+               kfilnd_peer_down(tn->peer);
+               break;
+
+       case TN_EVENT_TAG_TX_OK:
+               kfilnd_peer_alive(tn->peer);
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       kfilnd_tn_finalize(tn, tn_released);
+
+       return 0;
+}
+
+static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
+                               enum tn_events event, int status,
+                               bool *tn_released)
+{
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_TX_FAIL:
+               kfilnd_peer_down(tn->peer);
+               break;
+
+       case TN_EVENT_TX_OK:
+               kfilnd_peer_alive(tn->peer);
+               break;
+
+       case TN_EVENT_TAG_RX_FAIL:
+       case TN_EVENT_TAG_RX_CANCEL:
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       kfilnd_tn_finalize(tn, tn_released);
+
+       return 0;
+}
+
+static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
+                                                enum tn_events event,
+                                                int status, bool *tn_released)
+{
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       switch (event) {
+       case TN_EVENT_TAG_RX_CANCEL:
+               kfilnd_tn_status_update(tn, -ETIMEDOUT,
+                                       LNET_MSG_STATUS_REMOTE_TIMEOUT);
+               kfilnd_peer_down(tn->peer);
+               break;
+
+       case TN_EVENT_TAG_RX_FAIL:
+               kfilnd_tn_status_update(tn, status,
+                                       LNET_MSG_STATUS_LOCAL_ERROR);
+               break;
+
+       case TN_EVENT_TAG_RX_OK:
+               break;
+
+       default:
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       kfilnd_tn_finalize(tn, tn_released);
+
+       return 0;
+}
+
+static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
+                                            enum tn_events event, int status,
+                                            bool *tn_released)
+{
+       KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
+                       status);
+
+       if (event == TN_EVENT_TIMEOUT) {
+               kfilnd_tn_finalize(tn, tn_released);
+       } else {
+               KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
+               LBUG();
+       }
+
+       return 0;
+}
+
+static int
+(* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
+                                                      enum tn_events event,
+                                                      int status,
+                                                      bool *tn_released) = {
+       [TN_STATE_IDLE] = kfilnd_tn_state_idle,
+       [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
+       [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
+       [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
+       [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
+       [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
+       [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
+       [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
+       [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
+               kfilnd_tn_state_wait_timeout_tag_comp,
+       [TN_STATE_FAIL] = kfilnd_tn_state_fail,
+       [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
+       [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
+};
+
+/**
+ * kfilnd_tn_event_handler() - Update transaction state machine with an event.
+ * @tn: Transaction to be updated.
+ * @event: Transaction event.
+ * @status: Errno status associated with the event.
+ *
+ * When the transaction event handler is first called on a new transaction, the
+ * transaction is now own by the transaction system. This means that will be
+ * freed by the system as the transaction is progressed through the state
+ * machine.
+ */
+void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
+                            enum tn_events event, int status)
+{
+       bool tn_released = false;
+       int rc;
+
+       if (!tn)
+               return;
+
+       mutex_lock(&tn->tn_lock);
+       rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
+                                                         &tn_released);
+       if (rc == -EAGAIN) {
+               tn->replay_event = event;
+               tn->replay_status = status;
+               kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
+       }
+
+       if (!tn_released)
+               mutex_unlock(&tn->tn_lock);
+}
+
+/**
+ * kfilnd_tn_free() - Free a transaction.
+ */
+void kfilnd_tn_free(struct kfilnd_transaction *tn)
+{
+       spin_lock(&tn->tn_ep->tn_list_lock);
+       list_del(&tn->tn_entry);
+       spin_unlock(&tn->tn_ep->tn_list_lock);
+
+       KFILND_TN_DEBUG(tn, "Transaction freed");
+
+       if (tn->tn_mr_key)
+               kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
+
+       /* Free send message buffer if needed. */
+       if (tn->tn_tx_msg.msg)
+               kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
+
+       kmem_cache_free(tn_cache, tn);
+}
+
+/**
+ * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
+ * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
+ * transaction.
+ * @cpt: CPT of the transaction.
+ * @target_nid: Target NID of the transaction.
+ * @alloc_msg: Allocate an immediate message for the transaction.
+ * @is_initiator: Is initiator of LNet transaction.
+ * @key: Is transaction memory region key need.
+ *
+ * During transaction allocation, each transaction is associated with a KFI LND
+ * endpoint use to post data transfer operations. The CPT argument is used to
+ * lookup the KFI LND endpoint within the KFI LND device.
+ *
+ * Return: On success, valid pointer. Else, negative errno pointer.
+ */
+struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
+                                          lnet_nid_t target_nid,
+                                          bool alloc_msg, bool is_initiator,
+                                          bool key)
+{
+       struct kfilnd_transaction *tn;
+       struct kfilnd_ep *ep;
+       int rc;
+       ktime_t tn_alloc_ts;
+
+       if (!dev) {
+               rc = -EINVAL;
+               goto err;
+       }
+
+       tn_alloc_ts = ktime_get();
+
+       /* If the CPT does not fall into the LNet NI CPT range, force the CPT
+        * into the LNet NI CPT range. This should never happen.
+        */
+       ep = dev->cpt_to_endpoint[cpt];
+       if (!ep) {
+               CWARN("%s used invalid cpt=%d\n",
+                     libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
+               ep = dev->kfd_endpoints[0];
+       }
+
+       tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
+       if (!tn) {
+               rc = -ENOMEM;
+               goto err;
+       }
+
+       if (alloc_msg) {
+               tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
+               if (!tn->tn_tx_msg.msg) {
+                       rc = -ENOMEM;
+                       goto err_free_tn;
+               }
+       }
+
+       if (key) {
+               rc = kfilnd_ep_get_key(ep);
+               if (rc < 0)
+                       goto err_free_tn;
+               tn->tn_mr_key = rc;
+       }
+
+       tn->peer = kfilnd_peer_get(dev, target_nid);
+       if (IS_ERR(tn->peer)) {
+               rc = PTR_ERR(tn->peer);
+               goto err_put_mr_key;
+       }
+
+       mutex_init(&tn->tn_lock);
+       tn->tn_ep = ep;
+       tn->tn_response_rx = ep->end_context_id;
+       tn->tn_state = TN_STATE_IDLE;
+       tn->hstatus = LNET_MSG_STATUS_OK;
+       tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
+       tn->is_initiator = is_initiator;
+       INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
+
+       /* Add the transaction to an endpoint.  This is like
+        * incrementing a ref counter.
+        */
+       spin_lock(&ep->tn_list_lock);
+       list_add_tail(&tn->tn_entry, &ep->tn_list);
+       spin_unlock(&ep->tn_list_lock);
+
+       tn->tn_alloc_ts = tn_alloc_ts;
+       tn->tn_state_ts = ktime_get();
+
+       KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
+
+       return tn;
+
+err_put_mr_key:
+       if (key)
+               kfilnd_ep_put_key(ep, tn->tn_mr_key);
+err_free_tn:
+       if (tn->tn_tx_msg.msg)
+               kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
+       kmem_cache_free(tn_cache, tn);
+err:
+       return ERR_PTR(rc);
+}
+
+/**
+ * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
+ *
+ * This function should only be called when there are no outstanding
+ * transactions.
+ */
+void kfilnd_tn_cleanup(void)
+{
+       kmem_cache_destroy(imm_buf_cache);
+       kmem_cache_destroy(tn_cache);
+}
+
+/**
+ * kfilnd_tn_init() - Initialize KFI LND transaction system.
+ *
+ * Return: On success, zero. Else, negative errno.
+ */
+int kfilnd_tn_init(void)
+{
+       tn_cache = kmem_cache_create("kfilnd_tn",
+                                    sizeof(struct kfilnd_transaction), 0,
+                                    SLAB_HWCACHE_ALIGN, NULL);
+       if (!tn_cache)
+               goto err;
+
+       imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
+                                         KFILND_IMMEDIATE_MSG_SIZE, 0,
+                                         SLAB_HWCACHE_ALIGN, NULL);
+       if (!imm_buf_cache)
+               goto err_tn_cache_destroy;
+
+       return 0;
+
+err_tn_cache_destroy:
+       kmem_cache_destroy(tn_cache);
+err:
+       return -ENOMEM;
+}
+
+/**
+ * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
+ * @tn: Transaction to have buffer set.
+ * @kiov: LNet KIOV buffer.
+ * @num_iov: Number of IOVs.
+ * @offset: Offset into IOVs where the buffer starts.
+ * @len: Length of the buffer.
+ *
+ * This function takes the user provided IOV, offset, and len, and sets the
+ * transaction buffer. The user provided IOV is an LNet KIOV. When the
+ * transaction buffer is configured, the user provided offset is applied
+ * when the transaction buffer is configured (i.e. the transaction buffer
+ * offset is zero).
+ */
+int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
+                          struct bio_vec *kiov, size_t num_iov,
+                          size_t offset, size_t len)
+{
+       size_t i;
+       size_t cur_len = 0;
+       size_t cur_offset = offset;
+       size_t cur_iov = 0;
+       size_t tmp_len;
+       size_t tmp_offset;
+
+       for (i = 0; (i < num_iov) && (cur_len < len); i++) {
+               /* Skip KIOVs until a KIOV with a length less than the current
+                * offset is found.
+                */
+               if (kiov[i].bv_len <= cur_offset) {
+                       cur_offset -= kiov[i].bv_len;
+                       continue;
+               }
+
+               tmp_len = kiov[i].bv_len - cur_offset;
+               tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
+
+               if (tmp_len + cur_len > len)
+                       tmp_len = len - cur_len;
+
+               /* tn_kiov is an array of size LNET_MAX_IOV */
+               if (cur_iov >= LNET_MAX_IOV)
+                       return -EINVAL;
+
+               tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
+               tn->tn_kiov[cur_iov].bv_len = tmp_len;
+               tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
+
+               cur_iov++;
+               cur_len += tmp_len;
+               cur_offset = 0;
+       }
+
+       tn->tn_num_iovec = cur_iov;
+       tn->tn_nob = cur_len;
+
+       return 0;
+}
diff --git a/lnet/klnds/kfilnd/kfilnd_tn.h b/lnet/klnds/kfilnd/kfilnd_tn.h
new file mode 100644 (file)
index 0000000..237ac68
--- /dev/null
@@ -0,0 +1,50 @@
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but
+ * WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * General Public License version 2 for more details (a copy is included
+ * in the LICENSE file that accompanied this code).
+ *
+ * You should have received a copy of the GNU General Public License
+ * version 2 along with this program; If not, see
+ * http://www.gnu.org/licenses/gpl-2.0.html
+ *
+ * GPL HEADER END
+ */
+/*
+ * Copyright 2022 Hewlett Packard Enterprise Development LP
+ */
+/*
+ * This file is part of Lustre, http://www.lustre.org/
+ */
+/*
+ * kfilnd transaction and state machine processing.
+ */
+#ifndef _KFILND_TN_
+#define _KFILND_TN_
+
+#include "kfilnd.h"
+
+void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
+                               struct kfilnd_msg *rx_msg, int msg_size);
+void kfilnd_tn_free(struct kfilnd_transaction *tn);
+struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
+                                          lnet_nid_t target_nid,
+                                          bool alloc_msg, bool is_initiator,
+                                          bool key);
+void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
+                            enum tn_events event, int status);
+void kfilnd_tn_cleanup(void);
+int kfilnd_tn_init(void);
+int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn, struct bio_vec *kiov,
+                          size_t num_iov, size_t offset, size_t nob);
+
+#endif /* _KFILND_TN_ */
index c5ac18a..ad11da2 100644 (file)
@@ -59,6 +59,7 @@
 #define RDMA_PS_TCP 0x0106
 #endif
 
+#define cxi_nic_addr_path "/sys/class/cxi/cxi%u/device/properties/"
 const char *gmsg_stat_names[] = {"sent_stats", "received_stats",
                                 "dropped_stats"};
 
@@ -1151,6 +1152,40 @@ static int lustre_lnet_queryip(struct lnet_dlc_intf_descr *intf, __u32 *ip)
        return LUSTRE_CFG_RC_NO_ERR;
 }
 
+static int lustre_lnet_kfi_intf2nid(struct lnet_dlc_intf_descr *intf,
+                                   __u32 *nid_addr)
+{
+       unsigned int nic_index;
+       int rc;
+       char *nic_addr_path;
+       char val[128];
+       int size;
+       long int addr;
+
+       rc = sscanf(intf->intf_name, "cxi%u", &nic_index);
+       if (rc != 1)
+               return LUSTRE_CFG_RC_NO_MATCH;
+
+       size = snprintf(NULL, 0, cxi_nic_addr_path, nic_index) + 1;
+       nic_addr_path = malloc(size);
+       if (!nic_addr_path)
+               return LUSTRE_CFG_RC_OUT_OF_MEM;
+       sprintf(nic_addr_path, cxi_nic_addr_path, nic_index);
+
+       rc = read_sysfs_file(nic_addr_path, "nic_addr", val, 1, sizeof(val));
+       free(nic_addr_path);
+       if (rc)
+               return LUSTRE_CFG_RC_NO_MATCH;
+
+       addr = strtol(val, NULL, 16);
+       if (addr == LONG_MIN || addr == LONG_MAX)
+               return LUSTRE_CFG_RC_NO_MATCH;
+
+       *nid_addr = addr;
+
+       return LUSTRE_CFG_RC_NO_ERR;
+}
+
 /*
  * for each interface in the array of interfaces find the IP address of
  * that interface, create its nid and add it to an array of NIDs.
@@ -1164,6 +1199,7 @@ static int lustre_lnet_intf2nids(struct lnet_dlc_network_descr *nw,
        struct lnet_dlc_intf_descr *intf;
        char val[LNET_MAX_STR_LEN];
        __u32 ip;
+       __u32 nic_addr;
        int gni_num;
        char *endp;
        unsigned int num;
@@ -1206,6 +1242,21 @@ static int lustre_lnet_intf2nids(struct lnet_dlc_network_descr *nw,
                (*nids)[i] = LNET_MKNID(nw->nw_id, gni_num);
 
                goto out;
+       } else if (LNET_NETTYP(nw->nw_id) == KFILND) {
+               list_for_each_entry(intf, &nw->nw_intflist, intf_on_network) {
+                       rc = lustre_lnet_kfi_intf2nid(intf, &nic_addr);
+                       if (rc != LUSTRE_CFG_RC_NO_ERR) {
+                               snprintf(err_str, str_len,
+                                       "\"couldn't query kfi intf %s\"",
+                                       intf->intf_name);
+                               err_str[str_len - 1] = '\0';
+                               goto failed;
+                       }
+
+                       (*nids)[i] = LNET_MKNID(nw->nw_id, nic_addr);
+                       i++;
+               }
+               goto out;
        }
 
        /* look at the other interfaces */
index 878a184..644a81c 100644 (file)
@@ -85,6 +85,25 @@ lustre_socklnd_show_tun(struct cYAML *lndparams,
        return LUSTRE_CFG_RC_NO_ERR;
 }
 
+static int
+lustre_kfilnd_show_tun(struct cYAML *lndparams,
+                       struct lnet_ioctl_config_kfilnd_tunables *lnd_cfg)
+{
+       if (cYAML_create_number(lndparams, "prov_major_version",
+                               lnd_cfg->lnd_prov_major_version) == NULL)
+               return LUSTRE_CFG_RC_OUT_OF_MEM;
+
+       if (cYAML_create_number(lndparams, "prov_minor_version",
+                               lnd_cfg->lnd_prov_minor_version) == NULL)
+               return LUSTRE_CFG_RC_OUT_OF_MEM;
+
+       if (cYAML_create_number(lndparams, "auth_key",
+                               lnd_cfg->lnd_auth_key) == NULL)
+               return LUSTRE_CFG_RC_OUT_OF_MEM;
+
+       return LUSTRE_CFG_RC_NO_ERR;
+}
+
 int
 lustre_net_show_tunables(struct cYAML *tunables,
                         struct lnet_ioctl_config_lnd_cmn_tunables *cmn)
@@ -129,6 +148,9 @@ lustre_ni_show_tunables(struct cYAML *lnd_tunables,
        else if (net_type == SOCKLND)
                rc = lustre_socklnd_show_tun(lnd_tunables,
                                             &lnd->lnd_tun_u.lnd_sock);
+       else if (net_type == KFILND)
+               rc = lustre_kfilnd_show_tun(lnd_tunables,
+                                           &lnd->lnd_tun_u.lnd_kfi);
 
        return rc;
 }
@@ -176,6 +198,33 @@ yaml_extract_o2ib_tun(struct cYAML *tree,
 
 }
 
+static void
+yaml_extract_kfi_tun(struct cYAML *tree,
+                     struct lnet_ioctl_config_kfilnd_tunables *lnd_cfg)
+{
+       struct cYAML *prov_major_version = NULL;
+       struct cYAML *prov_minor_version = NULL;
+       struct cYAML *auth_key = NULL;
+       struct cYAML *lndparams = NULL;
+
+       lndparams = cYAML_get_object_item(tree, "lnd tunables");
+       if (!lndparams)
+               return;
+
+       prov_major_version =
+               cYAML_get_object_item(lndparams, "prov_major_version");
+       lnd_cfg->lnd_prov_major_version =
+               (prov_major_version) ? prov_major_version->cy_valueint : 0;
+
+       prov_minor_version =
+               cYAML_get_object_item(lndparams, "prov_minor_version");
+       lnd_cfg->lnd_prov_minor_version =
+               (prov_minor_version) ? prov_minor_version->cy_valueint : 0;
+
+       auth_key = cYAML_get_object_item(lndparams, "auth_key");
+       lnd_cfg->lnd_auth_key =
+               (auth_key) ? auth_key->cy_valueint : 0;
+}
 
 static void
 yaml_extract_sock_tun(struct cYAML *tree,
@@ -203,6 +252,8 @@ lustre_yaml_extract_lnd_tunables(struct cYAML *tree,
        else if (net_type == SOCKLND)
                yaml_extract_sock_tun(tree,
                                      &tun->lnd_tun_u.lnd_sock);
-
+       else if (net_type == KFILND)
+               yaml_extract_kfi_tun(tree,
+                                    &tun->lnd_tun_u.lnd_kfi);
 }
 
index 0024b16..a0f9de7 100644 (file)
@@ -165,7 +165,8 @@ command_t net_cmds[] = {
         "\t--credits: Network Interface credits\n"
         "\t--cpt: CPU Partitions configured net uses (e.g. [0,1]\n"
         "\t--conns-per-peer: number of connections per peer\n"
-        "\t--skip-mr-route-setup: do not add linux route for the ni\n"},
+        "\t--skip-mr-route-setup: do not add linux route for the ni\n"
+        "\t--auth-key: Network authorization key (kfilnd only)\n"},
        {"del", jt_del_ni, 0, "delete a network\n"
         "\t--net: net name (e.g. tcp0)\n"
         "\t--if: physical interface (e.g. eth0)\n"},
@@ -1056,7 +1057,7 @@ static int jt_add_route(int argc, char **argv)
 static int jt_add_ni(int argc, char **argv)
 {
        char *ip2net = NULL;
-       long int pto = -1, pc = -1, pbc = -1, cre = -1, cpp = -1;
+       long int pto = -1, pc = -1, pbc = -1, cre = -1, cpp = -1, auth_key = -1;
        struct cYAML *err_rc = NULL;
        int rc, opt, cpt_rc = -1;
        struct lnet_dlc_network_descr nw_descr;
@@ -1068,8 +1069,9 @@ static int jt_add_ni(int argc, char **argv)
        memset(&tunables, 0, sizeof(tunables));
        lustre_lnet_init_nw_descr(&nw_descr);
 
-       const char *const short_options = "b:c:i:k:m:n:p:r:s:t:";
+       const char *const short_options = "a:b:c:i:k:m:n:p:r:s:t:";
        static const struct option long_options[] = {
+       { .name = "auth-key",     .has_arg = required_argument, .val = 'a' },
        { .name = "peer-buffer-credits",
                                  .has_arg = required_argument, .val = 'b' },
        { .name = "peer-credits", .has_arg = required_argument, .val = 'c' },
@@ -1092,6 +1094,14 @@ static int jt_add_ni(int argc, char **argv)
        while ((opt = getopt_long(argc, argv, short_options,
                                   long_options, NULL)) != -1) {
                switch (opt) {
+               case 'a':
+                       rc = parse_long(optarg, &auth_key);
+                       if (rc != 0) {
+                               /* ignore option */
+                               auth_key = -1;
+                               continue;
+                       }
+                       break;
                case 'b':
                        rc = parse_long(optarg, &pbc);
                        if (rc != 0) {
@@ -1163,6 +1173,11 @@ static int jt_add_ni(int argc, char **argv)
                }
        }
 
+       if (auth_key > 0 && LNET_NETTYP(nw_descr.nw_id) == KFILND) {
+               tunables.lt_tun.lnd_tun_u.lnd_kfi.lnd_auth_key = auth_key;
+               found = true;
+       }
+
        if (pto > 0 || pc > 0 || pbc > 0 || cre > 0 || cpp > -1) {
                tunables.lt_cmn.lct_peer_timeout = pto;
                tunables.lt_cmn.lct_peer_tx_credits = pc;
index 7bc291b..76a6034 100644 (file)
@@ -295,7 +295,8 @@ static struct convert_struct converter[] = {
        [SOCKLND] = { .name = "SOCKLND", .nid2name = ipv4_nid2hostname },
        [O2IBLND] = { .name = "O2IBLND", .nid2name = ipv4_nid2hostname },
        [LOLND]   = { .name = "LOLND",   .nid2name = lolnd_nid2hostname },
-       [PTL4LND] = { .name = "PTL4LND", .nid2name = external_nid2hostname }
+       [PTL4LND] = { .name = "PTL4LND", .nid2name = external_nid2hostname },
+       [KFILND]  = { .name = "KFILND",  .nid2name = ipv4_nid2hostname }
 };
 
 #define LND_MAX         (sizeof(converter) / sizeof(converter[0]))