Whamcloud - gitweb
LU-16213 kfilnd: Allow one HELLO in-flight per peer
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd main interface.
30  */
31
32 #include <linux/delay.h>
33 #include "kfilnd.h"
34 #include "kfilnd_tn.h"
35 #include "kfilnd_dev.h"
36
37 /* These are temp constants to get stuff to compile */
38 #define KFILND_DEFAULT_DEVICE "eth0"
39
40 /* Some constants which should be turned into tunables */
41 #define KFILND_MAX_WORKER_THREADS 4
42 #define KFILND_MAX_EVENT_QUEUE 100
43
44 #define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
45 struct workqueue_struct *kfilnd_wq;
46 struct dentry *kfilnd_debug_dir;
47
48 static void kfilnd_shutdown(struct lnet_ni *ni)
49 {
50         struct kfilnd_dev *dev = ni->ni_data;
51
52         kfilnd_dev_free(dev);
53 }
54
55 static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
56 {
57         int cpt;
58
59         /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
60         cpt = lnet_cpt_current();
61         if (dev->cpt_to_endpoint[cpt])
62                 return cpt;
63
64         /* Hash to a LNet NI CPT based on target NID. */
65         return  dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
66 }
67
68 int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt,
69                               struct kfilnd_peer *kp)
70 {
71         struct kfilnd_transaction *tn;
72         int rc;
73
74         if (kfilnd_peer_set_check_hello_pending(kp)) {
75                 CDEBUG(D_NET, "Hello already pending to peer %s(%px)\n",
76                        libcfs_nid2str(kp->kp_nid), kp);
77                 return 0;
78         }
79
80         tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, true, true, false);
81         if (IS_ERR(tn)) {
82                 rc = PTR_ERR(tn);
83                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
84                 kfilnd_peer_clear_hello_pending(kp);
85                 return rc;
86         }
87
88         tn->msg_type = KFILND_MSG_HELLO_REQ;
89
90         kp->kp_hello_ts = ktime_get_seconds();
91
92         kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
93
94         return 0;
95 }
96
97 static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
98 {
99         int type = msg->msg_type;
100         struct lnet_processid *target = &msg->msg_target;
101         struct kfilnd_transaction *tn;
102         int nob;
103         struct kfilnd_dev *dev = ni->ni_data;
104         enum kfilnd_msg_type lnd_msg_type;
105         int cpt;
106         enum tn_events event = TN_EVENT_INVALID;
107         int rc;
108         bool tn_key = false;
109         lnet_nid_t tgt_nid4;
110
111         switch (type) {
112         default:
113                 return -EIO;
114
115         case LNET_MSG_ACK:
116                 if (msg->msg_len != 0)
117                         return -EINVAL;
118                 lnd_msg_type = KFILND_MSG_IMMEDIATE;
119                 break;
120
121         case LNET_MSG_GET:
122                 if (msg->msg_routing || msg->msg_target_is_router) {
123                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
124                         break;
125                 }
126
127                 nob = offsetof(struct kfilnd_msg,
128                                proto.immed.payload[msg->msg_md->md_length]);
129                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
130                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
131                         break;
132                 }
133
134                 lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
135                 tn_key = true;
136                 break;
137
138         case LNET_MSG_REPLY:
139         case LNET_MSG_PUT:
140                 nob = offsetof(struct kfilnd_msg,
141                                proto.immed.payload[msg->msg_len]);
142                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
143                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
144                         break;
145                 }
146
147                 lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
148                 tn_key = true;
149                 break;
150         }
151
152         tgt_nid4 = lnet_nid_to_nid4(&target->nid);
153
154         cpt = kfilnd_send_cpt(dev, tgt_nid4);
155         tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
156         if (IS_ERR(tn)) {
157                 rc = PTR_ERR(tn);
158                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
159                 return rc;
160         }
161
162         if (kfilnd_peer_needs_hello(tn->tn_kp)) {
163                 rc = kfilnd_send_hello_request(dev, cpt, tn->tn_kp);
164                 if (rc) {
165                         kfilnd_tn_free(tn);
166                         return rc;
167                 }
168         }
169
170         switch (lnd_msg_type) {
171         case KFILND_MSG_IMMEDIATE:
172                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
173                                             msg->msg_offset, msg->msg_len);
174                 if (rc) {
175                         CERROR("Failed to setup immediate buffer rc %d\n", rc);
176                         kfilnd_tn_free(tn);
177                         return rc;
178                 }
179
180                 event = TN_EVENT_INIT_IMMEDIATE;
181                 break;
182
183         case KFILND_MSG_BULK_PUT_REQ:
184                 tn->sink_buffer = false;
185                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
186                                             msg->msg_offset, msg->msg_len);
187                 if (rc) {
188                         CERROR("Failed to setup PUT source buffer rc %d\n", rc);
189                         kfilnd_tn_free(tn);
190                         return rc;
191                 }
192
193                 event = TN_EVENT_INIT_BULK;
194                 break;
195
196         case KFILND_MSG_BULK_GET_REQ:
197                 /* We need to create a reply message to inform LNet our
198                  * optimized GET is done.
199                  */
200                 tn->tn_getreply = lnet_create_reply_msg(ni, msg);
201                 if (!tn->tn_getreply) {
202                         CERROR("Can't create reply for GET -> %s\n",
203                                libcfs_nidstr(&target->nid));
204                         kfilnd_tn_free(tn);
205                         return -EIO;
206                 }
207
208                 tn->sink_buffer = true;
209                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
210                                             msg->msg_md->md_niov,
211                                             msg->msg_md->md_offset,
212                                             msg->msg_md->md_length);
213                 if (rc) {
214                         CERROR("Failed to setup GET sink buffer rc %d\n", rc);
215                         kfilnd_tn_free(tn);
216                         return rc;
217                 }
218                 event = TN_EVENT_INIT_BULK;
219                 break;
220
221         default:
222                 kfilnd_tn_free(tn);
223                 return -EIO;
224         }
225
226         tn->msg_type = lnd_msg_type;
227         tn->tn_lntmsg = msg;    /* finalise msg on completion */
228         tn->lnet_msg_len = tn->tn_nob;
229
230         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
231                         msg_type_to_str(lnd_msg_type), tn->tn_nob,
232                         tn->tn_num_iovec);
233
234         /* Start the state machine processing this transaction */
235         kfilnd_tn_event_handler(tn, event, 0);
236
237         return 0;
238 }
239
240 static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
241                        int delayed, unsigned int niov,
242                        struct bio_vec *kiov,
243                        unsigned int offset, unsigned int mlen,
244                        unsigned int rlen)
245 {
246         struct kfilnd_transaction *tn = private;
247         struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
248         int nob;
249         int rc = 0;
250         int status = 0;
251         enum tn_events event;
252
253         if (mlen > rlen)
254                 return -EINVAL;
255
256         /* Transaction must be in receive state */
257         if (tn->tn_state != TN_STATE_IMM_RECV)
258                 return -EINVAL;
259
260         tn->tn_lntmsg = msg;
261         tn->lnet_msg_len = rlen;
262
263         switch (rxmsg->type) {
264         case KFILND_MSG_IMMEDIATE:
265                 nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
266                 if (nob > tn->tn_rx_msg.length) {
267                         CERROR("Immediate message from %s too big: %d(%lu)\n",
268                                libcfs_nidstr(&msg->msg_hdr.src_nid),
269                                nob, tn->tn_rx_msg.length);
270                         return -EPROTO;
271                 }
272                 tn->tn_nob = nob;
273
274                 lnet_copy_flat2kiov(niov, kiov, offset,
275                                     KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
276                                     offsetof(struct kfilnd_msg,
277                                              proto.immed.payload),
278                                     mlen);
279
280                 kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
281                 return 0;
282
283         case KFILND_MSG_BULK_PUT_REQ:
284                 if (mlen == 0) {
285                         event = TN_EVENT_SKIP_TAG_RMA;
286                 } else {
287                         /* Post the buffer given us as a sink  */
288                         tn->sink_buffer = true;
289                         rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
290                                                     mlen);
291                         if (rc) {
292                                 CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
293                                 kfilnd_tn_free(tn);
294                                 return rc;
295                         }
296                         event = TN_EVENT_INIT_TAG_RMA;
297                 }
298                 break;
299
300         case KFILND_MSG_BULK_GET_REQ:
301                 if (!msg) {
302                         event = TN_EVENT_SKIP_TAG_RMA;
303                         status = -ENODATA;
304                 } else {
305                         /* Post the buffer given to us as a source  */
306                         tn->sink_buffer = false;
307                         rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
308                                                     msg->msg_niov,
309                                                     msg->msg_offset,
310                                                     msg->msg_len);
311                         if (rc) {
312                                 CERROR("Failed to setup GET source buffer rc %d\n", rc);
313                                 kfilnd_tn_free(tn);
314                                 return rc;
315                         }
316                         event = TN_EVENT_INIT_TAG_RMA;
317                 }
318                 break;
319
320         default:
321                 /* TODO: TN leaks here. */
322                 CERROR("Invalid message type = %d\n", rxmsg->type);
323                 return -EINVAL;
324         }
325
326         /* Store relevant fields to generate a bulk response. */
327         tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
328         tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
329
330 #if 0
331         tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
332                                                  KFILND_MSG_BULK_RSP,
333                                                  sizeof(struct kfilnd_bulk_rsp),
334                                                  ni);
335 #endif
336
337         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
338                         msg_type_to_str(rxmsg->type), tn->tn_nob,
339                         tn->tn_num_iovec);
340
341         kfilnd_tn_event_handler(tn, event, status);
342
343         return rc;
344 }
345
346 static const struct ln_key_list kfilnd_tunables_keys = {
347         .lkl_maxattr                    = LNET_NET_KFILND_TUNABLES_ATTR_MAX,
348         .lkl_list                       = {
349                 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR]      = {
350                         .lkp_value      = "prov_major_version",
351                         .lkp_data_type  = NLA_S32
352                 },
353                 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR]  = {
354                         .lkp_value      = "prov_minor_version",
355                         .lkp_data_type  = NLA_S32
356                 },
357                 [LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY]  = {
358                         .lkp_value      = "auth_key",
359                         .lkp_data_type  = NLA_S32
360                 },
361         },
362 };
363
364 static int
365 kfilnd_nl_set(int cmd, struct nlattr *attr, int type, void *data)
366 {
367         struct lnet_lnd_tunables *tunables = data;
368         int rc = 0;
369
370         if (cmd != LNET_CMD_NETS)
371                 return -EOPNOTSUPP;
372
373         switch (type) {
374         case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR:
375                 tunables->lnd_tun_u.lnd_kfi.lnd_prov_major_version = nla_get_s64(attr);
376                 break;
377         case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR:
378                 tunables->lnd_tun_u.lnd_kfi.lnd_prov_minor_version = nla_get_s64(attr);
379                 break;
380         case LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY:
381                 tunables->lnd_tun_u.lnd_kfi.lnd_auth_key = nla_get_s64(attr);
382                 break;
383         default:
384                 rc = -EINVAL;
385                 break;
386         }
387
388         return rc;
389 }
390
391 static int kfilnd_startup(struct lnet_ni *ni);
392
393 static const struct lnet_lnd the_kfilnd = {
394         .lnd_type       = KFILND,
395         .lnd_startup    = kfilnd_startup,
396         .lnd_shutdown   = kfilnd_shutdown,
397         .lnd_send       = kfilnd_send,
398         .lnd_recv       = kfilnd_recv,
399         .lnd_nl_set     = kfilnd_nl_set,
400         .lnd_keys       = &kfilnd_tunables_keys,
401 };
402
403 static int kfilnd_startup(struct lnet_ni *ni)
404 {
405         const char *node;
406         int rc;
407         struct kfilnd_dev *kfdev;
408
409         if (!ni)
410                 return -EINVAL;
411
412         if (ni->ni_net->net_lnd != &the_kfilnd) {
413                 CERROR("Wrong lnd type\n");
414                 return -EINVAL;
415         }
416
417         kfilnd_tunables_setup(ni);
418
419         /* Only a single interface is supported. */
420         if (!ni->ni_interface) {
421                 rc = -ENODEV;
422                 CERROR("No LNet network interface address defined\n");
423                 goto err;
424         }
425
426         node = ni->ni_interface;
427
428         kfdev = kfilnd_dev_alloc(ni, node);
429         if (IS_ERR(kfdev)) {
430                 rc = PTR_ERR(kfdev);
431                 CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
432                        rc);
433                 goto err;
434         }
435
436         ni->ni_data = kfdev;
437         ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
438
439         /* Post a series of immediate receive buffers */
440         rc = kfilnd_dev_post_imm_buffers(kfdev);
441         if (rc) {
442                 CERROR("Can't post buffers, rc = %d\n", rc);
443                 goto err_free_dev;
444         }
445
446         return 0;
447
448 err_free_dev:
449         kfilnd_dev_free(kfdev);
450 err:
451         return rc;
452 }
453
454 static void __exit kfilnd_exit(void)
455 {
456         destroy_workqueue(kfilnd_wq);
457
458         kfilnd_tn_cleanup();
459
460         lnet_unregister_lnd(&the_kfilnd);
461
462         debugfs_remove_recursive(kfilnd_debug_dir);
463 }
464
465 static int __init kfilnd_init(void)
466 {
467         int rc;
468
469         kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
470
471         rc = kfilnd_tunables_init();
472         if (rc)
473                 goto err;
474
475         /* Do any initialization of the transaction system */
476         rc = kfilnd_tn_init();
477         if (rc) {
478                 CERROR("Cannot initialize transaction system\n");
479                 goto err;
480         }
481
482         kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
483                                     WQ_MAX_ACTIVE);
484         if (!kfilnd_wq) {
485                 rc = -ENOMEM;
486                 CERROR("Failed to allocated kfilnd work queue\n");
487                 goto err_tn_cleanup;
488         }
489
490         lnet_register_lnd(&the_kfilnd);
491
492         return 0;
493
494 err_tn_cleanup:
495         kfilnd_tn_cleanup();
496 err:
497         return rc;
498 }
499
500 MODULE_AUTHOR("Cray Inc.");
501 MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
502 MODULE_VERSION(KFILND_VERSION);
503 MODULE_LICENSE("GPL");
504
505 module_init(kfilnd_init);
506 module_exit(kfilnd_exit);