5 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
7 * This program is free software; you can redistribute it and/or modify
8 * it under the terms of the GNU General Public License version 2 only,
9 * as published by the Free Software Foundation.
11 * This program is distributed in the hope that it will be useful, but
12 * WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 * General Public License version 2 for more details (a copy is included
15 * in the LICENSE file that accompanied this code).
17 * You should have received a copy of the GNU General Public License
18 * version 2 along with this program; If not, see
19 * http://www.gnu.org/licenses/gpl-2.0.html
24 * Copyright 2022 Hewlett Packard Enterprise Development LP
27 * This file is part of Lustre, http://www.lustre.org/
30 * kfilnd completion queue.
32 #include <linux/idr.h>
33 #include <linux/mutex.h>
34 #include <linux/byteorder/generic.h>
36 #include "kfilnd_cq.h"
37 #include "kfilnd_tn.h"
38 #include "kfilnd_ep.h"
40 void kfilnd_cq_process_error(struct kfilnd_ep *ep,
41 struct kfi_cq_err_entry *error)
43 struct kfilnd_immediate_buffer *buf;
44 struct kfilnd_transaction *tn;
45 enum tn_events tn_event;
48 switch (error->flags) {
49 case KFI_MSG | KFI_RECV:
50 if (error->err != ECANCELED) {
51 KFILND_EP_ERROR(ep, "Dropping error receive event %d\n",
56 case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
57 buf = error->op_context;
58 kfilnd_ep_imm_buffer_put(buf);
61 case KFI_TAGGED | KFI_RECV:
62 case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
63 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
64 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
65 tn = error->op_context;
66 if (error->err == ECANCELED) {
67 tn_event = TN_EVENT_TAG_RX_CANCEL;
70 tn_event = TN_EVENT_TAG_RX_FAIL;
75 case KFI_MSG | KFI_SEND:
76 tn = error->op_context;
77 tn_event = TN_EVENT_TX_FAIL;
81 case KFI_TAGGED | KFI_SEND:
82 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
83 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
84 tn = error->op_context;
85 tn_event = TN_EVENT_TAG_TX_FAIL;
93 kfilnd_tn_event_handler(tn, tn_event, status);
96 static void kfilnd_cq_process_event(struct kfi_cq_data_entry *event)
98 struct kfilnd_immediate_buffer *buf;
99 struct kfilnd_msg *rx_msg;
100 struct kfilnd_transaction *tn;
101 enum tn_events tn_event;
104 switch (event->flags) {
105 case KFI_MSG | KFI_RECV:
106 case KFI_MSG | KFI_RECV | KFI_MULTI_RECV:
107 buf = event->op_context;
110 kfilnd_tn_process_rx_event(buf, rx_msg, event->len);
112 /* If the KFI_MULTI_RECV flag is set, the buffer was
115 if (event->flags & KFI_MULTI_RECV)
116 kfilnd_ep_imm_buffer_put(buf);
119 case KFI_TAGGED | KFI_RECV | KFI_REMOTE_CQ_DATA:
120 status = -1 * (int64_t)be64_to_cpu(event->data);
122 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_RECV:
123 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_RECV:
124 tn_event = TN_EVENT_TAG_RX_OK;
125 tn = event->op_context;
128 case KFI_TAGGED | KFI_SEND:
129 case KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND:
130 case KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND:
131 tn = event->op_context;
132 tn_event = TN_EVENT_TAG_TX_OK;
135 case KFI_MSG | KFI_SEND:
136 tn = event->op_context;
137 tn_event = TN_EVENT_TX_OK;
144 kfilnd_tn_event_handler(tn, tn_event, status);
147 static void kfilnd_cq_process_completion(struct work_struct *work)
149 struct kfilnd_cq_work *cq_work =
150 container_of(work, struct kfilnd_cq_work, work);
151 struct kfilnd_cq *kfilnd_cq = cq_work->cq;
152 struct kfid_cq *cq = kfilnd_cq->cq;
153 struct kfi_cq_data_entry event;
154 struct kfi_cq_err_entry error;
158 /* Drain the KFI completion queue of all events and errors. */
160 rc = kfi_cq_read(cq, &event, 1);
161 if (rc == -KFI_EAVAIL) {
162 while (kfi_cq_readerr(cq, &error, 0) == 1)
163 kfilnd_cq_process_error(kfilnd_cq->ep, &error);
164 } else if (rc == 1) {
165 kfilnd_cq_process_event(&event);
166 } else if (rc == -EAGAIN) {
169 KFILND_EP_ERROR(kfilnd_cq->ep, "Unexpected rc = %ld",
175 if (kfilnd_ep_replays_pending(kfilnd_cq->ep))
176 kfilnd_ep_flush_replay_queue(kfilnd_cq->ep);
179 static void kfilnd_cq_completion(struct kfid_cq *cq, void *context)
181 struct kfilnd_cq *kfilnd_cq = context;
182 struct kfilnd_cq_work *cq_work;
185 for (i = 0; i < kfilnd_cq->cq_work_count; i++) {
186 cq_work = &kfilnd_cq->cq_works[i];
187 queue_work_on(cq_work->work_cpu, kfilnd_wq, &cq_work->work);
191 #define CQ_ALLOC_SIZE(cpu_count) \
192 (sizeof(struct kfilnd_cq) + \
193 (sizeof(struct kfilnd_cq_work) * (cpu_count)))
195 struct kfilnd_cq *kfilnd_cq_alloc(struct kfilnd_ep *ep,
196 struct kfi_cq_attr *attr)
198 struct kfilnd_cq *cq;
199 cpumask_var_t *cpu_mask;
201 unsigned int cpu_count = 0;
205 struct kfilnd_cq_work *cq_work;
207 cpu_mask = cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt);
208 for_each_cpu(cpu, *cpu_mask)
211 alloc_size = CQ_ALLOC_SIZE(cpu_count);
212 LIBCFS_CPT_ALLOC(cq, lnet_cpt_table(), ep->end_cpt, alloc_size);
215 KFILND_EP_ERROR(ep, "Failed to allocate memory: rc=%d", rc);
219 memset(cq, 0, alloc_size);
221 rc = kfi_cq_open(ep->end_dev->dom->domain, attr, &cq->cq,
222 kfilnd_cq_completion, cq);
224 KFILND_EP_ERROR(ep, "Failed to open KFI CQ: rc=%d", rc);
225 goto err_free_kfilnd_cq;
229 for_each_cpu(cpu, *cpu_mask) {
230 cq_work = &cq->cq_works[i];
232 cq_work->work_cpu = cpu;
233 INIT_WORK(&cq_work->work, kfilnd_cq_process_completion);
238 cq->cq_work_count = cpu_count;
243 LIBCFS_FREE(cq, alloc_size);
248 void kfilnd_cq_free(struct kfilnd_cq *cq)
250 flush_workqueue(kfilnd_wq);
251 kfi_close(&cq->cq->fid);
252 LIBCFS_FREE(cq, CQ_ALLOC_SIZE(cq->cq_work_count));