5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2 only,
9 * as published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License version 2 for more details (a copy is included
15 * in the LICENSE file that accompanied this code).
17 * You should have received a copy of the GNU General Public License
18 * version 2 along with this program; If not, see
19 * http://www.gnu.org/licenses/gpl-2.0.html
24 * Copyright 2022 Hewlett Packard Enterprise Development LP
27 * This file is part of Lustre, http://www.lustre.org/
30 * kfilnd completion queue.
32 #include <linux/idr.h>
33 #include <linux/mutex.h>
34 #include <linux/byteorder/generic.h>
36 #include "kfilnd_cq.h"
37 #include "kfilnd_tn.h"
38 #include "kfilnd_ep.h"
40 void kfilnd_cq_process_error(struct kfilnd_ep *ep,
41 struct kfi_cq_err_entry *error)
43 struct kfilnd_immediate_buffer *buf;
44 struct kfilnd_transaction *tn;
45 enum tn_events tn_event;
48 switch (error->flags) {
49 case KFI_MSG | KFI_RECV:
50 if (error->err != ECANCELED) {
51 KFILND_EP_ERROR(ep, "Dropping error receive event %d",
56 case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
57 buf = error->op_context;
58 kfilnd_ep_imm_buffer_put(buf);
61 case KFI_TAGGED | KFI_RECV:
62 case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
63 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
64 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
65 tn = error->op_context;
66 if (error->err == ECANCELED) {
67 tn_event = TN_EVENT_TAG_RX_CANCEL;
70 tn_event = TN_EVENT_TAG_RX_FAIL;
75 case KFI_MSG | KFI_SEND:
76 tn = error->op_context;
77 tn_event = TN_EVENT_TX_FAIL;
80 "msg send error %d prov error %d flags %llx",
81 status, -error->prov_errno, error->flags);
85 case KFI_TAGGED | KFI_SEND:
86 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
87 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
88 tn = error->op_context;
89 tn_event = TN_EVENT_TAG_TX_FAIL;
92 "tagged error %d prov error %d flags %llx",
93 status, -error->prov_errno, error->flags);
100 kfilnd_tn_event_handler(tn, tn_event, status);
103 static void kfilnd_cq_process_event(struct kfi_cq_data_entry *event)
105 struct kfilnd_immediate_buffer *buf;
106 struct kfilnd_msg *rx_msg;
107 struct kfilnd_transaction *tn;
108 enum tn_events tn_event;
111 switch (event->flags) {
112 case KFI_MSG | KFI_RECV:
113 case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
114 buf = event->op_context;
117 kfilnd_tn_process_rx_event(buf, rx_msg, event->len);
119 /* If the KFI_MULTI_RECV flag is set, the buffer was
122 if (event->flags & KFI_MULTI_RECV)
123 kfilnd_ep_imm_buffer_put(buf);
126 case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
127 status = -1 * (int64_t)be64_to_cpu(event->data);
129 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
130 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
131 tn_event = TN_EVENT_TAG_RX_OK;
132 tn = event->op_context;
135 case KFI_TAGGED | KFI_SEND:
136 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
137 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
138 tn = event->op_context;
139 tn_event = TN_EVENT_TAG_TX_OK;
142 case KFI_MSG | KFI_SEND:
143 tn = event->op_context;
144 tn_event = TN_EVENT_TX_OK;
151 kfilnd_tn_event_handler(tn, tn_event, status);
154 static void kfilnd_cq_process_completion(struct work_struct *work)
156 struct kfilnd_cq_work *cq_work =
157 container_of(work, struct kfilnd_cq_work, work);
158 struct kfilnd_cq *kfilnd_cq = cq_work->cq;
159 struct kfid_cq *cq = kfilnd_cq->cq;
160 struct kfi_cq_data_entry event;
161 struct kfi_cq_err_entry error;
165 /* Drain the KFI completion queue of all events and errors. */
167 rc = kfi_cq_read(cq, &event, 1);
168 if (rc == -KFI_EAVAIL) {
169 while (kfi_cq_readerr(cq, &error, 0) == 1)
170 kfilnd_cq_process_error(kfilnd_cq->ep, &error);
171 } else if (rc == 1) {
172 kfilnd_cq_process_event(&event);
173 } else if (rc == -EAGAIN) {
176 KFILND_EP_ERROR(kfilnd_cq->ep, "Unexpected rc = %ld",
182 if (kfilnd_ep_replays_pending(kfilnd_cq->ep))
183 kfilnd_ep_flush_replay_queue(kfilnd_cq->ep);
186 static void kfilnd_cq_completion(struct kfid_cq *cq, void *context)
188 struct kfilnd_cq *kfilnd_cq = context;
189 struct kfilnd_cq_work *cq_work;
192 for (i = 0; i < kfilnd_cq->cq_work_count; i++) {
193 cq_work = &kfilnd_cq->cq_works[i];
194 queue_work_on(cq_work->work_cpu, kfilnd_wq, &cq_work->work);
198 #define CQ_ALLOC_SIZE(cpu_count) \
199 (sizeof(struct kfilnd_cq) + \
200 (sizeof(struct kfilnd_cq_work) * (cpu_count)))
202 struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep,
203 struct kfi_cq_attr *attr)
205 struct kfilnd_cq *cq;
206 cpumask_var_t *cpu_mask;
208 unsigned int cpu_count = 0;
212 struct kfilnd_cq_work *cq_work;
214 cpu_mask = cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt);
215 for_each_cpu(cpu, *cpu_mask)
218 alloc_size = CQ_ALLOC_SIZE(cpu_count);
219 LIBCFS_CPT_ALLOC(cq, lnet_cpt_table(), ep->end_cpt, alloc_size);
222 KFILND_EP_ERROR(ep, "Failed to allocate memory: rc=%d", rc);
226 memset(cq, 0, alloc_size);
228 rc = kfi_cq_open(ep->end_dev->dom->domain, attr, &cq->cq,
229 kfilnd_cq_completion, cq);
231 KFILND_EP_ERROR(ep, "Failed to open KFI CQ: rc=%d", rc);
232 goto err_free_kfilnd_cq;
236 for_each_cpu(cpu, *cpu_mask) {
237 cq_work = &cq->cq_works[i];
239 cq_work->work_cpu = cpu;
240 INIT_WORK(&cq_work->work, kfilnd_cq_process_completion);
245 cq->cq_work_count = cpu_count;
250 LIBCFS_FREE(cq, alloc_size);
255 void kfilnd_cq_free(struct kfilnd_cq *cq)
257 flush_workqueue(kfilnd_wq);
258 kfi_close(&cq->cq->fid);
259 LIBCFS_FREE(cq, CQ_ALLOC_SIZE(cq->cq_work_count));