4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright 2022 Hewlett Packard Enterprise Development LP
26 * This file is part of Lustre, http://www.lustre.org/
29 * kfilnd main interface.
32 #include <linux/delay.h>
34 #include "kfilnd_tn.h"
35 #include "kfilnd_dev.h"
37 /* These are temp constants to get stuff to compile */
38 #define KFILND_DEFAULT_DEVICE "eth0"
40 /* Some constants which should be turned into tunables */
41 #define KFILND_MAX_WORKER_THREADS 4
42 #define KFILND_MAX_EVENT_QUEUE 100
44 #define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
45 struct workqueue_struct *kfilnd_wq;
46 struct dentry *kfilnd_debug_dir;
48 static void kfilnd_shutdown(struct lnet_ni *ni)
50 struct kfilnd_dev *dev = ni->ni_data;
55 static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
59 /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
60 cpt = lnet_cpt_current();
61 if (dev->cpt_to_endpoint[cpt])
64 /* Hash to a LNet NI CPT based on target NID. */
65 return dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
68 int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt,
69 struct kfilnd_peer *kp)
71 struct kfilnd_transaction *tn;
74 if (kfilnd_peer_set_check_hello_pending(kp)) {
75 CDEBUG(D_NET, "Hello already pending to peer %s(%px)\n",
76 libcfs_nid2str(kp->kp_nid), kp);
80 tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, true, true, false);
83 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
84 kfilnd_peer_clear_hello_pending(kp);
88 tn->msg_type = KFILND_MSG_HELLO_REQ;
90 kp->kp_hello_ts = ktime_get_seconds();
92 kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
97 static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
99 int type = msg->msg_type;
100 struct lnet_processid *target = &msg->msg_target;
101 struct kfilnd_transaction *tn;
103 struct kfilnd_dev *dev = ni->ni_data;
104 enum kfilnd_msg_type lnd_msg_type;
106 enum tn_events event = TN_EVENT_INVALID;
116 if (msg->msg_len != 0)
118 lnd_msg_type = KFILND_MSG_IMMEDIATE;
122 if (msg->msg_routing || msg->msg_target_is_router) {
123 lnd_msg_type = KFILND_MSG_IMMEDIATE;
127 nob = offsetof(struct kfilnd_msg,
128 proto.immed.payload[msg->msg_md->md_length]);
129 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
130 lnd_msg_type = KFILND_MSG_IMMEDIATE;
134 lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
140 nob = offsetof(struct kfilnd_msg,
141 proto.immed.payload[msg->msg_len]);
142 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
143 lnd_msg_type = KFILND_MSG_IMMEDIATE;
147 lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
152 tgt_nid4 = lnet_nid_to_nid4(&target->nid);
154 cpt = kfilnd_send_cpt(dev, tgt_nid4);
155 tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
158 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
162 if (kfilnd_peer_needs_hello(tn->tn_kp)) {
163 rc = kfilnd_send_hello_request(dev, cpt, tn->tn_kp);
164 if (rc && kfilnd_peer_is_new_peer(tn->tn_kp)) {
165 /* Only fail the send if this is a new peer. Otherwise
166 * attempt the send using our stale peer information
173 switch (lnd_msg_type) {
174 case KFILND_MSG_IMMEDIATE:
175 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
176 msg->msg_offset, msg->msg_len);
178 CERROR("Failed to setup immediate buffer rc %d\n", rc);
183 event = TN_EVENT_INIT_IMMEDIATE;
186 case KFILND_MSG_BULK_PUT_REQ:
187 tn->sink_buffer = false;
188 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
189 msg->msg_offset, msg->msg_len);
191 CERROR("Failed to setup PUT source buffer rc %d\n", rc);
196 event = TN_EVENT_INIT_BULK;
199 case KFILND_MSG_BULK_GET_REQ:
200 /* We need to create a reply message to inform LNet our
201 * optimized GET is done.
203 tn->tn_getreply = lnet_create_reply_msg(ni, msg);
204 if (!tn->tn_getreply) {
205 CERROR("Can't create reply for GET -> %s\n",
206 libcfs_nidstr(&target->nid));
211 tn->sink_buffer = true;
212 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
213 msg->msg_md->md_niov,
214 msg->msg_md->md_offset,
215 msg->msg_md->md_length);
217 CERROR("Failed to setup GET sink buffer rc %d\n", rc);
221 event = TN_EVENT_INIT_BULK;
229 tn->msg_type = lnd_msg_type;
230 tn->tn_lntmsg = msg; /* finalise msg on completion */
231 tn->lnet_msg_len = tn->tn_nob;
233 KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
234 msg_type_to_str(lnd_msg_type), tn->tn_nob,
237 /* Start the state machine processing this transaction */
238 kfilnd_tn_event_handler(tn, event, 0);
243 static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
244 int delayed, unsigned int niov,
245 struct bio_vec *kiov,
246 unsigned int offset, unsigned int mlen,
249 struct kfilnd_transaction *tn = private;
250 struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
254 enum tn_events event;
259 /* Transaction must be in receive state */
260 if (tn->tn_state != TN_STATE_IMM_RECV)
264 tn->lnet_msg_len = rlen;
266 switch (rxmsg->type) {
267 case KFILND_MSG_IMMEDIATE:
268 nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
269 if (nob > tn->tn_rx_msg.length) {
270 CERROR("Immediate message from %s too big: %d(%lu)\n",
271 libcfs_nidstr(&msg->msg_hdr.src_nid),
272 nob, tn->tn_rx_msg.length);
277 lnet_copy_flat2kiov(niov, kiov, offset,
278 KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
279 offsetof(struct kfilnd_msg,
280 proto.immed.payload),
283 kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
286 case KFILND_MSG_BULK_PUT_REQ:
288 event = TN_EVENT_SKIP_TAG_RMA;
290 /* Post the buffer given us as a sink */
291 tn->sink_buffer = true;
292 rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
295 CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
299 event = TN_EVENT_INIT_TAG_RMA;
303 case KFILND_MSG_BULK_GET_REQ:
305 event = TN_EVENT_SKIP_TAG_RMA;
308 /* Post the buffer given to us as a source */
309 tn->sink_buffer = false;
310 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
315 CERROR("Failed to setup GET source buffer rc %d\n", rc);
319 event = TN_EVENT_INIT_TAG_RMA;
324 /* TODO: TN leaks here. */
325 CERROR("Invalid message type = %d\n", rxmsg->type);
329 /* Store relevant fields to generate a bulk response. */
330 tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
331 tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
334 tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
336 sizeof(struct kfilnd_bulk_rsp),
340 KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
341 msg_type_to_str(rxmsg->type), tn->tn_nob,
344 kfilnd_tn_event_handler(tn, event, status);
349 static const struct ln_key_list kfilnd_tunables_keys = {
350 .lkl_maxattr = LNET_NET_KFILND_TUNABLES_ATTR_MAX,
352 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR] = {
353 .lkp_value = "prov_major_version",
354 .lkp_data_type = NLA_S32
356 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR] = {
357 .lkp_value = "prov_minor_version",
358 .lkp_data_type = NLA_S32
360 [LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY] = {
361 .lkp_value = "auth_key",
362 .lkp_data_type = NLA_S32
368 kfilnd_nl_set(int cmd, struct nlattr *attr, int type, void *data)
370 struct lnet_lnd_tunables *tunables = data;
373 if (cmd != LNET_CMD_NETS)
377 case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR:
378 tunables->lnd_tun_u.lnd_kfi.lnd_prov_major_version = nla_get_s64(attr);
380 case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR:
381 tunables->lnd_tun_u.lnd_kfi.lnd_prov_minor_version = nla_get_s64(attr);
383 case LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY:
384 tunables->lnd_tun_u.lnd_kfi.lnd_auth_key = nla_get_s64(attr);
394 static int kfilnd_startup(struct lnet_ni *ni);
396 static const struct lnet_lnd the_kfilnd = {
398 .lnd_startup = kfilnd_startup,
399 .lnd_shutdown = kfilnd_shutdown,
400 .lnd_send = kfilnd_send,
401 .lnd_recv = kfilnd_recv,
402 .lnd_nl_set = kfilnd_nl_set,
403 .lnd_keys = &kfilnd_tunables_keys,
406 static int kfilnd_startup(struct lnet_ni *ni)
410 struct kfilnd_dev *kfdev;
415 if (ni->ni_net->net_lnd != &the_kfilnd) {
416 CERROR("Wrong lnd type\n");
420 kfilnd_tunables_setup(ni);
422 /* Only a single interface is supported. */
423 if (!ni->ni_interface) {
425 CERROR("No LNet network interface address defined\n");
429 node = ni->ni_interface;
431 kfdev = kfilnd_dev_alloc(ni, node);
434 CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
440 ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
442 /* Post a series of immediate receive buffers */
443 rc = kfilnd_dev_post_imm_buffers(kfdev);
445 CERROR("Can't post buffers, rc = %d\n", rc);
452 kfilnd_dev_free(kfdev);
457 static void __exit kfilnd_exit(void)
459 destroy_workqueue(kfilnd_wq);
463 lnet_unregister_lnd(&the_kfilnd);
465 debugfs_remove_recursive(kfilnd_debug_dir);
468 static int __init kfilnd_init(void)
472 kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
474 rc = kfilnd_tunables_init();
478 /* Do any initialization of the transaction system */
479 rc = kfilnd_tn_init();
481 CERROR("Cannot initialize transaction system\n");
485 kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
489 CERROR("Failed to allocated kfilnd work queue\n");
493 lnet_register_lnd(&the_kfilnd);
503 MODULE_AUTHOR("Cray Inc.");
504 MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
505 MODULE_VERSION(KFILND_VERSION);
506 MODULE_LICENSE("GPL");
508 module_init(kfilnd_init);
509 module_exit(kfilnd_exit);