Whamcloud - gitweb
5213a6596b012a321c8f74450db689504b23da20
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd main interface.
30  */
31
32 #include <linux/delay.h>
33 #include "kfilnd.h"
34 #include "kfilnd_tn.h"
35 #include "kfilnd_dev.h"
36
37 /* These are temp constants to get stuff to compile */
38 #define KFILND_DEFAULT_DEVICE "eth0"
39
40 /* Some constants which should be turned into tunables */
41 #define KFILND_MAX_WORKER_THREADS 4
42 #define KFILND_MAX_EVENT_QUEUE 100
43
44 #define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
45 struct workqueue_struct *kfilnd_wq;
46 struct dentry *kfilnd_debug_dir;
47
48 static void kfilnd_shutdown(struct lnet_ni *ni)
49 {
50         struct kfilnd_dev *dev = ni->ni_data;
51
52         kfilnd_dev_free(dev);
53 }
54
55 static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
56 {
57         int cpt;
58
59         /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
60         cpt = lnet_cpt_current();
61         if (dev->cpt_to_endpoint[cpt])
62                 return cpt;
63
64         /* Hash to a LNet NI CPT based on target NID. */
65         return  dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
66 }
67
68 int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt, lnet_nid_t nid)
69 {
70         struct kfilnd_transaction *tn;
71         int rc;
72
73         tn = kfilnd_tn_alloc(dev, cpt, nid, true, true, false);
74         if (IS_ERR(tn)) {
75                 rc = PTR_ERR(tn);
76                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
77                 return rc;
78         }
79
80         kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
81
82         return 0;
83 }
84
85 static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
86 {
87         int type = msg->msg_type;
88         struct lnet_processid *target = &msg->msg_target;
89         struct kfilnd_transaction *tn;
90         int nob;
91         struct kfilnd_dev *dev = ni->ni_data;
92         enum kfilnd_msg_type lnd_msg_type;
93         int cpt;
94         enum tn_events event = TN_EVENT_INVALID;
95         int rc;
96         bool tn_key = false;
97         lnet_nid_t tgt_nid4;
98
99         switch (type) {
100         default:
101                 return -EIO;
102
103         case LNET_MSG_ACK:
104                 if (msg->msg_len != 0)
105                         return -EINVAL;
106                 lnd_msg_type = KFILND_MSG_IMMEDIATE;
107                 break;
108
109         case LNET_MSG_GET:
110                 nob = offsetof(struct kfilnd_msg,
111                                proto.immed.payload[msg->msg_md->md_length]);
112                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
113                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
114                         break;
115                 }
116
117                 lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
118                 tn_key = true;
119                 break;
120
121         case LNET_MSG_REPLY:
122         case LNET_MSG_PUT:
123                 nob = offsetof(struct kfilnd_msg,
124                                proto.immed.payload[msg->msg_len]);
125                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
126                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
127                         break;
128                 }
129
130                 lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
131                 tn_key = true;
132                 break;
133         }
134
135         tgt_nid4 = lnet_nid_to_nid4(&target->nid);
136
137         cpt = kfilnd_send_cpt(dev, tgt_nid4);
138         tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
139         if (IS_ERR(tn)) {
140                 rc = PTR_ERR(tn);
141                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
142                 return rc;
143         }
144
145         /* Need to fire off special transaction if this is a new peer. */
146         if (kfilnd_peer_is_new_peer(tn->peer)) {
147                 rc = kfilnd_send_hello_request(dev, cpt, tgt_nid4);
148                 if (rc) {
149                         kfilnd_tn_free(tn);
150                         return 0;
151                 }
152         }
153
154         switch (lnd_msg_type) {
155         case KFILND_MSG_IMMEDIATE:
156                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
157                                             msg->msg_offset, msg->msg_len);
158                 if (rc) {
159                         CERROR("Failed to setup immediate buffer rc %d\n", rc);
160                         kfilnd_tn_free(tn);
161                         return rc;
162                 }
163
164                 event = TN_EVENT_INIT_IMMEDIATE;
165                 break;
166
167         case KFILND_MSG_BULK_PUT_REQ:
168                 tn->sink_buffer = false;
169                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
170                                             msg->msg_offset, msg->msg_len);
171                 if (rc) {
172                         CERROR("Failed to setup PUT source buffer rc %d\n", rc);
173                         kfilnd_tn_free(tn);
174                         return rc;
175                 }
176
177                 event = TN_EVENT_INIT_BULK;
178                 break;
179
180         case KFILND_MSG_BULK_GET_REQ:
181                 /* We need to create a reply message to inform LNet our
182                  * optimized GET is done.
183                  */
184                 tn->tn_getreply = lnet_create_reply_msg(ni, msg);
185                 if (!tn->tn_getreply) {
186                         CERROR("Can't create reply for GET -> %s\n",
187                                libcfs_nidstr(&target->nid));
188                         kfilnd_tn_free(tn);
189                         return -EIO;
190                 }
191
192                 tn->sink_buffer = true;
193                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
194                                             msg->msg_md->md_niov,
195                                             msg->msg_md->md_offset,
196                                             msg->msg_md->md_length);
197                 if (rc) {
198                         CERROR("Failed to setup GET sink buffer rc %d\n", rc);
199                         kfilnd_tn_free(tn);
200                         return rc;
201                 }
202                 event = TN_EVENT_INIT_BULK;
203                 break;
204
205         default:
206                 kfilnd_tn_free(tn);
207                 return -EIO;
208         }
209
210         tn->msg_type = lnd_msg_type;
211         tn->tn_lntmsg = msg;    /* finalise msg on completion */
212         tn->lnet_msg_len = tn->tn_nob;
213
214         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
215                         msg_type_to_str(lnd_msg_type), tn->tn_nob,
216                         tn->tn_num_iovec);
217
218         /* Start the state machine processing this transaction */
219         kfilnd_tn_event_handler(tn, event, 0);
220
221         return 0;
222 }
223
224 static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
225                        int delayed, unsigned int niov,
226                        struct bio_vec *kiov,
227                        unsigned int offset, unsigned int mlen,
228                        unsigned int rlen)
229 {
230         struct kfilnd_transaction *tn = private;
231         struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
232         int nob;
233         int rc = 0;
234         int status = 0;
235         enum tn_events event;
236
237         if (mlen > rlen)
238                 return -EINVAL;
239
240         /* Transaction must be in receive state */
241         if (tn->tn_state != TN_STATE_IMM_RECV)
242                 return -EINVAL;
243
244         tn->tn_lntmsg = msg;
245         tn->lnet_msg_len = rlen;
246
247         switch (rxmsg->type) {
248         case KFILND_MSG_IMMEDIATE:
249                 nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
250                 if (nob > tn->tn_rx_msg.length) {
251                         CERROR("Immediate message from %s too big: %d(%lu)\n",
252                                libcfs_nidstr(&msg->msg_hdr.src_nid),
253                                nob, tn->tn_rx_msg.length);
254                         return -EPROTO;
255                 }
256                 tn->tn_nob = nob;
257
258                 lnet_copy_flat2kiov(niov, kiov, offset,
259                                     KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
260                                     offsetof(struct kfilnd_msg,
261                                              proto.immed.payload),
262                                     mlen);
263
264                 kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
265                 return 0;
266
267         case KFILND_MSG_BULK_PUT_REQ:
268                 if (mlen == 0) {
269                         event = TN_EVENT_SKIP_TAG_RMA;
270                 } else {
271                         /* Post the buffer given us as a sink  */
272                         tn->sink_buffer = true;
273                         rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
274                                                     mlen);
275                         if (rc) {
276                                 CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
277                                 kfilnd_tn_free(tn);
278                                 return rc;
279                         }
280                         event = TN_EVENT_INIT_TAG_RMA;
281                 }
282                 break;
283
284         case KFILND_MSG_BULK_GET_REQ:
285                 if (!msg) {
286                         event = TN_EVENT_SKIP_TAG_RMA;
287                         status = -ENODATA;
288                 } else {
289                         /* Post the buffer given to us as a source  */
290                         tn->sink_buffer = false;
291                         rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
292                                                     msg->msg_niov,
293                                                     msg->msg_offset,
294                                                     msg->msg_len);
295                         if (rc) {
296                                 CERROR("Failed to setup GET source buffer rc %d\n", rc);
297                                 kfilnd_tn_free(tn);
298                                 return rc;
299                         }
300                         event = TN_EVENT_INIT_TAG_RMA;
301                 }
302                 break;
303
304         default:
305                 /* TODO: TN leaks here. */
306                 CERROR("Invalid message type = %d\n", rxmsg->type);
307                 return -EINVAL;
308         }
309
310         /* Store relevant fields to generate a bulk response. */
311         tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
312         tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
313
314 #if 0
315         tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
316                                                  KFILND_MSG_BULK_RSP,
317                                                  sizeof(struct kfilnd_bulk_rsp),
318                                                  ni);
319 #endif
320
321         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
322                         msg_type_to_str(rxmsg->type), tn->tn_nob,
323                         tn->tn_num_iovec);
324
325         kfilnd_tn_event_handler(tn, event, status);
326
327         return rc;
328 }
329
330 static const struct ln_key_list kfilnd_tunables_keys = {
331         .lkl_maxattr                    = LNET_NET_KFILND_TUNABLES_ATTR_MAX,
332         .lkl_list                       = {
333                 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR]      = {
334                         .lkp_value      = "prov_major_version",
335                         .lkp_data_type  = NLA_S32
336                 },
337                 [LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR]  = {
338                         .lkp_value      = "prov_minor_version",
339                         .lkp_data_type  = NLA_S32
340                 },
341                 [LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY]  = {
342                         .lkp_value      = "auth_key",
343                         .lkp_data_type  = NLA_S32
344                 },
345         },
346 };
347
348 static int
349 kfilnd_nl_set(int cmd, struct nlattr *attr, int type, void *data)
350 {
351         struct lnet_lnd_tunables *tunables = data;
352         int rc = 0;
353
354         if (cmd != LNET_CMD_NETS)
355                 return -EOPNOTSUPP;
356
357         switch (type) {
358         case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MAJOR:
359                 tunables->lnd_tun_u.lnd_kfi.lnd_prov_major_version = nla_get_s64(attr);
360                 break;
361         case LNET_NET_KFILND_TUNABLES_ATTR_PROV_MINOR:
362                 tunables->lnd_tun_u.lnd_kfi.lnd_prov_minor_version = nla_get_s64(attr);
363                 break;
364         case LNET_NET_KFILND_TUNABLES_ATTR_AUTH_KEY:
365                 tunables->lnd_tun_u.lnd_kfi.lnd_auth_key = nla_get_s64(attr);
366                 break;
367         default:
368                 rc = -EINVAL;
369                 break;
370         }
371
372         return rc;
373 }
374
375 static int kfilnd_startup(struct lnet_ni *ni);
376
377 static const struct lnet_lnd the_kfilnd = {
378         .lnd_type       = KFILND,
379         .lnd_startup    = kfilnd_startup,
380         .lnd_shutdown   = kfilnd_shutdown,
381         .lnd_send       = kfilnd_send,
382         .lnd_recv       = kfilnd_recv,
383         .lnd_nl_set     = kfilnd_nl_set,
384         .lnd_keys       = &kfilnd_tunables_keys,
385 };
386
387 static int kfilnd_startup(struct lnet_ni *ni)
388 {
389         const char *node;
390         int rc;
391         struct kfilnd_dev *kfdev;
392
393         if (!ni)
394                 return -EINVAL;
395
396         if (ni->ni_net->net_lnd != &the_kfilnd) {
397                 CERROR("Wrong lnd type\n");
398                 return -EINVAL;
399         }
400
401         kfilnd_tunables_setup(ni);
402
403         /* Only a single interface is supported. */
404         if (!ni->ni_interface) {
405                 rc = -ENODEV;
406                 CERROR("No LNet network interface address defined\n");
407                 goto err;
408         }
409
410         node = ni->ni_interface;
411
412         kfdev = kfilnd_dev_alloc(ni, node);
413         if (IS_ERR(kfdev)) {
414                 rc = PTR_ERR(kfdev);
415                 CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
416                        rc);
417                 goto err;
418         }
419
420         ni->ni_data = kfdev;
421         ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
422
423         /* Post a series of immediate receive buffers */
424         rc = kfilnd_dev_post_imm_buffers(kfdev);
425         if (rc) {
426                 CERROR("Can't post buffers, rc = %d\n", rc);
427                 goto err_free_dev;
428         }
429
430         return 0;
431
432 err_free_dev:
433         kfilnd_dev_free(kfdev);
434 err:
435         return rc;
436 }
437
438 static void __exit kfilnd_exit(void)
439 {
440         destroy_workqueue(kfilnd_wq);
441
442         kfilnd_tn_cleanup();
443
444         lnet_unregister_lnd(&the_kfilnd);
445
446         debugfs_remove_recursive(kfilnd_debug_dir);
447 }
448
449 static int __init kfilnd_init(void)
450 {
451         int rc;
452
453         kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
454
455         rc = kfilnd_tunables_init();
456         if (rc)
457                 goto err;
458
459         /* Do any initialization of the transaction system */
460         rc = kfilnd_tn_init();
461         if (rc) {
462                 CERROR("Cannot initialize transaction system\n");
463                 goto err;
464         }
465
466         kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
467                                     WQ_MAX_ACTIVE);
468         if (!kfilnd_wq) {
469                 rc = -ENOMEM;
470                 CERROR("Failed to allocated kfilnd work queue\n");
471                 goto err_tn_cleanup;
472         }
473
474         lnet_register_lnd(&the_kfilnd);
475
476         return 0;
477
478 err_tn_cleanup:
479         kfilnd_tn_cleanup();
480 err:
481         return rc;
482 }
483
484 MODULE_AUTHOR("Cray Inc.");
485 MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
486 MODULE_VERSION(KFILND_VERSION);
487 MODULE_LICENSE("GPL");
488
489 module_init(kfilnd_init);
490 module_exit(kfilnd_exit);