Whamcloud - gitweb
LU-16035 kfilnd: Initial kfilnd implementation
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd main interface.
30  */
31
32 #include <linux/delay.h>
33 #include "kfilnd.h"
34 #include "kfilnd_tn.h"
35 #include "kfilnd_dev.h"
36
37 /* These are temp constants to get stuff to compile */
38 #define KFILND_DEFAULT_DEVICE "eth0"
39
40 /* Some constants which should be turned into tunables */
41 #define KFILND_MAX_WORKER_THREADS 4
42 #define KFILND_MAX_EVENT_QUEUE 100
43
44 #define KFILND_WQ_FLAGS (WQ_MEM_RECLAIM | WQ_HIGHPRI | WQ_SYSFS)
45 struct workqueue_struct *kfilnd_wq;
46 struct dentry *kfilnd_debug_dir;
47
48 static void kfilnd_shutdown(struct lnet_ni *ni)
49 {
50         struct kfilnd_dev *dev = ni->ni_data;
51
52         kfilnd_dev_free(dev);
53 }
54
55 static int kfilnd_send_cpt(struct kfilnd_dev *dev, lnet_nid_t nid)
56 {
57         int cpt;
58
59         /* If the current CPT has is within the LNet NI CPTs, use that CPT. */
60         cpt = lnet_cpt_current();
61         if (dev->cpt_to_endpoint[cpt])
62                 return cpt;
63
64         /* Hash to a LNet NI CPT based on target NID. */
65         return  dev->kfd_endpoints[nid % dev->kfd_ni->ni_ncpts]->end_cpt;
66 }
67
68 int kfilnd_send_hello_request(struct kfilnd_dev *dev, int cpt, lnet_nid_t nid)
69 {
70         struct kfilnd_transaction *tn;
71         int rc;
72
73         tn = kfilnd_tn_alloc(dev, cpt, nid, true, true, false);
74         if (IS_ERR(tn)) {
75                 rc = PTR_ERR(tn);
76                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
77                 return rc;
78         }
79
80         kfilnd_tn_event_handler(tn, TN_EVENT_TX_HELLO, 0);
81
82         return 0;
83 }
84
85 static int kfilnd_send(struct lnet_ni *ni, void *private, struct lnet_msg *msg)
86 {
87         int type = msg->msg_type;
88         struct lnet_processid *target = &msg->msg_target;
89         struct kfilnd_transaction *tn;
90         int nob;
91         struct kfilnd_dev *dev = ni->ni_data;
92         enum kfilnd_msg_type lnd_msg_type;
93         int cpt;
94         enum tn_events event = TN_EVENT_INVALID;
95         int rc;
96         bool tn_key = false;
97         lnet_nid_t tgt_nid4;
98
99         switch (type) {
100         default:
101                 return -EIO;
102
103         case LNET_MSG_ACK:
104                 if (msg->msg_len != 0)
105                         return -EINVAL;
106                 lnd_msg_type = KFILND_MSG_IMMEDIATE;
107                 break;
108
109         case LNET_MSG_GET:
110                 nob = offsetof(struct kfilnd_msg,
111                                proto.immed.payload[msg->msg_md->md_length]);
112                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
113                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
114                         break;
115                 }
116
117                 lnd_msg_type = KFILND_MSG_BULK_GET_REQ;
118                 tn_key = true;
119                 break;
120
121         case LNET_MSG_REPLY:
122         case LNET_MSG_PUT:
123                 nob = offsetof(struct kfilnd_msg,
124                                proto.immed.payload[msg->msg_len]);
125                 if (nob <= KFILND_IMMEDIATE_MSG_SIZE) {
126                         lnd_msg_type = KFILND_MSG_IMMEDIATE;
127                         break;
128                 }
129
130                 lnd_msg_type = KFILND_MSG_BULK_PUT_REQ;
131                 tn_key = true;
132                 break;
133         }
134
135         tgt_nid4 = lnet_nid_to_nid4(&target->nid);
136
137         cpt = kfilnd_send_cpt(dev, tgt_nid4);
138         tn = kfilnd_tn_alloc(dev, cpt, tgt_nid4, true, true, tn_key);
139         if (IS_ERR(tn)) {
140                 rc = PTR_ERR(tn);
141                 CERROR("Failed to allocate transaction struct: rc=%d\n", rc);
142                 return rc;
143         }
144
145         /* Need to fire off special transaction if this is a new peer. */
146         if (kfilnd_peer_is_new_peer(tn->peer)) {
147                 rc = kfilnd_send_hello_request(dev, cpt, tgt_nid4);
148                 if (rc) {
149                         kfilnd_tn_free(tn);
150                         return 0;
151                 }
152         }
153
154         switch (lnd_msg_type) {
155         case KFILND_MSG_IMMEDIATE:
156                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
157                                             msg->msg_offset, msg->msg_len);
158                 if (rc) {
159                         CERROR("Failed to setup immediate buffer rc %d\n", rc);
160                         kfilnd_tn_free(tn);
161                         return rc;
162                 }
163
164                 event = TN_EVENT_INIT_IMMEDIATE;
165                 break;
166
167         case KFILND_MSG_BULK_PUT_REQ:
168                 tn->sink_buffer = false;
169                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov, msg->msg_niov,
170                                             msg->msg_offset, msg->msg_len);
171                 if (rc) {
172                         CERROR("Failed to setup PUT source buffer rc %d\n", rc);
173                         kfilnd_tn_free(tn);
174                         return rc;
175                 }
176
177                 event = TN_EVENT_INIT_BULK;
178                 break;
179
180         case KFILND_MSG_BULK_GET_REQ:
181                 /* We need to create a reply message to inform LNet our
182                  * optimized GET is done.
183                  */
184                 tn->tn_getreply = lnet_create_reply_msg(ni, msg);
185                 if (!tn->tn_getreply) {
186                         CERROR("Can't create reply for GET -> %s\n",
187                                libcfs_nidstr(&target->nid));
188                         kfilnd_tn_free(tn);
189                         return -EIO;
190                 }
191
192                 tn->sink_buffer = true;
193                 rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_md->md_kiov,
194                                             msg->msg_md->md_niov,
195                                             msg->msg_md->md_offset,
196                                             msg->msg_md->md_length);
197                 if (rc) {
198                         CERROR("Failed to setup GET sink buffer rc %d\n", rc);
199                         kfilnd_tn_free(tn);
200                         return rc;
201                 }
202                 event = TN_EVENT_INIT_BULK;
203                 break;
204
205         default:
206                 kfilnd_tn_free(tn);
207                 return -EIO;
208         }
209
210         tn->msg_type = lnd_msg_type;
211         tn->tn_lntmsg = msg;    /* finalise msg on completion */
212         tn->lnet_msg_len = tn->tn_nob;
213
214         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
215                         msg_type_to_str(lnd_msg_type), tn->tn_nob,
216                         tn->tn_num_iovec);
217
218         /* Start the state machine processing this transaction */
219         kfilnd_tn_event_handler(tn, event, 0);
220
221         return 0;
222 }
223
224 static int kfilnd_recv(struct lnet_ni *ni, void *private, struct lnet_msg *msg,
225                        int delayed, unsigned int niov,
226                        struct bio_vec *kiov,
227                        unsigned int offset, unsigned int mlen,
228                        unsigned int rlen)
229 {
230         struct kfilnd_transaction *tn = private;
231         struct kfilnd_msg *rxmsg = tn->tn_rx_msg.msg;
232         int nob;
233         int rc = 0;
234         int status = 0;
235         enum tn_events event;
236
237         if (mlen > rlen)
238                 return -EINVAL;
239
240         /* Transaction must be in receive state */
241         if (tn->tn_state != TN_STATE_IMM_RECV)
242                 return -EINVAL;
243
244         tn->tn_lntmsg = msg;
245         tn->lnet_msg_len = rlen;
246
247         switch (rxmsg->type) {
248         case KFILND_MSG_IMMEDIATE:
249                 nob = offsetof(struct kfilnd_msg, proto.immed.payload[rlen]);
250                 if (nob > tn->tn_rx_msg.length) {
251                         CERROR("Immediate message from %s too big: %d(%lu)\n",
252                                libcfs_nidstr(&msg->msg_hdr.src_nid),
253                                nob, tn->tn_rx_msg.length);
254                         return -EPROTO;
255                 }
256                 tn->tn_nob = nob;
257
258                 lnet_copy_flat2kiov(niov, kiov, offset,
259                                     KFILND_IMMEDIATE_MSG_SIZE, rxmsg,
260                                     offsetof(struct kfilnd_msg,
261                                              proto.immed.payload),
262                                     mlen);
263
264                 kfilnd_tn_event_handler(tn, TN_EVENT_RX_OK, 0);
265                 return 0;
266
267         case KFILND_MSG_BULK_PUT_REQ:
268                 if (mlen == 0) {
269                         event = TN_EVENT_SKIP_TAG_RMA;
270                 } else {
271                         /* Post the buffer given us as a sink  */
272                         tn->sink_buffer = true;
273                         rc = kfilnd_tn_set_kiov_buf(tn, kiov, niov, offset,
274                                                     mlen);
275                         if (rc) {
276                                 CERROR("Failed to setup PUT sink buffer rc %d\n", rc);
277                                 kfilnd_tn_free(tn);
278                                 return rc;
279                         }
280                         event = TN_EVENT_INIT_TAG_RMA;
281                 }
282                 break;
283
284         case KFILND_MSG_BULK_GET_REQ:
285                 if (!msg) {
286                         event = TN_EVENT_SKIP_TAG_RMA;
287                         status = -ENODATA;
288                 } else {
289                         /* Post the buffer given to us as a source  */
290                         tn->sink_buffer = false;
291                         rc = kfilnd_tn_set_kiov_buf(tn, msg->msg_kiov,
292                                                     msg->msg_niov,
293                                                     msg->msg_offset,
294                                                     msg->msg_len);
295                         if (rc) {
296                                 CERROR("Failed to setup GET source buffer rc %d\n", rc);
297                                 kfilnd_tn_free(tn);
298                                 return rc;
299                         }
300                         event = TN_EVENT_INIT_TAG_RMA;
301                 }
302                 break;
303
304         default:
305                 /* TODO: TN leaks here. */
306                 CERROR("Invalid message type = %d\n", rxmsg->type);
307                 return -EINVAL;
308         }
309
310         /* Store relevant fields to generate a bulk response. */
311         tn->tn_response_mr_key = rxmsg->proto.bulk_req.key;
312         tn->tn_response_rx = rxmsg->proto.bulk_req.response_rx;
313
314 #if 0
315         tn->tn_tx_msg.length = kfilnd_init_proto(tn->tn_tx_msg.msg,
316                                                  KFILND_MSG_BULK_RSP,
317                                                  sizeof(struct kfilnd_bulk_rsp),
318                                                  ni);
319 #endif
320
321         KFILND_TN_DEBUG(tn, "%s in %u bytes in %u frags",
322                         msg_type_to_str(rxmsg->type), tn->tn_nob,
323                         tn->tn_num_iovec);
324
325         kfilnd_tn_event_handler(tn, event, status);
326
327         return rc;
328 }
329
330 static int kfilnd_startup(struct lnet_ni *ni);
331
332 static const struct lnet_lnd the_kfilnd = {
333         .lnd_type       = KFILND,
334         .lnd_startup    = kfilnd_startup,
335         .lnd_shutdown   = kfilnd_shutdown,
336         .lnd_send       = kfilnd_send,
337         .lnd_recv       = kfilnd_recv,
338 };
339
340 static int kfilnd_startup(struct lnet_ni *ni)
341 {
342         const char *node;
343         int rc;
344         struct kfilnd_dev *kfdev;
345
346         if (!ni)
347                 return -EINVAL;
348
349         if (ni->ni_net->net_lnd != &the_kfilnd) {
350                 CERROR("Wrong lnd type\n");
351                 return -EINVAL;
352         }
353
354         kfilnd_tunables_setup(ni);
355
356         /* Only a single interface is supported. */
357         if (!ni->ni_interface) {
358                 rc = -ENODEV;
359                 CERROR("No LNet network interface address defined\n");
360                 goto err;
361         }
362
363         node = ni->ni_interface;
364
365         kfdev = kfilnd_dev_alloc(ni, node);
366         if (IS_ERR(kfdev)) {
367                 rc = PTR_ERR(kfdev);
368                 CERROR("Failed to allocate KFILND device for %s: rc=%d\n", node,
369                        rc);
370                 goto err;
371         }
372
373         ni->ni_data = kfdev;
374         ni->ni_nid.nid_addr[0] = cpu_to_be32(LNET_NIDADDR(kfdev->nic_addr));
375
376         /* Post a series of immediate receive buffers */
377         rc = kfilnd_dev_post_imm_buffers(kfdev);
378         if (rc) {
379                 CERROR("Can't post buffers, rc = %d\n", rc);
380                 goto err_free_dev;
381         }
382
383         return 0;
384
385 err_free_dev:
386         kfilnd_dev_free(kfdev);
387 err:
388         return rc;
389 }
390
391 static void __exit kfilnd_exit(void)
392 {
393         destroy_workqueue(kfilnd_wq);
394
395         kfilnd_tn_cleanup();
396
397         lnet_unregister_lnd(&the_kfilnd);
398
399         debugfs_remove_recursive(kfilnd_debug_dir);
400 }
401
402 static int __init kfilnd_init(void)
403 {
404         int rc;
405
406         kfilnd_debug_dir = debugfs_create_dir("kfilnd", NULL);
407
408         rc = kfilnd_tunables_init();
409         if (rc)
410                 goto err;
411
412         /* Do any initialization of the transaction system */
413         rc = kfilnd_tn_init();
414         if (rc) {
415                 CERROR("Cannot initialize transaction system\n");
416                 goto err;
417         }
418
419         kfilnd_wq = alloc_workqueue("kfilnd_wq", KFILND_WQ_FLAGS,
420                                     WQ_MAX_ACTIVE);
421         if (!kfilnd_wq) {
422                 rc = -ENOMEM;
423                 CERROR("Failed to allocated kfilnd work queue\n");
424                 goto err_tn_cleanup;
425         }
426
427         lnet_register_lnd(&the_kfilnd);
428
429         return 0;
430
431 err_tn_cleanup:
432         kfilnd_tn_cleanup();
433 err:
434         return rc;
435 }
436
437 MODULE_AUTHOR("Cray Inc.");
438 MODULE_DESCRIPTION("Kfabric Lustre Network Driver");
439 MODULE_VERSION(KFILND_VERSION);
440 MODULE_LICENSE("GPL");
441
442 module_init(kfilnd_init);
443 module_exit(kfilnd_exit);