Whamcloud - gitweb
e4efdd4acf6c8e1d18de626b66f54a9d0dc651f9
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd main interface.
30  */
31
32 #include <linux/delay.h>
33 #include "kfilnd.h"
34 #include "kfilnd_tn.h"
35 #include "kfilnd_dev.h"
36
37 /* These are temp constants to get stuff to compile */
38 #define KFILND_DEFAULT_DEVICE "eth0"
39
40 /* Some constants which should be turned into tunables */
41 #define KFILND_MAX_WORKER_THREADS 4
42 #define KFILND_MAX_EVENT_QUEUE 100
43
44 #define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
45 struct workqueue_struct *kfilnd_wq;
46 struct dentry *kfilnd_debug_dir;
47
48 static void kfilnd_shutdown(struct lnet_ni *ni)
49 {
50         struct kfilnd_dev *dev = ni->ni_data;
51
52         kfilnd_dev_free(dev);
53 }
54
55 static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
56 {
57         int cpt;
58
59         /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
60         cpt = lnet_cpt_current();
61         if (dev->cpt_to_endpoint[cpt])
62                 return cpt;
63
64         /* Hash to a LNet NI CPT based on target NID. */
65         return  dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
66 }
67
68 int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt,
69                               struct kfilnd_peer *kp)
70 {
71         struct kfilnd_transaction *tn;
72         int rc;
73
74         if (kfilnd_peer_set_check_hello_pending(kp)) {
75                 CDEBUG(D_NET, "Hello already pending to peer %s(%px)\n",
76                        libcfs_nid2str(kp->kp_nid), kp);
77                 return 0;
78         }
79
80         tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, true, true, false);
81         if (IS_ERR(tn)) {
82                 rc = PTR_ERR(tn);
83                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
84                 kfilnd_peer_clear_hello_pending(kp);
85                 return rc;
86         }
87
88         tn->msg_type = KFILND_MSG_HELLO_REQ;
89
90         kp->kp_hello_ts = ktime_get_seconds();
91
92         kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
93
94         return 0;
95 }
96
97 static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
98 {
99         int type = msg->msg_type;
100         struct lnet_processid *target = &msg->msg_target;
101         struct kfilnd_transaction *tn;
102         int nob;
103         struct kfilnd_dev *dev = ni->ni_data;
104         enum kfilnd_msg_type lnd_msg_type;
105         int cpt;
106         enum tn_events event = TN_EVENT_INVALID;
107         int rc;
108         bool tn_key = false;
109         lnet_nid_t tgt_nid4;
110
111         switch (type) {
112         default:
113                 return -EIO;
114
115         case LNET_MSG_ACK:
116                 if (msg->msg_len != 0)
117                         return -EINVAL;
118                 lnd_msg_type = KFILND_MSG_IMMEDIATE;
119                 break;
120
121         case LNET_MSG_GET:
122                 if (msg->msg_routing || msg->msg_target_is_router) {
123                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
124                         break;
125                 }
126
127                 nob = offsetof(struct kfilnd_msg,
128                                proto.immed.payload[msg->msg_md->md_length]);
129                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
130                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
131                         break;
132                 }
133
134                 lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
135                 tn_key = true;
136                 break;
137
138         case LNET_MSG_REPLY:
139         case LNET_MSG_PUT:
140                 nob = offsetof(struct kfilnd_msg,
141                                proto.immed.payload[msg->msg_len]);
142                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
143                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
144                         break;
145                 }
146
147                 lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
148                 tn_key = true;
149                 break;
150         }
151
152         tgt_nid4 = lnet_nid_to_nid4(&target->nid);
153
154         cpt = kfilnd_send_cpt(dev, tgt_nid4);
155         tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
156         if (IS_ERR(tn)) {
157                 rc = PTR_ERR(tn);
158                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
159                 return rc;
160         }
161
162         if (kfilnd_peer_needs_hello(tn->tn_kp)) {
163                 rc = kfilnd_send_hello_request(dev, cpt, tn->tn_kp);
164                 if (rc && kfilnd_peer_is_new_peer(tn->tn_kp)) {
165                         /* Only fail the send if this is a new peer. Otherwise
166                          * attempt the send using our stale peer information
167                          */
168                         kfilnd_tn_free(tn);
169                         return rc;
170                 }
171         }
172
173         switch (lnd_msg_type) {
174         case KFILND_MSG_IMMEDIATE:
175                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
176                                             msg->msg_offset, msg->msg_len);
177                 if (rc) {
178                         CERROR("Failed to setup immediate buffer rc %d\n", rc);
179                         kfilnd_tn_free(tn);
180                         return rc;
181                 }
182
183                 event = TN_EVENT_INIT_IMMEDIATE;
184                 break;
185
186         case KFILND_MSG_BULK_PUT_REQ:
187                 tn->sink_buffer = false;
188                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
189                                             msg->msg_offset, msg->msg_len);
190                 if (rc) {
191                         CERROR("Failed to setup PUT source buffer rc %d\n", rc);
192                         kfilnd_tn_free(tn);
193                         return rc;
194                 }
195
196                 event = TN_EVENT_INIT_BULK;
197                 break;
198
199         case KFILND_MSG_BULK_GET_REQ:
200                 /* We need to create a reply message to inform LNet our
201                  * optimized GET is done.
202                  */
203                 tn->tn_getreply = lnet_create_reply_msg(ni, msg);
204                 if (!tn->tn_getreply) {
205                         CERROR("Can't create reply for GET -> %s\n",
206                                libcfs_nidstr(&target->nid));
207                         kfilnd_tn_free(tn);
208                         return -EIO;
209                 }
210
211                 tn->sink_buffer = true;
212                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
213                                             msg->msg_md->md_niov,
214                                             msg->msg_md->md_offset,
215                                             msg->msg_md->md_length);
216                 if (rc) {
217                         CERROR("Failed to setup GET sink buffer rc %d\n", rc);
218                         kfilnd_tn_free(tn);
219                         return rc;
220                 }
221                 event = TN_EVENT_INIT_BULK;
222                 break;
223
224         default:
225                 kfilnd_tn_free(tn);
226                 return -EIO;
227         }
228
229         tn->msg_type = lnd_msg_type;
230         tn->tn_lntmsg = msg;    /* finalise msg on completion */
231         tn->lnet_msg_len = tn->tn_nob;
232
233         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
234                         msg_type_to_str(lnd_msg_type), tn->tn_nob,
235                         tn->tn_num_iovec);
236
237         /* Start the state machine processing this transaction */
238         kfilnd_tn_event_handler(tn, event, 0);
239
240         return 0;
241 }
242
243 static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
244                        int delayed, unsigned int niov,
245                        struct bio_vec *kiov,
246                        unsigned int offset, unsigned int mlen,
247                        unsigned int rlen)
248 {
249         struct kfilnd_transaction *tn = private;
250         struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
251         int nob;
252         int rc = 0;
253         int status = 0;
254         enum tn_events event;
255
256         if (mlen > rlen)
257                 return -EINVAL;
258
259         /* Transaction must be in receive state */
260         if (tn->tn_state != TN_STATE_IMM_RECV)
261                 return -EINVAL;
262
263         tn->tn_lntmsg = msg;
264         tn->lnet_msg_len = rlen;
265
266         switch (rxmsg->type) {
267         case KFILND_MSG_IMMEDIATE:
268                 nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
269                 if (nob > tn->tn_rx_msg.length) {
270                         CERROR("Immediate message from %s too big: %d(%lu)\n",
271                                libcfs_nidstr(&msg->msg_hdr.src_nid),
272                                nob, tn->tn_rx_msg.length);
273                         return -EPROTO;
274                 }
275                 tn->tn_nob = nob;
276
277                 lnet_copy_flat2kiov(niov, kiov, offset,
278                                     KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
279                                     offsetof(struct kfilnd_msg,
280                                              proto.immed.payload),
281                                     mlen);
282
283                 kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
284                 return 0;
285
286         case KFILND_MSG_BULK_PUT_REQ:
287                 if (mlen == 0) {
288                         event = TN_EVENT_SKIP_TAG_RMA;
289                 } else {
290                         /* Post the buffer given us as a sink  */
291                         tn->sink_buffer = true;
292                         rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
293                                                     mlen);
294                         if (rc) {
295                                 CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
296                                 kfilnd_tn_free(tn);
297                                 return rc;
298                         }
299                         event = TN_EVENT_INIT_TAG_RMA;
300                 }
301                 break;
302
303         case KFILND_MSG_BULK_GET_REQ:
304                 if (!msg) {
305                         event = TN_EVENT_SKIP_TAG_RMA;
306                         status = -ENODATA;
307                 } else {
308                         /* Post the buffer given to us as a source  */
309                         tn->sink_buffer = false;
310                         rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
311                                                     msg->msg_niov,
312                                                     msg->msg_offset,
313                                                     msg->msg_len);
314                         if (rc) {
315                                 CERROR("Failed to setup GET source buffer rc %d\n", rc);
316                                 kfilnd_tn_free(tn);
317                                 return rc;
318                         }
319                         event = TN_EVENT_INIT_TAG_RMA;
320                 }
321                 break;
322
323         default:
324                 /* TODO: TN leaks here. */
325                 CERROR("Invalid message type = %d\n", rxmsg->type);
326                 return -EINVAL;
327         }
328
329         /* Store relevant fields to generate a bulk response. */
330         tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
331         tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
332
333 #if 0
334         tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
335                                                  KFILND_MSG_BULK_RSP,
336                                                  sizeof(struct kfilnd_bulk_rsp),
337                                                  ni);
338 #endif
339
340         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
341                         msg_type_to_str(rxmsg->type), tn->tn_nob,
342                         tn->tn_num_iovec);
343
344         kfilnd_tn_event_handler(tn, event, status);
345
346         return rc;
347 }
348
349 static const struct ln_key_list kfilnd_tunables_keys = {
350         .lkl_maxattr                    = LNET_NET_KFILND_TUNABLES_ATTR_MAX,
351         .lkl_list                       = {
352                 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR]      = {
353                         .lkp_value      = "prov_major_version",
354                         .lkp_data_type  = NLA_S32
355                 },
356                 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR]  = {
357                         .lkp_value      = "prov_minor_version",
358                         .lkp_data_type  = NLA_S32
359                 },
360                 [LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY]  = {
361                         .lkp_value      = "auth_key",
362                         .lkp_data_type  = NLA_S32
363                 },
364         },
365 };
366
367 static int
368 kfilnd_nl_set(int cmd, struct nlattr *attr, int type, void *data)
369 {
370         struct lnet_lnd_tunables *tunables = data;
371         int rc = 0;
372
373         if (cmd != LNET_CMD_NETS)
374                 return -EOPNOTSUPP;
375
376         switch (type) {
377         case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR:
378                 tunables->lnd_tun_u.lnd_kfi.lnd_prov_major_version = nla_get_s64(attr);
379                 break;
380         case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR:
381                 tunables->lnd_tun_u.lnd_kfi.lnd_prov_minor_version = nla_get_s64(attr);
382                 break;
383         case LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY:
384                 tunables->lnd_tun_u.lnd_kfi.lnd_auth_key = nla_get_s64(attr);
385                 break;
386         default:
387                 rc = -EINVAL;
388                 break;
389         }
390
391         return rc;
392 }
393
394 static int kfilnd_startup(struct lnet_ni *ni);
395
396 static const struct lnet_lnd the_kfilnd = {
397         .lnd_type       = KFILND,
398         .lnd_startup    = kfilnd_startup,
399         .lnd_shutdown   = kfilnd_shutdown,
400         .lnd_send       = kfilnd_send,
401         .lnd_recv       = kfilnd_recv,
402         .lnd_nl_set     = kfilnd_nl_set,
403         .lnd_keys       = &kfilnd_tunables_keys,
404 };
405
406 static int kfilnd_startup(struct lnet_ni *ni)
407 {
408         const char *node;
409         int rc;
410         struct kfilnd_dev *kfdev;
411
412         if (!ni)
413                 return -EINVAL;
414
415         if (ni->ni_net->net_lnd != &the_kfilnd) {
416                 CERROR("Wrong lnd type\n");
417                 return -EINVAL;
418         }
419
420         kfilnd_tunables_setup(ni);
421
422         /* Only a single interface is supported. */
423         if (!ni->ni_interface) {
424                 rc = -ENODEV;
425                 CERROR("No LNet network interface address defined\n");
426                 goto err;
427         }
428
429         node = ni->ni_interface;
430
431         kfdev = kfilnd_dev_alloc(ni, node);
432         if (IS_ERR(kfdev)) {
433                 rc = PTR_ERR(kfdev);
434                 CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
435                        rc);
436                 goto err;
437         }
438
439         ni->ni_data = kfdev;
440         ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
441
442         /* Post a series of immediate receive buffers */
443         rc = kfilnd_dev_post_imm_buffers(kfdev);
444         if (rc) {
445                 CERROR("Can't post buffers, rc = %d\n", rc);
446                 goto err_free_dev;
447         }
448
449         return 0;
450
451 err_free_dev:
452         kfilnd_dev_free(kfdev);
453 err:
454         return rc;
455 }
456
457 static void __exit kfilnd_exit(void)
458 {
459         destroy_workqueue(kfilnd_wq);
460
461         kfilnd_tn_cleanup();
462
463         lnet_unregister_lnd(&the_kfilnd);
464
465         debugfs_remove_recursive(kfilnd_debug_dir);
466 }
467
468 static int __init kfilnd_init(void)
469 {
470         int rc;
471
472         kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
473
474         rc = kfilnd_tunables_init();
475         if (rc)
476                 goto err;
477
478         /* Do any initialization of the transaction system */
479         rc = kfilnd_tn_init();
480         if (rc) {
481                 CERROR("Cannot initialize transaction system\n");
482                 goto err;
483         }
484
485         kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
486                                     WQ_MAX_ACTIVE);
487         if (!kfilnd_wq) {
488                 rc = -ENOMEM;
489                 CERROR("Failed to allocated kfilnd work queue\n");
490                 goto err_tn_cleanup;
491         }
492
493         lnet_register_lnd(&the_kfilnd);
494
495         return 0;
496
497 err_tn_cleanup:
498         kfilnd_tn_cleanup();
499 err:
500         return rc;
501 }
502
503 MODULE_AUTHOR("Cray Inc.");
504 MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
505 MODULE_VERSION(KFILND_VERSION);
506 MODULE_LICENSE("GPL");
507
508 module_init(kfilnd_init);
509 module_exit(kfilnd_exit);