From 2612cf4ad8a892768b26abe6ce60bb240edf3bdf Mon Sep 17 00:00:00 2001 From: Doug Oucharek Date: Tue, 16 Oct 2018 15:51:21 -0700 Subject: [PATCH] LU-16035 kfilnd: Initial kfilnd implementation Initial implementation of the kfabric Lustre Network Driver. Test-Parameters: trivial HPE-bug-id: LUS-6565 Signed-off-by: Doug Oucharek Signed-off-by: Ian Ziemba Signed-off-by: Chris Horn Change-Id: I48a070ca0ba37e4923cd6dcb3327676ae6ddaae1 Reviewed-on: https://review.whamcloud.com/48009 Tested-by: jenkins Tested-by: Maloo Reviewed-by: Ron Gredvig Reviewed-by: Oleg Drokin --- lnet/autoconf/lustre-lnet.m4 | 42 + lnet/include/uapi/linux/lnet/lnet-dlc.h | 8 + lnet/include/uapi/linux/lnet/lnet-idl.h | 1 + lnet/klnds/Makefile.in | 1 + lnet/klnds/autoMakefile.am | 2 +- lnet/klnds/kfilnd/Makefile.in | 16 + lnet/klnds/kfilnd/autoMakefile.am | 10 + lnet/klnds/kfilnd/kfilnd.c | 443 ++++++++ lnet/klnds/kfilnd/kfilnd.h | 692 +++++++++++++ lnet/klnds/kfilnd/kfilnd_cq.c | 253 +++++ lnet/klnds/kfilnd/kfilnd_cq.h | 42 + lnet/klnds/kfilnd/kfilnd_debugfs.c | 204 ++++ lnet/klnds/kfilnd/kfilnd_dev.c | 335 ++++++ lnet/klnds/kfilnd/kfilnd_dev.h | 46 + lnet/klnds/kfilnd/kfilnd_dom.c | 450 ++++++++ lnet/klnds/kfilnd/kfilnd_dom.h | 40 + lnet/klnds/kfilnd/kfilnd_ep.c | 954 +++++++++++++++++ lnet/klnds/kfilnd/kfilnd_ep.h | 70 ++ lnet/klnds/kfilnd/kfilnd_modparams.c | 183 ++++ lnet/klnds/kfilnd/kfilnd_peer.c | 274 +++++ lnet/klnds/kfilnd/kfilnd_peer.h | 49 + lnet/klnds/kfilnd/kfilnd_tn.c | 1605 +++++++++++++++++++++++++++++ lnet/klnds/kfilnd/kfilnd_tn.h | 50 + lnet/utils/lnetconfig/liblnetconfig.c | 51 + lnet/utils/lnetconfig/liblnetconfig_lnd.c | 53 +- lnet/utils/lnetctl.c | 21 +- lustre/utils/gss/lsupport.c | 3 +- 27 files changed, 5892 insertions(+), 6 deletions(-) create mode 100644 lnet/klnds/kfilnd/Makefile.in create mode 100644 lnet/klnds/kfilnd/autoMakefile.am create mode 100644 lnet/klnds/kfilnd/kfilnd.c create mode 100644 lnet/klnds/kfilnd/kfilnd.h create mode 100644 lnet/klnds/kfilnd/kfilnd_cq.c create mode 100644 lnet/klnds/kfilnd/kfilnd_cq.h create mode 100644 lnet/klnds/kfilnd/kfilnd_debugfs.c create mode 100644 lnet/klnds/kfilnd/kfilnd_dev.c create mode 100644 lnet/klnds/kfilnd/kfilnd_dev.h create mode 100644 lnet/klnds/kfilnd/kfilnd_dom.c create mode 100644 lnet/klnds/kfilnd/kfilnd_dom.h create mode 100644 lnet/klnds/kfilnd/kfilnd_ep.c create mode 100644 lnet/klnds/kfilnd/kfilnd_ep.h create mode 100644 lnet/klnds/kfilnd/kfilnd_modparams.c create mode 100644 lnet/klnds/kfilnd/kfilnd_peer.c create mode 100644 lnet/klnds/kfilnd/kfilnd_peer.h create mode 100644 lnet/klnds/kfilnd/kfilnd_tn.c create mode 100644 lnet/klnds/kfilnd/kfilnd_tn.h diff --git a/lnet/autoconf/lustre-lnet.m4 b/lnet/autoconf/lustre-lnet.m4 index 90b7700..f76bf5a 100644 --- a/lnet/autoconf/lustre-lnet.m4 +++ b/lnet/autoconf/lustre-lnet.m4 @@ -651,6 +651,44 @@ AC_SUBST(GNICPPFLAGS) AC_SUBST(GNILND) ]) # LN_CONFIG_GNILND +# +# LN_CONFIG_KFILND +# +# check whether to use the kfabric Network Interface lnd +# +AC_DEFUN([LN_CONFIG_KFILND], [ +AC_ARG_WITH([kfi], + AC_HELP_STRING([--with-kfi=], [Kfabric build path for kfilnd]), + [ + AC_CHECK_FILE([$with_kfi/Module.symvers], + [ + # KFICPPFLAGS was set in spec file + KFICPPFLAGS="-I$with_kfi/include" + EXTRA_KCFLAGS_save="$EXTRA_KCFLAGS" + EXTRA_KCFLAGS="$EXTRA_KCFLAGS $KFICPPFLAGS" + KBUILD_EXTRA_SYMBOLS="$KBUILD_EXTRA_SYMBOLS $with_kfi/Module.symvers" + LB_CHECK_COMPILE([if kfabric headers are present], KFI_header, + [ + #include + ],[ + struct kfi_info *hints; + hints = kfi_allocinfo(); + ],[ + KFILND="kfilnd" + AC_MSG_NOTICE([adding $with_kfi/Module.symvers to Symbol Path]) + EXTRA_SYMBOLS="$EXTRA_SYMBOLS $with_kfi/Module.symvers" + ],[ + AC_MSG_ERROR([can't compile kfilnd with given KFICPPFLAGS: $KFICPPFLAGS]) + ]) + ],[ + AC_MSG_ERROR(["$with_kfi/Module.symvers does not exist"]) + ]) + ],[]) +AC_SUBST(KFICPPFLAGS) +AC_SUBST(KFILND) +AC_SUBST(EXTRA_SYMBOLS) +]) # LN_CONFIG_KFILND + # LN_CONFIG_STRSCPY_EXISTS # # If strscpy exists, prefer it over strlcpy @@ -897,6 +935,7 @@ LN_CONFIG_BACKOFF LN_CONFIG_O2IB LN_CONFIG_GNILND LN_CONFIG_STRSCPY_EXISTS +LN_CONFIG_KFILND # 3.10 LN_EXPORT_KMAP_TO_PAGE # 3.15 @@ -1038,6 +1077,7 @@ LN_USR_NLMSGERR AC_DEFUN([LN_CONDITIONALS], [ AM_CONDITIONAL(BUILD_O2IBLND, test x$O2IBLND = "xo2iblnd") AM_CONDITIONAL(BUILD_GNILND, test x$GNILND = "xgnilnd") +AM_CONDITIONAL(BUILD_KFILND, test x$KFILND = "xkfilnd") ]) # LN_CONDITIONALS # @@ -1062,6 +1102,8 @@ lnet/klnds/gnilnd/Makefile lnet/klnds/gnilnd/autoMakefile lnet/klnds/socklnd/Makefile lnet/klnds/socklnd/autoMakefile +lnet/klnds/kfilnd/Makefile +lnet/klnds/kfilnd/autoMakefile lnet/lnet/Makefile lnet/lnet/autoMakefile lnet/selftest/Makefile diff --git a/lnet/include/uapi/linux/lnet/lnet-dlc.h b/lnet/include/uapi/linux/lnet/lnet-dlc.h index 2b2c05f..9b6cbe3 100644 --- a/lnet/include/uapi/linux/lnet/lnet-dlc.h +++ b/lnet/include/uapi/linux/lnet/lnet-dlc.h @@ -81,6 +81,13 @@ struct lnet_ioctl_config_o2iblnd_tunables { __u16 lnd_ntx; }; +struct lnet_ioctl_config_kfilnd_tunables { + __u32 lnd_version; + __u32 lnd_prov_major_version; + __u32 lnd_prov_minor_version; + __u32 lnd_auth_key; +}; + struct lnet_ioctl_config_socklnd_tunables { __u32 lnd_version; __u16 lnd_conns_per_peer; @@ -91,6 +98,7 @@ struct lnet_lnd_tunables { union { struct lnet_ioctl_config_o2iblnd_tunables lnd_o2ib; struct lnet_ioctl_config_socklnd_tunables lnd_sock; + struct lnet_ioctl_config_kfilnd_tunables lnd_kfi; } lnd_tun_u; }; diff --git a/lnet/include/uapi/linux/lnet/lnet-idl.h b/lnet/include/uapi/linux/lnet/lnet-idl.h index 7ed58a3..d10b588 100644 --- a/lnet/include/uapi/linux/lnet/lnet-idl.h +++ b/lnet/include/uapi/linux/lnet/lnet-idl.h @@ -203,6 +203,7 @@ struct lnet_magicversion { #define LNET_PROTO_IB_MAGIC 0x0be91b91 #define LNET_PROTO_GNI_MAGIC 0xb00fbabe /* ask Kim */ #define LNET_PROTO_TCP_MAGIC 0xeebc0ded +#define LNET_PROTO_KFI_MAGIC 0xdeadbeef #define LNET_PROTO_ACCEPTOR_MAGIC 0xacce7100 #define LNET_PROTO_PING_MAGIC 0x70696E67 /* 'ping' */ diff --git a/lnet/klnds/Makefile.in b/lnet/klnds/Makefile.in index d968d82..b2f23ae 100644 --- a/lnet/klnds/Makefile.in +++ b/lnet/klnds/Makefile.in @@ -1,5 +1,6 @@ @BUILD_GNILND_TRUE@obj-m += gnilnd/ @BUILD_O2IBLND_TRUE@obj-m += o2iblnd/ +@BUILD_KFILND_TRUE@obj-m += kfilnd/ obj-m += socklnd/ @INCLUDE_RULES@ diff --git a/lnet/klnds/autoMakefile.am b/lnet/klnds/autoMakefile.am index eca1c05..0ef56f0 100644 --- a/lnet/klnds/autoMakefile.am +++ b/lnet/klnds/autoMakefile.am @@ -29,4 +29,4 @@ # This file is part of Lustre, http://www.lustre.org/ # -SUBDIRS = socklnd gnilnd o2iblnd +SUBDIRS = socklnd gnilnd o2iblnd kfilnd diff --git a/lnet/klnds/kfilnd/Makefile.in b/lnet/klnds/kfilnd/Makefile.in new file mode 100644 index 0000000..045f292 --- /dev/null +++ b/lnet/klnds/kfilnd/Makefile.in @@ -0,0 +1,16 @@ +MODULES := kkfilnd + +kkfilnd-objs := \ + kfilnd.o \ + kfilnd_modparams.o \ + kfilnd_tn.o \ + kfilnd_ep.o \ + kfilnd_dev.o \ + kfilnd_dom.o \ + kfilnd_peer.o \ + kfilnd_cq.o \ + kfilnd_debugfs.o \ + +EXTRA_POST_CFLAGS += @KFICPPFLAGS@ + +@INCLUDE_RULES@ diff --git a/lnet/klnds/kfilnd/autoMakefile.am b/lnet/klnds/kfilnd/autoMakefile.am new file mode 100644 index 0000000..3bf6bae --- /dev/null +++ b/lnet/klnds/kfilnd/autoMakefile.am @@ -0,0 +1,10 @@ +if MODULES +if BUILD_KFILND +modulenet_DATA := kkfilnd$(KMODEXT) +endif # BUILD_KFILND +endif # MODULES + +EXTRA_DIST := $(kkfilnd-objs:%.o=%.c) kfilnd.h kfilnd_dev.h kfilnd_dom.h \ + kfilnd_ep.h kfilnd_peer.h kfilnd_tn.h kfilnd_cq.h + +MOSTLYCLEANFILES = @MOSTLYCLEANFILES@ diff --git a/lnet/klnds/kfilnd/kfilnd.c b/lnet/klnds/kfilnd/kfilnd.c new file mode 100644 index 0000000..e8547a1 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd.c @@ -0,0 +1,443 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd main interface. + */ + +#include +#include "kfilnd.h" +#include "kfilnd_tn.h" +#include "kfilnd_dev.h" + +/* These are temp constants to get stuff to compile */ +#define KFILND_DEFAULT_DEVICE "eth0" + +/* Some constants which should be turned into tunables */ +#define KFILND_MAX_WORKER_THREADS 4 +#define KFILND_MAX_EVENT_QUEUE 100 + +#define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS) +struct workqueue_struct *kfilnd_wq; +struct dentry *kfilnd_debug_dir; + +static void kfilnd_shutdown(struct lnet_ni *ni) +{ + struct kfilnd_dev *dev = ni->ni_data; + + kfilnd_dev_free(dev); +} + +static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid) +{ + int cpt; + + /* If the current CPT has is within the LNet NI CPTs, use that CPT. */ + cpt = lnet_cpt_current(); + if (dev->cpt_to_endpoint[cpt]) + return cpt; + + /* Hash to a LNet NI CPT based on target NID. */ + return dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt; +} + +int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt, lnet_nid_t nid) +{ + struct kfilnd_transaction *tn; + int rc; + + tn = kfilnd_tn_alloc(dev, cpt, nid, true, true, false); + if (IS_ERR(tn)) { + rc = PTR_ERR(tn); + CERROR("Failed to allocate transaction struct: rc=%d\n", rc); + return rc; + } + + kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0); + + return 0; +} + +static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg) +{ + int type = msg->msg_type; + struct lnet_processid *target = &msg->msg_target; + struct kfilnd_transaction *tn; + int nob; + struct kfilnd_dev *dev = ni->ni_data; + enum kfilnd_msg_type lnd_msg_type; + int cpt; + enum tn_events event = TN_EVENT_INVALID; + int rc; + bool tn_key = false; + lnet_nid_t tgt_nid4; + + switch (type) { + default: + return -EIO; + + case LNET_MSG_ACK: + if (msg->msg_len != 0) + return -EINVAL; + lnd_msg_type = KFILND_MSG_IMMEDIATE; + break; + + case LNET_MSG_GET: + nob = offsetof(struct kfilnd_msg, + proto.immed.payload[msg->msg_md->md_length]); + if (nob <= KFILND_IMMEDIATE_MSG_SIZE) { + lnd_msg_type = KFILND_MSG_IMMEDIATE; + break; + } + + lnd_msg_type = KFILND_MSG_BULK_GET_REQ; + tn_key = true; + break; + + case LNET_MSG_REPLY: + case LNET_MSG_PUT: + nob = offsetof(struct kfilnd_msg, + proto.immed.payload[msg->msg_len]); + if (nob <= KFILND_IMMEDIATE_MSG_SIZE) { + lnd_msg_type = KFILND_MSG_IMMEDIATE; + break; + } + + lnd_msg_type = KFILND_MSG_BULK_PUT_REQ; + tn_key = true; + break; + } + + tgt_nid4 = lnet_nid_to_nid4(&target->nid); + + cpt = kfilnd_send_cpt(dev, tgt_nid4); + tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key); + if (IS_ERR(tn)) { + rc = PTR_ERR(tn); + CERROR("Failed to allocate transaction struct: rc=%d\n", rc); + return rc; + } + + /* Need to fire off special transaction if this is a new peer. */ + if (kfilnd_peer_is_new_peer(tn->peer)) { + rc = kfilnd_send_hello_request(dev, cpt, tgt_nid4); + if (rc) { + kfilnd_tn_free(tn); + return 0; + } + } + + switch (lnd_msg_type) { + case KFILND_MSG_IMMEDIATE: + rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov, + msg->msg_offset, msg->msg_len); + if (rc) { + CERROR("Failed to setup immediate buffer rc %d\n", rc); + kfilnd_tn_free(tn); + return rc; + } + + event = TN_EVENT_INIT_IMMEDIATE; + break; + + case KFILND_MSG_BULK_PUT_REQ: + tn->sink_buffer = false; + rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov, + msg->msg_offset, msg->msg_len); + if (rc) { + CERROR("Failed to setup PUT source buffer rc %d\n", rc); + kfilnd_tn_free(tn); + return rc; + } + + event = TN_EVENT_INIT_BULK; + break; + + case KFILND_MSG_BULK_GET_REQ: + /* We need to create a reply message to inform LNet our + * optimized GET is done. + */ + tn->tn_getreply = lnet_create_reply_msg(ni, msg); + if (!tn->tn_getreply) { + CERROR("Can't create reply for GET -> %s\n", + libcfs_nidstr(&target->nid)); + kfilnd_tn_free(tn); + return -EIO; + } + + tn->sink_buffer = true; + rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov, + msg->msg_md->md_niov, + msg->msg_md->md_offset, + msg->msg_md->md_length); + if (rc) { + CERROR("Failed to setup GET sink buffer rc %d\n", rc); + kfilnd_tn_free(tn); + return rc; + } + event = TN_EVENT_INIT_BULK; + break; + + default: + kfilnd_tn_free(tn); + return -EIO; + } + + tn->msg_type = lnd_msg_type; + tn->tn_lntmsg = msg; /* finalise msg on completion */ + tn->lnet_msg_len = tn->tn_nob; + + KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags", + msg_type_to_str(lnd_msg_type), tn->tn_nob, + tn->tn_num_iovec); + + /* Start the state machine processing this transaction */ + kfilnd_tn_event_handler(tn, event, 0); + + return 0; +} + +static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg, + int delayed, unsigned int niov, + struct bio_vec *kiov, + unsigned int offset, unsigned int mlen, + unsigned int rlen) +{ + struct kfilnd_transaction *tn = private; + struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg; + int nob; + int rc = 0; + int status = 0; + enum tn_events event; + + if (mlen > rlen) + return -EINVAL; + + /* Transaction must be in receive state */ + if (tn->tn_state != TN_STATE_IMM_RECV) + return -EINVAL; + + tn->tn_lntmsg = msg; + tn->lnet_msg_len = rlen; + + switch (rxmsg->type) { + case KFILND_MSG_IMMEDIATE: + nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]); + if (nob > tn->tn_rx_msg.length) { + CERROR("Immediate message from %s too big: %d(%lu)\n", + libcfs_nidstr(&msg->msg_hdr.src_nid), + nob, tn->tn_rx_msg.length); + return -EPROTO; + } + tn->tn_nob = nob; + + lnet_copy_flat2kiov(niov, kiov, offset, + KFILND_IMMEDIATE_MSG_SIZE, rxmsg, + offsetof(struct kfilnd_msg, + proto.immed.payload), + mlen); + + kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0); + return 0; + + case KFILND_MSG_BULK_PUT_REQ: + if (mlen == 0) { + event = TN_EVENT_SKIP_TAG_RMA; + } else { + /* Post the buffer given us as a sink */ + tn->sink_buffer = true; + rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset, + mlen); + if (rc) { + CERROR("Failed to setup PUT sink buffer rc %d\n", rc); + kfilnd_tn_free(tn); + return rc; + } + event = TN_EVENT_INIT_TAG_RMA; + } + break; + + case KFILND_MSG_BULK_GET_REQ: + if (!msg) { + event = TN_EVENT_SKIP_TAG_RMA; + status = -ENODATA; + } else { + /* Post the buffer given to us as a source */ + tn->sink_buffer = false; + rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, + msg->msg_niov, + msg->msg_offset, + msg->msg_len); + if (rc) { + CERROR("Failed to setup GET source buffer rc %d\n", rc); + kfilnd_tn_free(tn); + return rc; + } + event = TN_EVENT_INIT_TAG_RMA; + } + break; + + default: + /* TODO: TN leaks here. */ + CERROR("Invalid message type = %d\n", rxmsg->type); + return -EINVAL; + } + + /* Store relevant fields to generate a bulk response. */ + tn->tn_response_mr_key = rxmsg->proto.bulk_req.key; + tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx; + +#if 0 + tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg, + KFILND_MSG_BULK_RSP, + sizeof(struct kfilnd_bulk_rsp), + ni); +#endif + + KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags", + msg_type_to_str(rxmsg->type), tn->tn_nob, + tn->tn_num_iovec); + + kfilnd_tn_event_handler(tn, event, status); + + return rc; +} + +static int kfilnd_startup(struct lnet_ni *ni); + +static const struct lnet_lnd the_kfilnd = { + .lnd_type = KFILND, + .lnd_startup = kfilnd_startup, + .lnd_shutdown = kfilnd_shutdown, + .lnd_send = kfilnd_send, + .lnd_recv = kfilnd_recv, +}; + +static int kfilnd_startup(struct lnet_ni *ni) +{ + const char *node; + int rc; + struct kfilnd_dev *kfdev; + + if (!ni) + return -EINVAL; + + if (ni->ni_net->net_lnd != &the_kfilnd) { + CERROR("Wrong lnd type\n"); + return -EINVAL; + } + + kfilnd_tunables_setup(ni); + + /* Only a single interface is supported. */ + if (!ni->ni_interface) { + rc = -ENODEV; + CERROR("No LNet network interface address defined\n"); + goto err; + } + + node = ni->ni_interface; + + kfdev = kfilnd_dev_alloc(ni, node); + if (IS_ERR(kfdev)) { + rc = PTR_ERR(kfdev); + CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node, + rc); + goto err; + } + + ni->ni_data = kfdev; + ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr)); + + /* Post a series of immediate receive buffers */ + rc = kfilnd_dev_post_imm_buffers(kfdev); + if (rc) { + CERROR("Can't post buffers, rc = %d\n", rc); + goto err_free_dev; + } + + return 0; + +err_free_dev: + kfilnd_dev_free(kfdev); +err: + return rc; +} + +static void __exit kfilnd_exit(void) +{ + destroy_workqueue(kfilnd_wq); + + kfilnd_tn_cleanup(); + + lnet_unregister_lnd(&the_kfilnd); + + debugfs_remove_recursive(kfilnd_debug_dir); +} + +static int __init kfilnd_init(void) +{ + int rc; + + kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL); + + rc = kfilnd_tunables_init(); + if (rc) + goto err; + + /* Do any initialization of the transaction system */ + rc = kfilnd_tn_init(); + if (rc) { + CERROR("Cannot initialize transaction system\n"); + goto err; + } + + kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS, + WQ_MAX_ACTIVE); + if (!kfilnd_wq) { + rc = -ENOMEM; + CERROR("Failed to allocated kfilnd work queue\n"); + goto err_tn_cleanup; + } + + lnet_register_lnd(&the_kfilnd); + + return 0; + +err_tn_cleanup: + kfilnd_tn_cleanup(); +err: + return rc; +} + +MODULE_AUTHOR("Cray Inc."); +MODULE_DESCRIPTION("Kfabric Lustre Network Driver"); +MODULE_VERSION(KFILND_VERSION); +MODULE_LICENSE("GPL"); + +module_init(kfilnd_init); +module_exit(kfilnd_exit); diff --git a/lnet/klnds/kfilnd/kfilnd.h b/lnet/klnds/kfilnd/kfilnd.h new file mode 100644 index 0000000..a1a47a4 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd.h @@ -0,0 +1,692 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd main interface. + */ + +#ifndef _KFILND_ +#define _KFILND_ + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#define KFILND_VERSION "0.2.0" + +#define DEBUG_SUBSYSTEM S_LND + +#include +#include +#include "kfi_endpoint.h" +#include "kfi_errno.h" +#include "kfi_rma.h" +#include "kfi_tagged.h" +#include "kfi_cxi_ext.h" + +/* KFILND CFS fail range 0xF100 - 0xF1FF. */ + +#define CFS_KFI_FAIL_SEND_EVENT 0xF100 +#define CFS_KFI_FAIL_READ_EVENT 0xF101 +#define CFS_KFI_FAIL_WRITE_EVENT 0xF102 +#define CFS_KFI_FAIL_TAGGED_SEND_EVENT 0xF103 +#define CFS_KFI_FAIL_TAGGED_RECV_EVENT 0xF104 +#define CFS_KFI_FAIL_BULK_TIMEOUT 0xF105 +#define CFS_KFI_FAIL_SEND 0xF106 +#define CFS_KFI_FAIL_READ 0xF107 +#define CFS_KFI_FAIL_WRITE 0xF108 +#define CFS_KFI_FAIL_TAGGED_SEND 0xF109 +#define CFS_KFI_FAIL_TAGGED_RECV 0xF10A +#define CFS_KFI_FAIL_SEND_EAGAIN 0xF10B +#define CFS_KFI_FAIL_READ_EAGAIN 0xF10C +#define CFS_KFI_FAIL_WRITE_EAGAIN 0xF10D +#define CFS_KFI_FAIL_TAGGED_SEND_EAGAIN 0xF10E +#define CFS_KFI_FAIL_TAGGED_RECV_EAGAIN 0xF10F +#define CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN 0xF110 +#define CFS_KFI_FAIL_RECV_EAGAIN 0xF111 +#define CFS_KFI_FAIL_RECV 0xF112 +#define CFS_KFI_FAIL_MSG_UNPACK 0xF113 + +/* Maximum number of transaction keys supported. */ +#define KFILND_EP_KEY_BITS 16U +#define KFILND_EP_KEY_MAX (BIT(KFILND_EP_KEY_BITS) - 1) + +/* Some constants which should be turned into tunables */ +#define KFILND_IMMEDIATE_MSG_SIZE 4096 + +#define KFILND_MY_PROCID 49152 + +/* 256 Rx contexts max */ +#define KFILND_FAB_RX_CTX_BITS 8 + +/* Get the KFI base address from a KFI RX address. RX context information is + * stored in the MSBs of the KFI address. + */ +#define KFILND_BASE_ADDR(addr) \ + ((addr) & ((1UL << (64 - KFILND_FAB_RX_CTX_BITS)) - 1)) + +/* States used by all kfilnd structures */ +enum kfilnd_object_states { + KFILND_STATE_UNINITIALIZED, + KFILND_STATE_INITIALIZED, + KFILND_STATE_SHUTTING_DOWN +}; + +extern struct dentry *kfilnd_debug_dir; +extern const struct file_operations kfilnd_initiator_state_stats_file_ops; +extern const struct file_operations kfilnd_target_state_stats_file_ops; +extern const struct file_operations kfilnd_target_stats_file_ops; +extern const struct file_operations kfilnd_initiator_stats_file_ops; +extern const struct file_operations kfilnd_reset_stats_file_ops; + +extern struct workqueue_struct *kfilnd_wq; + +extern unsigned int cksum; +extern unsigned int tx_scale_factor; +extern unsigned int rx_cq_scale_factor; +extern unsigned int tx_cq_scale_factor; +extern unsigned int eq_size; +extern unsigned int immediate_rx_buf_count; + +int kfilnd_tunables_setup(struct lnet_ni *ni); +int kfilnd_tunables_init(void); + +struct kfilnd_transaction; +struct kfilnd_ep; +struct kfilnd_dev; + +/* Multi-receive buffers for immediate receives */ +struct kfilnd_immediate_buffer { + void *immed_buf; + size_t immed_buf_size; + struct page *immed_buf_page; + atomic_t immed_ref; + bool immed_no_repost; + struct list_head replay_entry; + struct kfilnd_ep *immed_end; +}; + +extern atomic_t kfilnd_rx_count; + +struct kfilnd_cq; + +struct kfilnd_cq_work { + struct kfilnd_cq *cq; + unsigned int work_cpu; + struct work_struct work; +}; + +struct kfilnd_cq { + struct kfilnd_ep *ep; + struct kfid_cq *cq; + unsigned int cq_work_count; + struct kfilnd_cq_work cq_works[]; +}; + +struct kfilnd_ep { + /* The contexts for this CPT */ + struct kfid_ep *end_tx; + struct kfid_ep *end_rx; + + /* Corresponding CQs */ + struct kfilnd_cq *end_tx_cq; + struct kfilnd_cq *end_rx_cq; + + /* Specific config values for this endpoint */ + struct kfilnd_dev *end_dev; + int end_cpt; + int end_context_id; + + /* List of transactions. */ + struct list_head tn_list; + spinlock_t tn_list_lock; + + /* Replay queues. */ + struct list_head tn_replay; + struct list_head imm_buffer_replay; + spinlock_t replay_lock; + struct timer_list replay_timer; + struct work_struct replay_work; + atomic_t replay_count; + + /* Key used to build the tag for tagged buffers. */ + struct ida keys; + + /* Pre-posted immediate buffers */ + struct kfilnd_immediate_buffer end_immed_bufs[]; +}; + +struct kfilnd_peer { + struct rhash_head node; + struct rcu_head rcu_head; + struct kfilnd_dev *dev; + lnet_nid_t nid; + kfi_addr_t addr; + atomic_t rx_base; + atomic_t remove_peer; + refcount_t cnt; + time64_t last_alive; + u16 version; + u32 local_session_key; + u32 remote_session_key; +}; + +static inline bool kfilnd_peer_is_new_peer(struct kfilnd_peer *peer) +{ + return peer->version == 0; +} + +static inline void kfilnd_peer_set_version(struct kfilnd_peer *peer, + u16 version) +{ + peer->version = version; +} + +static inline void kfilnd_peer_set_remote_session_key(struct kfilnd_peer *peer, + u32 session_key) +{ + peer->remote_session_key = session_key; +} + +struct kfilnd_fab { + struct list_head entry; + struct list_head dom_list; + struct mutex dom_list_lock; + struct kfid_fabric *fabric; + struct kref cnt; +}; + +struct kfilnd_dom { + struct list_head entry; + struct list_head dev_list; + spinlock_t lock; + struct kfilnd_fab *fab; + struct kfid_domain *domain; + struct kref cnt; +}; + +/* Transaction States */ +enum tn_states { + TN_STATE_INVALID, + + /* Shared initiator and target states. */ + TN_STATE_IDLE, + TN_STATE_WAIT_TAG_COMP, + + /* Initiator immediate states. */ + TN_STATE_IMM_SEND, + + /* Initiator bulk states. */ + TN_STATE_TAGGED_RECV_POSTED, + TN_STATE_SEND_FAILED, + TN_STATE_WAIT_COMP, + TN_STATE_WAIT_TIMEOUT_COMP, + TN_STATE_WAIT_SEND_COMP, + TN_STATE_WAIT_TIMEOUT_TAG_COMP, + TN_STATE_FAIL, + + /* Target states. */ + TN_STATE_IMM_RECV, + TN_STATE_WAIT_TAG_RMA_COMP, + + /* Invalid max value. */ + TN_STATE_MAX, +}; + +/* Base duration state stats. */ +struct kfilnd_tn_duration_stat { + atomic64_t accumulated_duration; + atomic_t accumulated_count; +}; + +/* Transaction state stats group into 22 buckets. Bucket zero corresponds to + * LNet message size of 0 bytes and buckets 1 through 21 correspond to LNet + * message sizes of 1 to 1048576 bytes increasing by a power of 2. LNet message + * sizes are round up to the nearest power of 2. + */ +#define KFILND_DATA_SIZE_BUCKETS 22U +#define KFILND_DATA_SIZE_MAX_SIZE (1U << (KFILND_DATA_SIZE_BUCKETS - 2)) +struct kfilnd_tn_data_size_duration_stats { + struct kfilnd_tn_duration_stat data_size[KFILND_DATA_SIZE_BUCKETS]; +}; + +static inline unsigned int kfilnd_msg_len_to_data_size_bucket(size_t size) +{ + u64 bit; + + if (size == 0) + return 0; + if (size >= KFILND_DATA_SIZE_MAX_SIZE) + return KFILND_DATA_SIZE_BUCKETS - 1; + + /* Round size up to the nearest power of 2. */ + bit = fls64(size); + if (BIT(bit) < size) + bit++; + + return (unsigned int)bit; +} + +/* One data size duraction state bucket for each transaction state. */ +struct kfilnd_tn_state_data_size_duration_stats { + struct kfilnd_tn_data_size_duration_stats state[TN_STATE_MAX]; +}; + +struct kfilnd_dev { + struct list_head kfd_list; /* chain on kfid_devs */ + struct lnet_ni *kfd_ni; + enum kfilnd_object_states kfd_state; + + /* KFI LND domain the device is associated with. */ + struct kfilnd_dom *dom; + + /* Fields specific to kfabric operation */ + spinlock_t kfd_lock; + struct kfid_ep *kfd_sep; + struct kfid_av *kfd_av; + struct kfilnd_ep **kfd_endpoints; + + /* Map of LNet NI CPTs to endpoints. */ + struct kfilnd_ep **cpt_to_endpoint; + + /* Hash of LNet NIDs to KFI addresses. */ + struct rhashtable peer_cache; + + /* Per LNet NI states. */ + struct kfilnd_tn_state_data_size_duration_stats initiator_state_stats; + struct kfilnd_tn_state_data_size_duration_stats target_state_stats; + struct kfilnd_tn_data_size_duration_stats initiator_stats; + struct kfilnd_tn_data_size_duration_stats target_stats; + + /* Per LNet NI debugfs stats. */ + struct dentry *dev_dir; + struct dentry *initiator_state_stats_file; + struct dentry *initiator_stats_file; + struct dentry *target_state_stats_file; + struct dentry *target_stats_file; + struct dentry *reset_stats_file; + + /* Physical NIC address. */ + unsigned int nic_addr; + atomic_t session_keys; +}; + +/* Invalid checksum value is treated as no checksum. */ +/* TODO: Module parameter to disable checksum? */ +#define NO_CHECKSUM 0x0 + +/* Hello message header. */ +struct kfilnd_hello_msg { + /* Support kfilnd version. */ + __u16 version; + + /* Base RX context peer should used. */ + __u16 rx_base; + + /* Session key used by peer. */ + __u32 session_key; + + /* RX context count peer can target. */ + __u16 rx_count; +} __packed; + +/* Immediate message header. */ +struct kfilnd_immed_msg { + /* Entire LNet header needed by the destination to match incoming + * message. + */ + struct lnet_hdr_nid4 hdr; + + /* Entire LNet message payload. */ + char payload[0]; +} __packed; + +/* Bulk request message header. */ +struct kfilnd_bulk_req_msg { + /* Entire LNet header needed by the destination to match incoming + * message. + */ + struct lnet_hdr_nid4 hdr; + + /* Specific RX context the target must target to push/pull LNet + * payload. + */ + __u32 response_rx; + + /* Memory key needed by the target to push/pull LNet payload. */ + __u16 key; +} __packed; + +/* Kfilnd message. Includes base transport header plus embedded protocol + * message. + */ +struct kfilnd_msg { + /* Unique kfilnd magic. */ + __u32 magic; + + /* Version of the kfilnd protocol. */ + __u16 version; + + /* Specific kfilnd protocol type. */ + __u8 type; + + /* Unused 8 bits. */ + __u8 reserved; + + /* Number of bytes in message. */ + __u16 nob; + + /* Checksum of entire message. 0 is checksum disabled. */ + __sum16 cksum; + + /* Message LNet source NID. */ + __u64 srcnid; + + /* Message LNet target NID. */ + __u64 dstnid; + + /* Embedded protocol headers. Must remain at bottom. */ + union { + struct kfilnd_immed_msg immed; + struct kfilnd_bulk_req_msg bulk_req; + struct kfilnd_hello_msg hello; + } __packed proto; +} __packed; + +#define KFILND_MSG_MAGIC LNET_PROTO_KFI_MAGIC /* unique magic */ + +#define KFILND_MSG_VERSION_1 0x1 +#define KFILND_MSG_VERSION KFILND_MSG_VERSION_1 + +/* Get the KFI RX context from a KFI RX address. RX context information is + * stored in the MSBs of the KFI address. + */ +#define KFILND_RX_CONTEXT(addr) ((addr) >> (64 - KFILND_FAB_RX_CTX_BITS)) + +#define KFILND_EP_DEBUG(ep, fmt, ...) \ + CDEBUG(D_NET, "%s:%d " fmt "\n", \ + libcfs_nidstr(&(ep)->end_dev->kfd_ni->ni_nid), \ + (ep)->end_context_id, ##__VA_ARGS__) + +#define KFILND_EP_ERROR(ep, fmt, ...) \ + CNETERR("%s:%d " fmt "\n", \ + libcfs_nidstr(&(ep)->end_dev->kfd_ni->ni_nid), \ + (ep)->end_context_id, ##__VA_ARGS__) + +#define KFILND_TN_PEER_VALID(tn) \ + !IS_ERR_OR_NULL((tn)->peer) + +#define KFILND_TN_DIR_DEBUG(tn, fmt, dir, ...) \ + CDEBUG(D_NET, "Transaction ID %p: %s:%u %s %s:%llu " fmt "\n", \ + (tn), \ + libcfs_nidstr(&(tn)->tn_ep->end_dev->kfd_ni->ni_nid), \ + (tn)->tn_ep->end_context_id, dir, \ + libcfs_nid2str((tn)->peer->nid), \ + KFILND_TN_PEER_VALID(tn) ? \ + KFILND_RX_CONTEXT((tn)->peer->addr) : 0, \ + ##__VA_ARGS__) + +#define KFILND_TN_DEBUG(tn, fmt, ...) \ + do { \ + if ((tn)->is_initiator) \ + KFILND_TN_DIR_DEBUG(tn, fmt, "->", ##__VA_ARGS__); \ + else \ + KFILND_TN_DIR_DEBUG(tn, fmt, "<-", ##__VA_ARGS__); \ + } while (0) + +#define KFILND_TN_DIR_ERROR(tn, fmt, dir, ...) \ + CNETERR("Transaction ID %p: %s:%u %s %s:%llu " fmt "\n", \ + (tn), \ + libcfs_nidstr(&(tn)->tn_ep->end_dev->kfd_ni->ni_nid), \ + (tn)->tn_ep->end_context_id, dir, \ + libcfs_nid2str((tn)->peer->nid), \ + KFILND_TN_PEER_VALID(tn) ? \ + KFILND_RX_CONTEXT((tn)->peer->addr) : 0, \ + ##__VA_ARGS__) + +#define KFILND_TN_ERROR(tn, fmt, ...) \ + do { \ + if ((tn)->is_initiator) \ + KFILND_TN_DIR_ERROR(tn, fmt, "->", ##__VA_ARGS__); \ + else \ + KFILND_TN_DIR_ERROR(tn, fmt, "<-", ##__VA_ARGS__); \ + } while (0) + +/* TODO: Support NOOPs? */ +enum kfilnd_msg_type { + /* Valid message types start at 1. */ + KFILND_MSG_INVALID, + + /* Valid message types. */ + KFILND_MSG_IMMEDIATE, + KFILND_MSG_BULK_PUT_REQ, + KFILND_MSG_BULK_GET_REQ, + KFILND_MSG_HELLO_REQ, + KFILND_MSG_HELLO_RSP, + + /* Invalid max value. */ + KFILND_MSG_MAX, +}; + +static inline const char *msg_type_to_str(enum kfilnd_msg_type type) +{ + static const char *str[KFILND_MSG_MAX] = { + [KFILND_MSG_INVALID] = "KFILND_MSG_INVALID", + [KFILND_MSG_IMMEDIATE] = "KFILND_MSG_IMMEDIATE", + [KFILND_MSG_BULK_PUT_REQ] = "KFILND_MSG_BULK_PUT_REQ", + [KFILND_MSG_BULK_GET_REQ] = "KFILND_MSG_BULK_GET_REQ", + [KFILND_MSG_HELLO_REQ] = "KFILND_MSG_HELLO_REQ", + [KFILND_MSG_HELLO_RSP] = "KFILND_MSG_HELLO_RSP", + }; + + if (type >= KFILND_MSG_MAX) + return "KFILND_MSG_INVALID"; + + return str[type]; +}; + +static inline const char *tn_state_to_str(enum tn_states type) +{ + static const char *str[TN_STATE_MAX] = { + [TN_STATE_INVALID] = "TN_STATE_INVALID", + [TN_STATE_IDLE] = "TN_STATE_IDLE", + [TN_STATE_WAIT_TAG_COMP] = "TN_STATE_WAIT_TAG_COMP", + [TN_STATE_IMM_SEND] = "TN_STATE_IMM_SEND", + [TN_STATE_TAGGED_RECV_POSTED] = "TN_STATE_TAGGED_RECV_POSTED", + [TN_STATE_SEND_FAILED] = "TN_STATE_SEND_FAILED", + [TN_STATE_WAIT_COMP] = "TN_STATE_WAIT_COMP", + [TN_STATE_WAIT_TIMEOUT_COMP] = "TN_STATE_WAIT_TIMEOUT_COMP", + [TN_STATE_WAIT_SEND_COMP] = "TN_STATE_WAIT_SEND_COMP", + [TN_STATE_WAIT_TIMEOUT_TAG_COMP] = "TN_STATE_WAIT_TIMEOUT_TAG_COMP", + [TN_STATE_FAIL] = "TN_STATE_FAIL", + [TN_STATE_IMM_RECV] = "TN_STATE_IMM_RECV", + [TN_STATE_WAIT_TAG_RMA_COMP] = "TN_STATE_WAIT_TAG_RMA_COMP", + }; + + return str[type]; +}; + +/* Transaction Events */ +enum tn_events { + TN_EVENT_INVALID, + + /* Initiator events. */ + TN_EVENT_INIT_IMMEDIATE, + TN_EVENT_INIT_BULK, + TN_EVENT_TX_HELLO, + TN_EVENT_TX_OK, + TN_EVENT_TX_FAIL, + TN_EVENT_TAG_RX_OK, + TN_EVENT_TAG_RX_FAIL, + TN_EVENT_TAG_RX_CANCEL, + TN_EVENT_TIMEOUT, + + /* Target events. */ + TN_EVENT_RX_HELLO, + TN_EVENT_RX_OK, + TN_EVENT_RX_FAIL, + TN_EVENT_INIT_TAG_RMA, + TN_EVENT_SKIP_TAG_RMA, + TN_EVENT_TAG_TX_OK, + TN_EVENT_TAG_TX_FAIL, + + /* Invalid max value. */ + TN_EVENT_MAX, +}; + +static inline const char *tn_event_to_str(enum tn_events type) +{ + static const char *str[TN_EVENT_MAX] = { + [TN_EVENT_INVALID] = "TN_EVENT_INVALID", + [TN_EVENT_INIT_IMMEDIATE] = "TN_EVENT_INIT_IMMEDIATE", + [TN_EVENT_INIT_BULK] = "TN_EVENT_INIT_BULK", + [TN_EVENT_TX_HELLO] = "TN_EVENT_TX_HELLO", + [TN_EVENT_TX_OK] = "TN_EVENT_TX_OK", + [TN_EVENT_TX_FAIL] = "TN_EVENT_TX_FAIL", + [TN_EVENT_TAG_RX_OK] = "TN_EVENT_TAG_RX_OK", + [TN_EVENT_TAG_RX_FAIL] = "TN_EVENT_TAG_RX_FAIL", + [TN_EVENT_TAG_RX_CANCEL] = "TN_EVENT_TAG_RX_CANCEL", + [TN_EVENT_TIMEOUT] = "TN_EVENT_TIMEOUT", + [TN_EVENT_RX_HELLO] = "TN_EVENT_RX_HELLO", + [TN_EVENT_RX_OK] = "TN_EVENT_RX_OK", + [TN_EVENT_RX_FAIL] = "TN_EVENT_RX_FAIL", + [TN_EVENT_INIT_TAG_RMA] = "TN_EVENT_INIT_TAG_RMA", + [TN_EVENT_SKIP_TAG_RMA] = "TN_EVENT_SKIP_TAG_RMA", + [TN_EVENT_TAG_TX_FAIL] = "TN_EVENT_TAG_TX_FAIL", + }; + + return str[type]; +}; + +struct kfilnd_transaction_msg { + struct kfilnd_msg *msg; + size_t length; +}; + +/* Initiator and target transaction structure. */ +struct kfilnd_transaction { + /* Endpoint list transaction lives on. */ + struct list_head tn_entry; + struct mutex tn_lock; /* to serialize events */ + int tn_status; /* return code from ops */ + struct kfilnd_ep *tn_ep; /* endpoint we operate under */ + enum tn_states tn_state; /* current state of Tn */ + struct lnet_msg *tn_lntmsg; /* LNet msg to finalize */ + struct lnet_msg *tn_getreply; /* GET LNet msg to finalize */ + + bool is_initiator; /* Initiated LNet transfer. */ + + /* Transaction send message and target address. */ + kfi_addr_t tn_target_addr; + struct kfilnd_peer *peer; + struct kfilnd_transaction_msg tn_tx_msg; + + /* Transaction multi-receive buffer and associated receive message. */ + struct kfilnd_immediate_buffer *tn_posted_buf; + struct kfilnd_transaction_msg tn_rx_msg; + + /* LNet buffer used to register a memory region or perform a RMA + * operation. + */ + struct bio_vec tn_kiov[LNET_MAX_IOV]; + unsigned int tn_num_iovec; + + /* LNet transaction payload byte count. */ + unsigned int tn_nob; + + /* Bulk transaction buffer is sink or source buffer. */ + bool sink_buffer; + + /* Memory region and remote key used to cover initiator's buffer. */ + u16 tn_mr_key; + + /* RX context used to perform response operations to a Put/Get + * request. This is required since the request initiator locks in a + * transactions to a specific RX context. + */ + u16 tn_response_mr_key; + u8 tn_response_rx; + + /* Immediate data used to convey transaction state from LNet target to + * LNet intiator. + */ + u64 tagged_data; + + /* Bulk operation timeout timer. */ + struct timer_list timeout_timer; + struct work_struct timeout_work; + + /* Transaction health status. */ + enum lnet_msg_hstatus hstatus; + + /* Transaction deadline. */ + ktime_t deadline; + + ktime_t tn_alloc_ts; + ktime_t tn_state_ts; + size_t lnet_msg_len; + + /* Fields used to replay transaction. */ + struct list_head replay_entry; + enum tn_events replay_event; + int replay_status; + + enum kfilnd_msg_type msg_type; +}; + +int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt, lnet_nid_t nid); + +#endif /* _KFILND_ */ diff --git a/lnet/klnds/kfilnd/kfilnd_cq.c b/lnet/klnds/kfilnd/kfilnd_cq.c new file mode 100644 index 0000000..d070afe --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_cq.c @@ -0,0 +1,253 @@ + +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd completion queue. + */ +#include +#include +#include + +#include "kfilnd_cq.h" +#include "kfilnd_tn.h" +#include "kfilnd_ep.h" + +void kfilnd_cq_process_error(struct kfilnd_ep *ep, + struct kfi_cq_err_entry *error) +{ + struct kfilnd_immediate_buffer *buf; + struct kfilnd_transaction *tn; + enum tn_events tn_event; + int status; + + switch (error->flags) { + case KFI_MSG | KFI_RECV: + if (error->err != ECANCELED) { + KFILND_EP_ERROR(ep, "Dropping error receive event %d\n", + -error->err); + return; + } + fallthrough; + case KFI_MSG | KFI_RECV | KFI_MULTI_RECV: + buf = error->op_context; + kfilnd_ep_imm_buffer_put(buf); + return; + + case KFI_TAGGED | KFI_RECV: + case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA: + case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV: + case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV: + tn = error->op_context; + if (error->err == ECANCELED) { + tn_event = TN_EVENT_TAG_RX_CANCEL; + status = 0; + } else { + tn_event = TN_EVENT_TAG_RX_FAIL; + status = -error->err; + } + break; + + case KFI_MSG | KFI_SEND: + tn = error->op_context; + tn_event = TN_EVENT_TX_FAIL; + status = -error->err; + break; + + case KFI_TAGGED | KFI_SEND: + case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND: + case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND: + tn = error->op_context; + tn_event = TN_EVENT_TAG_TX_FAIL; + status = -error->err; + break; + + default: + LBUG(); + } + + kfilnd_tn_event_handler(tn, tn_event, status); +} + +static void kfilnd_cq_process_event(struct kfi_cq_data_entry *event) +{ + struct kfilnd_immediate_buffer *buf; + struct kfilnd_msg *rx_msg; + struct kfilnd_transaction *tn; + enum tn_events tn_event; + int64_t status = 0; + + switch (event->flags) { + case KFI_MSG | KFI_RECV: + case KFI_MSG | KFI_RECV | KFI_MULTI_RECV: + buf = event->op_context; + rx_msg = event->buf; + + kfilnd_tn_process_rx_event(buf, rx_msg, event->len); + + /* If the KFI_MULTI_RECV flag is set, the buffer was + * unlinked. + */ + if (event->flags & KFI_MULTI_RECV) + kfilnd_ep_imm_buffer_put(buf); + return; + + case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA: + status = -1 * (int64_t)be64_to_cpu(event->data); + fallthrough; + case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV: + case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV: + tn_event = TN_EVENT_TAG_RX_OK; + tn = event->op_context; + break; + + case KFI_TAGGED | KFI_SEND: + case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND: + case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND: + tn = event->op_context; + tn_event = TN_EVENT_TAG_TX_OK; + break; + + case KFI_MSG | KFI_SEND: + tn = event->op_context; + tn_event = TN_EVENT_TX_OK; + break; + + default: + LBUG(); + } + + kfilnd_tn_event_handler(tn, tn_event, status); +} + +static void kfilnd_cq_process_completion(struct work_struct *work) +{ + struct kfilnd_cq_work *cq_work = + container_of(work, struct kfilnd_cq_work, work); + struct kfilnd_cq *kfilnd_cq = cq_work->cq; + struct kfid_cq *cq = kfilnd_cq->cq; + struct kfi_cq_data_entry event; + struct kfi_cq_err_entry error; + ssize_t rc; + bool done = false; + + /* Drain the KFI completion queue of all events and errors. */ + while (!done) { + rc = kfi_cq_read(cq, &event, 1); + if (rc == -KFI_EAVAIL) { + while (kfi_cq_readerr(cq, &error, 0) == 1) + kfilnd_cq_process_error(kfilnd_cq->ep, &error); + } else if (rc == 1) { + kfilnd_cq_process_event(&event); + } else if (rc == -EAGAIN) { + done = true; + } else { + KFILND_EP_ERROR(kfilnd_cq->ep, "Unexpected rc = %ld", + rc); + done = true; + } + } + + if (kfilnd_ep_replays_pending(kfilnd_cq->ep)) + kfilnd_ep_flush_replay_queue(kfilnd_cq->ep); +} + +static void kfilnd_cq_completion(struct kfid_cq *cq, void *context) +{ + struct kfilnd_cq *kfilnd_cq = context; + struct kfilnd_cq_work *cq_work; + unsigned int i; + + for (i = 0; i < kfilnd_cq->cq_work_count; i++) { + cq_work = &kfilnd_cq->cq_works[i]; + queue_work_on(cq_work->work_cpu, kfilnd_wq, &cq_work->work); + } +} + +#define CQ_ALLOC_SIZE(cpu_count) \ + (sizeof(struct kfilnd_cq) + \ + (sizeof(struct kfilnd_cq_work) * (cpu_count))) + +struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep, + struct kfi_cq_attr *attr) +{ + struct kfilnd_cq *cq; + cpumask_var_t *cpu_mask; + int rc; + unsigned int cpu_count = 0; + unsigned int cpu; + unsigned int i; + size_t alloc_size; + struct kfilnd_cq_work *cq_work; + + cpu_mask = cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt); + for_each_cpu(cpu, *cpu_mask) + cpu_count++; + + alloc_size = CQ_ALLOC_SIZE(cpu_count); + LIBCFS_CPT_ALLOC(cq, lnet_cpt_table(), ep->end_cpt, alloc_size); + if (!cq) { + rc = -ENOMEM; + KFILND_EP_ERROR(ep, "Failed to allocate memory: rc=%d", rc); + goto err; + } + + memset(cq, 0, alloc_size); + + rc = kfi_cq_open(ep->end_dev->dom->domain, attr, &cq->cq, + kfilnd_cq_completion, cq); + if (rc) { + KFILND_EP_ERROR(ep, "Failed to open KFI CQ: rc=%d", rc); + goto err_free_kfilnd_cq; + } + + i = 0; + for_each_cpu(cpu, *cpu_mask) { + cq_work = &cq->cq_works[i]; + cq_work->cq = cq; + cq_work->work_cpu = cpu; + INIT_WORK(&cq_work->work, kfilnd_cq_process_completion); + i++; + } + + cq->ep = ep; + cq->cq_work_count = cpu_count; + + return cq; + +err_free_kfilnd_cq: + LIBCFS_FREE(cq, alloc_size); +err: + return ERR_PTR(rc); +} + +void kfilnd_cq_free(struct kfilnd_cq *cq) +{ + flush_workqueue(kfilnd_wq); + kfi_close(&cq->cq->fid); + LIBCFS_FREE(cq, CQ_ALLOC_SIZE(cq->cq_work_count)); +} diff --git a/lnet/klnds/kfilnd/kfilnd_cq.h b/lnet/klnds/kfilnd/kfilnd_cq.h new file mode 100644 index 0000000..2aad454 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_cq.h @@ -0,0 +1,42 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd completion queue. + */ +#ifndef _KFILND_CQ_ +#define _KFILND_CQ_ + +#include "kfilnd.h" + +void kfilnd_cq_process_error(struct kfilnd_ep *ep, + struct kfi_cq_err_entry *error); +struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep, + struct kfi_cq_attr *attr); +void kfilnd_cq_free(struct kfilnd_cq *cq); + +#endif /*_KFILND_CQ_ */ diff --git a/lnet/klnds/kfilnd/kfilnd_debugfs.c b/lnet/klnds/kfilnd/kfilnd_debugfs.c new file mode 100644 index 0000000..21504c5 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_debugfs.c @@ -0,0 +1,204 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd device implementation. + */ +#include "kfilnd.h" +#include "kfilnd_dev.h" + +#define TIME_MAX 0xFFFFFFFFFFFF +static s64 print_duration(struct kfilnd_tn_duration_stat *stat) +{ + s64 duration; + + if (!atomic_read(&stat->accumulated_count)) + return 0; + + duration = atomic64_read(&stat->accumulated_duration) / + atomic_read(&stat->accumulated_count); + + return min_t(s64, duration, TIME_MAX); +} + +static void seq_print_tn_state_stats(struct seq_file *s, struct kfilnd_dev *dev, + bool initiator) +{ + struct kfilnd_tn_state_data_size_duration_stats *state_stats; + unsigned int data_size; + + if (initiator) + state_stats = &dev->initiator_state_stats; + else + state_stats = &dev->target_state_stats; + + seq_printf(s, "%-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s %-20s\n", + "MSG_SIZE", "IDLE", "WAIT_TAG_COMP", "IMM_SEND", + "TAGGED_RECV_POSTED", "SEND_FAILED", "WAIT_COMP", + "WAIT_TOUT_COMP", "SEND_COMP", "WAIT_TOUT_TAG_COMP", "FAIL", + "IMM_RECV", "WAIT_TAG_RMA_COMP"); + + for (data_size = 0; data_size < KFILND_DATA_SIZE_BUCKETS; data_size++) { + seq_printf(s, "%-20lu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu %-20llu\n", + data_size == 0 ? 0 : BIT(data_size - 1), + print_duration(&state_stats->state[TN_STATE_IDLE].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_WAIT_TAG_COMP].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_IMM_SEND].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_TAGGED_RECV_POSTED].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_SEND_FAILED].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_WAIT_COMP].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_WAIT_TIMEOUT_COMP].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_WAIT_SEND_COMP].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_WAIT_TIMEOUT_TAG_COMP].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_FAIL].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_IMM_RECV].data_size[data_size]), + print_duration(&state_stats->state[TN_STATE_WAIT_TAG_RMA_COMP].data_size[data_size])); + } +} + +static int kfilnd_initiator_state_stats_file_show(struct seq_file *s, + void *unused) +{ + seq_print_tn_state_stats(s, s->private, true); + + return 0; +} + +static int kfilnd_initiator_state_stats_file_open(struct inode *inode, + struct file *file) +{ + return single_open(file, kfilnd_initiator_state_stats_file_show, + inode->i_private); +} + +const struct file_operations kfilnd_initiator_state_stats_file_ops = { + .owner = THIS_MODULE, + .open = kfilnd_initiator_state_stats_file_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static int kfilnd_target_state_stats_file_show(struct seq_file *s, + void *unused) +{ + seq_print_tn_state_stats(s, s->private, false); + + return 0; +} + +static int kfilnd_target_state_stats_file_open(struct inode *inode, + struct file *file) +{ + return single_open(file, kfilnd_target_state_stats_file_show, + inode->i_private); +} + +const struct file_operations kfilnd_target_state_stats_file_ops = { + .owner = THIS_MODULE, + .open = kfilnd_target_state_stats_file_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static void seq_print_tn_stats(struct seq_file *s, struct kfilnd_dev *dev, + bool initiator) +{ + struct kfilnd_tn_data_size_duration_stats *stats; + unsigned int data_size; + + if (initiator) + stats = &dev->initiator_stats; + else + stats = &dev->target_stats; + + seq_printf(s, "%-16s %-16s\n", "MSG_SIZE", "DURATION"); + + for (data_size = 0; data_size < KFILND_DATA_SIZE_BUCKETS; data_size++) { + seq_printf(s, "%-16lu %-16llu\n", + data_size == 0 ? 0 : BIT(data_size - 1), + print_duration(&stats->data_size[data_size])); + } +} + +static int kfilnd_initiator_stats_file_show(struct seq_file *s, void *unused) +{ + seq_print_tn_stats(s, s->private, true); + + return 0; +} + +static int kfilnd_initiator_stats_file_open(struct inode *inode, + struct file *file) +{ + return single_open(file, kfilnd_initiator_stats_file_show, + inode->i_private); +} + +const struct file_operations kfilnd_initiator_stats_file_ops = { + .owner = THIS_MODULE, + .open = kfilnd_initiator_stats_file_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static int kfilnd_target_stats_file_show(struct seq_file *s, void *unused) +{ + seq_print_tn_stats(s, s->private, false); + + return 0; +} + +static int kfilnd_target_stats_file_open(struct inode *inode, struct file *file) +{ + return single_open(file, kfilnd_target_stats_file_show, + inode->i_private); +} + +const struct file_operations kfilnd_target_stats_file_ops = { + .owner = THIS_MODULE, + .open = kfilnd_target_stats_file_open, + .read = seq_read, + .llseek = seq_lseek, + .release = seq_release, +}; + +static ssize_t kfilnd_reset_stats_file_write(struct file *filp, + const char __user *buf, + size_t count, loff_t *loff) +{ + kfilnd_dev_reset_stats(filp->f_inode->i_private); + + return count; +} + +const struct file_operations kfilnd_reset_stats_file_ops = { + .owner = THIS_MODULE, + .write = kfilnd_reset_stats_file_write, +}; diff --git a/lnet/klnds/kfilnd/kfilnd_dev.c b/lnet/klnds/kfilnd/kfilnd_dev.c new file mode 100644 index 0000000..5113551 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_dev.c @@ -0,0 +1,335 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd device implementation. + */ +#include "kfilnd_dev.h" +#include "kfilnd_ep.h" +#include "kfilnd_dom.h" +#include "kfilnd_peer.h" + +/** + * kfilnd_dev_post_imm_buffers() - Post all immediate receive buffers on each + * KFI LND endpoint. + * @dev: KFI LND device to have all endpoint receive buffers posted. + * + * This function should be called only during KFI LND device initialization. + * + * Return: On success, zero. Else, negative errno. + */ +int kfilnd_dev_post_imm_buffers(struct kfilnd_dev *dev) +{ + int i; + int rc; + + if (!dev) + return -EINVAL; + + for (i = 0; i < dev->kfd_ni->ni_ncpts; i++) { + rc = kfilnd_ep_post_imm_buffers(dev->kfd_endpoints[i]); + if (rc) + return rc; + } + + return 0; +} + +/** + * kfilnd_dev_free() - Free a KFI LND device. + * + * This function will not complete until all underlying KFI LND transactions are + * complete. + * + * Once the KFI LND device is freed, a reference is returned to the module. + */ +void kfilnd_dev_free(struct kfilnd_dev *dev) +{ + int i; + int lnet_ncpts; + + if (!dev) + return; + + debugfs_remove_recursive(dev->dev_dir); + + /* Change state to shutting down so TNs stop using it */ + dev->kfd_state = KFILND_STATE_SHUTTING_DOWN; + + /* Cancel all outstanding RX buffers. */ + for (i = 0; i < dev->kfd_ni->ni_ncpts; i++) + kfilnd_ep_cancel_imm_buffers(dev->kfd_endpoints[i]); + + /* Free all endpoints. */ + for (i = 0; i < dev->kfd_ni->ni_ncpts; i++) + kfilnd_ep_free(dev->kfd_endpoints[i]); + + kfilnd_peer_destroy(dev); + + lnet_ncpts = cfs_cpt_number(lnet_cpt_table()); + LIBCFS_FREE(dev->cpt_to_endpoint, + lnet_ncpts * sizeof(*dev->cpt_to_endpoint)); + + LIBCFS_FREE(dev->kfd_endpoints, + dev->kfd_ni->ni_ncpts * sizeof(*dev->kfd_endpoints)); + + kfi_close(&dev->kfd_sep->fid); + kfi_close(&dev->kfd_av->fid); + + kfilnd_dom_put(dev->dom); + + LIBCFS_FREE(dev, sizeof(*dev)); + + module_put(THIS_MODULE); +} + +/** + * kfilnd_dev_alloc() - Allocate a new KFI LND device a LNet NI. + * @ni: LNet NI used to allocate the KFI LND device. + * @node: Node string which can be passed into kfi_getinfo(). + * + * During KFI LND device allocation, the LNet NID NID is used to build node + * and service string. The LNet NID address (IPv4 address) is used for the node + * string. The LNet NID net number is used for the service string. Together, the + * node and service string define the address of the KFI LND device. + * + * The node and service strings are used to allocate a KFI scalable endpoint. + * The KFI scalable endpoint is later used to allocate KFI LND endpoints. + * + * For each successful KFI LND device allocation, a reference is taken against + * this module to it free being prematurely removed. + * + * Return: On success, valid pointer. On error, negative errno pointer. + */ +struct kfilnd_dev *kfilnd_dev_alloc(struct lnet_ni *ni, + const char *node) +{ + int i; + int rc; + struct kfi_av_attr av_attr = {}; + struct kfi_info *dev_info; + int cpt; + int lnet_ncpts; + struct kfilnd_dev *dev; + + if (!ni) { + rc = -EINVAL; + goto err; + } + + /* Start allocating memory and underlying hardware resources for the + * LNet NI. + */ + LIBCFS_ALLOC(dev, sizeof(*dev)); + if (!dev) { + rc = -ENOMEM; + goto err; + } + + dev->kfd_ni = ni; + spin_lock_init(&dev->kfd_lock); + atomic_set(&dev->session_keys, 0); + + dev->dom = kfilnd_dom_get(ni, node, &dev_info); + if (IS_ERR(dev->dom)) { + rc = PTR_ERR(dev->dom); + CERROR("Failed to get KFI LND domain: rc=%d\n", rc); + goto err_free_dev; + } + + /* KFI LNet NID address needs to be unique per LNet NID and something + * which can be inserted into the KFI AV. The NIC address is one of the + * unique components. Local interface NIC address needs to be extracted + * and used to build the LNet NID. + * + * At this point, only the KFI CXI provider is supported. + */ + if (!dev_info->src_addr || + dev_info->src_addrlen != sizeof(struct kcxi_addr)) { + rc = -EADDRNOTAVAIL; + CERROR("No kfabric source address returned\n"); + goto err_put_dom; + } + + dev->nic_addr = ((struct kcxi_addr *)dev_info->src_addr)->nic; + + /* Create an AV for this device */ + av_attr.type = KFI_AV_UNSPEC; + av_attr.rx_ctx_bits = KFILND_FAB_RX_CTX_BITS; + rc = kfi_av_open(dev->dom->domain, &av_attr, &dev->kfd_av, dev); + if (rc) { + CERROR("Could not open AV, rc = %d\n", rc); + goto err_put_dom; + } + + /* Create a scalable endpont to represent the device. */ + rc = kfi_scalable_ep(dev->dom->domain, dev_info, &dev->kfd_sep, dev); + if (rc) { + CERROR("Could not create scalable endpoint, rc = %d\n", rc); + goto err_free_av; + } + + /* Done with info. */ + kfi_freeinfo(dev_info); + dev_info = NULL; + + /* Bind the endpoint to the AV */ + rc = kfi_scalable_ep_bind(dev->kfd_sep, &dev->kfd_av->fid, 0); + if (rc) { + CERROR("Could not bind scalable endpoint to AV, rc = %d\n", rc); + goto err_free_sep; + } + + /* Enable the scalable endpoint */ + rc = kfi_enable(dev->kfd_sep); + if (rc) { + CERROR("Could not enable scalable endpoint, rc = %d\n", rc); + goto err_free_sep; + } + + /* Allocate an array to store all the KFI LND endpoints. */ + LIBCFS_ALLOC_GFP(dev->kfd_endpoints, + ni->ni_ncpts * sizeof(*dev->kfd_endpoints), + GFP_KERNEL); + if (!dev->kfd_endpoints) { + rc = -ENOMEM; + goto err_free_sep; + } + + /* Map of all LNet CPTs to endpoints. */ + lnet_ncpts = cfs_cpt_number(lnet_cpt_table()); + LIBCFS_ALLOC_GFP(dev->cpt_to_endpoint, + lnet_ncpts * sizeof(*dev->cpt_to_endpoint), + GFP_KERNEL); + if (!dev->cpt_to_endpoint) { + rc = -ENOMEM; + goto err_free_ep_array; + } + + /* Create RX/TX contexts in kfabric for each LNet NI CPT. */ + for (i = 0; i < ni->ni_ncpts; i++) { + cpt = !ni->ni_cpts ? i : ni->ni_cpts[i]; + + dev->kfd_endpoints[i] = + kfilnd_ep_alloc(dev, i, cpt, + ni->ni_net->net_tunables.lct_max_tx_credits, + KFILND_IMMEDIATE_MSG_SIZE); + if (IS_ERR(dev->kfd_endpoints[i])) { + rc = PTR_ERR(dev->kfd_endpoints[i]); + goto err_free_endpoints; + } + + dev->cpt_to_endpoint[cpt] = dev->kfd_endpoints[i]; + } + + kfilnd_peer_init(dev); + + /* Mark that the dev/NI has now been initialized */ + dev->kfd_state = KFILND_STATE_INITIALIZED; + + /* Initialize debugfs stats. */ + dev->dev_dir = debugfs_create_dir(libcfs_nidstr(&ni->ni_nid), + kfilnd_debug_dir); + dev->initiator_state_stats_file = + debugfs_create_file("initiator_state_stats", 0444, + dev->dev_dir, dev, + &kfilnd_initiator_state_stats_file_ops); + dev->initiator_state_stats_file = + debugfs_create_file("initiator_stats", 0444, + dev->dev_dir, dev, + &kfilnd_initiator_stats_file_ops); + dev->initiator_state_stats_file = + debugfs_create_file("target_state_stats", 0444, dev->dev_dir, + dev, &kfilnd_target_state_stats_file_ops); + dev->initiator_state_stats_file = + debugfs_create_file("target_stats", 0444, dev->dev_dir, dev, + &kfilnd_target_stats_file_ops); + dev->initiator_state_stats_file = + debugfs_create_file("reset_stats", 0444, dev->dev_dir, dev, + &kfilnd_reset_stats_file_ops); + + kfilnd_dev_reset_stats(dev); + + try_module_get(THIS_MODULE); + + return dev; + +err_free_endpoints: + for (i = 0; i < ni->ni_ncpts; i++) + kfilnd_ep_free(dev->kfd_endpoints[i]); + + LIBCFS_FREE(dev->cpt_to_endpoint, + lnet_ncpts * sizeof(*dev->cpt_to_endpoint)); +err_free_ep_array: + LIBCFS_FREE(dev->kfd_endpoints, + ni->ni_ncpts * sizeof(*dev->kfd_endpoints)); +err_free_sep: + kfi_close(&dev->kfd_sep->fid); +err_free_av: + kfi_close(&dev->kfd_av->fid); +err_put_dom: + kfilnd_dom_put(dev->dom); + if (dev_info) + kfi_freeinfo(dev_info); +err_free_dev: + LIBCFS_FREE(dev, sizeof(*dev)); +err: + return ERR_PTR(rc); +} + + +void kfilnd_dev_reset_stats(struct kfilnd_dev *dev) +{ + unsigned int data_size; + enum tn_states state; + struct kfilnd_tn_duration_stat *stat; + + for (data_size = 0; data_size < KFILND_DATA_SIZE_BUCKETS; data_size++) { + stat = &dev->initiator_stats.data_size[data_size]; + atomic64_set(&stat->accumulated_duration, 0); + atomic_set(&stat->accumulated_count, 0); + + stat = &dev->target_stats.data_size[data_size]; + atomic64_set(&stat->accumulated_duration, 0); + atomic_set(&stat->accumulated_count, 0); + + for (state = 0; state < TN_STATE_MAX; state++) { + stat = &dev->initiator_state_stats.state[state].data_size[data_size]; + atomic64_set(&stat->accumulated_duration, 0); + atomic_set(&stat->accumulated_count, 0); + + stat = &dev->target_state_stats.state[state].data_size[data_size]; + atomic64_set(&stat->accumulated_duration, 0); + atomic_set(&stat->accumulated_count, 0); + } + } +} + +u32 kfilnd_dev_get_session_key(struct kfilnd_dev *dev) +{ + return (u32)atomic_add_return(1, &dev->session_keys); +} diff --git a/lnet/klnds/kfilnd/kfilnd_dev.h b/lnet/klnds/kfilnd/kfilnd_dev.h new file mode 100644 index 0000000..8bd948d --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_dev.h @@ -0,0 +1,46 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd device implementation. + */ +#ifndef _KFILND_DEV_ +#define _KFILND_DEV_ + +#include "kfilnd.h" + +/* TODO: Module parameters? */ +#define KFILND_CURRENT_HASH_BITS 7 +#define KFILND_MAX_HASH_BITS 12 + +int kfilnd_dev_post_imm_buffers(struct kfilnd_dev *dev); +void kfilnd_dev_free(struct kfilnd_dev *dev); +struct kfilnd_dev *kfilnd_dev_alloc(struct lnet_ni *ni, const char *node); +void kfilnd_dev_reset_stats(struct kfilnd_dev *dev); +u32 kfilnd_dev_get_session_key(struct kfilnd_dev *dev); + +#endif /* _KFILND_DEV_ */ diff --git a/lnet/klnds/kfilnd/kfilnd_dom.c b/lnet/klnds/kfilnd/kfilnd_dom.c new file mode 100644 index 0000000..74cdc7f --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_dom.c @@ -0,0 +1,450 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd domain and fabric implementation. + */ +#include "kfilnd_dom.h" +#include "kfilnd_tn.h" + +/* Global list of allocated KFI LND fabrics. */ +static LIST_HEAD(fab_list); +static DEFINE_MUTEX(fab_list_lock); + +/** + * kfilnd_dom_free() - Free a KFI LND domain. + * @dom: KFI LND domain to be freed. + */ +static void kfilnd_dom_free(struct kref *kref) +{ + struct kfilnd_dom *dom; + + if (!kref) + return; + + dom = container_of(kref, struct kfilnd_dom, cnt); + + mutex_lock(&dom->fab->dom_list_lock); + list_del(&dom->entry); + mutex_unlock(&dom->fab->dom_list_lock); + + kfi_close(&dom->domain->fid); + LIBCFS_FREE(dom, sizeof(*dom)); +} + +/** + * kfilnd_dom_alloc() - Allocate a new KFI LND domain. + * @dom_info: KFI info structure used to allocate the KFI LND domain. + * @fab: KFI LND fabric used by the domain. + * + * A KFI LND domain (and the underlying KFI domain) provides access to a + * specific NIC on a fabric. The same KFI LND domain can be used to allocate + * different KFI LND devices. + * + * Return: On success, valid pointer. Else, negative errno pointer. + */ +static struct kfilnd_dom *kfilnd_dom_alloc(struct kfi_info *dom_info, + struct kfilnd_fab *fab) +{ + int rc; + struct kfilnd_dom *dom; + + if (!dom_info || !fab) { + rc = -EINVAL; + goto err; + } + + LIBCFS_ALLOC_GFP(dom, sizeof(*dom), GFP_KERNEL); + if (!dom) { + rc = -ENOMEM; + goto err; + } + + INIT_LIST_HEAD(&dom->dev_list); + spin_lock_init(&dom->lock); + dom->fab = fab; + kref_init(&dom->cnt); + + rc = kfi_domain(fab->fabric, dom_info, &dom->domain, dom); + if (rc) { + CERROR("Failed to create KFI domain: rc=%d\n", rc); + goto err_free_dom; + } + + mutex_lock(&fab->dom_list_lock); + list_add_tail(&dom->entry, &fab->dom_list); + mutex_unlock(&fab->dom_list_lock); + + return dom; + +err_free_dom: + LIBCFS_FREE(dom, sizeof(*dom)); +err: + return ERR_PTR(rc); +} + +/** + * kfilnd_dom_reuse() - Attempt to reuse an already allocated domain. + * @node: Node string used to limit domains to. + * @service: Service string used to limit domains to. + * @hints: Hints used to allocate KFI info structures. + * @fab: Fabric used to limit domains to. + * + * Return: On success (matching domain is found), valid pointer is returned. + * Else, NULL. + */ +struct kfilnd_dom *kfilnd_dom_reuse(const char *node, const char *service, + struct kfi_info *hints, + struct kfilnd_fab *fab) +{ + struct kfilnd_dom *dom; + struct kfi_info *info; + int rc; + + if (!node || !service || !hints || !fab) + return NULL; + + /* Update the hints domain attribute with an already allocated domain to + * see if domains can be reused. + */ + hints->fabric_attr->fabric = fab->fabric; + + mutex_lock(&fab->dom_list_lock); + list_for_each_entry(dom, &fab->dom_list, entry) { + hints->domain_attr->domain = dom->domain; + + rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints, &info); + if (!rc) { + kref_get(&dom->cnt); + + mutex_unlock(&fab->dom_list_lock); + + kfi_freeinfo(info); + + return dom; + } + } + mutex_unlock(&fab->dom_list_lock); + + hints->domain_attr->domain = NULL; + + return NULL; +} + +/** + * kfilnd_fab_free() - Free KFI LND fabric. + */ +static void kfilnd_fab_free(struct kref *kref) +{ + struct kfilnd_fab *fab; + + if (!kref) + return; + + fab = container_of(kref, struct kfilnd_fab, cnt); + + mutex_lock(&fab_list_lock); + list_del(&fab->entry); + mutex_unlock(&fab_list_lock); + + kfi_close(&fab->fabric->fid); + LIBCFS_FREE(fab, sizeof(*fab)); +} + +/** + * kfilnd_fab_alloc() - Allocate a new KFI LND fabric. + * @attr: KFI fabric attributes used to allocate the underlying KFI fabric. + * + * A KFI LND fabric (and the underlying KFI fabric) providers access to NICs on + * the same fabric. The underlying KFI fabric should be shared between all NICs + * (KFI domains) on the same fabric. + * + * Return: On success, valid pointer. Else, negative errno pointer. + */ +static struct kfilnd_fab *kfilnd_fab_alloc(struct kfi_fabric_attr *attr) +{ + int rc; + struct kfilnd_fab *fab; + + if (!attr) { + rc = -EINVAL; + goto err; + } + + LIBCFS_ALLOC_GFP(fab, sizeof(*fab), GFP_KERNEL); + if (!fab) { + rc = -ENOMEM; + goto err; + } + + INIT_LIST_HEAD(&fab->dom_list); + mutex_init(&fab->dom_list_lock); + kref_init(&fab->cnt); + + rc = kfi_fabric(attr, &fab->fabric, fab); + if (rc) { + CERROR("Failed to allocate KFI fabric: rc=%d\n", rc); + goto err_free_fab; + } + + mutex_lock(&fab_list_lock); + list_add_tail(&fab->entry, &fab_list); + mutex_unlock(&fab_list_lock); + + return fab; + +err_free_fab: + LIBCFS_FREE(fab, sizeof(*fab)); +err: + return ERR_PTR(rc); +} + +/** + * kfilnd_fab_reuse() - Attempt to reuse an already allocated fabric. + * @node: Node string used to limit fabrics to. + * @service: Service string used to limit fabrics to. + * @hints: Hints used to allocate KFI info structures. + * + * Return: On success (matching fabric is found), valid pointer is returned. + * Else, NULL. + */ +struct kfilnd_fab *kfilnd_fab_reuse(const char *node, const char *service, + struct kfi_info *hints) +{ + struct kfilnd_fab *fab; + struct kfi_info *info; + int rc; + + if (!node || !service || !hints) + return NULL; + + /* Update the hints fabric attribute with an already allocated fabric to + * see if fabrics can be reused. + */ + mutex_lock(&fab_list_lock); + list_for_each_entry(fab, &fab_list, entry) { + hints->fabric_attr->fabric = fab->fabric; + + rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints, &info); + if (!rc) { + kref_get(&fab->cnt); + + mutex_unlock(&fab_list_lock); + + kfi_freeinfo(info); + + return fab; + } + } + mutex_unlock(&fab_list_lock); + + hints->fabric_attr->fabric = NULL; + + return NULL; +} + +/** + * kfi_domain_put() - Put a KFI LND domain reference. + */ +void kfilnd_dom_put(struct kfilnd_dom *dom) +{ + struct kfilnd_fab *fab; + + if (!dom) + return; + + fab = dom->fab; + + kref_put(&dom->cnt, kfilnd_dom_free); + + kref_put(&fab->cnt, kfilnd_fab_free); +} + +/** + * kfilnd_dom_get() - Get a KFI LND domain. + * @ni: LNet NI used to define the KFI LND domain address. + * @node: Node string which can be passed into kfi_getinfo(). + * @dev_info: KFI info structure which should be used to allocate a KFI LND + * device using this domain. + * + * On success, a KFI info structure is returned to the user in addition to a KFI + * LND domain. Callers should free the KFI info structure once done using it. + * + * Return: On success, dev_info is set to a valid KFI info structure and a valid + * KFI LND domain is returned. Else, negative errno pointer is returned. + */ +struct kfilnd_dom *kfilnd_dom_get(struct lnet_ni *ni, const char *node, + struct kfi_info **dev_info) +{ + int rc; + struct kfi_info *hints; + struct kfi_info *info; + struct kfi_info *hints_tmp; + struct kfi_info *info_tmp; + struct kfilnd_fab *fab; + struct kfilnd_dom *dom; + struct kfi_cxi_fabric_ops *fab_ops; + char *service; + + if (!ni || !dev_info) { + rc = -EINVAL; + goto err; + } + + service = kasprintf(GFP_KERNEL, "%u", ni->ni_nid.nid_num); + if (!service) { + rc = -ENOMEM; + goto err; + } + + hints = kfi_allocinfo(); + if (!hints) { + rc = -ENOMEM; + goto err_free_service; + } + + hints->caps = KFI_MSG | KFI_RMA | KFI_SEND | KFI_RECV | KFI_READ | + KFI_WRITE | KFI_REMOTE_READ | KFI_REMOTE_WRITE | + KFI_MULTI_RECV | KFI_REMOTE_COMM | KFI_NAMED_RX_CTX | + KFI_TAGGED | KFI_TAGGED_RMA | KFI_DIRECTED_RECV; + hints->fabric_attr->prov_version = + KFI_VERSION(ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_prov_major_version, + ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_prov_minor_version); + hints->domain_attr->mr_iov_limit = 256; /* 1 MiB LNet message */ + hints->domain_attr->mr_key_size = sizeof(int); + hints->domain_attr->resource_mgmt = KFI_RM_DISABLED; + hints->ep_attr->max_msg_size = LNET_MAX_PAYLOAD; + hints->rx_attr->op_flags = KFI_COMPLETION | KFI_MULTI_RECV; + hints->rx_attr->iov_limit = 256; /* 1 MiB LNet message */ + hints->tx_attr->op_flags = KFI_COMPLETION; + hints->tx_attr->iov_limit = 256; /* 1 MiB LNet message */ + hints->tx_attr->rma_iov_limit = 256; /* 1 MiB LNet message */ + hints->ep_attr->auth_key = + (void *)&ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key; + hints->ep_attr->auth_key_size = + sizeof(ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key); + + /* Check if dynamic resource allocation is supported. + * Set dynamic resource alloc hints if it is. + * + * Need to check if op is supported since due to a bug can't + * simply set ctx_cnts greater than 1 (default value) if it isn't. + */ + hints_tmp = kfi_dupinfo(hints); + if (hints_tmp) { + rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints_tmp, + &info_tmp); + if (!rc) { + fab = kfilnd_fab_alloc(info_tmp->fabric_attr); + if (!IS_ERR(fab)) { + rc = kfi_open_ops(&fab->fabric->fid, + KFI_CXI_FAB_OPS_1, 0, (void **)&fab_ops, + NULL); + if (!rc) { + /* Set dynamic resource alloc hints */ + hints->domain_attr->cq_cnt = ni->ni_ncpts * 2; + hints->domain_attr->tx_ctx_cnt = ni->ni_ncpts; + hints->domain_attr->rx_ctx_cnt = ni->ni_ncpts; + hints->rx_attr->size = + ni->ni_net->net_tunables.lct_max_tx_credits + + immediate_rx_buf_count; + } + kref_put(&fab->cnt, kfilnd_fab_free); + } + kfi_freeinfo(info_tmp); + } + kfi_freeinfo(hints_tmp); + } + + /* Check to see if any KFI LND fabrics/domains can be reused. */ + fab = kfilnd_fab_reuse(node, service, hints); + dom = kfilnd_dom_reuse(node, service, hints, fab); + + if (fab) + hints->fabric_attr->fabric = fab->fabric; + if (dom) + hints->domain_attr->domain = dom->domain; + + /* Allocate the official KFI info structure to be used for KFI LND + * device allocation. + */ + rc = kfi_getinfo(0, node, service, KFI_SOURCE, hints, &info); + + /* Authorization key information is now stored in the returned kfi_info + * structure. Since kfi_freeinfo() will try to free the auth_key pointer + * and this memory is owned as part of the LNet NI, need to zero this + * information in the hints to prevent LNet NI corruption. + */ + hints->ep_attr->auth_key = NULL; + hints->ep_attr->auth_key_size = 0; + + kfi_freeinfo(hints); + kfree(service); + node = NULL; + service = NULL; + + if (rc) + goto err_free_service; + + /* Allocate a new KFI LND fabric and domain if necessary. */ + if (!fab) { + fab = kfilnd_fab_alloc(info->fabric_attr); + if (IS_ERR(fab)) { + rc = PTR_ERR(fab); + goto err_free_info; + } + } + + if (!dom) { + /* Enable dynamic resource allocation if operation supported */ + rc = kfi_open_ops(&fab->fabric->fid, KFI_CXI_FAB_OPS_1, 0, + (void **)&fab_ops, NULL); + if (!rc) { + rc = fab_ops->enable_dynamic_rsrc_alloc(&fab->fabric->fid, true); + if (!rc) + CDEBUG(D_NET, "Enabled dynamic resource allocation for KFI domain\n"); + } + dom = kfilnd_dom_alloc(info, fab); + if (IS_ERR(dom)) { + rc = PTR_ERR(dom); + goto err_put_fab; + } + } + + *dev_info = info; + + return dom; + +err_put_fab: + kref_put(&fab->cnt, kfilnd_fab_free); +err_free_info: + kfi_freeinfo(info); +err_free_service: + kfree(service); +err: + return ERR_PTR(rc); +} diff --git a/lnet/klnds/kfilnd/kfilnd_dom.h b/lnet/klnds/kfilnd/kfilnd_dom.h new file mode 100644 index 0000000..94649ad --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_dom.h @@ -0,0 +1,40 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd domain implementation. + */ +#ifndef _KFILND_DOM_ +#define _KFILND_DOM_ + +#include "kfilnd.h" + +void kfilnd_dom_put(struct kfilnd_dom *dom); +struct kfilnd_dom *kfilnd_dom_get(struct lnet_ni *ni, const char *node, + struct kfi_info **dev_info); + +#endif /* _KFILND_DOM_ */ diff --git a/lnet/klnds/kfilnd/kfilnd_ep.c b/lnet/klnds/kfilnd/kfilnd_ep.c new file mode 100644 index 0000000..72b4e0e --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_ep.c @@ -0,0 +1,954 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd endpoint implementation. + */ +#include "kfilnd_ep.h" +#include "kfilnd_dev.h" +#include "kfilnd_tn.h" +#include "kfilnd_cq.h" + +/** + * kfilnd_ep_post_recv() - Post a single receive buffer. + * @ep: KFI LND endpoint to have receive buffers posted on. + * @buf: Receive buffer to be posted. + * + * Return: On succes, zero. Else, negative errno. + */ +static int kfilnd_ep_post_recv(struct kfilnd_ep *ep, + struct kfilnd_immediate_buffer *buf) +{ + int rc; + + if (!ep || !buf) + return -EINVAL; + + if (buf->immed_no_repost) + return 0; + + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV)) + return -EIO; + else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV_EAGAIN)) + return -EAGAIN; + + atomic_inc(&buf->immed_ref); + rc = kfi_recv(ep->end_rx, buf->immed_buf, buf->immed_buf_size, NULL, + KFI_ADDR_UNSPEC, buf); + if (rc) + atomic_dec(&buf->immed_ref); + + return rc; +} + +#define KFILND_EP_REPLAY_TIMER_MSEC (100U) + +/** + * kfilnd_ep_imm_buffer_put() - Decrement the immediate buffer count reference + * counter. + * @buf: Immediate buffer to have reference count decremented. + * + * If the immediate buffer's reference count reaches zero, the buffer will + * automatically be reposted. + */ +void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf) +{ + unsigned long expires; + int rc; + + if (!buf) + return; + + if (atomic_sub_return(1, &buf->immed_ref) != 0) + return; + + rc = kfilnd_ep_post_recv(buf->immed_end, buf); + switch (rc) { + case 0: + break; + + /* Return the buffer reference and queue the immediate buffer put to be + * replayed. + */ + case -EAGAIN: + expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) + + jiffies; + atomic_inc(&buf->immed_ref); + + spin_lock(&buf->immed_end->replay_lock); + list_add_tail(&buf->replay_entry, + &buf->immed_end->imm_buffer_replay); + atomic_inc(&buf->immed_end->replay_count); + spin_unlock(&buf->immed_end->replay_lock); + + if (!timer_pending(&buf->immed_end->replay_timer)) + mod_timer(&buf->immed_end->replay_timer, expires); + break; + + /* Unexpected error resulting in immediate buffer not being able to be + * posted. Since immediate buffers are used to sink incoming messages, + * failure to post immediate buffers means failure to communicate. + * + * TODO: Prevent LNet NI from doing sends/recvs? + */ + default: + KFILND_EP_ERROR(buf->immed_end, + "Failed to post immediate receive buffer: rc=%d", + rc); + } +} + +/** + * kfilnd_ep_post_imm_buffers() - Post all immediate receive buffers. + * @ep: KFI LND endpoint to have receive buffers posted on. + * + * This function should be called only during KFI LND device initialization. + * + * Return: On success, zero. Else, negative errno. + */ +int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep) +{ + int rc = 0; + int i; + + if (!ep) + return -EINVAL; + + for (i = 0; i < immediate_rx_buf_count; i++) { + rc = kfilnd_ep_post_recv(ep, &ep->end_immed_bufs[i]); + if (rc) + goto out; + } + +out: + return rc; +} + +/** + * kfilnd_ep_cancel_imm_buffers() - Cancel all immediate receive buffers. + * @ep: KFI LND endpoint to have receive buffers canceled. + */ +void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep) +{ + int i; + + if (!ep) + return; + + for (i = 0; i < immediate_rx_buf_count; i++) { + ep->end_immed_bufs[i].immed_no_repost = true; + + /* Since this is called during LNet NI teardown, no need to + * pipeline retries. Just spin until -EAGAIN is not returned. + */ + while (kfi_cancel(&ep->end_rx->fid, &ep->end_immed_bufs[i]) == + -EAGAIN) + schedule(); + } +} + +static void kfilnd_ep_err_fail_loc_work(struct work_struct *work) +{ + struct kfilnd_ep_err_fail_loc_work *err = + container_of(work, struct kfilnd_ep_err_fail_loc_work, work); + + kfilnd_cq_process_error(err->ep, &err->err); + kfree(err); +} + +static int kfilnd_ep_gen_fake_err(struct kfilnd_ep *ep, + const struct kfi_cq_err_entry *err) +{ + struct kfilnd_ep_err_fail_loc_work *fake_err; + + fake_err = kmalloc(sizeof(*fake_err), GFP_KERNEL); + if (!fake_err) + return -ENOMEM; + + fake_err->ep = ep; + fake_err->err = *err; + INIT_WORK(&fake_err->work, kfilnd_ep_err_fail_loc_work); + queue_work(kfilnd_wq, &fake_err->work); + + return 0; +} + +static uint64_t gen_init_tag_bits(struct kfilnd_transaction *tn) +{ + return (tn->peer->remote_session_key << KFILND_EP_KEY_BITS) | + tn->tn_response_mr_key; +} + +/** + * kfilnd_ep_post_tagged_send() - Post a tagged send operation. + * @ep: KFI LND endpoint used to post the tagged receivce operation. + * @tn: Transaction structure containing the send buffer to be posted. + * + * The tag for the post tagged send operation is the response memory region key + * associated with the transaction. + * + * Return: On success, zero. Else, negative errno value. + */ +int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn) +{ + struct kfi_cq_err_entry fake_error = { + .op_context = tn, + .flags = KFI_TAGGED | KFI_SEND, + .err = EIO, + }; + int rc; + + if (!ep || !tn) + return -EINVAL; + + /* Make sure the device is not being shut down */ + if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED) + return -EINVAL; + + /* Progress transaction to failure if send should fail. */ + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EVENT)) { + rc = kfilnd_ep_gen_fake_err(ep, &fake_error); + if (!rc) + return 0; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND)) { + return -EIO; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EAGAIN)) { + return -EAGAIN; + } + + rc = kfi_tsenddata(ep->end_tx, NULL, 0, NULL, tn->tagged_data, + tn->tn_target_addr, gen_init_tag_bits(tn), tn); + switch (rc) { + case 0: + case -EAGAIN: + KFILND_EP_DEBUG(ep, + "Transaction ID %p: %s tagged send of with tag 0x%x to peer 0x%llx: rc=%d", + tn, rc ? "Failed to post" : "Posted", + tn->tn_response_mr_key, tn->tn_target_addr, rc); + break; + + default: + KFILND_EP_ERROR(ep, + "Transaction ID %p: Failed to post tagged send with tag 0x%x to peer 0x%llx: rc=%d", + tn, tn->tn_response_mr_key, + tn->tn_target_addr, rc); + } + + return rc; +} + +/** + * kfilnd_ep_cancel_tagged_recv() - Cancel a tagged recv. + * @ep: KFI LND endpoint used to cancel the tagged receivce operation. + * @tn: Transaction structure containing the receive buffer to be cancelled. + * + * The tagged receive buffer context pointer is used to cancel a tagged receive + * operation. The context pointer is always the transaction pointer. + * + * Return: 0 on success. -ENOENT if the tagged receive buffer is not found. The + * tagged receive buffer may not be found due to a tagged send operation already + * landing or the tagged receive buffer never being posted. Negative errno value + * on error. + */ +int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn) +{ + if (!ep || !tn) + return -EINVAL; + + /* Make sure the device is not being shut down */ + if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED) + return -EINVAL; + + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN)) + return -EAGAIN; + + /* The async event count is not decremented for a cancel operation since + * it was incremented for the post tagged receive. + */ + return kfi_cancel(&ep->end_rx->fid, tn); +} + +static uint64_t gen_target_tag_bits(struct kfilnd_transaction *tn) +{ + return (tn->peer->local_session_key << KFILND_EP_KEY_BITS) | + tn->tn_mr_key; +} + +/** + * kfilnd_ep_post_tagged_recv() - Post a tagged receive operation. + * @ep: KFI LND endpoint used to post the tagged receivce operation. + * @tn: Transaction structure containing the receive buffer to be posted. + * + * The tag for the post tagged receive operation is the memory region key + * associated with the transaction. + * + * Return: On success, zero. Else, negative errno value. + */ +int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn) +{ + struct kfi_msg_tagged msg = { + .tag = gen_target_tag_bits(tn), + .context = tn, + .addr = tn->peer->addr, + }; + struct kfi_cq_err_entry fake_error = { + .op_context = tn, + .flags = KFI_TAGGED | KFI_RECV, + .err = EIO, + }; + int rc; + + if (!ep || !tn) + return -EINVAL; + + /* Make sure the device is not being shut down */ + if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED) + return -EINVAL; + + /* Progress transaction to failure if send should fail. */ + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EVENT)) { + rc = kfilnd_ep_gen_fake_err(ep, &fake_error); + if (!rc) + return 0; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV)) { + return -EIO; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EAGAIN)) { + return -EAGAIN; + } + + msg.iov_count = tn->tn_num_iovec; + msg.type = KFI_BVEC; + msg.msg_biov = tn->tn_kiov; + + rc = kfi_trecvmsg(ep->end_rx, &msg, KFI_COMPLETION); + switch (rc) { + case 0: + case -EAGAIN: + KFILND_EP_DEBUG(ep, + "Transaction ID %p: %s tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d", + tn, rc ? "Failed to post" : "Posted", + tn->tn_nob, tn->tn_num_iovec, msg.tag, rc); + break; + + default: + KFILND_EP_ERROR(ep, + "Transaction ID %p: Failed to post tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d", + tn, tn->tn_nob, tn->tn_num_iovec, msg.tag, rc); + } + + return rc; +} + +/** + * kfilnd_ep_post_send() - Post a send operation. + * @ep: KFI LND endpoint used to post the send operation. + * @tn: Transaction structure containing the buffer to be sent. + * + * The target of the send operation is based on the target LNet NID field within + * the transaction structure. A lookup of LNet NID to KFI address is performed. + * + * Return: On success, zero. Else, negative errno value. + */ +int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn) +{ + size_t len; + void *buf; + struct kfi_cq_err_entry fake_error = { + .op_context = tn, + .flags = KFI_MSG | KFI_SEND, + .err = EIO, + }; + int rc; + + if (!ep || !tn) + return -EINVAL; + + buf = tn->tn_tx_msg.msg; + len = tn->tn_tx_msg.length; + + /* Make sure the device is not being shut down */ + if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED) + return -EINVAL; + + /* Progress transaction to failure if send should fail. */ + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EVENT)) { + rc = kfilnd_ep_gen_fake_err(ep, &fake_error); + if (!rc) + return 0; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND)) { + return -EIO; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EAGAIN)) { + return -EAGAIN; + } + + rc = kfi_send(ep->end_tx, buf, len, NULL, tn->tn_target_addr, tn); + switch (rc) { + case 0: + case -EAGAIN: + KFILND_EP_DEBUG(ep, + "Transaction ID %p: %s send of %lu bytes to peer 0x%llx: rc=%d", + tn, rc ? "Failed to post" : "Posted", + len, tn->tn_target_addr, rc); + break; + + default: + KFILND_EP_ERROR(ep, + "Transaction ID %p: Failed to post send of %lu bytes to peer 0x%llx: rc=%d", + tn, len, tn->tn_target_addr, rc); + } + + return rc; +} + +/** + * kfilnd_ep_post_write() - Post a write operation. + * @ep: KFI LND endpoint used to post the write operation. + * @tn: Transaction structure containing the buffer to be read from. + * + * The target of the write operation is based on the target LNet NID field + * within the transaction structure. A lookup of LNet NID to KFI address is + * performed. + * + * The transaction cookie is used as the remote key for the target memory + * region. + * + * Return: On success, zero. Else, negative errno value. + */ +int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn) +{ + int rc; + struct kfi_cq_err_entry fake_error = { + .op_context = tn, + .flags = KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND, + .err = EIO, + }; + struct kfi_rma_iov rma_iov = { + .len = tn->tn_nob, + .key = gen_init_tag_bits(tn), + }; + struct kfi_msg_rma rma = { + .addr = tn->tn_target_addr, + .rma_iov = &rma_iov, + .rma_iov_count = 1, + .context = tn, + }; + + if (!ep || !tn) + return -EINVAL; + + /* Make sure the device is not being shut down */ + if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED) + return -EINVAL; + + /* Progress transaction to failure if read should fail. */ + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EVENT)) { + rc = kfilnd_ep_gen_fake_err(ep, &fake_error); + if (!rc) + return 0; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE)) { + return -EIO; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EAGAIN)) { + return -EAGAIN; + } + + rma.iov_count = tn->tn_num_iovec; + rma.type = KFI_BVEC; + rma.msg_biov = tn->tn_kiov; + + rc = kfi_writemsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION); + switch (rc) { + case 0: + case -EAGAIN: + KFILND_EP_DEBUG(ep, + "Transaction ID %p: %s write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d", + tn, rc ? "Failed to post" : "Posted", + tn->tn_nob, tn->tn_num_iovec, + tn->tn_response_mr_key, tn->tn_target_addr, rc); + break; + + default: + KFILND_EP_ERROR(ep, + "Transaction ID %p: Failed to post write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d", + tn, tn->tn_nob, tn->tn_num_iovec, + tn->tn_response_mr_key, tn->tn_target_addr, + rc); + } + + return rc; +} + +/** + * kfilnd_ep_post_read() - Post a read operation. + * @ep: KFI LND endpoint used to post the read operation. + * @tn: Transaction structure containing the buffer to be read into. + * + * The target of the read operation is based on the target LNet NID field within + * the transaction structure. A lookup of LNet NID to KFI address is performed. + * + * The transaction cookie is used as the remote key for the target memory + * region. + * + * Return: On success, zero. Else, negative errno value. + */ +int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn) +{ + int rc; + struct kfi_cq_err_entry fake_error = { + .op_context = tn, + .flags = KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND, + .err = EIO, + }; + struct kfi_rma_iov rma_iov = { + .len = tn->tn_nob, + .key = gen_init_tag_bits(tn), + }; + struct kfi_msg_rma rma = { + .addr = tn->tn_target_addr, + .rma_iov = &rma_iov, + .rma_iov_count = 1, + .context = tn, + }; + + if (!ep || !tn) + return -EINVAL; + + /* Make sure the device is not being shut down */ + if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED) + return -EINVAL; + + /* Progress transaction to failure if read should fail. */ + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EVENT)) { + rc = kfilnd_ep_gen_fake_err(ep, &fake_error); + if (!rc) + return 0; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ)) { + return -EIO; + } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EAGAIN)) { + return -EAGAIN; + } + + rma.iov_count = tn->tn_num_iovec; + rma.type = KFI_BVEC; + rma.msg_biov = tn->tn_kiov; + + rc = kfi_readmsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION); + switch (rc) { + case 0: + case -EAGAIN: + KFILND_EP_DEBUG(ep, + "Transaction ID %p: %s read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d", + tn, rc ? "Failed to post" : "Posted", + tn->tn_nob, tn->tn_num_iovec, + tn->tn_response_mr_key, tn->tn_target_addr, rc); + break; + + default: + KFILND_EP_ERROR(ep, + "Transaction ID %p: Failed to post read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d", + tn, tn->tn_nob, tn->tn_num_iovec, + tn->tn_response_mr_key, tn->tn_target_addr, rc); + } + + return rc; +} + +void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn) +{ + unsigned long expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) + + jiffies; + + spin_lock(&ep->replay_lock); + list_add_tail(&tn->replay_entry, &ep->tn_replay); + atomic_inc(&ep->replay_count); + spin_unlock(&ep->replay_lock); + + if (!timer_pending(&ep->replay_timer)) + mod_timer(&ep->replay_timer, expires); +} + +void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep) +{ + LIST_HEAD(tn_replay); + LIST_HEAD(imm_buf_replay); + struct kfilnd_transaction *tn_first; + struct kfilnd_transaction *tn_last; + struct kfilnd_immediate_buffer *buf_first; + struct kfilnd_immediate_buffer *buf_last; + + /* Since the endpoint replay lists can be manipulated while + * attempting to do replays, the entire replay list is moved to a + * temporary list. + */ + spin_lock(&ep->replay_lock); + + tn_first = list_first_entry_or_null(&ep->tn_replay, + struct kfilnd_transaction, + replay_entry); + if (tn_first) { + tn_last = list_last_entry(&ep->tn_replay, + struct kfilnd_transaction, + replay_entry); + list_bulk_move_tail(&tn_replay, &tn_first->replay_entry, + &tn_last->replay_entry); + LASSERT(list_empty(&ep->tn_replay)); + } + + buf_first = list_first_entry_or_null(&ep->imm_buffer_replay, + struct kfilnd_immediate_buffer, + replay_entry); + if (buf_first) { + buf_last = list_last_entry(&ep->imm_buffer_replay, + struct kfilnd_immediate_buffer, + replay_entry); + list_bulk_move_tail(&imm_buf_replay, &buf_first->replay_entry, + &buf_last->replay_entry); + LASSERT(list_empty(&ep->imm_buffer_replay)); + } + + spin_unlock(&ep->replay_lock); + + /* Replay all queued transactions. */ + list_for_each_entry_safe(tn_first, tn_last, &tn_replay, replay_entry) { + list_del(&tn_first->replay_entry); + atomic_dec(&ep->replay_count); + kfilnd_tn_event_handler(tn_first, tn_first->replay_event, + tn_first->replay_status); + } + + list_for_each_entry_safe(buf_first, buf_last, &imm_buf_replay, + replay_entry) { + list_del(&buf_first->replay_entry); + atomic_dec(&ep->replay_count); + kfilnd_ep_imm_buffer_put(buf_first); + } +} + +static void kfilnd_ep_replay_work(struct work_struct *work) +{ + struct kfilnd_ep *ep = + container_of(work, struct kfilnd_ep, replay_work); + + kfilnd_ep_flush_replay_queue(ep); +} + +static void kfilnd_ep_replay_timer(cfs_timer_cb_arg_t data) +{ + struct kfilnd_ep *ep = cfs_from_timer(ep, data, replay_timer); + unsigned int cpu = + cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt)); + + queue_work_on(cpu, kfilnd_wq, &ep->replay_work); +} + +#define KFILND_EP_ALLOC_SIZE \ + (sizeof(struct kfilnd_ep) + \ + (sizeof(struct kfilnd_immediate_buffer) * immediate_rx_buf_count)) + +/** + * kfilnd_ep_free() - Free a KFI LND endpoint. + * @ep: KFI LND endpoint to be freed. + * + * Safe to call on NULL or error pointer. + */ +void kfilnd_ep_free(struct kfilnd_ep *ep) +{ + int i; + int k = 2; + + if (IS_ERR_OR_NULL(ep)) + return; + + while (atomic_read(&ep->replay_count)) { + k++; + CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET, + "Waiting for replay count %d not zero\n", + atomic_read(&ep->replay_count)); + schedule_timeout_uninterruptible(HZ); + } + + /* Cancel any outstanding immediate receive buffers. */ + kfilnd_ep_cancel_imm_buffers(ep); + + /* Wait for RX buffers to no longer be used and then free them. */ + for (i = 0; i < immediate_rx_buf_count; i++) { + k = 2; + while (atomic_read(&ep->end_immed_bufs[i].immed_ref)) { + k++; + CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET, + "Waiting for RX buffer %d to release\n", i); + schedule_timeout_uninterruptible(HZ); + } + } + + /* Wait for all transactions to complete. */ + k = 2; + spin_lock(&ep->tn_list_lock); + while (!list_empty(&ep->tn_list)) { + spin_unlock(&ep->tn_list_lock); + k++; + CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET, + "Waiting for transactions to complete\n"); + schedule_timeout_uninterruptible(HZ); + spin_lock(&ep->tn_list_lock); + } + spin_unlock(&ep->tn_list_lock); + + /* Free all immediate buffers. */ + for (i = 0; i < immediate_rx_buf_count; i++) + __free_pages(ep->end_immed_bufs[i].immed_buf_page, + order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE)); + + kfi_close(&ep->end_tx->fid); + kfi_close(&ep->end_rx->fid); + kfilnd_cq_free(ep->end_tx_cq); + kfilnd_cq_free(ep->end_rx_cq); + ida_destroy(&ep->keys); + LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE); +} + +/** + * kfilnd_ep_alloc() - Allocate a new KFI LND endpoint. + * @dev: KFI LND device used to allocate endpoints. + * @context_id: Context ID associated with the endpoint. + * @cpt: CPT KFI LND endpoint should be associated with. + * + * An KFI LND endpoint consists of unique transmit/receive command queues + * (contexts) and completion queues. The underlying completion queue interrupt + * vector is associated with a core within the CPT. + * + * Return: On success, valid pointer. Else, negative errno pointer. + */ +struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev, + unsigned int context_id, unsigned int cpt, + size_t nrx, size_t rx_size) +{ + int rc; + struct kfi_cq_attr cq_attr = {}; + struct kfi_rx_attr rx_attr = {}; + struct kfi_tx_attr tx_attr = {}; + int ncpts; + size_t min_multi_recv = KFILND_IMMEDIATE_MSG_SIZE; + struct kfilnd_ep *ep; + int i; + size_t rx_buf_size; + + if (!dev || !nrx || !rx_size) { + rc = -EINVAL; + goto err; + } + + ncpts = dev->kfd_ni->ni_ncpts; + + LIBCFS_CPT_ALLOC(ep, lnet_cpt_table(), cpt, KFILND_EP_ALLOC_SIZE); + if (!ep) { + rc = -ENOMEM; + goto err; + } + + ep->end_dev = dev; + ep->end_cpt = cpt; + ep->end_context_id = context_id; + INIT_LIST_HEAD(&ep->tn_list); + spin_lock_init(&ep->tn_list_lock); + INIT_LIST_HEAD(&ep->tn_replay); + INIT_LIST_HEAD(&ep->imm_buffer_replay); + spin_lock_init(&ep->replay_lock); + cfs_timer_setup(&ep->replay_timer, kfilnd_ep_replay_timer, + (unsigned long)ep, 0); + INIT_WORK(&ep->replay_work, kfilnd_ep_replay_work); + atomic_set(&ep->replay_count, 0); + ida_init(&ep->keys); + + /* Create a CQ for this CPT */ + cq_attr.flags = KFI_AFFINITY; + cq_attr.format = KFI_CQ_FORMAT_DATA; + cq_attr.wait_cond = KFI_CQ_COND_NONE; + cq_attr.wait_obj = KFI_WAIT_NONE; + + /* Vector is set to first core in the CPT */ + cq_attr.signaling_vector = + cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), cpt)); + + cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits * + rx_cq_scale_factor; + ep->end_rx_cq = kfilnd_cq_alloc(ep, &cq_attr); + if (IS_ERR(ep->end_rx_cq)) { + rc = PTR_ERR(ep->end_rx_cq); + CERROR("Failed to allocated KFILND RX CQ: rc=%d\n", rc); + goto err_free_ep; + } + + cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits * + tx_cq_scale_factor; + ep->end_tx_cq = kfilnd_cq_alloc(ep, &cq_attr); + if (IS_ERR(ep->end_tx_cq)) { + rc = PTR_ERR(ep->end_tx_cq); + CERROR("Failed to allocated KFILND TX CQ: rc=%d\n", rc); + goto err_free_rx_cq; + } + + /* Initialize the RX/TX contexts for the given CPT */ + rx_attr.op_flags = KFI_COMPLETION | KFI_MULTI_RECV; + rx_attr.msg_order = KFI_ORDER_NONE; + rx_attr.comp_order = KFI_ORDER_NONE; + rx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits + + immediate_rx_buf_count; + rx_attr.iov_limit = LNET_MAX_IOV; + rc = kfi_rx_context(dev->kfd_sep, context_id, &rx_attr, &ep->end_rx, + ep); + if (rc) { + CERROR("Could not create RX context on CPT %d, rc = %d\n", cpt, + rc); + goto err_free_tx_cq; + } + + /* Set the lower limit for multi-receive buffers */ + rc = kfi_setopt(&ep->end_rx->fid, KFI_OPT_ENDPOINT, + KFI_OPT_MIN_MULTI_RECV, &min_multi_recv, + sizeof(min_multi_recv)); + if (rc) { + CERROR("Could not set min_multi_recv on CPT %d, rc = %d\n", cpt, + rc); + goto err_free_rx_context; + } + + tx_attr.op_flags = KFI_COMPLETION | KFI_TRANSMIT_COMPLETE; + tx_attr.msg_order = KFI_ORDER_NONE; + tx_attr.comp_order = KFI_ORDER_NONE; + tx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits * + tx_scale_factor; + tx_attr.iov_limit = LNET_MAX_IOV; + tx_attr.rma_iov_limit = LNET_MAX_IOV; + rc = kfi_tx_context(dev->kfd_sep, context_id, &tx_attr, &ep->end_tx, + ep); + if (rc) { + CERROR("Could not create TX context on CPT %d, rc = %d\n", cpt, + rc); + goto err_free_rx_context; + } + + /* Bind these two contexts to the CPT's CQ */ + rc = kfi_ep_bind(ep->end_rx, &ep->end_rx_cq->cq->fid, 0); + if (rc) { + CERROR("Could not bind RX context on CPT %d, rc = %d\n", cpt, + rc); + goto err_free_tx_context; + } + + rc = kfi_ep_bind(ep->end_tx, &ep->end_tx_cq->cq->fid, 0); + if (rc) { + CERROR("Could not bind TX context on CPT %d, rc = %d\n", cpt, + rc); + goto err_free_tx_context; + } + + /* Enable both endpoints */ + rc = kfi_enable(ep->end_rx); + if (rc) { + CERROR("Could not enable RX context on CPT %d, rc = %d\n", cpt, + rc); + goto err_free_tx_context; + } + + rc = kfi_enable(ep->end_tx); + if (rc) { + CERROR("Could not enable TX context on CPT %d, rc=%d\n", cpt, + rc); + goto err_free_tx_context; + } + + /* The nrx value is the max number of immediate messages any one peer + * can send us. Given that compute nodes are RPC-based, we should not + * see any more incoming messages than we are able to send. A such, nrx + * is a good size for each multi-receive buffer. However, if we are + * a server or LNet router, we need a multiplier of this value. For + * now, we will just have nrx drive the buffer size per CPT. Then, + * LNet routers and servers can just define more CPTs to get a better + * spread of buffers to receive messages from multiple peers. A better + * way should be devised in the future. + */ + rx_buf_size = roundup_pow_of_two(max(nrx * rx_size, PAGE_SIZE)); + + for (i = 0; i < immediate_rx_buf_count; i++) { + + /* Using physically contiguous allocations can allow for + * underlying kfabric providers to use untranslated addressing + * instead of having to setup NIC memory mappings. This + * typically leads to improved performance. + */ + ep->end_immed_bufs[i].immed_buf_page = + alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt), + GFP_KERNEL | __GFP_NOWARN, + order_base_2(rx_buf_size / PAGE_SIZE)); + if (!ep->end_immed_bufs[i].immed_buf_page) { + rc = -ENOMEM; + goto err_free_rx_buffers; + } + + atomic_set(&ep->end_immed_bufs[i].immed_ref, 0); + ep->end_immed_bufs[i].immed_buf = + page_address(ep->end_immed_bufs[i].immed_buf_page); + ep->end_immed_bufs[i].immed_buf_size = rx_buf_size; + ep->end_immed_bufs[i].immed_end = ep; + } + + return ep; + +err_free_rx_buffers: + for (i = 0; i < immediate_rx_buf_count; i++) { + if (ep->end_immed_bufs[i].immed_buf_page) + __free_pages(ep->end_immed_bufs[i].immed_buf_page, + order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE)); + } + +err_free_tx_context: + kfi_close(&ep->end_tx->fid); +err_free_rx_context: + kfi_close(&ep->end_rx->fid); +err_free_tx_cq: + kfilnd_cq_free(ep->end_tx_cq); +err_free_rx_cq: + kfilnd_cq_free(ep->end_rx_cq); +err_free_ep: + LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE); +err: + return ERR_PTR(rc); +} + +int kfilnd_ep_get_key(struct kfilnd_ep *ep) +{ + return ida_simple_get(&ep->keys, 1, KFILND_EP_KEY_MAX, GFP_KERNEL); +} + +void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key) +{ + ida_simple_remove(&ep->keys, key); +} diff --git a/lnet/klnds/kfilnd/kfilnd_ep.h b/lnet/klnds/kfilnd/kfilnd_ep.h new file mode 100644 index 0000000..6b83fdf --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_ep.h @@ -0,0 +1,70 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +#ifndef _KFILND_EP_ +#define _KFILND_EP_ + +#include "kfilnd.h" + +struct kfilnd_ep_err_fail_loc_work { + struct kfilnd_ep *ep; + struct work_struct work; + struct kfi_cq_err_entry err; +}; + +static inline bool kfilnd_ep_replays_pending(struct kfilnd_ep *ep) +{ + return atomic_read(&ep->replay_count) > 0; +}; + +void kfilnd_ep_dereg_mr(struct kfilnd_ep *ep, struct kfilnd_transaction *tn); +int kfilnd_ep_reg_mr(struct kfilnd_ep *ep, struct kfilnd_transaction *tn); +int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn); +int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn); +int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn); +int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn); +int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn); +int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn); +void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf); +int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep); +void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep); +void kfilnd_ep_free(struct kfilnd_ep *ep); +struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev, + unsigned int context_id, unsigned int cpt, + size_t nrx, size_t rx_size); +void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep); +void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep, + struct kfilnd_transaction *tn); + +int kfilnd_ep_get_key(struct kfilnd_ep *ep); +void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key); + + +#endif /* _KFILND_EP_ */ diff --git a/lnet/klnds/kfilnd/kfilnd_modparams.c b/lnet/klnds/kfilnd/kfilnd_modparams.c new file mode 100644 index 0000000..798983d --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_modparams.c @@ -0,0 +1,183 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd module parameters + */ + +#include "kfilnd.h" + +unsigned int cksum; +module_param(cksum, uint, 0444); +MODULE_PARM_DESC(cksum, "Enable checksums for non-zero messages (not RDMA)"); + +/* Scale factor for TX context queue depth. The factor is applied to the number + * of credits to determine queue depth. + */ +unsigned int tx_scale_factor = 2; +module_param(tx_scale_factor, uint, 0444); +MODULE_PARM_DESC(tx_scale_factor, + "Factor applied to credits to determine TX context size"); + +/* Scale factor for TX and RX completion queue depth. The factor is applied to + * the number of credits to determine queue depth. + */ +unsigned int rx_cq_scale_factor = 10; +module_param(rx_cq_scale_factor, uint, 0444); +MODULE_PARM_DESC(rx_cq_scale_factor, + "Factor applied to credits to determine RX CQ size"); + +unsigned int tx_cq_scale_factor = 10; +module_param(tx_cq_scale_factor, uint, 0444); +MODULE_PARM_DESC(tx_cq_scale_factor, + "Factor applied to credits to determine TX CQ size"); + +unsigned int eq_size = 1024; +module_param(eq_size, uint, 0444); +MODULE_PARM_DESC(eq_size, "Default event queue size used by all kfi LNet NIs"); + +unsigned int immediate_rx_buf_count = 2; +module_param(immediate_rx_buf_count, uint, 0444); +MODULE_PARM_DESC(immediate_rx_buf_count, + "Number of immediate multi-receive buffers posted per CPT"); + +/* Common LND network tunables. */ +static int credits = 256; +module_param(credits, int, 0444); +MODULE_PARM_DESC(credits, "Number of concurrent sends on network"); + +static int peer_credits = 128; +module_param(peer_credits, int, 0444); +MODULE_PARM_DESC(peer_credits, "Number of concurrent sends to 1 peer"); + +static int peer_buffer_credits = -1; +module_param(peer_buffer_credits, int, 0444); +MODULE_PARM_DESC(peer_buffer_credits, + "Number of per-peer router buffer credits"); + +static int peer_timeout = -1; +module_param(peer_timeout, int, 0444); +MODULE_PARM_DESC(peer_timeout, + "Seconds without aliveness news to declare peer dead (less than or equal to 0 to disable)."); + +static unsigned int prov_major_version = 1; +module_param(prov_major_version, int, 0444); +MODULE_PARM_DESC(prov_major_version, + "Default kfabric provider major version kfilnd should use"); + +static unsigned int prov_minor_version; +module_param(prov_minor_version, int, 0444); +MODULE_PARM_DESC(prov_minor_version, + "Default kfabric provider minor version kfilnd should use"); + +static unsigned int auth_key = 255; +module_param(auth_key, uint, 0444); +MODULE_PARM_DESC(auth_key, "Default authorization key to be used for LNet NIs"); + +int kfilnd_tunables_setup(struct lnet_ni *ni) +{ + struct lnet_ioctl_config_lnd_cmn_tunables *net_tunables; + struct lnet_ioctl_config_kfilnd_tunables *kfilnd_tunables; + + net_tunables = &ni->ni_net->net_tunables; + kfilnd_tunables = &ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi; + + if (!ni->ni_net->net_tunables_set) { + net_tunables->lct_max_tx_credits = credits; + net_tunables->lct_peer_tx_credits = peer_credits; + net_tunables->lct_peer_rtr_credits = peer_buffer_credits; + net_tunables->lct_peer_timeout = peer_timeout; + + if (net_tunables->lct_peer_tx_credits > + net_tunables->lct_max_tx_credits) + net_tunables->lct_peer_tx_credits = + net_tunables->lct_max_tx_credits; + } + + kfilnd_tunables->lnd_version = KFILND_MSG_VERSION; + if (!ni->ni_lnd_tunables_set) { + kfilnd_tunables->lnd_prov_major_version = prov_major_version; + kfilnd_tunables->lnd_prov_minor_version = prov_minor_version; + + /* Treat zero as uninitialized. */ + if (ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key == 0) + ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_auth_key = + auth_key; + } + + if (net_tunables->lct_max_tx_credits > KFILND_EP_KEY_MAX) { + CERROR("Credits cannot exceed %lu\n", KFILND_EP_KEY_MAX); + return -EINVAL; + } + + if (net_tunables->lct_peer_tx_credits > KFILND_EP_KEY_MAX) { + CERROR("Peer credits cannot exceed %lu\n", KFILND_EP_KEY_MAX); + return -EINVAL; + } + + return 0; +} + +int kfilnd_tunables_init(void) +{ + if (tx_scale_factor < 1) { + CERROR("TX context scale factor less than 1"); + return -EINVAL; + } + + if (rx_cq_scale_factor < 1) { + CERROR("RX CQ scale factor less than 1"); + return -EINVAL; + } + + if (tx_cq_scale_factor < 1) { + CERROR("TX CQ scale factor less than 1"); + return -EINVAL; + } + + if (immediate_rx_buf_count < 2) { + CERROR("Immediate multi-receive buffer count less than 2"); + return -EINVAL; + } + + if (auth_key < 1) { + CERROR("Authorization key cannot be less than 1"); + return -EINVAL; + } + + if (credits > KFILND_EP_KEY_MAX) { + CERROR("Credits cannot exceed %lu\n", KFILND_EP_KEY_MAX); + return -EINVAL; + } + + if (peer_credits > KFILND_EP_KEY_MAX) { + CERROR("Peer credits cannot exceed %lu\n", KFILND_EP_KEY_MAX); + return -EINVAL; + } + + return 0; +} diff --git a/lnet/klnds/kfilnd/kfilnd_peer.c b/lnet/klnds/kfilnd/kfilnd_peer.c new file mode 100644 index 0000000..e38f9d3 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_peer.c @@ -0,0 +1,274 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd peer management implementation. + */ +#include "kfilnd_peer.h" +#include "kfilnd_dev.h" + +static const struct rhashtable_params peer_cache_params = { + .head_offset = offsetof(struct kfilnd_peer, node), + .key_offset = offsetof(struct kfilnd_peer, nid), + .key_len = sizeof_field(struct kfilnd_peer, nid), + .automatic_shrinking = true, +}; + +/** + * kfilnd_peer_free() - RCU safe way to free a peer. + * @ptr: Pointer to peer. + * @arg: Unused. + */ +static void kfilnd_peer_free(void *ptr, void *arg) +{ + struct kfilnd_peer *peer = ptr; + + CDEBUG(D_NET, "%s(0x%llx) peer entry freed\n", + libcfs_nid2str(peer->nid), peer->addr); + + kfi_av_remove(peer->dev->kfd_av, &peer->addr, 1, 0); + + kfree_rcu(peer, rcu_head); +} + +/** + * kfilnd_peer_down() - Mark a peer as down. + * @peer: Peer to be downed. + */ +void kfilnd_peer_down(struct kfilnd_peer *peer) +{ + if (atomic_cmpxchg(&peer->remove_peer, 0, 1) == 0) { + CDEBUG(D_NET, "%s(0x%llx) marked for removal from peer cache\n", + libcfs_nid2str(peer->nid), peer->addr); + + lnet_notify(peer->dev->kfd_ni, peer->nid, false, false, + peer->last_alive); + } +} + +/** + * kfilnd_peer_put() - Return a reference for a peer. + * @peer: Peer where the reference should be returned. + */ +void kfilnd_peer_put(struct kfilnd_peer *peer) +{ + rcu_read_lock(); + + /* Return allocation reference if the peer was marked for removal. */ + if (atomic_cmpxchg(&peer->remove_peer, 1, 2) == 1) { + rhashtable_remove_fast(&peer->dev->peer_cache, &peer->node, + peer_cache_params); + refcount_dec(&peer->cnt); + + CDEBUG(D_NET, "%s(0x%llx) removed from peer cache\n", + libcfs_nid2str(peer->nid), peer->addr); + } + + if (refcount_dec_and_test(&peer->cnt)) + kfilnd_peer_free(peer, NULL); + + rcu_read_unlock(); +} + +u16 kfilnd_peer_target_rx_base(struct kfilnd_peer *peer) +{ + int cpt = lnet_cpt_of_nid(peer->nid, peer->dev->kfd_ni); + struct kfilnd_ep *ep = peer->dev->cpt_to_endpoint[cpt]; + + return ep->end_context_id; +} + +/** + * kfilnd_peer_get() - Get a reference for a peer. + * @dev: Device used to lookup peer. + * @nid: LNet NID of peer. + * + * Return: On success, pointer to a valid peer structed. Else, ERR_PTR. + */ +struct kfilnd_peer *kfilnd_peer_get(struct kfilnd_dev *dev, lnet_nid_t nid) +{ + char *node; + char *service; + int rc; + u32 nid_addr = LNET_NIDADDR(nid); + u32 net_num = LNET_NETNUM(LNET_NIDNET(nid)); + struct kfilnd_peer *peer; + struct kfilnd_peer *clash_peer; + +again: + /* Check the cache for a match. */ + rcu_read_lock(); + peer = rhashtable_lookup_fast(&dev->peer_cache, &nid, + peer_cache_params); + if (peer && !refcount_inc_not_zero(&peer->cnt)) + peer = NULL; + rcu_read_unlock(); + + if (peer) + return peer; + + /* Allocate a new peer for the cache. */ + peer = kzalloc(sizeof(*peer), GFP_KERNEL); + if (!peer) { + rc = -ENOMEM; + goto err; + } + + node = kasprintf(GFP_KERNEL, "%#x", nid_addr); + if (!node) { + rc = -ENOMEM; + goto err_free_peer; + } + + service = kasprintf(GFP_KERNEL, "%u", net_num); + if (!service) { + rc = -ENOMEM; + goto err_free_node_str; + } + + /* Use the KFI address vector to translate node and service string into + * a KFI address handle. + */ + rc = kfi_av_insertsvc(dev->kfd_av, node, service, &peer->addr, 0, dev); + + kfree(service); + kfree(node); + + if (rc < 0) { + goto err_free_peer; + } else if (rc != 1) { + rc = -ECONNABORTED; + goto err_free_peer; + } + + peer->dev = dev; + peer->nid = nid; + atomic_set(&peer->rx_base, 0); + atomic_set(&peer->remove_peer, 0); + peer->local_session_key = kfilnd_dev_get_session_key(dev); + + /* One reference for the allocation and another for get operation + * performed for this peer. The allocation reference is returned when + * the entry is marked for removal. + */ + refcount_set(&peer->cnt, 2); + + clash_peer = rhashtable_lookup_get_insert_fast(&dev->peer_cache, + &peer->node, + peer_cache_params); + + if (clash_peer) { + kfi_av_remove(dev->kfd_av, &peer->addr, 1, 0); + kfree(peer); + + if (IS_ERR(clash_peer)) { + rc = PTR_ERR(clash_peer); + goto err; + } else { + goto again; + } + } + + kfilnd_peer_alive(peer); + + CDEBUG(D_NET, "%s(0x%llx) peer entry allocated\n", + libcfs_nid2str(peer->nid), peer->addr); + + return peer; + +err_free_node_str: + kfree(node); +err_free_peer: + kfree(peer); +err: + return ERR_PTR(rc); +} + +/** + * kfilnd_peer_get_kfi_addr() - Return kfi_addr_t used for eager untagged send + * kfi operations. + * @peer: Peer struct. + * + * The returned kfi_addr_t is updated to target a specific RX context. The + * address return by this function should not be used if a specific RX context + * needs to be targeted (i/e the response RX context for a bulk transfer + * operation). + * + * Return: kfi_addr_t. + */ +kfi_addr_t kfilnd_peer_get_kfi_addr(struct kfilnd_peer *peer) +{ + /* TODO: Support RX count by round-robining the generated kfi_addr_t's + * across multiple RX contexts using RX base and RX count. + */ + return kfi_rx_addr(KFILND_BASE_ADDR(peer->addr), + atomic_read(&peer->rx_base), KFILND_FAB_RX_CTX_BITS); +} + +/** + * kfilnd_peer_update_rx_contexts() - Update the RX context for a peer. + * @peer: Peer to be updated. + * @rx_base: New RX base for peer. + * @rx_count: New RX count for peer. + */ +void kfilnd_peer_update_rx_contexts(struct kfilnd_peer *peer, + unsigned int rx_base, unsigned int rx_count) +{ + /* TODO: Support RX count. */ + LASSERT(rx_count > 0); + atomic_set(&peer->rx_base, rx_base); +} + +/** + * kfilnd_peer_alive() - Update when the peer was last alive. + * @peer: Peer to be updated. + */ +void kfilnd_peer_alive(struct kfilnd_peer *peer) +{ + peer->last_alive = ktime_get_seconds(); + + /* Ensure timestamp is committed to memory before used. */ + smp_mb(); +} + +/** + * kfilnd_peer_destroy() - Destroy peer cache. + * @dev: Device peer cache to be destroyed. + */ +void kfilnd_peer_destroy(struct kfilnd_dev *dev) +{ + rhashtable_free_and_destroy(&dev->peer_cache, kfilnd_peer_free, NULL); +} + +/** + * kfilnd_peer_init() - Initialize peer cache. + * @dev: Device peer cache to be initialized. + */ +void kfilnd_peer_init(struct kfilnd_dev *dev) +{ + rhashtable_init(&dev->peer_cache, &peer_cache_params); +} diff --git a/lnet/klnds/kfilnd/kfilnd_peer.h b/lnet/klnds/kfilnd/kfilnd_peer.h new file mode 100644 index 0000000..27a72bd --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_peer.h @@ -0,0 +1,49 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd peer interface. + */ + +#ifndef _KFILND_PEER_ +#define _KFILND_PEER_ + +#include "kfilnd.h" + +void kfilnd_peer_down(struct kfilnd_peer *peer); +void kfilnd_peer_put(struct kfilnd_peer *peer); +struct kfilnd_peer *kfilnd_peer_get(struct kfilnd_dev *dev, lnet_nid_t nid); +void kfilnd_peer_update_rx_contexts(struct kfilnd_peer *peer, + unsigned int rx_base, + unsigned int rx_count); +void kfilnd_peer_alive(struct kfilnd_peer *peer); +void kfilnd_peer_destroy(struct kfilnd_dev *dev); +void kfilnd_peer_init(struct kfilnd_dev *dev); +kfi_addr_t kfilnd_peer_get_kfi_addr(struct kfilnd_peer *peer); +u16 kfilnd_peer_target_rx_base(struct kfilnd_peer *peer); + +#endif /* _KFILND_PEER_ */ diff --git a/lnet/klnds/kfilnd/kfilnd_tn.c b/lnet/klnds/kfilnd/kfilnd_tn.c new file mode 100644 index 0000000..ba79c80 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_tn.c @@ -0,0 +1,1605 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd transaction and state machine processing. + */ + +#include "kfilnd_tn.h" +#include "kfilnd_ep.h" +#include "kfilnd_dev.h" +#include "kfilnd_dom.h" +#include "kfilnd_peer.h" +#include + +static struct kmem_cache *tn_cache; +static struct kmem_cache *imm_buf_cache; + +static __sum16 kfilnd_tn_cksum(void *ptr, int nob) +{ + if (cksum) + return csum_fold(csum_partial(ptr, nob, 0)); + return NO_CHECKSUM; +} + +static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type) +{ + const int hdr_size = offsetof(struct kfilnd_msg, proto); + + switch (type) { + case KFILND_MSG_IMMEDIATE: + return offsetof(struct kfilnd_msg, proto.immed.payload[0]); + + case KFILND_MSG_BULK_PUT_REQ: + case KFILND_MSG_BULK_GET_REQ: + return hdr_size + sizeof(struct kfilnd_bulk_req_msg); + + default: + return -1; + } +} + +static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn) +{ + struct kfilnd_msg *msg = tn->tn_tx_msg.msg; + + /* Pack the protocol header and payload. */ + msg->proto.hello.version = KFILND_MSG_VERSION; + msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->peer); + msg->proto.hello.session_key = tn->peer->local_session_key; + + /* TODO: Support multiple RX contexts per peer. */ + msg->proto.hello.rx_count = 1; + + /* Pack the transport header. */ + msg->magic = KFILND_MSG_MAGIC; + + /* Mesage version zero is only valid for hello requests. */ + msg->version = 0; + msg->type = KFILND_MSG_HELLO_REQ; + msg->nob = sizeof(struct kfilnd_hello_msg) + + offsetof(struct kfilnd_msg, proto); + msg->cksum = NO_CHECKSUM; + msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid); + msg->dstnid = tn->peer->nid; + + /* Checksum entire message. */ + msg->cksum = kfilnd_tn_cksum(msg, msg->nob); + + tn->tn_tx_msg.length = msg->nob; +} + +static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn) +{ + struct kfilnd_msg *msg = tn->tn_tx_msg.msg; + + /* Pack the protocol header and payload. */ + msg->proto.hello.version = tn->peer->version; + msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->peer); + msg->proto.hello.session_key = tn->peer->local_session_key; + + /* TODO: Support multiple RX contexts per peer. */ + msg->proto.hello.rx_count = 1; + + /* Pack the transport header. */ + msg->magic = KFILND_MSG_MAGIC; + + /* Mesage version zero is only valid for hello requests. */ + msg->version = 0; + msg->type = KFILND_MSG_HELLO_RSP; + msg->nob = sizeof(struct kfilnd_hello_msg) + + offsetof(struct kfilnd_msg, proto); + msg->cksum = NO_CHECKSUM; + msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid); + msg->dstnid = tn->peer->nid; + + /* Checksum entire message. */ + msg->cksum = kfilnd_tn_cksum(msg, msg->nob); + + tn->tn_tx_msg.length = msg->nob; +} + +static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn) +{ + struct kfilnd_msg *msg = tn->tn_tx_msg.msg; + + /* Pack the protocol header and payload. */ + lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr); + msg->proto.bulk_req.key = tn->tn_mr_key; + msg->proto.bulk_req.response_rx = tn->tn_response_rx; + + /* Pack the transport header. */ + msg->magic = KFILND_MSG_MAGIC; + msg->version = KFILND_MSG_VERSION; + msg->type = tn->msg_type; + msg->nob = sizeof(struct kfilnd_bulk_req_msg) + + offsetof(struct kfilnd_msg, proto); + msg->cksum = NO_CHECKSUM; + msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid); + msg->dstnid = tn->peer->nid; + + /* Checksum entire message. */ + msg->cksum = kfilnd_tn_cksum(msg, msg->nob); + + tn->tn_tx_msg.length = msg->nob; +} + +static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn) +{ + struct kfilnd_msg *msg = tn->tn_tx_msg.msg; + + /* Pack the protocol header and payload. */ + lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr); + + lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE, + msg, + offsetof(struct kfilnd_msg, + proto.immed.payload), + tn->tn_num_iovec, tn->tn_kiov, 0, + tn->tn_nob); + + /* Pack the transport header. */ + msg->magic = KFILND_MSG_MAGIC; + msg->version = KFILND_MSG_VERSION; + msg->type = tn->msg_type; + msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]); + msg->cksum = NO_CHECKSUM; + msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid); + msg->dstnid = tn->peer->nid; + + /* Checksum entire message. */ + msg->cksum = kfilnd_tn_cksum(msg, msg->nob); + + tn->tn_tx_msg.length = msg->nob; +} + +static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg, + unsigned int nob) +{ + const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto); + + if (nob < hdr_size) { + KFILND_EP_ERROR(ep, "Short message: %u", nob); + return -EPROTO; + } + + /* TODO: Support byte swapping on mixed endian systems. */ + if (msg->magic != KFILND_MSG_MAGIC) { + KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic); + return -EPROTO; + } + + /* TODO: Allow for older versions. */ + if (msg->version > KFILND_MSG_VERSION) { + KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version); + return -EPROTO; + } + + if (msg->nob > nob) { + KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob, + msg->nob); + return -EPROTO; + } + + /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */ + if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) { + KFILND_EP_ERROR(ep, "Bad checksum"); + return -EPROTO; + } + + if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) { + KFILND_EP_ERROR(ep, "Bad destination nid: %s", + libcfs_nid2str(msg->dstnid)); + return -EPROTO; + } + + if (msg->srcnid == LNET_NID_ANY) { + KFILND_EP_ERROR(ep, "Bad source nid: %s", + libcfs_nid2str(msg->srcnid)); + return -EPROTO; + } + + if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) { + KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n", + msg_type_to_str(msg->type), + msg->nob, kfilnd_tn_msgtype2size(msg->type)); + return -EPROTO; + } + + switch ((enum kfilnd_msg_type)msg->type) { + case KFILND_MSG_IMMEDIATE: + case KFILND_MSG_BULK_PUT_REQ: + case KFILND_MSG_BULK_GET_REQ: + if (msg->version == 0) { + KFILND_EP_ERROR(ep, + "Bad message type and version: type=%s version=%u", + msg_type_to_str(msg->type), + msg->version); + return -EPROTO; + } + break; + + case KFILND_MSG_HELLO_REQ: + case KFILND_MSG_HELLO_RSP: + if (msg->version != 0) { + KFILND_EP_ERROR(ep, + "Bad message type and version: type=%s version=%u", + msg_type_to_str(msg->type), + msg->version); + return -EPROTO; + } + break; + + default: + CERROR("Unknown message type %x\n", msg->type); + return -EPROTO; + } + return 0; +} + +static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn) +{ + unsigned int data_size_bucket = + kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len); + struct kfilnd_tn_duration_stat *stat; + + if (tn->is_initiator) + stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket]; + else + stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket]; + + atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts)), + &stat->accumulated_duration); + atomic_inc(&stat->accumulated_count); +} + +static void kfilnd_tn_state_change(struct kfilnd_transaction *tn, + enum tn_states new_state) +{ + KFILND_TN_DEBUG(tn, "%s -> %s state change", + tn_state_to_str(tn->tn_state), + tn_state_to_str(new_state)); + + kfilnd_tn_record_state_change(tn); + + tn->tn_state = new_state; + tn->tn_state_ts = ktime_get(); +} + +static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status, + enum lnet_msg_hstatus hstatus) +{ + /* Only the first non-ok status will take. */ + if (tn->tn_status == 0) { + KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status, + status); + tn->tn_status = status; + } + + if (tn->hstatus == LNET_MSG_STATUS_OK) { + KFILND_TN_DEBUG(tn, "%d -> %d health status change", + tn->hstatus, hstatus); + tn->hstatus = hstatus; + } +} + +static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn) +{ + return tn->tn_status != 0; +} + +/** + * kfilnd_tn_process_rx_event() - Process an immediate receive event. + * + * For each immediate receive, a transaction structure needs to be allocated to + * process the receive. + */ +void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc, + struct kfilnd_msg *rx_msg, int msg_size) +{ + struct kfilnd_transaction *tn; + bool alloc_msg = true; + int rc; + enum tn_events event = TN_EVENT_RX_HELLO; + + /* Increment buf ref count for this work */ + atomic_inc(&bufdesc->immed_ref); + + /* Unpack the message */ + rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size); + if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) { + kfilnd_ep_imm_buffer_put(bufdesc); + KFILND_EP_ERROR(bufdesc->immed_end, + "Failed to unpack message %d", rc); + return; + } + + switch ((enum kfilnd_msg_type)rx_msg->type) { + case KFILND_MSG_IMMEDIATE: + case KFILND_MSG_BULK_PUT_REQ: + case KFILND_MSG_BULK_GET_REQ: + event = TN_EVENT_RX_OK; + fallthrough; + case KFILND_MSG_HELLO_RSP: + alloc_msg = false; + fallthrough; + case KFILND_MSG_HELLO_REQ: + /* Context points to a received buffer and status is the length. + * Allocate a Tn structure, set its values, then launch the + * receive. + */ + tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev, + bufdesc->immed_end->end_cpt, + rx_msg->srcnid, alloc_msg, false, + false); + if (IS_ERR(tn)) { + kfilnd_ep_imm_buffer_put(bufdesc); + KFILND_EP_ERROR(bufdesc->immed_end, + "Failed to allocate transaction struct: rc=%ld", + PTR_ERR(tn)); + return; + } + + tn->tn_rx_msg.msg = rx_msg; + tn->tn_rx_msg.length = msg_size; + tn->tn_posted_buf = bufdesc; + + KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u", + msg_type_to_str((enum kfilnd_msg_type)rx_msg->type), + tn->tn_mr_key); + break; + + default: + KFILND_EP_ERROR(bufdesc->immed_end, + "Unhandled kfilnd message type: %d", + (enum kfilnd_msg_type)rx_msg->type); + LBUG(); + }; + + kfilnd_tn_event_handler(tn, event, 0); +} + +static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn) +{ + unsigned int data_size_bucket = + kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len); + struct kfilnd_tn_duration_stat *stat; + + if (tn->is_initiator) + stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket]; + else + stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket]; + + atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts)), + &stat->accumulated_duration); + atomic_inc(&stat->accumulated_count); +} + +/** + * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation. + * + * All state machine functions should call kfilnd_tn_finalize() instead of + * kfilnd_tn_free(). Once all expected asynchronous events have been received, + * if the transaction lock has not been released, it will now be released, + * transaction resources cleaned up, and LNet finalized will be called. + */ +static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released) +{ + if (!*tn_released) { + mutex_unlock(&tn->tn_lock); + *tn_released = true; + } + + /* Release the reference on the multi-receive buffer. */ + if (tn->tn_posted_buf) + kfilnd_ep_imm_buffer_put(tn->tn_posted_buf); + + /* Finalize LNet operation. */ + if (tn->tn_lntmsg) { + tn->tn_lntmsg->msg_health_status = tn->hstatus; + lnet_finalize(tn->tn_lntmsg, tn->tn_status); + } + + if (tn->tn_getreply) { + tn->tn_getreply->msg_health_status = tn->hstatus; + lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni, + tn->tn_getreply, + tn->tn_status ? 0 : tn->tn_nob); + lnet_finalize(tn->tn_getreply, tn->tn_status); + } + + if (KFILND_TN_PEER_VALID(tn)) + kfilnd_peer_put(tn->peer); + + kfilnd_tn_record_state_change(tn); + kfilnd_tn_record_duration(tn); + + kfilnd_tn_free(tn); +} + +/** + * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive. + * @tn: Transaction to have tagged received cancelled. + * + * Return: 0 on success. Else, negative errno. If an error occurs, resources may + * be leaked. + */ +static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn) +{ + int rc; + + /* Issue a cancel. A return code of zero means the operation issued an + * async cancel. A return code of -ENOENT means the tagged receive was + * not found. The assumption here is that a tagged send landed thus + * removing the tagged receive buffer from hardware. For both cases, + * async events should occur. + */ + rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn); + if (rc != 0 && rc != -ENOENT) { + KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak."); + return rc; + } + + return 0; +} + +static void kfilnd_tn_timeout_work(struct work_struct *work) +{ + struct kfilnd_transaction *tn = + container_of(work, struct kfilnd_transaction, timeout_work); + + KFILND_TN_ERROR(tn, "Bulk operation timeout"); + kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0); +} + +static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data) +{ + struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer); + + queue_work(kfilnd_wq, &tn->timeout_work); +} + +static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn) +{ + return del_timer(&tn->timeout_timer); +} + +static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn) +{ + ktime_t remaining_time = max_t(ktime_t, 0, + tn->deadline - ktime_get_seconds()); + unsigned long expires = remaining_time * HZ + jiffies; + + if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT)) + expires = jiffies; + + cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout, + (unsigned long)tn, 0); + mod_timer(&tn->timeout_timer, expires); +} + +/* The following are the state machine routines for the transactions. */ +static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + int rc; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_INIT_BULK: + /* Need to cancel the tagged receive to prevent resources from + * being leaked. + */ + rc = kfilnd_tn_cancel_tag_recv(tn); + + switch (rc) { + /* Async event will progress transaction. */ + case 0: + kfilnd_tn_state_change(tn, TN_STATE_FAIL); + return 0; + + /* Need to replay TN_EVENT_INIT_BULK event while in the + * TN_STATE_SEND_FAILED state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, + "Need to replay cancel tagged recv"); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Unexpected error during cancel tagged receive: rc=%d", + rc); + LBUG(); + } + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } +} + +static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + int rc; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_INIT_BULK: + tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->peer); + KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + + kfilnd_tn_pack_bulk_req(tn); + + rc = kfilnd_ep_post_send(tn->tn_ep, tn); + switch (rc) { + /* Async event will progress immediate send. */ + case 0: + kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP); + return 0; + + /* Need to replay TN_EVENT_INIT_BULK event while in the + * TN_STATE_TAGGED_RECV_POSTED state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, + "Need to replay post send to %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + return -EAGAIN; + + /* Need to transition to the TN_STATE_SEND_FAILED to cleanup + * posted tagged receive buffer. + */ + default: + KFILND_TN_ERROR(tn, + "Failed to post send to %s(%#llx): rc=%d", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr, rc); + kfilnd_tn_status_update(tn, rc, + LNET_MSG_STATUS_LOCAL_ERROR); + kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED); + + /* Propogate TN_EVENT_INIT_BULK event to + * TN_STATE_SEND_FAILED handler. + */ + return kfilnd_tn_state_send_failed(tn, event, rc, + tn_released); + } + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } +} + +static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + struct kfilnd_msg *msg; + int rc; + bool finalize = false; + ktime_t remaining_time; + struct lnet_hdr hdr; + struct lnet_nid srcnid; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + /* For new peers, send a hello request message and queue the true LNet + * message for replay. + */ + if (kfilnd_peer_is_new_peer(tn->peer) && + (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) { + remaining_time = max_t(ktime_t, 0, + tn->deadline - ktime_get_seconds()); + + /* If transaction deadline has not be met, return -EAGAIN. This + * will cause this transaction event to be replayed. During this + * time, an async message from the peer should occur at which + * point the kfilnd version should be negotiated. + */ + if (remaining_time > 0) { + KFILND_TN_DEBUG(tn, "%s hello response pending", + libcfs_nid2str(tn->peer->nid)); + return -EAGAIN; + } + + rc = 0; + kfilnd_tn_status_update(tn, -ETIMEDOUT, + LNET_MSG_STATUS_NETWORK_TIMEOUT); + goto out; + } + + switch (event) { + case TN_EVENT_INIT_IMMEDIATE: + case TN_EVENT_TX_HELLO: + tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->peer); + KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + + if (event == TN_EVENT_INIT_IMMEDIATE) + kfilnd_tn_pack_immed_msg(tn); + else + kfilnd_tn_pack_hello_req(tn); + + /* Send immediate message. */ + rc = kfilnd_ep_post_send(tn->tn_ep, tn); + switch (rc) { + /* Async event will progress immediate send. */ + case 0: + kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND); + return 0; + + /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE + * state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Failed to post send to %s(%#llx): rc=%d", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr, rc); + kfilnd_tn_status_update(tn, rc, + LNET_MSG_STATUS_LOCAL_ERROR); + } + break; + + case TN_EVENT_INIT_BULK: + /* Post tagged receive buffer used to land bulk response. */ + rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn); + + switch (rc) { + /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */ + case 0: + kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED); + + /* Propogate TN_EVENT_INIT_BULK event to + * TN_STATE_TAGGED_RECV_POSTED handler. + */ + return kfilnd_tn_state_tagged_recv_posted(tn, event, + rc, + tn_released); + + /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE + * state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, "Need to replay tagged recv"); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, "Failed to post tagged recv %d", + rc); + kfilnd_tn_status_update(tn, rc, + LNET_MSG_STATUS_LOCAL_ERROR); + } + break; + + case TN_EVENT_RX_OK: + /* If TN_EVENT_RX_OK occurs on a new peer, this is a sign of a + * peer having a stale peer structure. Stale peer structures + * requires dropping the incoming message and initiating a hello + * handshake. + */ + if (kfilnd_peer_is_new_peer(tn->peer)) { + rc = kfilnd_send_hello_request(tn->tn_ep->end_dev, + tn->tn_ep->end_cpt, + tn->peer->nid); + if (rc) + KFILND_TN_ERROR(tn, + "Failed to send hello request: rc=%d", + rc); + + /* Need to drop this message since it is uses stale + * peer. + */ + KFILND_TN_ERROR(tn, + "Dropping message from %s due to stale peer", + libcfs_nid2str(tn->peer->nid)); + kfilnd_tn_status_update(tn, -EPROTO, + LNET_MSG_STATUS_LOCAL_DROPPED); + rc = 0; + goto out; + } + + LASSERT(kfilnd_peer_is_new_peer(tn->peer) == false); + msg = tn->tn_rx_msg.msg; + + /* Update the NID address with the new preferred RX context. */ + kfilnd_peer_alive(tn->peer); + + /* Pass message up to LNet + * The TN will be reused in this call chain so we need to + * release the lock on the TN before proceeding. + */ + KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change", + tn_state_to_str(tn->tn_state)); + + /* TODO: Do not manually update this state change. */ + tn->tn_state = TN_STATE_IMM_RECV; + mutex_unlock(&tn->tn_lock); + *tn_released = true; + lnet_nid4_to_nid(msg->srcnid, &srcnid); + if (msg->type == KFILND_MSG_IMMEDIATE) { + lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr); + rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni, + &hdr, &srcnid, tn, 0); + } else { + lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr); + rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni, + &hdr, &srcnid, tn, 1); + } + + /* If successful, transaction has been accepted by LNet and we + * cannot process the transaction anymore within this context. + */ + if (!rc) + return 0; + + KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc); + kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR); + break; + + case TN_EVENT_RX_HELLO: + msg = tn->tn_rx_msg.msg; + + switch (msg->type) { + case KFILND_MSG_HELLO_REQ: + kfilnd_peer_update_rx_contexts(tn->peer, + msg->proto.hello.rx_base, + msg->proto.hello.rx_count); + kfilnd_peer_set_remote_session_key(tn->peer, + msg->proto.hello.session_key); + + /* Negotiate kfilnd version used between peers. Fallback + * to the minimum implemented kfilnd version. + */ + kfilnd_peer_set_version(tn->peer, + min_t(__u16, KFILND_MSG_VERSION, + msg->proto.hello.version)); + KFILND_TN_DEBUG(tn, + "Peer kfilnd version: %u; Local kfilnd version: %u; Negotiated kfilnd verions: %u", + msg->proto.hello.version, + KFILND_MSG_VERSION, tn->peer->version); + + tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->peer); + KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + + kfilnd_tn_pack_hello_rsp(tn); + + /* Send immediate message. */ + rc = kfilnd_ep_post_send(tn->tn_ep, tn); + switch (rc) { + case 0: + kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND); + return 0; + + case -EAGAIN: + KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Failed to post send to %s(%#llx): rc=%d", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr, rc); + kfilnd_tn_status_update(tn, rc, + LNET_MSG_STATUS_LOCAL_ERROR); + } + break; + + case KFILND_MSG_HELLO_RSP: + rc = 0; + kfilnd_peer_update_rx_contexts(tn->peer, + msg->proto.hello.rx_base, + msg->proto.hello.rx_count); + kfilnd_peer_set_remote_session_key(tn->peer, + msg->proto.hello.session_key); + kfilnd_peer_set_version(tn->peer, + msg->proto.hello.version); + KFILND_TN_DEBUG(tn, "Negotiated kfilnd version: %u", + msg->proto.hello.version); + finalize = true; + break; + + default: + KFILND_TN_ERROR(tn, "Invalid message type: %s", + msg_type_to_str(msg->type)); + LBUG(); + } + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + +out: + if (kfilnd_tn_has_failed(tn)) + finalize = true; + + if (finalize) + kfilnd_tn_finalize(tn, tn_released); + + return rc; +} + +static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + enum lnet_msg_hstatus hstatus; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_TX_FAIL: + if (status == -ETIMEDOUT || status == -EIO) + hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT; + else + hstatus = LNET_MSG_STATUS_REMOTE_ERROR; + + kfilnd_tn_status_update(tn, status, hstatus); + kfilnd_peer_down(tn->peer); + break; + + case TN_EVENT_TX_OK: + kfilnd_peer_alive(tn->peer); + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + kfilnd_tn_finalize(tn, tn_released); + + return 0; +} + +static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + int rc = 0; + bool finalize = false; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_INIT_TAG_RMA: + case TN_EVENT_SKIP_TAG_RMA: + /* Release the buffer we received the request on. All relevant + * information to perform the RMA operation is stored in the + * transaction structure. This should be done before the RMA + * operation to prevent two contexts from potentially processing + * the same transaction. + * + * TODO: Prevent this from returning -EAGAIN. + */ + if (tn->tn_posted_buf) { + kfilnd_ep_imm_buffer_put(tn->tn_posted_buf); + tn->tn_posted_buf = NULL; + } + + /* Update the KFI address to use the response RX context. */ + tn->tn_target_addr = + kfi_rx_addr(KFILND_BASE_ADDR(tn->peer->addr), + tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS); + KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + + /* Initiate the RMA operation to push/pull the LNet payload or + * send a tagged message to finalize the bulk operation if the + * RMA operation should be skipped. + */ + if (event == TN_EVENT_INIT_TAG_RMA) { + if (tn->sink_buffer) + rc = kfilnd_ep_post_read(tn->tn_ep, tn); + else + rc = kfilnd_ep_post_write(tn->tn_ep, tn); + + switch (rc) { + /* Async tagged RMA event will progress transaction. */ + case 0: + kfilnd_tn_state_change(tn, + TN_STATE_WAIT_TAG_RMA_COMP); + return 0; + + /* Need to replay TN_EVENT_INIT_TAG_RMA event while in + * the TN_STATE_IMM_RECV state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, + "Need to replay tagged %s to %s(%#llx)", + tn->sink_buffer ? "read" : "write", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Failed to post tagged %s to %s(%#llx): rc=%d", + tn->sink_buffer ? "read" : "write", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr, rc); + kfilnd_tn_status_update(tn, rc, + LNET_MSG_STATUS_LOCAL_ERROR); + } + } else { + kfilnd_tn_status_update(tn, status, + LNET_MSG_STATUS_OK); + + /* Since the LNet initiator has posted a unique tagged + * buffer specific for this LNet transaction and the + * LNet target has decide not to push/pull to/for the + * LNet initiator tagged buffer, a noop operation is + * done to this tagged buffer (i/e payload transfer size + * is zero). But, immediate data, which contains the + * LNet target status for the transaction, is sent to + * the LNet initiator. Immediate data only appears in + * the completion event at the LNet initiator and not in + * the tagged buffer. + */ + tn->tagged_data = cpu_to_be64(abs(tn->tn_status)); + + rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn); + switch (rc) { + /* Async tagged RMA event will progress transaction. */ + case 0: + kfilnd_tn_state_change(tn, + TN_STATE_WAIT_TAG_COMP); + return 0; + + /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in + * the TN_STATE_IMM_RECV state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, + "Need to replay tagged send to %s(%#llx)", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Failed to post tagged send to %s(%#llx): rc=%d", + libcfs_nid2str(tn->peer->nid), + tn->tn_target_addr, rc); + kfilnd_tn_status_update(tn, rc, + LNET_MSG_STATUS_LOCAL_ERROR); + } + } + break; + + case TN_EVENT_RX_OK: + finalize = true; + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + if (kfilnd_tn_has_failed(tn)) + finalize = true; + + if (finalize) + kfilnd_tn_finalize(tn, tn_released); + + return rc; +} + +static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + int rc; + enum lnet_msg_hstatus hstatus; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_TX_OK: + kfilnd_peer_alive(tn->peer); + kfilnd_tn_timeout_enable(tn); + kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP); + break; + + case TN_EVENT_TAG_RX_OK: + kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP); + break; + + case TN_EVENT_TX_FAIL: + if (status == -ETIMEDOUT) + hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT; + else + hstatus = LNET_MSG_STATUS_REMOTE_ERROR; + + kfilnd_tn_status_update(tn, status, hstatus); + kfilnd_peer_down(tn->peer); + + /* Need to cancel the tagged receive to prevent resources from + * being leaked. + */ + rc = kfilnd_tn_cancel_tag_recv(tn); + + switch (rc) { + /* Async cancel event will progress transaction. */ + case 0: + kfilnd_tn_status_update(tn, status, + LNET_MSG_STATUS_LOCAL_ERROR); + kfilnd_tn_state_change(tn, TN_STATE_FAIL); + return 0; + + /* Need to replay TN_EVENT_INIT_BULK event while in the + * TN_STATE_SEND_FAILED state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, + "Need to replay cancel tagged recv"); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Unexpected error during cancel tagged receive: rc=%d", + rc); + LBUG(); + } + break; + + case TN_EVENT_TAG_RX_FAIL: + kfilnd_tn_status_update(tn, status, + LNET_MSG_STATUS_LOCAL_ERROR); + kfilnd_tn_state_change(tn, TN_STATE_FAIL); + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + return 0; +} + +static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + if (event == TN_EVENT_TX_OK) { + kfilnd_peer_alive(tn->peer); + kfilnd_tn_finalize(tn, tn_released); + } else { + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + return 0; +} + +static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + enum lnet_msg_hstatus hstatus; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_TAG_TX_OK: + kfilnd_peer_alive(tn->peer); + break; + + case TN_EVENT_TAG_TX_FAIL: + if (status == -ETIMEDOUT) + hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT; + else + hstatus = LNET_MSG_STATUS_REMOTE_ERROR; + + kfilnd_tn_status_update(tn, status, hstatus); + kfilnd_peer_down(tn->peer); + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + kfilnd_tn_finalize(tn, tn_released); + + return 0; +} + +static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + int rc; + enum lnet_msg_hstatus hstatus; + + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_TAG_RX_FAIL: + case TN_EVENT_TAG_RX_OK: + /* Status can be set for both TN_EVENT_TAG_RX_FAIL and + * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set, + * LNet target returned -ENODATA. + */ + if (status) { + if (event == TN_EVENT_TAG_RX_FAIL) + kfilnd_tn_status_update(tn, status, + LNET_MSG_STATUS_LOCAL_ERROR); + else + kfilnd_tn_status_update(tn, status, + LNET_MSG_STATUS_OK); + } + + if (!kfilnd_tn_timeout_cancel(tn)) { + kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP); + return 0; + } + break; + + case TN_EVENT_TIMEOUT: + /* Need to cancel the tagged receive to prevent resources from + * being leaked. + */ + rc = kfilnd_tn_cancel_tag_recv(tn); + + switch (rc) { + /* Async cancel event will progress transaction. */ + case 0: + kfilnd_tn_state_change(tn, + TN_STATE_WAIT_TIMEOUT_TAG_COMP); + return 0; + + /* Need to replay TN_EVENT_INIT_BULK event while in the + * TN_STATE_WAIT_TAG_COMP state. + */ + case -EAGAIN: + KFILND_TN_DEBUG(tn, + "Need to replay cancel tagged recv"); + return -EAGAIN; + + default: + KFILND_TN_ERROR(tn, + "Unexpected error during cancel tagged receive: rc=%d", + rc); + LBUG(); + } + break; + + case TN_EVENT_TAG_TX_FAIL: + if (status == -ETIMEDOUT) + hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT; + else + hstatus = LNET_MSG_STATUS_REMOTE_ERROR; + + kfilnd_tn_status_update(tn, status, hstatus); + kfilnd_peer_down(tn->peer); + break; + + case TN_EVENT_TAG_TX_OK: + kfilnd_peer_alive(tn->peer); + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + kfilnd_tn_finalize(tn, tn_released); + + return 0; +} + +static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_TX_FAIL: + kfilnd_peer_down(tn->peer); + break; + + case TN_EVENT_TX_OK: + kfilnd_peer_alive(tn->peer); + break; + + case TN_EVENT_TAG_RX_FAIL: + case TN_EVENT_TAG_RX_CANCEL: + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + kfilnd_tn_finalize(tn, tn_released); + + return 0; +} + +static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn, + enum tn_events event, + int status, bool *tn_released) +{ + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + switch (event) { + case TN_EVENT_TAG_RX_CANCEL: + kfilnd_tn_status_update(tn, -ETIMEDOUT, + LNET_MSG_STATUS_REMOTE_TIMEOUT); + kfilnd_peer_down(tn->peer); + break; + + case TN_EVENT_TAG_RX_FAIL: + kfilnd_tn_status_update(tn, status, + LNET_MSG_STATUS_LOCAL_ERROR); + break; + + case TN_EVENT_TAG_RX_OK: + break; + + default: + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + kfilnd_tn_finalize(tn, tn_released); + + return 0; +} + +static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn, + enum tn_events event, int status, + bool *tn_released) +{ + KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event), + status); + + if (event == TN_EVENT_TIMEOUT) { + kfilnd_tn_finalize(tn, tn_released); + } else { + KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event)); + LBUG(); + } + + return 0; +} + +static int +(* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn, + enum tn_events event, + int status, + bool *tn_released) = { + [TN_STATE_IDLE] = kfilnd_tn_state_idle, + [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp, + [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send, + [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted, + [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed, + [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp, + [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp, + [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp, + [TN_STATE_WAIT_TIMEOUT_TAG_COMP] = + kfilnd_tn_state_wait_timeout_tag_comp, + [TN_STATE_FAIL] = kfilnd_tn_state_fail, + [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv, + [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp, +}; + +/** + * kfilnd_tn_event_handler() - Update transaction state machine with an event. + * @tn: Transaction to be updated. + * @event: Transaction event. + * @status: Errno status associated with the event. + * + * When the transaction event handler is first called on a new transaction, the + * transaction is now own by the transaction system. This means that will be + * freed by the system as the transaction is progressed through the state + * machine. + */ +void kfilnd_tn_event_handler(struct kfilnd_transaction *tn, + enum tn_events event, int status) +{ + bool tn_released = false; + int rc; + + if (!tn) + return; + + mutex_lock(&tn->tn_lock); + rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status, + &tn_released); + if (rc == -EAGAIN) { + tn->replay_event = event; + tn->replay_status = status; + kfilnd_ep_queue_tn_replay(tn->tn_ep, tn); + } + + if (!tn_released) + mutex_unlock(&tn->tn_lock); +} + +/** + * kfilnd_tn_free() - Free a transaction. + */ +void kfilnd_tn_free(struct kfilnd_transaction *tn) +{ + spin_lock(&tn->tn_ep->tn_list_lock); + list_del(&tn->tn_entry); + spin_unlock(&tn->tn_ep->tn_list_lock); + + KFILND_TN_DEBUG(tn, "Transaction freed"); + + if (tn->tn_mr_key) + kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key); + + /* Free send message buffer if needed. */ + if (tn->tn_tx_msg.msg) + kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg); + + kmem_cache_free(tn_cache, tn); +} + +/** + * kfilnd_tn_alloc() - Allocate a new KFI LND transaction. + * @dev: KFI LND device used to look the KFI LND endpoint to associate with the + * transaction. + * @cpt: CPT of the transaction. + * @target_nid: Target NID of the transaction. + * @alloc_msg: Allocate an immediate message for the transaction. + * @is_initiator: Is initiator of LNet transaction. + * @key: Is transaction memory region key need. + * + * During transaction allocation, each transaction is associated with a KFI LND + * endpoint use to post data transfer operations. The CPT argument is used to + * lookup the KFI LND endpoint within the KFI LND device. + * + * Return: On success, valid pointer. Else, negative errno pointer. + */ +struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt, + lnet_nid_t target_nid, + bool alloc_msg, bool is_initiator, + bool key) +{ + struct kfilnd_transaction *tn; + struct kfilnd_ep *ep; + int rc; + ktime_t tn_alloc_ts; + + if (!dev) { + rc = -EINVAL; + goto err; + } + + tn_alloc_ts = ktime_get(); + + /* If the CPT does not fall into the LNet NI CPT range, force the CPT + * into the LNet NI CPT range. This should never happen. + */ + ep = dev->cpt_to_endpoint[cpt]; + if (!ep) { + CWARN("%s used invalid cpt=%d\n", + libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt); + ep = dev->kfd_endpoints[0]; + } + + tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL); + if (!tn) { + rc = -ENOMEM; + goto err; + } + + if (alloc_msg) { + tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL); + if (!tn->tn_tx_msg.msg) { + rc = -ENOMEM; + goto err_free_tn; + } + } + + if (key) { + rc = kfilnd_ep_get_key(ep); + if (rc < 0) + goto err_free_tn; + tn->tn_mr_key = rc; + } + + tn->peer = kfilnd_peer_get(dev, target_nid); + if (IS_ERR(tn->peer)) { + rc = PTR_ERR(tn->peer); + goto err_put_mr_key; + } + + mutex_init(&tn->tn_lock); + tn->tn_ep = ep; + tn->tn_response_rx = ep->end_context_id; + tn->tn_state = TN_STATE_IDLE; + tn->hstatus = LNET_MSG_STATUS_OK; + tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout(); + tn->is_initiator = is_initiator; + INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work); + + /* Add the transaction to an endpoint. This is like + * incrementing a ref counter. + */ + spin_lock(&ep->tn_list_lock); + list_add_tail(&tn->tn_entry, &ep->tn_list); + spin_unlock(&ep->tn_list_lock); + + tn->tn_alloc_ts = tn_alloc_ts; + tn->tn_state_ts = ktime_get(); + + KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key); + + return tn; + +err_put_mr_key: + if (key) + kfilnd_ep_put_key(ep, tn->tn_mr_key); +err_free_tn: + if (tn->tn_tx_msg.msg) + kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg); + kmem_cache_free(tn_cache, tn); +err: + return ERR_PTR(rc); +} + +/** + * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system. + * + * This function should only be called when there are no outstanding + * transactions. + */ +void kfilnd_tn_cleanup(void) +{ + kmem_cache_destroy(imm_buf_cache); + kmem_cache_destroy(tn_cache); +} + +/** + * kfilnd_tn_init() - Initialize KFI LND transaction system. + * + * Return: On success, zero. Else, negative errno. + */ +int kfilnd_tn_init(void) +{ + tn_cache = kmem_cache_create("kfilnd_tn", + sizeof(struct kfilnd_transaction), 0, + SLAB_HWCACHE_ALIGN, NULL); + if (!tn_cache) + goto err; + + imm_buf_cache = kmem_cache_create("kfilnd_imm_buf", + KFILND_IMMEDIATE_MSG_SIZE, 0, + SLAB_HWCACHE_ALIGN, NULL); + if (!imm_buf_cache) + goto err_tn_cache_destroy; + + return 0; + +err_tn_cache_destroy: + kmem_cache_destroy(tn_cache); +err: + return -ENOMEM; +} + +/** + * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction. + * @tn: Transaction to have buffer set. + * @kiov: LNet KIOV buffer. + * @num_iov: Number of IOVs. + * @offset: Offset into IOVs where the buffer starts. + * @len: Length of the buffer. + * + * This function takes the user provided IOV, offset, and len, and sets the + * transaction buffer. The user provided IOV is an LNet KIOV. When the + * transaction buffer is configured, the user provided offset is applied + * when the transaction buffer is configured (i.e. the transaction buffer + * offset is zero). + */ +int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn, + struct bio_vec *kiov, size_t num_iov, + size_t offset, size_t len) +{ + size_t i; + size_t cur_len = 0; + size_t cur_offset = offset; + size_t cur_iov = 0; + size_t tmp_len; + size_t tmp_offset; + + for (i = 0; (i < num_iov) && (cur_len < len); i++) { + /* Skip KIOVs until a KIOV with a length less than the current + * offset is found. + */ + if (kiov[i].bv_len <= cur_offset) { + cur_offset -= kiov[i].bv_len; + continue; + } + + tmp_len = kiov[i].bv_len - cur_offset; + tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset; + + if (tmp_len + cur_len > len) + tmp_len = len - cur_len; + + /* tn_kiov is an array of size LNET_MAX_IOV */ + if (cur_iov >= LNET_MAX_IOV) + return -EINVAL; + + tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page; + tn->tn_kiov[cur_iov].bv_len = tmp_len; + tn->tn_kiov[cur_iov].bv_offset = tmp_offset; + + cur_iov++; + cur_len += tmp_len; + cur_offset = 0; + } + + tn->tn_num_iovec = cur_iov; + tn->tn_nob = cur_len; + + return 0; +} diff --git a/lnet/klnds/kfilnd/kfilnd_tn.h b/lnet/klnds/kfilnd/kfilnd_tn.h new file mode 100644 index 0000000..237ac68 --- /dev/null +++ b/lnet/klnds/kfilnd/kfilnd_tn.h @@ -0,0 +1,50 @@ +/* + * GPL HEADER START + * + * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER. + * + * This program is free software; you can redistribute it and/or modify + * it under the terms of the GNU General Public License version 2 only, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but + * WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU + * General Public License version 2 for more details (a copy is included + * in the LICENSE file that accompanied this code). + * + * You should have received a copy of the GNU General Public License + * version 2 along with this program; If not, see + * http://www.gnu.org/licenses/gpl-2.0.html + * + * GPL HEADER END + */ +/* + * Copyright 2022 Hewlett Packard Enterprise Development LP + */ +/* + * This file is part of Lustre, http://www.lustre.org/ + */ +/* + * kfilnd transaction and state machine processing. + */ +#ifndef _KFILND_TN_ +#define _KFILND_TN_ + +#include "kfilnd.h" + +void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc, + struct kfilnd_msg *rx_msg, int msg_size); +void kfilnd_tn_free(struct kfilnd_transaction *tn); +struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt, + lnet_nid_t target_nid, + bool alloc_msg, bool is_initiator, + bool key); +void kfilnd_tn_event_handler(struct kfilnd_transaction *tn, + enum tn_events event, int status); +void kfilnd_tn_cleanup(void); +int kfilnd_tn_init(void); +int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn, struct bio_vec *kiov, + size_t num_iov, size_t offset, size_t nob); + +#endif /* _KFILND_TN_ */ diff --git a/lnet/utils/lnetconfig/liblnetconfig.c b/lnet/utils/lnetconfig/liblnetconfig.c index c5ac18a..ad11da2 100644 --- a/lnet/utils/lnetconfig/liblnetconfig.c +++ b/lnet/utils/lnetconfig/liblnetconfig.c @@ -59,6 +59,7 @@ #define RDMA_PS_TCP 0x0106 #endif +#define cxi_nic_addr_path "/sys/class/cxi/cxi%u/device/properties/" const char *gmsg_stat_names[] = {"sent_stats", "received_stats", "dropped_stats"}; @@ -1151,6 +1152,40 @@ static int lustre_lnet_queryip(struct lnet_dlc_intf_descr *intf, __u32 *ip) return LUSTRE_CFG_RC_NO_ERR; } +static int lustre_lnet_kfi_intf2nid(struct lnet_dlc_intf_descr *intf, + __u32 *nid_addr) +{ + unsigned int nic_index; + int rc; + char *nic_addr_path; + char val[128]; + int size; + long int addr; + + rc = sscanf(intf->intf_name, "cxi%u", &nic_index); + if (rc != 1) + return LUSTRE_CFG_RC_NO_MATCH; + + size = snprintf(NULL, 0, cxi_nic_addr_path, nic_index) + 1; + nic_addr_path = malloc(size); + if (!nic_addr_path) + return LUSTRE_CFG_RC_OUT_OF_MEM; + sprintf(nic_addr_path, cxi_nic_addr_path, nic_index); + + rc = read_sysfs_file(nic_addr_path, "nic_addr", val, 1, sizeof(val)); + free(nic_addr_path); + if (rc) + return LUSTRE_CFG_RC_NO_MATCH; + + addr = strtol(val, NULL, 16); + if (addr == LONG_MIN || addr == LONG_MAX) + return LUSTRE_CFG_RC_NO_MATCH; + + *nid_addr = addr; + + return LUSTRE_CFG_RC_NO_ERR; +} + /* * for each interface in the array of interfaces find the IP address of * that interface, create its nid and add it to an array of NIDs. @@ -1164,6 +1199,7 @@ static int lustre_lnet_intf2nids(struct lnet_dlc_network_descr *nw, struct lnet_dlc_intf_descr *intf; char val[LNET_MAX_STR_LEN]; __u32 ip; + __u32 nic_addr; int gni_num; char *endp; unsigned int num; @@ -1206,6 +1242,21 @@ static int lustre_lnet_intf2nids(struct lnet_dlc_network_descr *nw, (*nids)[i] = LNET_MKNID(nw->nw_id, gni_num); goto out; + } else if (LNET_NETTYP(nw->nw_id) == KFILND) { + list_for_each_entry(intf, &nw->nw_intflist, intf_on_network) { + rc = lustre_lnet_kfi_intf2nid(intf, &nic_addr); + if (rc != LUSTRE_CFG_RC_NO_ERR) { + snprintf(err_str, str_len, + "\"couldn't query kfi intf %s\"", + intf->intf_name); + err_str[str_len - 1] = '\0'; + goto failed; + } + + (*nids)[i] = LNET_MKNID(nw->nw_id, nic_addr); + i++; + } + goto out; } /* look at the other interfaces */ diff --git a/lnet/utils/lnetconfig/liblnetconfig_lnd.c b/lnet/utils/lnetconfig/liblnetconfig_lnd.c index 878a184..644a81c 100644 --- a/lnet/utils/lnetconfig/liblnetconfig_lnd.c +++ b/lnet/utils/lnetconfig/liblnetconfig_lnd.c @@ -85,6 +85,25 @@ lustre_socklnd_show_tun(struct cYAML *lndparams, return LUSTRE_CFG_RC_NO_ERR; } +static int +lustre_kfilnd_show_tun(struct cYAML *lndparams, + struct lnet_ioctl_config_kfilnd_tunables *lnd_cfg) +{ + if (cYAML_create_number(lndparams, "prov_major_version", + lnd_cfg->lnd_prov_major_version) == NULL) + return LUSTRE_CFG_RC_OUT_OF_MEM; + + if (cYAML_create_number(lndparams, "prov_minor_version", + lnd_cfg->lnd_prov_minor_version) == NULL) + return LUSTRE_CFG_RC_OUT_OF_MEM; + + if (cYAML_create_number(lndparams, "auth_key", + lnd_cfg->lnd_auth_key) == NULL) + return LUSTRE_CFG_RC_OUT_OF_MEM; + + return LUSTRE_CFG_RC_NO_ERR; +} + int lustre_net_show_tunables(struct cYAML *tunables, struct lnet_ioctl_config_lnd_cmn_tunables *cmn) @@ -129,6 +148,9 @@ lustre_ni_show_tunables(struct cYAML *lnd_tunables, else if (net_type == SOCKLND) rc = lustre_socklnd_show_tun(lnd_tunables, &lnd->lnd_tun_u.lnd_sock); + else if (net_type == KFILND) + rc = lustre_kfilnd_show_tun(lnd_tunables, + &lnd->lnd_tun_u.lnd_kfi); return rc; } @@ -176,6 +198,33 @@ yaml_extract_o2ib_tun(struct cYAML *tree, } +static void +yaml_extract_kfi_tun(struct cYAML *tree, + struct lnet_ioctl_config_kfilnd_tunables *lnd_cfg) +{ + struct cYAML *prov_major_version = NULL; + struct cYAML *prov_minor_version = NULL; + struct cYAML *auth_key = NULL; + struct cYAML *lndparams = NULL; + + lndparams = cYAML_get_object_item(tree, "lnd tunables"); + if (!lndparams) + return; + + prov_major_version = + cYAML_get_object_item(lndparams, "prov_major_version"); + lnd_cfg->lnd_prov_major_version = + (prov_major_version) ? prov_major_version->cy_valueint : 0; + + prov_minor_version = + cYAML_get_object_item(lndparams, "prov_minor_version"); + lnd_cfg->lnd_prov_minor_version = + (prov_minor_version) ? prov_minor_version->cy_valueint : 0; + + auth_key = cYAML_get_object_item(lndparams, "auth_key"); + lnd_cfg->lnd_auth_key = + (auth_key) ? auth_key->cy_valueint : 0; +} static void yaml_extract_sock_tun(struct cYAML *tree, @@ -203,6 +252,8 @@ lustre_yaml_extract_lnd_tunables(struct cYAML *tree, else if (net_type == SOCKLND) yaml_extract_sock_tun(tree, &tun->lnd_tun_u.lnd_sock); - + else if (net_type == KFILND) + yaml_extract_kfi_tun(tree, + &tun->lnd_tun_u.lnd_kfi); } diff --git a/lnet/utils/lnetctl.c b/lnet/utils/lnetctl.c index 0024b16..a0f9de7 100644 --- a/lnet/utils/lnetctl.c +++ b/lnet/utils/lnetctl.c @@ -165,7 +165,8 @@ command_t net_cmds[] = { "\t--credits: Network Interface credits\n" "\t--cpt: CPU Partitions configured net uses (e.g. [0,1]\n" "\t--conns-per-peer: number of connections per peer\n" - "\t--skip-mr-route-setup: do not add linux route for the ni\n"}, + "\t--skip-mr-route-setup: do not add linux route for the ni\n" + "\t--auth-key: Network authorization key (kfilnd only)\n"}, {"del", jt_del_ni, 0, "delete a network\n" "\t--net: net name (e.g. tcp0)\n" "\t--if: physical interface (e.g. eth0)\n"}, @@ -1056,7 +1057,7 @@ static int jt_add_route(int argc, char **argv) static int jt_add_ni(int argc, char **argv) { char *ip2net = NULL; - long int pto = -1, pc = -1, pbc = -1, cre = -1, cpp = -1; + long int pto = -1, pc = -1, pbc = -1, cre = -1, cpp = -1, auth_key = -1; struct cYAML *err_rc = NULL; int rc, opt, cpt_rc = -1; struct lnet_dlc_network_descr nw_descr; @@ -1068,8 +1069,9 @@ static int jt_add_ni(int argc, char **argv) memset(&tunables, 0, sizeof(tunables)); lustre_lnet_init_nw_descr(&nw_descr); - const char *const short_options = "b:c:i:k:m:n:p:r:s:t:"; + const char *const short_options = "a:b:c:i:k:m:n:p:r:s:t:"; static const struct option long_options[] = { + { .name = "auth-key", .has_arg = required_argument, .val = 'a' }, { .name = "peer-buffer-credits", .has_arg = required_argument, .val = 'b' }, { .name = "peer-credits", .has_arg = required_argument, .val = 'c' }, @@ -1092,6 +1094,14 @@ static int jt_add_ni(int argc, char **argv) while ((opt = getopt_long(argc, argv, short_options, long_options, NULL)) != -1) { switch (opt) { + case 'a': + rc = parse_long(optarg, &auth_key); + if (rc != 0) { + /* ignore option */ + auth_key = -1; + continue; + } + break; case 'b': rc = parse_long(optarg, &pbc); if (rc != 0) { @@ -1163,6 +1173,11 @@ static int jt_add_ni(int argc, char **argv) } } + if (auth_key > 0 && LNET_NETTYP(nw_descr.nw_id) == KFILND) { + tunables.lt_tun.lnd_tun_u.lnd_kfi.lnd_auth_key = auth_key; + found = true; + } + if (pto > 0 || pc > 0 || pbc > 0 || cre > 0 || cpp > -1) { tunables.lt_cmn.lct_peer_timeout = pto; tunables.lt_cmn.lct_peer_tx_credits = pc; diff --git a/lustre/utils/gss/lsupport.c b/lustre/utils/gss/lsupport.c index 7bc291b..76a6034 100644 --- a/lustre/utils/gss/lsupport.c +++ b/lustre/utils/gss/lsupport.c @@ -295,7 +295,8 @@ static struct convert_struct converter[] = { [SOCKLND] = { .name = "SOCKLND", .nid2name = ipv4_nid2hostname }, [O2IBLND] = { .name = "O2IBLND", .nid2name = ipv4_nid2hostname }, [LOLND] = { .name = "LOLND", .nid2name = lolnd_nid2hostname }, - [PTL4LND] = { .name = "PTL4LND", .nid2name = external_nid2hostname } + [PTL4LND] = { .name = "PTL4LND", .nid2name = external_nid2hostname }, + [KFILND] = { .name = "KFILND", .nid2name = ipv4_nid2hostname } }; #define LND_MAX (sizeof(converter) / sizeof(converter[0])) -- 1.8.3.1