4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright 2022 Hewlett Packard Enterprise Development LP
26 * This file is part of Lustre, http://www.lustre.org/
29 * kfilnd main interface.
32 #include <linux/delay.h>
34 #include "kfilnd_tn.h"
35 #include "kfilnd_dev.h"
37 /* These are temp constants to get stuff to compile */
38 #define KFILND_DEFAULT_DEVICE "eth0"
40 /* Some constants which should be turned into tunables */
41 #define KFILND_MAX_WORKER_THREADS 4
42 #define KFILND_MAX_EVENT_QUEUE 100
44 #define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
45 struct workqueue_struct *kfilnd_wq;
46 struct dentry *kfilnd_debug_dir;
48 static void kfilnd_shutdown(struct lnet_ni *ni)
50 struct kfilnd_dev *dev = ni->ni_data;
55 static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
59 /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
60 cpt = lnet_cpt_current();
61 if (dev->cpt_to_endpoint[cpt])
64 /* Hash to a LNet NI CPT based on target NID. */
65 return dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
68 int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt,
69 struct kfilnd_peer *kp)
71 struct kfilnd_transaction *tn;
74 if (kfilnd_peer_set_check_hello_pending(kp)) {
75 CDEBUG(D_NET, "Hello already pending to peer %s(%px)\n",
76 libcfs_nid2str(kp->kp_nid), kp);
80 tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, true, true, false);
83 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
84 kfilnd_peer_clear_hello_pending(kp);
88 tn->msg_type = KFILND_MSG_HELLO_REQ;
90 kp->kp_hello_ts = ktime_get_seconds();
92 kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
97 static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
99 int type = msg->msg_type;
100 struct lnet_processid *target = &msg->msg_target;
101 struct kfilnd_transaction *tn;
103 struct kfilnd_dev *dev = ni->ni_data;
104 enum kfilnd_msg_type lnd_msg_type;
106 enum tn_events event = TN_EVENT_INVALID;
116 if (msg->msg_len != 0)
118 lnd_msg_type = KFILND_MSG_IMMEDIATE;
122 if (msg->msg_routing || msg->msg_target_is_router) {
123 lnd_msg_type = KFILND_MSG_IMMEDIATE;
127 nob = offsetof(struct kfilnd_msg,
128 proto.immed.payload[msg->msg_md->md_length]);
129 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
130 lnd_msg_type = KFILND_MSG_IMMEDIATE;
134 lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
140 nob = offsetof(struct kfilnd_msg,
141 proto.immed.payload[msg->msg_len]);
142 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
143 lnd_msg_type = KFILND_MSG_IMMEDIATE;
147 lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
152 tgt_nid4 = lnet_nid_to_nid4(&target->nid);
154 cpt = kfilnd_send_cpt(dev, tgt_nid4);
155 tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
158 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
162 if (kfilnd_peer_needs_hello(tn->tn_kp)) {
163 rc = kfilnd_send_hello_request(dev, cpt, tn->tn_kp);
170 switch (lnd_msg_type) {
171 case KFILND_MSG_IMMEDIATE:
172 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
173 msg->msg_offset, msg->msg_len);
175 CERROR("Failed to setup immediate buffer rc %d\n", rc);
180 event = TN_EVENT_INIT_IMMEDIATE;
183 case KFILND_MSG_BULK_PUT_REQ:
184 tn->sink_buffer = false;
185 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
186 msg->msg_offset, msg->msg_len);
188 CERROR("Failed to setup PUT source buffer rc %d\n", rc);
193 event = TN_EVENT_INIT_BULK;
196 case KFILND_MSG_BULK_GET_REQ:
197 /* We need to create a reply message to inform LNet our
198 * optimized GET is done.
200 tn->tn_getreply = lnet_create_reply_msg(ni, msg);
201 if (!tn->tn_getreply) {
202 CERROR("Can't create reply for GET -> %s\n",
203 libcfs_nidstr(&target->nid));
208 tn->sink_buffer = true;
209 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
210 msg->msg_md->md_niov,
211 msg->msg_md->md_offset,
212 msg->msg_md->md_length);
214 CERROR("Failed to setup GET sink buffer rc %d\n", rc);
218 event = TN_EVENT_INIT_BULK;
226 tn->msg_type = lnd_msg_type;
227 tn->tn_lntmsg = msg; /* finalise msg on completion */
228 tn->lnet_msg_len = tn->tn_nob;
230 KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
231 msg_type_to_str(lnd_msg_type), tn->tn_nob,
234 /* Start the state machine processing this transaction */
235 kfilnd_tn_event_handler(tn, event, 0);
240 static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
241 int delayed, unsigned int niov,
242 struct bio_vec *kiov,
243 unsigned int offset, unsigned int mlen,
246 struct kfilnd_transaction *tn = private;
247 struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
251 enum tn_events event;
256 /* Transaction must be in receive state */
257 if (tn->tn_state != TN_STATE_IMM_RECV)
261 tn->lnet_msg_len = rlen;
263 switch (rxmsg->type) {
264 case KFILND_MSG_IMMEDIATE:
265 nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
266 if (nob > tn->tn_rx_msg.length) {
267 CERROR("Immediate message from %s too big: %d(%lu)\n",
268 libcfs_nidstr(&msg->msg_hdr.src_nid),
269 nob, tn->tn_rx_msg.length);
274 lnet_copy_flat2kiov(niov, kiov, offset,
275 KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
276 offsetof(struct kfilnd_msg,
277 proto.immed.payload),
280 kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
283 case KFILND_MSG_BULK_PUT_REQ:
285 event = TN_EVENT_SKIP_TAG_RMA;
287 /* Post the buffer given us as a sink */
288 tn->sink_buffer = true;
289 rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
292 CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
296 event = TN_EVENT_INIT_TAG_RMA;
300 case KFILND_MSG_BULK_GET_REQ:
302 event = TN_EVENT_SKIP_TAG_RMA;
305 /* Post the buffer given to us as a source */
306 tn->sink_buffer = false;
307 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
312 CERROR("Failed to setup GET source buffer rc %d\n", rc);
316 event = TN_EVENT_INIT_TAG_RMA;
321 /* TODO: TN leaks here. */
322 CERROR("Invalid message type = %d\n", rxmsg->type);
326 /* Store relevant fields to generate a bulk response. */
327 tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
328 tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
331 tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
333 sizeof(struct kfilnd_bulk_rsp),
337 KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
338 msg_type_to_str(rxmsg->type), tn->tn_nob,
341 kfilnd_tn_event_handler(tn, event, status);
346 static const struct ln_key_list kfilnd_tunables_keys = {
347 .lkl_maxattr = LNET_NET_KFILND_TUNABLES_ATTR_MAX,
349 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR] = {
350 .lkp_value = "prov_major_version",
351 .lkp_data_type = NLA_S32
353 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR] = {
354 .lkp_value = "prov_minor_version",
355 .lkp_data_type = NLA_S32
357 [LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY] = {
358 .lkp_value = "auth_key",
359 .lkp_data_type = NLA_S32
365 kfilnd_nl_set(int cmd, struct nlattr *attr, int type, void *data)
367 struct lnet_lnd_tunables *tunables = data;
370 if (cmd != LNET_CMD_NETS)
374 case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR:
375 tunables->lnd_tun_u.lnd_kfi.lnd_prov_major_version = nla_get_s64(attr);
377 case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR:
378 tunables->lnd_tun_u.lnd_kfi.lnd_prov_minor_version = nla_get_s64(attr);
380 case LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY:
381 tunables->lnd_tun_u.lnd_kfi.lnd_auth_key = nla_get_s64(attr);
391 static int kfilnd_startup(struct lnet_ni *ni);
393 static const struct lnet_lnd the_kfilnd = {
395 .lnd_startup = kfilnd_startup,
396 .lnd_shutdown = kfilnd_shutdown,
397 .lnd_send = kfilnd_send,
398 .lnd_recv = kfilnd_recv,
399 .lnd_nl_set = kfilnd_nl_set,
400 .lnd_keys = &kfilnd_tunables_keys,
403 static int kfilnd_startup(struct lnet_ni *ni)
407 struct kfilnd_dev *kfdev;
412 if (ni->ni_net->net_lnd != &the_kfilnd) {
413 CERROR("Wrong lnd type\n");
417 kfilnd_tunables_setup(ni);
419 /* Only a single interface is supported. */
420 if (!ni->ni_interface) {
422 CERROR("No LNet network interface address defined\n");
426 node = ni->ni_interface;
428 kfdev = kfilnd_dev_alloc(ni, node);
431 CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
437 ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
439 /* Post a series of immediate receive buffers */
440 rc = kfilnd_dev_post_imm_buffers(kfdev);
442 CERROR("Can't post buffers, rc = %d\n", rc);
449 kfilnd_dev_free(kfdev);
454 static void __exit kfilnd_exit(void)
456 destroy_workqueue(kfilnd_wq);
460 lnet_unregister_lnd(&the_kfilnd);
462 debugfs_remove_recursive(kfilnd_debug_dir);
465 static int __init kfilnd_init(void)
469 kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
471 rc = kfilnd_tunables_init();
475 /* Do any initialization of the transaction system */
476 rc = kfilnd_tn_init();
478 CERROR("Cannot initialize transaction system\n");
482 kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
486 CERROR("Failed to allocated kfilnd work queue\n");
490 lnet_register_lnd(&the_kfilnd);
500 MODULE_AUTHOR("Cray Inc.");
501 MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
502 MODULE_VERSION(KFILND_VERSION);
503 MODULE_LICENSE("GPL");
505 module_init(kfilnd_init);
506 module_exit(kfilnd_exit);