Whamcloud - gitweb
LU-16035 kfilnd: Initial kfilnd implementation
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd_ep.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd endpoint implementation.
30  */
31 #include "kfilnd_ep.h"
32 #include "kfilnd_dev.h"
33 #include "kfilnd_tn.h"
34 #include "kfilnd_cq.h"
35
36 /**
37  * kfilnd_ep_post_recv() - Post a single receive buffer.
38  * @ep: KFI LND endpoint to have receive buffers posted on.
39  * @buf: Receive buffer to be posted.
40  *
41  * Return: On succes, zero. Else, negative errno.
42  */
43 static int kfilnd_ep_post_recv(struct kfilnd_ep *ep,
44                                struct kfilnd_immediate_buffer *buf)
45 {
46         int rc;
47
48         if (!ep || !buf)
49                 return -EINVAL;
50
51         if (buf->immed_no_repost)
52                 return 0;
53
54         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV))
55                 return -EIO;
56         else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV_EAGAIN))
57                 return -EAGAIN;
58
59         atomic_inc(&buf->immed_ref);
60         rc = kfi_recv(ep->end_rx, buf->immed_buf, buf->immed_buf_size, NULL,
61                       KFI_ADDR_UNSPEC, buf);
62         if (rc)
63                 atomic_dec(&buf->immed_ref);
64
65         return rc;
66 }
67
68 #define KFILND_EP_REPLAY_TIMER_MSEC (100U)
69
70 /**
71  * kfilnd_ep_imm_buffer_put() - Decrement the immediate buffer count reference
72  * counter.
73  * @buf: Immediate buffer to have reference count decremented.
74  *
75  * If the immediate buffer's reference count reaches zero, the buffer will
76  * automatically be reposted.
77  */
78 void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf)
79 {
80         unsigned long expires;
81         int rc;
82
83         if (!buf)
84                 return;
85
86         if (atomic_sub_return(1, &buf->immed_ref) != 0)
87                 return;
88
89         rc = kfilnd_ep_post_recv(buf->immed_end, buf);
90         switch (rc) {
91         case 0:
92                 break;
93
94         /* Return the buffer reference and queue the immediate buffer put to be
95          * replayed.
96          */
97         case -EAGAIN:
98                 expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
99                         jiffies;
100                 atomic_inc(&buf->immed_ref);
101
102                 spin_lock(&buf->immed_end->replay_lock);
103                 list_add_tail(&buf->replay_entry,
104                               &buf->immed_end->imm_buffer_replay);
105                 atomic_inc(&buf->immed_end->replay_count);
106                 spin_unlock(&buf->immed_end->replay_lock);
107
108                 if (!timer_pending(&buf->immed_end->replay_timer))
109                         mod_timer(&buf->immed_end->replay_timer, expires);
110                 break;
111
112         /* Unexpected error resulting in immediate buffer not being able to be
113          * posted. Since immediate buffers are used to sink incoming messages,
114          * failure to post immediate buffers means failure to communicate.
115          *
116          * TODO: Prevent LNet NI from doing sends/recvs?
117          */
118         default:
119                 KFILND_EP_ERROR(buf->immed_end,
120                                 "Failed to post immediate receive buffer: rc=%d",
121                                 rc);
122         }
123 }
124
125 /**
126  * kfilnd_ep_post_imm_buffers() - Post all immediate receive buffers.
127  * @ep: KFI LND endpoint to have receive buffers posted on.
128  *
129  * This function should be called only during KFI LND device initialization.
130  *
131  * Return: On success, zero. Else, negative errno.
132  */
133 int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep)
134 {
135         int rc = 0;
136         int i;
137
138         if (!ep)
139                 return -EINVAL;
140
141         for (i = 0; i < immediate_rx_buf_count; i++) {
142                 rc = kfilnd_ep_post_recv(ep, &ep->end_immed_bufs[i]);
143                 if (rc)
144                         goto out;
145         }
146
147 out:
148         return rc;
149 }
150
151 /**
152  * kfilnd_ep_cancel_imm_buffers() - Cancel all immediate receive buffers.
153  * @ep: KFI LND endpoint to have receive buffers canceled.
154  */
155 void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep)
156 {
157         int i;
158
159         if (!ep)
160                 return;
161
162         for (i = 0; i < immediate_rx_buf_count; i++) {
163                 ep->end_immed_bufs[i].immed_no_repost = true;
164
165                 /* Since this is called during LNet NI teardown, no need to
166                  * pipeline retries. Just spin until -EAGAIN is not returned.
167                  */
168                 while (kfi_cancel(&ep->end_rx->fid, &ep->end_immed_bufs[i]) ==
169                        -EAGAIN)
170                         schedule();
171         }
172 }
173
174 static void kfilnd_ep_err_fail_loc_work(struct work_struct *work)
175 {
176         struct kfilnd_ep_err_fail_loc_work *err =
177                 container_of(work, struct kfilnd_ep_err_fail_loc_work, work);
178
179         kfilnd_cq_process_error(err->ep, &err->err);
180         kfree(err);
181 }
182
183 static int kfilnd_ep_gen_fake_err(struct kfilnd_ep *ep,
184                                   const struct kfi_cq_err_entry *err)
185 {
186         struct kfilnd_ep_err_fail_loc_work *fake_err;
187
188         fake_err = kmalloc(sizeof(*fake_err), GFP_KERNEL);
189         if (!fake_err)
190                 return -ENOMEM;
191
192         fake_err->ep = ep;
193         fake_err->err = *err;
194         INIT_WORK(&fake_err->work, kfilnd_ep_err_fail_loc_work);
195         queue_work(kfilnd_wq, &fake_err->work);
196
197         return 0;
198 }
199
200 static uint64_t gen_init_tag_bits(struct kfilnd_transaction *tn)
201 {
202         return (tn->peer->remote_session_key << KFILND_EP_KEY_BITS) |
203                 tn->tn_response_mr_key;
204 }
205
206 /**
207  * kfilnd_ep_post_tagged_send() - Post a tagged send operation.
208  * @ep: KFI LND endpoint used to post the tagged receivce operation.
209  * @tn: Transaction structure containing the send buffer to be posted.
210  *
211  * The tag for the post tagged send operation is the response memory region key
212  * associated with the transaction.
213  *
214  * Return: On success, zero. Else, negative errno value.
215  */
216 int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep,
217                                struct kfilnd_transaction *tn)
218 {
219         struct kfi_cq_err_entry fake_error = {
220                 .op_context = tn,
221                 .flags = KFI_TAGGED | KFI_SEND,
222                 .err = EIO,
223         };
224         int rc;
225
226         if (!ep || !tn)
227                 return -EINVAL;
228
229         /* Make sure the device is not being shut down */
230         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
231                 return -EINVAL;
232
233         /* Progress transaction to failure if send should fail. */
234         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EVENT)) {
235                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
236                 if (!rc)
237                         return 0;
238         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND)) {
239                 return -EIO;
240         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EAGAIN)) {
241                 return -EAGAIN;
242         }
243
244         rc = kfi_tsenddata(ep->end_tx, NULL, 0, NULL, tn->tagged_data,
245                            tn->tn_target_addr, gen_init_tag_bits(tn), tn);
246         switch (rc) {
247         case 0:
248         case -EAGAIN:
249                 KFILND_EP_DEBUG(ep,
250                                 "Transaction ID %p: %s tagged send of with tag 0x%x to peer 0x%llx: rc=%d",
251                                 tn, rc ? "Failed to post" : "Posted",
252                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
253                 break;
254
255         default:
256                 KFILND_EP_ERROR(ep,
257                                 "Transaction ID %p: Failed to post tagged send with tag 0x%x to peer 0x%llx: rc=%d",
258                                 tn, tn->tn_response_mr_key,
259                                 tn->tn_target_addr, rc);
260         }
261
262         return rc;
263 }
264
265 /**
266  * kfilnd_ep_cancel_tagged_recv() - Cancel a tagged recv.
267  * @ep: KFI LND endpoint used to cancel the tagged receivce operation.
268  * @tn: Transaction structure containing the receive buffer to be cancelled.
269  *
270  * The tagged receive buffer context pointer is used to cancel a tagged receive
271  * operation. The context pointer is always the transaction pointer.
272  *
273  * Return: 0 on success. -ENOENT if the tagged receive buffer is not found. The
274  * tagged receive buffer may not be found due to a tagged send operation already
275  * landing or the tagged receive buffer never being posted. Negative errno value
276  * on error.
277  */
278 int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep,
279                                  struct kfilnd_transaction *tn)
280 {
281         if (!ep || !tn)
282                 return -EINVAL;
283
284         /* Make sure the device is not being shut down */
285         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
286                 return -EINVAL;
287
288         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN))
289                 return -EAGAIN;
290
291         /* The async event count is not decremented for a cancel operation since
292          * it was incremented for the post tagged receive.
293          */
294         return kfi_cancel(&ep->end_rx->fid, tn);
295 }
296
297 static uint64_t gen_target_tag_bits(struct kfilnd_transaction *tn)
298 {
299         return (tn->peer->local_session_key << KFILND_EP_KEY_BITS) |
300                 tn->tn_mr_key;
301 }
302
303 /**
304  * kfilnd_ep_post_tagged_recv() - Post a tagged receive operation.
305  * @ep: KFI LND endpoint used to post the tagged receivce operation.
306  * @tn: Transaction structure containing the receive buffer to be posted.
307  *
308  * The tag for the post tagged receive operation is the memory region key
309  * associated with the transaction.
310  *
311  * Return: On success, zero. Else, negative errno value.
312  */
313 int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep,
314                                struct kfilnd_transaction *tn)
315 {
316         struct kfi_msg_tagged msg = {
317                 .tag = gen_target_tag_bits(tn),
318                 .context = tn,
319                 .addr = tn->peer->addr,
320         };
321         struct kfi_cq_err_entry fake_error = {
322                 .op_context = tn,
323                 .flags = KFI_TAGGED | KFI_RECV,
324                 .err = EIO,
325         };
326         int rc;
327
328         if (!ep || !tn)
329                 return -EINVAL;
330
331         /* Make sure the device is not being shut down */
332         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
333                 return -EINVAL;
334
335         /* Progress transaction to failure if send should fail. */
336         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EVENT)) {
337                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
338                 if (!rc)
339                         return 0;
340         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV)) {
341                 return -EIO;
342         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EAGAIN)) {
343                 return -EAGAIN;
344         }
345
346         msg.iov_count = tn->tn_num_iovec;
347         msg.type = KFI_BVEC;
348         msg.msg_biov = tn->tn_kiov;
349
350         rc = kfi_trecvmsg(ep->end_rx, &msg, KFI_COMPLETION);
351         switch (rc) {
352         case 0:
353         case -EAGAIN:
354                 KFILND_EP_DEBUG(ep,
355                                 "Transaction ID %p: %s tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
356                                 tn, rc ? "Failed to post" : "Posted",
357                                 tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
358                 break;
359
360         default:
361                 KFILND_EP_ERROR(ep,
362                                 "Transaction ID %p: Failed to post tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
363                                 tn, tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
364         }
365
366         return rc;
367 }
368
369 /**
370  * kfilnd_ep_post_send() - Post a send operation.
371  * @ep: KFI LND endpoint used to post the send operation.
372  * @tn: Transaction structure containing the buffer to be sent.
373  *
374  * The target of the send operation is based on the target LNet NID field within
375  * the transaction structure. A lookup of LNet NID to KFI address is performed.
376  *
377  * Return: On success, zero. Else, negative errno value.
378  */
379 int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
380 {
381         size_t len;
382         void *buf;
383         struct kfi_cq_err_entry fake_error = {
384                 .op_context = tn,
385                 .flags = KFI_MSG | KFI_SEND,
386                 .err = EIO,
387         };
388         int rc;
389
390         if (!ep || !tn)
391                 return -EINVAL;
392
393         buf = tn->tn_tx_msg.msg;
394         len = tn->tn_tx_msg.length;
395
396         /* Make sure the device is not being shut down */
397         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
398                 return -EINVAL;
399
400         /* Progress transaction to failure if send should fail. */
401         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EVENT)) {
402                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
403                 if (!rc)
404                         return 0;
405         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND)) {
406                 return -EIO;
407         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EAGAIN)) {
408                 return -EAGAIN;
409         }
410
411         rc = kfi_send(ep->end_tx, buf, len, NULL, tn->tn_target_addr, tn);
412         switch (rc) {
413         case 0:
414         case -EAGAIN:
415                 KFILND_EP_DEBUG(ep,
416                                 "Transaction ID %p: %s send of %lu bytes to peer 0x%llx: rc=%d",
417                                 tn, rc ? "Failed to post" : "Posted",
418                                 len, tn->tn_target_addr, rc);
419                 break;
420
421         default:
422                 KFILND_EP_ERROR(ep,
423                                 "Transaction ID %p: Failed to post send of %lu bytes to peer 0x%llx: rc=%d",
424                                 tn, len, tn->tn_target_addr, rc);
425         }
426
427         return rc;
428 }
429
430 /**
431  * kfilnd_ep_post_write() - Post a write operation.
432  * @ep: KFI LND endpoint used to post the write operation.
433  * @tn: Transaction structure containing the buffer to be read from.
434  *
435  * The target of the write operation is based on the target LNet NID field
436  * within the transaction structure. A lookup of LNet NID to KFI address is
437  * performed.
438  *
439  * The transaction cookie is used as the remote key for the target memory
440  * region.
441  *
442  * Return: On success, zero. Else, negative errno value.
443  */
444 int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
445 {
446         int rc;
447         struct kfi_cq_err_entry fake_error = {
448                 .op_context = tn,
449                 .flags = KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND,
450                 .err = EIO,
451         };
452         struct kfi_rma_iov rma_iov = {
453                 .len = tn->tn_nob,
454                 .key = gen_init_tag_bits(tn),
455         };
456         struct kfi_msg_rma rma = {
457                 .addr = tn->tn_target_addr,
458                 .rma_iov = &rma_iov,
459                 .rma_iov_count = 1,
460                 .context = tn,
461         };
462
463         if (!ep || !tn)
464                 return -EINVAL;
465
466         /* Make sure the device is not being shut down */
467         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
468                 return -EINVAL;
469
470         /* Progress transaction to failure if read should fail. */
471         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EVENT)) {
472                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
473                 if (!rc)
474                         return 0;
475         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE)) {
476                 return -EIO;
477         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EAGAIN)) {
478                 return -EAGAIN;
479         }
480
481         rma.iov_count = tn->tn_num_iovec;
482         rma.type = KFI_BVEC;
483         rma.msg_biov = tn->tn_kiov;
484
485         rc = kfi_writemsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
486         switch (rc) {
487         case 0:
488         case -EAGAIN:
489                 KFILND_EP_DEBUG(ep,
490                                 "Transaction ID %p: %s write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
491                                 tn, rc ? "Failed to post" : "Posted",
492                                 tn->tn_nob, tn->tn_num_iovec,
493                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
494                 break;
495
496         default:
497                 KFILND_EP_ERROR(ep,
498                                 "Transaction ID %p: Failed to post write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
499                                 tn, tn->tn_nob, tn->tn_num_iovec,
500                                 tn->tn_response_mr_key, tn->tn_target_addr,
501                                 rc);
502         }
503
504         return rc;
505 }
506
507 /**
508  * kfilnd_ep_post_read() - Post a read operation.
509  * @ep: KFI LND endpoint used to post the read operation.
510  * @tn: Transaction structure containing the buffer to be read into.
511  *
512  * The target of the read operation is based on the target LNet NID field within
513  * the transaction structure. A lookup of LNet NID to KFI address is performed.
514  *
515  * The transaction cookie is used as the remote key for the target memory
516  * region.
517  *
518  * Return: On success, zero. Else, negative errno value.
519  */
520 int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
521 {
522         int rc;
523         struct kfi_cq_err_entry fake_error = {
524                 .op_context = tn,
525                 .flags = KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND,
526                 .err = EIO,
527         };
528         struct kfi_rma_iov rma_iov = {
529                 .len = tn->tn_nob,
530                 .key = gen_init_tag_bits(tn),
531         };
532         struct kfi_msg_rma rma = {
533                 .addr = tn->tn_target_addr,
534                 .rma_iov = &rma_iov,
535                 .rma_iov_count = 1,
536                 .context = tn,
537         };
538
539         if (!ep || !tn)
540                 return -EINVAL;
541
542         /* Make sure the device is not being shut down */
543         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
544                 return -EINVAL;
545
546         /* Progress transaction to failure if read should fail. */
547         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EVENT)) {
548                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
549                 if (!rc)
550                         return 0;
551         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ)) {
552                 return -EIO;
553         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EAGAIN)) {
554                 return -EAGAIN;
555         }
556
557         rma.iov_count = tn->tn_num_iovec;
558         rma.type = KFI_BVEC;
559         rma.msg_biov = tn->tn_kiov;
560
561         rc = kfi_readmsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
562         switch (rc) {
563         case 0:
564         case -EAGAIN:
565                 KFILND_EP_DEBUG(ep,
566                                 "Transaction ID %p: %s read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
567                                 tn, rc ? "Failed to post" : "Posted",
568                                 tn->tn_nob, tn->tn_num_iovec,
569                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
570                 break;
571
572         default:
573                 KFILND_EP_ERROR(ep,
574                                 "Transaction ID %p: Failed to post read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
575                                 tn, tn->tn_nob, tn->tn_num_iovec,
576                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
577         }
578
579         return rc;
580 }
581
582 void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep,
583                                struct kfilnd_transaction *tn)
584 {
585         unsigned long expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
586                 jiffies;
587
588         spin_lock(&ep->replay_lock);
589         list_add_tail(&tn->replay_entry, &ep->tn_replay);
590         atomic_inc(&ep->replay_count);
591         spin_unlock(&ep->replay_lock);
592
593         if (!timer_pending(&ep->replay_timer))
594                 mod_timer(&ep->replay_timer, expires);
595 }
596
597 void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep)
598 {
599         LIST_HEAD(tn_replay);
600         LIST_HEAD(imm_buf_replay);
601         struct kfilnd_transaction *tn_first;
602         struct kfilnd_transaction *tn_last;
603         struct kfilnd_immediate_buffer *buf_first;
604         struct kfilnd_immediate_buffer *buf_last;
605
606         /* Since the endpoint replay lists can be manipulated while
607          * attempting to do replays, the entire replay list is moved to a
608          * temporary list.
609          */
610         spin_lock(&ep->replay_lock);
611
612         tn_first = list_first_entry_or_null(&ep->tn_replay,
613                                             struct kfilnd_transaction,
614                                             replay_entry);
615         if (tn_first) {
616                 tn_last = list_last_entry(&ep->tn_replay,
617                                           struct kfilnd_transaction,
618                                           replay_entry);
619                 list_bulk_move_tail(&tn_replay, &tn_first->replay_entry,
620                                     &tn_last->replay_entry);
621                 LASSERT(list_empty(&ep->tn_replay));
622         }
623
624         buf_first = list_first_entry_or_null(&ep->imm_buffer_replay,
625                                              struct kfilnd_immediate_buffer,
626                                              replay_entry);
627         if (buf_first) {
628                 buf_last = list_last_entry(&ep->imm_buffer_replay,
629                                            struct kfilnd_immediate_buffer,
630                                            replay_entry);
631                 list_bulk_move_tail(&imm_buf_replay, &buf_first->replay_entry,
632                                     &buf_last->replay_entry);
633                 LASSERT(list_empty(&ep->imm_buffer_replay));
634         }
635
636         spin_unlock(&ep->replay_lock);
637
638         /* Replay all queued transactions. */
639         list_for_each_entry_safe(tn_first, tn_last, &tn_replay, replay_entry) {
640                 list_del(&tn_first->replay_entry);
641                 atomic_dec(&ep->replay_count);
642                 kfilnd_tn_event_handler(tn_first, tn_first->replay_event,
643                                         tn_first->replay_status);
644         }
645
646         list_for_each_entry_safe(buf_first, buf_last, &imm_buf_replay,
647                                  replay_entry) {
648                 list_del(&buf_first->replay_entry);
649                 atomic_dec(&ep->replay_count);
650                 kfilnd_ep_imm_buffer_put(buf_first);
651         }
652 }
653
654 static void kfilnd_ep_replay_work(struct work_struct *work)
655 {
656         struct kfilnd_ep *ep =
657                 container_of(work, struct kfilnd_ep, replay_work);
658
659         kfilnd_ep_flush_replay_queue(ep);
660 }
661
662 static void kfilnd_ep_replay_timer(cfs_timer_cb_arg_t data)
663 {
664         struct kfilnd_ep *ep = cfs_from_timer(ep, data, replay_timer);
665         unsigned int cpu =
666                 cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt));
667
668         queue_work_on(cpu, kfilnd_wq, &ep->replay_work);
669 }
670
671 #define KFILND_EP_ALLOC_SIZE \
672         (sizeof(struct kfilnd_ep) + \
673          (sizeof(struct kfilnd_immediate_buffer) * immediate_rx_buf_count))
674
675 /**
676  * kfilnd_ep_free() - Free a KFI LND endpoint.
677  * @ep: KFI LND endpoint to be freed.
678  *
679  * Safe to call on NULL or error pointer.
680  */
681 void kfilnd_ep_free(struct kfilnd_ep *ep)
682 {
683         int i;
684         int k = 2;
685
686         if (IS_ERR_OR_NULL(ep))
687                 return;
688
689         while (atomic_read(&ep->replay_count)) {
690                 k++;
691                 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
692                         "Waiting for replay count %d not zero\n",
693                         atomic_read(&ep->replay_count));
694                 schedule_timeout_uninterruptible(HZ);
695         }
696
697         /* Cancel any outstanding immediate receive buffers. */
698         kfilnd_ep_cancel_imm_buffers(ep);
699
700         /* Wait for RX buffers to no longer be used and then free them. */
701         for (i = 0; i < immediate_rx_buf_count; i++) {
702                 k = 2;
703                 while (atomic_read(&ep->end_immed_bufs[i].immed_ref)) {
704                         k++;
705                         CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
706                                "Waiting for RX buffer %d to release\n", i);
707                         schedule_timeout_uninterruptible(HZ);
708                 }
709         }
710
711         /* Wait for all transactions to complete. */
712         k = 2;
713         spin_lock(&ep->tn_list_lock);
714         while (!list_empty(&ep->tn_list)) {
715                 spin_unlock(&ep->tn_list_lock);
716                 k++;
717                 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
718                        "Waiting for transactions to complete\n");
719                 schedule_timeout_uninterruptible(HZ);
720                 spin_lock(&ep->tn_list_lock);
721         }
722         spin_unlock(&ep->tn_list_lock);
723
724         /* Free all immediate buffers. */
725         for (i = 0; i < immediate_rx_buf_count; i++)
726                 __free_pages(ep->end_immed_bufs[i].immed_buf_page,
727                              order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
728
729         kfi_close(&ep->end_tx->fid);
730         kfi_close(&ep->end_rx->fid);
731         kfilnd_cq_free(ep->end_tx_cq);
732         kfilnd_cq_free(ep->end_rx_cq);
733         ida_destroy(&ep->keys);
734         LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
735 }
736
737 /**
738  * kfilnd_ep_alloc() - Allocate a new KFI LND endpoint.
739  * @dev: KFI LND device used to allocate endpoints.
740  * @context_id: Context ID associated with the endpoint.
741  * @cpt: CPT KFI LND endpoint should be associated with.
742  *
743  * An KFI LND endpoint consists of unique transmit/receive command queues
744  * (contexts) and completion queues. The underlying completion queue interrupt
745  * vector is associated with a core within the CPT.
746  *
747  * Return: On success, valid pointer. Else, negative errno pointer.
748  */
749 struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev,
750                                   unsigned int context_id, unsigned int cpt,
751                                   size_t nrx, size_t rx_size)
752 {
753         int rc;
754         struct kfi_cq_attr cq_attr = {};
755         struct kfi_rx_attr rx_attr = {};
756         struct kfi_tx_attr tx_attr = {};
757         int ncpts;
758         size_t min_multi_recv = KFILND_IMMEDIATE_MSG_SIZE;
759         struct kfilnd_ep *ep;
760         int i;
761         size_t rx_buf_size;
762
763         if (!dev || !nrx || !rx_size) {
764                 rc = -EINVAL;
765                 goto err;
766         }
767
768         ncpts = dev->kfd_ni->ni_ncpts;
769
770         LIBCFS_CPT_ALLOC(ep, lnet_cpt_table(), cpt, KFILND_EP_ALLOC_SIZE);
771         if (!ep) {
772                 rc = -ENOMEM;
773                 goto err;
774         }
775
776         ep->end_dev = dev;
777         ep->end_cpt = cpt;
778         ep->end_context_id = context_id;
779         INIT_LIST_HEAD(&ep->tn_list);
780         spin_lock_init(&ep->tn_list_lock);
781         INIT_LIST_HEAD(&ep->tn_replay);
782         INIT_LIST_HEAD(&ep->imm_buffer_replay);
783         spin_lock_init(&ep->replay_lock);
784         cfs_timer_setup(&ep->replay_timer, kfilnd_ep_replay_timer,
785                         (unsigned long)ep, 0);
786         INIT_WORK(&ep->replay_work, kfilnd_ep_replay_work);
787         atomic_set(&ep->replay_count, 0);
788         ida_init(&ep->keys);
789
790         /* Create a CQ for this CPT */
791         cq_attr.flags = KFI_AFFINITY;
792         cq_attr.format = KFI_CQ_FORMAT_DATA;
793         cq_attr.wait_cond = KFI_CQ_COND_NONE;
794         cq_attr.wait_obj = KFI_WAIT_NONE;
795
796         /* Vector is set to first core in the CPT */
797         cq_attr.signaling_vector =
798                 cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), cpt));
799
800         cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
801                 rx_cq_scale_factor;
802         ep->end_rx_cq = kfilnd_cq_alloc(ep, &cq_attr);
803         if (IS_ERR(ep->end_rx_cq)) {
804                 rc = PTR_ERR(ep->end_rx_cq);
805                 CERROR("Failed to allocated KFILND RX CQ: rc=%d\n", rc);
806                 goto err_free_ep;
807         }
808
809         cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
810                 tx_cq_scale_factor;
811         ep->end_tx_cq = kfilnd_cq_alloc(ep, &cq_attr);
812         if (IS_ERR(ep->end_tx_cq)) {
813                 rc = PTR_ERR(ep->end_tx_cq);
814                 CERROR("Failed to allocated KFILND TX CQ: rc=%d\n", rc);
815                 goto err_free_rx_cq;
816         }
817
818         /* Initialize the RX/TX contexts for the given CPT */
819         rx_attr.op_flags = KFI_COMPLETION | KFI_MULTI_RECV;
820         rx_attr.msg_order = KFI_ORDER_NONE;
821         rx_attr.comp_order = KFI_ORDER_NONE;
822         rx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits +
823                 immediate_rx_buf_count;
824         rx_attr.iov_limit = LNET_MAX_IOV;
825         rc = kfi_rx_context(dev->kfd_sep, context_id, &rx_attr, &ep->end_rx,
826                             ep);
827         if (rc) {
828                 CERROR("Could not create RX context on CPT %d, rc = %d\n", cpt,
829                        rc);
830                 goto err_free_tx_cq;
831         }
832
833         /* Set the lower limit for multi-receive buffers */
834         rc = kfi_setopt(&ep->end_rx->fid, KFI_OPT_ENDPOINT,
835                         KFI_OPT_MIN_MULTI_RECV, &min_multi_recv,
836                         sizeof(min_multi_recv));
837         if (rc) {
838                 CERROR("Could not set min_multi_recv on CPT %d, rc = %d\n", cpt,
839                        rc);
840                 goto err_free_rx_context;
841         }
842
843         tx_attr.op_flags = KFI_COMPLETION | KFI_TRANSMIT_COMPLETE;
844         tx_attr.msg_order = KFI_ORDER_NONE;
845         tx_attr.comp_order = KFI_ORDER_NONE;
846         tx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
847                 tx_scale_factor;
848         tx_attr.iov_limit = LNET_MAX_IOV;
849         tx_attr.rma_iov_limit = LNET_MAX_IOV;
850         rc = kfi_tx_context(dev->kfd_sep, context_id, &tx_attr, &ep->end_tx,
851                             ep);
852         if (rc) {
853                 CERROR("Could not create TX context on CPT %d, rc = %d\n", cpt,
854                        rc);
855                 goto err_free_rx_context;
856         }
857
858         /* Bind these two contexts to the CPT's CQ */
859         rc = kfi_ep_bind(ep->end_rx, &ep->end_rx_cq->cq->fid, 0);
860         if (rc) {
861                 CERROR("Could not bind RX context on CPT %d, rc = %d\n", cpt,
862                        rc);
863                 goto err_free_tx_context;
864         }
865
866         rc = kfi_ep_bind(ep->end_tx, &ep->end_tx_cq->cq->fid, 0);
867         if (rc) {
868                 CERROR("Could not bind TX context on CPT %d, rc = %d\n", cpt,
869                        rc);
870                 goto err_free_tx_context;
871         }
872
873         /* Enable both endpoints */
874         rc = kfi_enable(ep->end_rx);
875         if (rc) {
876                 CERROR("Could not enable RX context on CPT %d, rc = %d\n", cpt,
877                        rc);
878                 goto err_free_tx_context;
879         }
880
881         rc = kfi_enable(ep->end_tx);
882         if (rc) {
883                 CERROR("Could not enable TX context on CPT %d, rc=%d\n", cpt,
884                        rc);
885                 goto err_free_tx_context;
886         }
887
888         /* The nrx value is the max number of immediate messages any one peer
889          * can send us.  Given that compute nodes are RPC-based, we should not
890          * see any more incoming messages than we are able to send.  A such, nrx
891          * is a good size for each multi-receive buffer.  However, if we are
892          * a server or LNet router, we need a multiplier of this value. For
893          * now, we will just have nrx drive the buffer size per CPT.  Then,
894          * LNet routers and servers can just define more CPTs to get a better
895          * spread of buffers to receive messages from multiple peers.  A better
896          * way should be devised in the future.
897          */
898         rx_buf_size = roundup_pow_of_two(max(nrx * rx_size, PAGE_SIZE));
899
900         for (i = 0; i < immediate_rx_buf_count; i++) {
901
902                 /* Using physically contiguous allocations can allow for
903                  * underlying kfabric providers to use untranslated addressing
904                  * instead of having to setup NIC memory mappings. This
905                  * typically leads to improved performance.
906                  */
907                 ep->end_immed_bufs[i].immed_buf_page =
908                         alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
909                                          GFP_KERNEL | __GFP_NOWARN,
910                                          order_base_2(rx_buf_size / PAGE_SIZE));
911                 if (!ep->end_immed_bufs[i].immed_buf_page) {
912                         rc = -ENOMEM;
913                         goto err_free_rx_buffers;
914                 }
915
916                 atomic_set(&ep->end_immed_bufs[i].immed_ref, 0);
917                 ep->end_immed_bufs[i].immed_buf =
918                         page_address(ep->end_immed_bufs[i].immed_buf_page);
919                 ep->end_immed_bufs[i].immed_buf_size = rx_buf_size;
920                 ep->end_immed_bufs[i].immed_end = ep;
921         }
922
923         return ep;
924
925 err_free_rx_buffers:
926         for (i = 0; i < immediate_rx_buf_count; i++) {
927                 if (ep->end_immed_bufs[i].immed_buf_page)
928                         __free_pages(ep->end_immed_bufs[i].immed_buf_page,
929                                      order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
930         }
931
932 err_free_tx_context:
933         kfi_close(&ep->end_tx->fid);
934 err_free_rx_context:
935         kfi_close(&ep->end_rx->fid);
936 err_free_tx_cq:
937         kfilnd_cq_free(ep->end_tx_cq);
938 err_free_rx_cq:
939         kfilnd_cq_free(ep->end_rx_cq);
940 err_free_ep:
941         LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
942 err:
943         return ERR_PTR(rc);
944 }
945
946 int kfilnd_ep_get_key(struct kfilnd_ep *ep)
947 {
948         return ida_simple_get(&ep->keys, 1, KFILND_EP_KEY_MAX, GFP_KERNEL);
949 }
950
951 void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key)
952 {
953         ida_simple_remove(&ep->keys, key);
954 }