Whamcloud - gitweb
d070afe59d1eeb72292008cad901630863b23edc
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd_cq.c
1
2 /*
3  * GPL HEADER START
4  *
5  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License version 2 only,
9  * as published by the Free Software Foundation.
10  *
11  * This program is distributed in the hope that it will be useful, but
12  * WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14  * General Public License version 2 for more details (a copy is included
15  * in the LICENSE file that accompanied this code).
16  *
17  * You should have received a copy of the GNU General Public License
18  * version 2 along with this program; If not, see
19  * http://www.gnu.org/licenses/gpl-2.0.html
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright 2022 Hewlett Packard Enterprise Development LP
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  */
29 /*
30  * kfilnd completion queue.
31  */
32 #include <linux/idr.h>
33 #include <linux/mutex.h>
34 #include <linux/byteorder/generic.h>
35
36 #include "kfilnd_cq.h"
37 #include "kfilnd_tn.h"
38 #include "kfilnd_ep.h"
39
40 void kfilnd_cq_process_error(struct kfilnd_ep *ep,
41                              struct kfi_cq_err_entry *error)
42 {
43         struct kfilnd_immediate_buffer *buf;
44         struct kfilnd_transaction *tn;
45         enum tn_events tn_event;
46         int status;
47
48         switch (error->flags) {
49         case KFI_MSG | KFI_RECV:
50                 if (error->err != ECANCELED) {
51                         KFILND_EP_ERROR(ep, "Dropping error receive event %d\n",
52                                         -error->err);
53                         return;
54                 }
55                 fallthrough;
56         case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
57                 buf = error->op_context;
58                 kfilnd_ep_imm_buffer_put(buf);
59                 return;
60
61         case KFI_TAGGED | KFI_RECV:
62         case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
63         case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
64         case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
65                 tn = error->op_context;
66                 if (error->err == ECANCELED) {
67                         tn_event = TN_EVENT_TAG_RX_CANCEL;
68                         status = 0;
69                 } else {
70                         tn_event = TN_EVENT_TAG_RX_FAIL;
71                         status = -error->err;
72                 }
73                 break;
74
75         case KFI_MSG | KFI_SEND:
76                 tn = error->op_context;
77                 tn_event = TN_EVENT_TX_FAIL;
78                 status = -error->err;
79                 break;
80
81         case KFI_TAGGED | KFI_SEND:
82         case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
83         case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
84                 tn = error->op_context;
85                 tn_event = TN_EVENT_TAG_TX_FAIL;
86                 status = -error->err;
87                 break;
88
89         default:
90                 LBUG();
91         }
92
93         kfilnd_tn_event_handler(tn, tn_event, status);
94 }
95
96 static void kfilnd_cq_process_event(struct kfi_cq_data_entry *event)
97 {
98         struct kfilnd_immediate_buffer *buf;
99         struct kfilnd_msg *rx_msg;
100         struct kfilnd_transaction *tn;
101         enum tn_events tn_event;
102         int64_t status = 0;
103
104         switch (event->flags) {
105         case KFI_MSG | KFI_RECV:
106         case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
107                 buf = event->op_context;
108                 rx_msg = event->buf;
109
110                 kfilnd_tn_process_rx_event(buf, rx_msg, event->len);
111
112                 /* If the KFI_MULTI_RECV flag is set, the buffer was
113                  * unlinked.
114                  */
115                 if (event->flags & KFI_MULTI_RECV)
116                         kfilnd_ep_imm_buffer_put(buf);
117                 return;
118
119         case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
120                 status = -1 * (int64_t)be64_to_cpu(event->data);
121                 fallthrough;
122         case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
123         case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
124                 tn_event = TN_EVENT_TAG_RX_OK;
125                 tn = event->op_context;
126                 break;
127
128         case KFI_TAGGED | KFI_SEND:
129         case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
130         case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
131                 tn = event->op_context;
132                 tn_event = TN_EVENT_TAG_TX_OK;
133                 break;
134
135         case KFI_MSG | KFI_SEND:
136                 tn = event->op_context;
137                 tn_event = TN_EVENT_TX_OK;
138                 break;
139
140         default:
141                 LBUG();
142         }
143
144         kfilnd_tn_event_handler(tn, tn_event, status);
145 }
146
147 static void kfilnd_cq_process_completion(struct work_struct *work)
148 {
149         struct kfilnd_cq_work *cq_work =
150                 container_of(work, struct kfilnd_cq_work, work);
151         struct kfilnd_cq *kfilnd_cq = cq_work->cq;
152         struct kfid_cq *cq = kfilnd_cq->cq;
153         struct kfi_cq_data_entry event;
154         struct kfi_cq_err_entry error;
155         ssize_t rc;
156         bool done = false;
157
158         /* Drain the KFI completion queue of all events and errors. */
159         while (!done) {
160                 rc = kfi_cq_read(cq, &event, 1);
161                 if (rc == -KFI_EAVAIL) {
162                         while (kfi_cq_readerr(cq, &error, 0) == 1)
163                                 kfilnd_cq_process_error(kfilnd_cq->ep, &error);
164                 } else if (rc == 1) {
165                         kfilnd_cq_process_event(&event);
166                 } else if (rc == -EAGAIN) {
167                         done = true;
168                 } else {
169                         KFILND_EP_ERROR(kfilnd_cq->ep, "Unexpected rc = %ld",
170                                         rc);
171                         done = true;
172                 }
173         }
174
175         if (kfilnd_ep_replays_pending(kfilnd_cq->ep))
176                 kfilnd_ep_flush_replay_queue(kfilnd_cq->ep);
177 }
178
179 static void kfilnd_cq_completion(struct kfid_cq *cq, void *context)
180 {
181         struct kfilnd_cq *kfilnd_cq = context;
182         struct kfilnd_cq_work *cq_work;
183         unsigned int i;
184
185         for (i = 0; i < kfilnd_cq->cq_work_count; i++) {
186                 cq_work = &kfilnd_cq->cq_works[i];
187                 queue_work_on(cq_work->work_cpu, kfilnd_wq, &cq_work->work);
188         }
189 }
190
191 #define CQ_ALLOC_SIZE(cpu_count) \
192         (sizeof(struct kfilnd_cq) + \
193          (sizeof(struct kfilnd_cq_work) * (cpu_count)))
194
195 struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep,
196                                   struct kfi_cq_attr *attr)
197 {
198         struct kfilnd_cq *cq;
199         cpumask_var_t *cpu_mask;
200         int rc;
201         unsigned int cpu_count = 0;
202         unsigned int cpu;
203         unsigned int i;
204         size_t alloc_size;
205         struct kfilnd_cq_work *cq_work;
206
207         cpu_mask = cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt);
208         for_each_cpu(cpu, *cpu_mask)
209                 cpu_count++;
210
211         alloc_size = CQ_ALLOC_SIZE(cpu_count);
212         LIBCFS_CPT_ALLOC(cq, lnet_cpt_table(), ep->end_cpt, alloc_size);
213         if (!cq) {
214                 rc = -ENOMEM;
215                 KFILND_EP_ERROR(ep, "Failed to allocate memory: rc=%d", rc);
216                 goto err;
217         }
218
219         memset(cq, 0, alloc_size);
220
221         rc = kfi_cq_open(ep->end_dev->dom->domain, attr, &cq->cq,
222                          kfilnd_cq_completion, cq);
223         if (rc) {
224                 KFILND_EP_ERROR(ep, "Failed to open KFI CQ: rc=%d", rc);
225                 goto err_free_kfilnd_cq;
226         }
227
228         i = 0;
229         for_each_cpu(cpu, *cpu_mask) {
230                 cq_work = &cq->cq_works[i];
231                 cq_work->cq = cq;
232                 cq_work->work_cpu = cpu;
233                 INIT_WORK(&cq_work->work, kfilnd_cq_process_completion);
234                 i++;
235         }
236
237         cq->ep = ep;
238         cq->cq_work_count = cpu_count;
239
240         return cq;
241
242 err_free_kfilnd_cq:
243         LIBCFS_FREE(cq, alloc_size);
244 err:
245         return ERR_PTR(rc);
246 }
247
248 void kfilnd_cq_free(struct kfilnd_cq *cq)
249 {
250         flush_workqueue(kfilnd_wq);
251         kfi_close(&cq->cq->fid);
252         LIBCFS_FREE(cq, CQ_ALLOC_SIZE(cq->cq_work_count));
253 }