Whamcloud - gitweb
LU-17839 kfilnd: Wait for hello response to mark peer uptodate
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd_ep.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd endpoint implementation.
30  */
31 #include "kfilnd_ep.h"
32 #include "kfilnd_dev.h"
33 #include "kfilnd_tn.h"
34 #include "kfilnd_cq.h"
35
36 /**
37  * kfilnd_ep_post_recv() - Post a single receive buffer.
38  * @ep: KFI LND endpoint to have receive buffers posted on.
39  * @buf: Receive buffer to be posted.
40  *
41  * Return: On succes, zero. Else, negative errno.
42  */
43 static int kfilnd_ep_post_recv(struct kfilnd_ep *ep,
44                                struct kfilnd_immediate_buffer *buf)
45 {
46         int rc;
47
48         if (!ep || !buf)
49                 return -EINVAL;
50
51         if (buf->immed_no_repost)
52                 return 0;
53
54         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV))
55                 return -EIO;
56         else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV_EAGAIN))
57                 return -EAGAIN;
58
59         atomic_inc(&buf->immed_ref);
60         rc = kfi_recv(ep->end_rx, buf->immed_buf, buf->immed_buf_size, NULL,
61                       KFI_ADDR_UNSPEC, buf);
62         if (rc)
63                 atomic_dec(&buf->immed_ref);
64
65         return rc;
66 }
67
68 #define KFILND_EP_REPLAY_TIMER_MSEC (100U)
69
70 /**
71  * kfilnd_ep_imm_buffer_put() - Decrement the immediate buffer count reference
72  * counter.
73  * @buf: Immediate buffer to have reference count decremented.
74  *
75  * If the immediate buffer's reference count reaches zero, the buffer will
76  * automatically be reposted.
77  */
78 void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf)
79 {
80         unsigned long expires;
81         int rc;
82
83         if (!buf)
84                 return;
85
86         if (atomic_sub_return(1, &buf->immed_ref) != 0)
87                 return;
88
89         rc = kfilnd_ep_post_recv(buf->immed_end, buf);
90         switch (rc) {
91         case 0:
92                 break;
93
94         /* Return the buffer reference and queue the immediate buffer put to be
95          * replayed.
96          */
97         case -EAGAIN:
98                 expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
99                         jiffies;
100                 atomic_inc(&buf->immed_ref);
101
102                 spin_lock(&buf->immed_end->replay_lock);
103                 list_add_tail(&buf->replay_entry,
104                               &buf->immed_end->imm_buffer_replay);
105                 atomic_inc(&buf->immed_end->replay_count);
106                 spin_unlock(&buf->immed_end->replay_lock);
107
108                 if (!timer_pending(&buf->immed_end->replay_timer))
109                         mod_timer(&buf->immed_end->replay_timer, expires);
110                 break;
111
112         /* Unexpected error resulting in immediate buffer not being able to be
113          * posted. Since immediate buffers are used to sink incoming messages,
114          * failure to post immediate buffers means failure to communicate.
115          *
116          * TODO: Prevent LNet NI from doing sends/recvs?
117          */
118         default:
119                 KFILND_EP_ERROR(buf->immed_end,
120                                 "Failed to post immediate receive buffer: rc=%d",
121                                 rc);
122         }
123 }
124
125 /**
126  * kfilnd_ep_post_imm_buffers() - Post all immediate receive buffers.
127  * @ep: KFI LND endpoint to have receive buffers posted on.
128  *
129  * This function should be called only during KFI LND device initialization.
130  *
131  * Return: On success, zero. Else, negative errno.
132  */
133 int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep)
134 {
135         int rc = 0;
136         int i;
137
138         if (!ep)
139                 return -EINVAL;
140
141         for (i = 0; i < immediate_rx_buf_count; i++) {
142                 rc = kfilnd_ep_post_recv(ep, &ep->end_immed_bufs[i]);
143                 if (rc)
144                         goto out;
145         }
146
147 out:
148         return rc;
149 }
150
151 /**
152  * kfilnd_ep_cancel_imm_buffers() - Cancel all immediate receive buffers.
153  * @ep: KFI LND endpoint to have receive buffers canceled.
154  */
155 void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep)
156 {
157         int i;
158
159         if (!ep)
160                 return;
161
162         for (i = 0; i < immediate_rx_buf_count; i++) {
163                 ep->end_immed_bufs[i].immed_no_repost = true;
164
165                 /* Since this is called during LNet NI teardown, no need to
166                  * pipeline retries. Just spin until -EAGAIN is not returned.
167                  */
168                 while (kfi_cancel(&ep->end_rx->fid, &ep->end_immed_bufs[i]) ==
169                        -EAGAIN)
170                         schedule();
171         }
172 }
173
174 static void kfilnd_ep_err_fail_loc_work(struct work_struct *work)
175 {
176         struct kfilnd_ep_err_fail_loc_work *err =
177                 container_of(work, struct kfilnd_ep_err_fail_loc_work, work);
178
179         kfilnd_cq_process_error(err->ep, &err->err);
180         kfree(err);
181 }
182
183 int kfilnd_ep_gen_fake_err(struct kfilnd_ep *ep,
184                            const struct kfi_cq_err_entry *err)
185 {
186         struct kfilnd_ep_err_fail_loc_work *fake_err;
187
188         fake_err = kmalloc(sizeof(*fake_err), GFP_KERNEL);
189         if (!fake_err)
190                 return -ENOMEM;
191
192         fake_err->ep = ep;
193         fake_err->err = *err;
194         INIT_WORK(&fake_err->work, kfilnd_ep_err_fail_loc_work);
195         queue_work(kfilnd_wq, &fake_err->work);
196
197         return 0;
198 }
199
200 static uint64_t gen_init_tag_bits(struct kfilnd_transaction *tn)
201 {
202         return (tn->tn_kp->kp_remote_session_key << KFILND_EP_KEY_BITS) |
203                 tn->tn_response_mr_key;
204 }
205
206 /**
207  * kfilnd_ep_post_tagged_send() - Post a tagged send operation.
208  * @ep: KFI LND endpoint used to post the tagged receivce operation.
209  * @tn: Transaction structure containing the send buffer to be posted.
210  *
211  * The tag for the post tagged send operation is the response memory region key
212  * associated with the transaction.
213  *
214  * Return: On success, zero. Else, negative errno value.
215  */
216 int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep,
217                                struct kfilnd_transaction *tn)
218 {
219         struct kfi_cq_err_entry fake_error = {
220                 .op_context = tn,
221                 .flags = KFI_TAGGED | KFI_SEND,
222                 .err = EIO,
223         };
224         int rc;
225
226         if (!ep || !tn)
227                 return -EINVAL;
228
229         /* Make sure the device is not being shut down */
230         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
231                 return -EINVAL;
232
233         /* Progress transaction to failure if send should fail. */
234         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EVENT)) {
235                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
236                 if (!rc)
237                         return 0;
238         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND)) {
239                 return -EIO;
240         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EAGAIN)) {
241                 return -EAGAIN;
242         }
243
244         rc = kfi_tsenddata(ep->end_tx, NULL, 0, NULL, tn->tagged_data,
245                            tn->tn_target_addr, gen_init_tag_bits(tn), tn);
246         switch (rc) {
247         case 0:
248         case -EAGAIN:
249                 KFILND_EP_DEBUG(ep,
250                                 "Transaction ID %p: %s tagged send of with tag 0x%x to peer 0x%llx: rc=%d",
251                                 tn, rc ? "Failed to post" : "Posted",
252                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
253                 break;
254
255         default:
256                 KFILND_EP_ERROR(ep,
257                                 "Transaction ID %p: Failed to post tagged send with tag 0x%x to peer 0x%llx: rc=%d",
258                                 tn, tn->tn_response_mr_key,
259                                 tn->tn_target_addr, rc);
260         }
261
262         return rc;
263 }
264
265 /**
266  * kfilnd_ep_cancel_tagged_recv() - Cancel a tagged recv.
267  * @ep: KFI LND endpoint used to cancel the tagged receivce operation.
268  * @tn: Transaction structure containing the receive buffer to be cancelled.
269  *
270  * The tagged receive buffer context pointer is used to cancel a tagged receive
271  * operation. The context pointer is always the transaction pointer.
272  *
273  * Return: 0 on success. -ENOENT if the tagged receive buffer is not found. The
274  * tagged receive buffer may not be found due to a tagged send operation already
275  * landing or the tagged receive buffer never being posted. Negative errno value
276  * on error.
277  */
278 int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep,
279                                  struct kfilnd_transaction *tn)
280 {
281         if (!ep || !tn)
282                 return -EINVAL;
283
284         /* Make sure the device is not being shut down */
285         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
286                 return -EINVAL;
287
288         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN))
289                 return -EAGAIN;
290
291         /* The async event count is not decremented for a cancel operation since
292          * it was incremented for the post tagged receive.
293          */
294         return kfi_cancel(&ep->end_rx->fid, tn);
295 }
296
297 static uint64_t gen_target_tag_bits(struct kfilnd_transaction *tn)
298 {
299         return (tn->tn_kp->kp_local_session_key << KFILND_EP_KEY_BITS) |
300                 tn->tn_mr_key;
301 }
302
303 /**
304  * kfilnd_ep_post_tagged_recv() - Post a tagged receive operation.
305  * @ep: KFI LND endpoint used to post the tagged receivce operation.
306  * @tn: Transaction structure containing the receive buffer to be posted.
307  *
308  * The tag for the post tagged receive operation is the memory region key
309  * associated with the transaction.
310  *
311  * Return: On success, zero. Else, negative errno value.
312  */
313 int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep,
314                                struct kfilnd_transaction *tn)
315 {
316         struct kfi_msg_tagged msg = {
317                 .tag = gen_target_tag_bits(tn),
318                 .context = tn,
319                 .addr = tn->tn_kp->kp_addr,
320         };
321         struct kfi_cq_err_entry fake_error = {
322                 .op_context = tn,
323                 .flags = KFI_TAGGED | KFI_RECV,
324                 .err = EIO,
325         };
326         int rc;
327
328         if (!ep || !tn)
329                 return -EINVAL;
330
331         /* Make sure the device is not being shut down */
332         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
333                 return -EINVAL;
334
335         /* Progress transaction to failure if send should fail. */
336         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EVENT)) {
337                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
338                 if (!rc)
339                         return 0;
340         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV)) {
341                 return -EIO;
342         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EAGAIN)) {
343                 return -EAGAIN;
344         }
345
346         msg.iov_count = tn->tn_num_iovec;
347         msg.type = KFI_BVEC;
348         msg.msg_biov = tn->tn_kiov;
349
350         rc = kfi_trecvmsg(ep->end_rx, &msg, KFI_COMPLETION);
351         switch (rc) {
352         case 0:
353         case -EAGAIN:
354                 KFILND_EP_DEBUG(ep,
355                                 "Transaction ID %p: %s tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
356                                 tn, rc ? "Failed to post" : "Posted",
357                                 tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
358                 break;
359
360         default:
361                 KFILND_EP_ERROR(ep,
362                                 "Transaction ID %p: Failed to post tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
363                                 tn, tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
364         }
365
366         return rc;
367 }
368
369 /**
370  * kfilnd_ep_post_send() - Post a send operation.
371  * @ep: KFI LND endpoint used to post the send operation.
372  * @tn: Transaction structure containing the buffer to be sent.
373  *
374  * The target of the send operation is based on the target LNet NID field within
375  * the transaction structure. A lookup of LNet NID to KFI address is performed.
376  *
377  * Return: On success, zero. Else, negative errno value.
378  */
379 int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
380 {
381         size_t len;
382         void *buf;
383         struct kfi_cq_err_entry fake_error = {
384                 .op_context = tn,
385                 .flags = KFI_MSG | KFI_SEND,
386                 .err = EIO,
387         };
388         int rc;
389
390         if (!ep || !tn)
391                 return -EINVAL;
392
393         buf = tn->tn_tx_msg.msg;
394         len = tn->tn_tx_msg.length;
395
396         /* Make sure the device is not being shut down */
397         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
398                 return -EINVAL;
399
400         /* Progress transaction to failure if send should fail. */
401         if (CFS_FAIL_CHECK_VALUE(CFS_KFI_FAIL_MSG_TYPE,
402                                  tn->tn_tx_msg.msg->type) ||
403             CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EVENT)) {
404                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
405                 if (!rc)
406                         return 0;
407         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND)) {
408                 return -EIO;
409         } else if (CFS_FAIL_CHECK_VALUE(CFS_KFI_FAIL_MSG_TYPE_EAGAIN,
410                                          tn->tn_tx_msg.msg->type) ||
411                    CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EAGAIN)) {
412                 return -EAGAIN;
413         }
414
415         rc = kfi_send(ep->end_tx, buf, len, NULL, tn->tn_target_addr, tn);
416         switch (rc) {
417         case 0:
418         case -EAGAIN:
419                 KFILND_EP_DEBUG(ep,
420                                 "Transaction ID %p: %s send of %lu bytes to peer 0x%llx: rc=%d",
421                                 tn, rc ? "Failed to post" : "Posted",
422                                 len, tn->tn_target_addr, rc);
423                 break;
424
425         default:
426                 KFILND_EP_ERROR(ep,
427                                 "Transaction ID %p: Failed to post send of %lu bytes to peer 0x%llx: rc=%d",
428                                 tn, len, tn->tn_target_addr, rc);
429         }
430
431         return rc;
432 }
433
434 /**
435  * kfilnd_ep_post_write() - Post a write operation.
436  * @ep: KFI LND endpoint used to post the write operation.
437  * @tn: Transaction structure containing the buffer to be read from.
438  *
439  * The target of the write operation is based on the target LNet NID field
440  * within the transaction structure. A lookup of LNet NID to KFI address is
441  * performed.
442  *
443  * The transaction cookie is used as the remote key for the target memory
444  * region.
445  *
446  * Return: On success, zero. Else, negative errno value.
447  */
448 int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
449 {
450         int rc;
451         struct kfi_cq_err_entry fake_error = {
452                 .op_context = tn,
453                 .flags = KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND,
454                 .err = EIO,
455         };
456         struct kfi_rma_iov rma_iov = {
457                 .len = tn->tn_nob,
458                 .key = gen_init_tag_bits(tn),
459         };
460         struct kfi_msg_rma rma = {
461                 .addr = tn->tn_target_addr,
462                 .rma_iov = &rma_iov,
463                 .rma_iov_count = 1,
464                 .context = tn,
465         };
466
467         if (!ep || !tn)
468                 return -EINVAL;
469
470         /* Make sure the device is not being shut down */
471         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
472                 return -EINVAL;
473
474         /* Progress transaction to failure if read should fail. */
475         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EVENT)) {
476                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
477                 if (!rc)
478                         return 0;
479         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE)) {
480                 return -EIO;
481         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EAGAIN)) {
482                 return -EAGAIN;
483         }
484
485         rma.iov_count = tn->tn_num_iovec;
486         rma.type = KFI_BVEC;
487         rma.msg_biov = tn->tn_kiov;
488
489         rc = kfi_writemsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
490         switch (rc) {
491         case 0:
492         case -EAGAIN:
493                 KFILND_EP_DEBUG(ep,
494                                 "Transaction ID %p: %s write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
495                                 tn, rc ? "Failed to post" : "Posted",
496                                 tn->tn_nob, tn->tn_num_iovec,
497                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
498                 break;
499
500         default:
501                 KFILND_EP_ERROR(ep,
502                                 "Transaction ID %p: Failed to post write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
503                                 tn, tn->tn_nob, tn->tn_num_iovec,
504                                 tn->tn_response_mr_key, tn->tn_target_addr,
505                                 rc);
506         }
507
508         return rc;
509 }
510
511 /**
512  * kfilnd_ep_post_read() - Post a read operation.
513  * @ep: KFI LND endpoint used to post the read operation.
514  * @tn: Transaction structure containing the buffer to be read into.
515  *
516  * The target of the read operation is based on the target LNet NID field within
517  * the transaction structure. A lookup of LNet NID to KFI address is performed.
518  *
519  * The transaction cookie is used as the remote key for the target memory
520  * region.
521  *
522  * Return: On success, zero. Else, negative errno value.
523  */
524 int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
525 {
526         int rc;
527         struct kfi_cq_err_entry fake_error = {
528                 .op_context = tn,
529                 .flags = KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND,
530                 .err = EIO,
531         };
532         struct kfi_rma_iov rma_iov = {
533                 .len = tn->tn_nob,
534                 .key = gen_init_tag_bits(tn),
535         };
536         struct kfi_msg_rma rma = {
537                 .addr = tn->tn_target_addr,
538                 .rma_iov = &rma_iov,
539                 .rma_iov_count = 1,
540                 .context = tn,
541         };
542
543         if (!ep || !tn)
544                 return -EINVAL;
545
546         /* Make sure the device is not being shut down */
547         if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
548                 return -EINVAL;
549
550         /* Progress transaction to failure if read should fail. */
551         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EVENT)) {
552                 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
553                 if (!rc)
554                         return 0;
555         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ)) {
556                 return -EIO;
557         } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EAGAIN)) {
558                 return -EAGAIN;
559         }
560
561         rma.iov_count = tn->tn_num_iovec;
562         rma.type = KFI_BVEC;
563         rma.msg_biov = tn->tn_kiov;
564
565         rc = kfi_readmsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
566         switch (rc) {
567         case 0:
568         case -EAGAIN:
569                 KFILND_EP_DEBUG(ep,
570                                 "Transaction ID %p: %s read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
571                                 tn, rc ? "Failed to post" : "Posted",
572                                 tn->tn_nob, tn->tn_num_iovec,
573                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
574                 break;
575
576         default:
577                 KFILND_EP_ERROR(ep,
578                                 "Transaction ID %p: Failed to post read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
579                                 tn, tn->tn_nob, tn->tn_num_iovec,
580                                 tn->tn_response_mr_key, tn->tn_target_addr, rc);
581         }
582
583         return rc;
584 }
585
586 void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep,
587                                struct kfilnd_transaction *tn)
588 {
589         unsigned long expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
590                 jiffies;
591
592         spin_lock(&ep->replay_lock);
593         list_add_tail(&tn->replay_entry, &ep->tn_replay);
594         atomic_inc(&ep->replay_count);
595         spin_unlock(&ep->replay_lock);
596
597         if (!timer_pending(&ep->replay_timer))
598                 mod_timer(&ep->replay_timer, expires);
599 }
600
601 void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep)
602 {
603         LIST_HEAD(tn_replay);
604         LIST_HEAD(imm_buf_replay);
605         struct kfilnd_transaction *tn_first;
606         struct kfilnd_transaction *tn_last;
607         struct kfilnd_immediate_buffer *buf_first;
608         struct kfilnd_immediate_buffer *buf_last;
609
610         /* Since the endpoint replay lists can be manipulated while
611          * attempting to do replays, the entire replay list is moved to a
612          * temporary list.
613          */
614         spin_lock(&ep->replay_lock);
615
616         tn_first = list_first_entry_or_null(&ep->tn_replay,
617                                             struct kfilnd_transaction,
618                                             replay_entry);
619         if (tn_first) {
620                 tn_last = list_last_entry(&ep->tn_replay,
621                                           struct kfilnd_transaction,
622                                           replay_entry);
623                 list_bulk_move_tail(&tn_replay, &tn_first->replay_entry,
624                                     &tn_last->replay_entry);
625                 LASSERT(list_empty(&ep->tn_replay));
626         }
627
628         buf_first = list_first_entry_or_null(&ep->imm_buffer_replay,
629                                              struct kfilnd_immediate_buffer,
630                                              replay_entry);
631         if (buf_first) {
632                 buf_last = list_last_entry(&ep->imm_buffer_replay,
633                                            struct kfilnd_immediate_buffer,
634                                            replay_entry);
635                 list_bulk_move_tail(&imm_buf_replay, &buf_first->replay_entry,
636                                     &buf_last->replay_entry);
637                 LASSERT(list_empty(&ep->imm_buffer_replay));
638         }
639
640         spin_unlock(&ep->replay_lock);
641
642         /* Replay all queued transactions. */
643         list_for_each_entry_safe(tn_first, tn_last, &tn_replay, replay_entry) {
644                 list_del(&tn_first->replay_entry);
645                 atomic_dec(&ep->replay_count);
646                 kfilnd_tn_event_handler(tn_first, tn_first->replay_event,
647                                         tn_first->replay_status);
648         }
649
650         list_for_each_entry_safe(buf_first, buf_last, &imm_buf_replay,
651                                  replay_entry) {
652                 list_del(&buf_first->replay_entry);
653                 atomic_dec(&ep->replay_count);
654                 kfilnd_ep_imm_buffer_put(buf_first);
655         }
656 }
657
658 static void kfilnd_ep_replay_work(struct work_struct *work)
659 {
660         struct kfilnd_ep *ep =
661                 container_of(work, struct kfilnd_ep, replay_work);
662
663         kfilnd_ep_flush_replay_queue(ep);
664 }
665
666 static void kfilnd_ep_replay_timer(cfs_timer_cb_arg_t data)
667 {
668         struct kfilnd_ep *ep = cfs_from_timer(ep, data, replay_timer);
669         unsigned int cpu =
670                 cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt));
671
672         queue_work_on(cpu, kfilnd_wq, &ep->replay_work);
673 }
674
675 #define KFILND_EP_ALLOC_SIZE \
676         (sizeof(struct kfilnd_ep) + \
677          (sizeof(struct kfilnd_immediate_buffer) * immediate_rx_buf_count))
678
679 /**
680  * kfilnd_ep_free() - Free a KFI LND endpoint.
681  * @ep: KFI LND endpoint to be freed.
682  *
683  * Safe to call on NULL or error pointer.
684  */
685 void kfilnd_ep_free(struct kfilnd_ep *ep)
686 {
687         int i;
688         int k = 2;
689
690         if (IS_ERR_OR_NULL(ep))
691                 return;
692
693         while (atomic_read(&ep->replay_count)) {
694                 k++;
695                 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
696                         "Waiting for replay count %d not zero\n",
697                         atomic_read(&ep->replay_count));
698                 schedule_timeout_uninterruptible(HZ);
699         }
700
701         /* Cancel any outstanding immediate receive buffers. */
702         kfilnd_ep_cancel_imm_buffers(ep);
703
704         /* Wait for RX buffers to no longer be used and then free them. */
705         for (i = 0; i < immediate_rx_buf_count; i++) {
706                 k = 2;
707                 while (atomic_read(&ep->end_immed_bufs[i].immed_ref)) {
708                         k++;
709                         CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
710                                "Waiting for RX buffer %d to release\n", i);
711                         schedule_timeout_uninterruptible(HZ);
712                 }
713         }
714
715         /* Wait for all transactions to complete. */
716         k = 2;
717         spin_lock(&ep->tn_list_lock);
718         while (!list_empty(&ep->tn_list)) {
719                 spin_unlock(&ep->tn_list_lock);
720                 k++;
721                 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
722                        "Waiting for transactions to complete\n");
723                 schedule_timeout_uninterruptible(HZ);
724                 spin_lock(&ep->tn_list_lock);
725         }
726         spin_unlock(&ep->tn_list_lock);
727
728         /* Free all immediate buffers. */
729         for (i = 0; i < immediate_rx_buf_count; i++)
730                 __free_pages(ep->end_immed_bufs[i].immed_buf_page,
731                              order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
732
733         kfi_close(&ep->end_tx->fid);
734         kfi_close(&ep->end_rx->fid);
735         kfilnd_cq_free(ep->end_tx_cq);
736         kfilnd_cq_free(ep->end_rx_cq);
737         ida_destroy(&ep->keys);
738         LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
739 }
740
741 /**
742  * kfilnd_ep_alloc() - Allocate a new KFI LND endpoint.
743  * @dev: KFI LND device used to allocate endpoints.
744  * @context_id: Context ID associated with the endpoint.
745  * @cpt: CPT KFI LND endpoint should be associated with.
746  *
747  * An KFI LND endpoint consists of unique transmit/receive command queues
748  * (contexts) and completion queues. The underlying completion queue interrupt
749  * vector is associated with a core within the CPT.
750  *
751  * Return: On success, valid pointer. Else, negative errno pointer.
752  */
753 struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev,
754                                   unsigned int context_id, unsigned int cpt,
755                                   size_t nrx, size_t rx_size)
756 {
757         int rc;
758         struct kfi_cq_attr cq_attr = {};
759         struct kfi_rx_attr rx_attr = {};
760         struct kfi_tx_attr tx_attr = {};
761         int ncpts;
762         size_t min_multi_recv = KFILND_IMMEDIATE_MSG_SIZE;
763         struct kfilnd_ep *ep;
764         int i;
765         size_t rx_buf_size;
766
767         if (!dev || !nrx || !rx_size) {
768                 rc = -EINVAL;
769                 goto err;
770         }
771
772         ncpts = dev->kfd_ni->ni_ncpts;
773
774         LIBCFS_CPT_ALLOC(ep, lnet_cpt_table(), cpt, KFILND_EP_ALLOC_SIZE);
775         if (!ep) {
776                 rc = -ENOMEM;
777                 goto err;
778         }
779
780         ep->end_dev = dev;
781         ep->end_cpt = cpt;
782         ep->end_context_id = context_id;
783         INIT_LIST_HEAD(&ep->tn_list);
784         spin_lock_init(&ep->tn_list_lock);
785         INIT_LIST_HEAD(&ep->tn_replay);
786         INIT_LIST_HEAD(&ep->imm_buffer_replay);
787         spin_lock_init(&ep->replay_lock);
788         cfs_timer_setup(&ep->replay_timer, kfilnd_ep_replay_timer,
789                         (unsigned long)ep, 0);
790         INIT_WORK(&ep->replay_work, kfilnd_ep_replay_work);
791         atomic_set(&ep->replay_count, 0);
792         ida_init(&ep->keys);
793
794         /* Create a CQ for this CPT */
795         cq_attr.flags = KFI_AFFINITY;
796         cq_attr.format = KFI_CQ_FORMAT_DATA;
797         cq_attr.wait_cond = KFI_CQ_COND_NONE;
798         cq_attr.wait_obj = KFI_WAIT_NONE;
799
800         /* Vector is set to first core in the CPT */
801         cq_attr.signaling_vector =
802                 cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), cpt));
803
804         cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
805                 rx_cq_scale_factor;
806         ep->end_rx_cq = kfilnd_cq_alloc(ep, &cq_attr);
807         if (IS_ERR(ep->end_rx_cq)) {
808                 rc = PTR_ERR(ep->end_rx_cq);
809                 CERROR("Failed to allocated KFILND RX CQ: rc=%d\n", rc);
810                 goto err_free_ep;
811         }
812
813         cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
814                 tx_cq_scale_factor;
815         ep->end_tx_cq = kfilnd_cq_alloc(ep, &cq_attr);
816         if (IS_ERR(ep->end_tx_cq)) {
817                 rc = PTR_ERR(ep->end_tx_cq);
818                 CERROR("Failed to allocated KFILND TX CQ: rc=%d\n", rc);
819                 goto err_free_rx_cq;
820         }
821
822         /* Initialize the RX/TX contexts for the given CPT */
823         rx_attr.op_flags = KFI_COMPLETION | KFI_MULTI_RECV;
824         rx_attr.msg_order = KFI_ORDER_NONE;
825         rx_attr.comp_order = KFI_ORDER_NONE;
826         rx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits +
827                 immediate_rx_buf_count;
828         rx_attr.iov_limit = LNET_MAX_IOV;
829         rc = kfi_rx_context(dev->kfd_sep, context_id, &rx_attr, &ep->end_rx,
830                             ep);
831         if (rc) {
832                 CERROR("Could not create RX context on CPT %d, rc = %d\n", cpt,
833                        rc);
834                 goto err_free_tx_cq;
835         }
836
837         /* Set the lower limit for multi-receive buffers */
838         rc = kfi_setopt(&ep->end_rx->fid, KFI_OPT_ENDPOINT,
839                         KFI_OPT_MIN_MULTI_RECV, &min_multi_recv,
840                         sizeof(min_multi_recv));
841         if (rc) {
842                 CERROR("Could not set min_multi_recv on CPT %d, rc = %d\n", cpt,
843                        rc);
844                 goto err_free_rx_context;
845         }
846
847         tx_attr.op_flags = KFI_COMPLETION | KFI_TRANSMIT_COMPLETE;
848         tx_attr.msg_order = KFI_ORDER_NONE;
849         tx_attr.comp_order = KFI_ORDER_NONE;
850         tx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
851                 tx_scale_factor;
852         tx_attr.iov_limit = LNET_MAX_IOV;
853         tx_attr.rma_iov_limit = LNET_MAX_IOV;
854         tx_attr.tclass =
855                 dev->kfd_ni->ni_lnd_tunables.lnd_tun_u.lnd_kfi.lnd_traffic_class;
856         rc = kfi_tx_context(dev->kfd_sep, context_id, &tx_attr, &ep->end_tx,
857                             ep);
858         if (rc) {
859                 CERROR("Could not create TX context on CPT %d, rc = %d\n", cpt,
860                        rc);
861                 goto err_free_rx_context;
862         }
863
864         /* Bind these two contexts to the CPT's CQ */
865         rc = kfi_ep_bind(ep->end_rx, &ep->end_rx_cq->cq->fid, 0);
866         if (rc) {
867                 CERROR("Could not bind RX context on CPT %d, rc = %d\n", cpt,
868                        rc);
869                 goto err_free_tx_context;
870         }
871
872         rc = kfi_ep_bind(ep->end_tx, &ep->end_tx_cq->cq->fid, 0);
873         if (rc) {
874                 CERROR("Could not bind TX context on CPT %d, rc = %d\n", cpt,
875                        rc);
876                 goto err_free_tx_context;
877         }
878
879         /* Enable both endpoints */
880         rc = kfi_enable(ep->end_rx);
881         if (rc) {
882                 CERROR("Could not enable RX context on CPT %d, rc = %d\n", cpt,
883                        rc);
884                 goto err_free_tx_context;
885         }
886
887         rc = kfi_enable(ep->end_tx);
888         if (rc) {
889                 CERROR("Could not enable TX context on CPT %d, rc=%d\n", cpt,
890                        rc);
891                 goto err_free_tx_context;
892         }
893
894         /* The nrx value is the max number of immediate messages any one peer
895          * can send us.  Given that compute nodes are RPC-based, we should not
896          * see any more incoming messages than we are able to send.  A such, nrx
897          * is a good size for each multi-receive buffer.  However, if we are
898          * a server or LNet router, we need a multiplier of this value. For
899          * now, we will just have nrx drive the buffer size per CPT.  Then,
900          * LNet routers and servers can just define more CPTs to get a better
901          * spread of buffers to receive messages from multiple peers.  A better
902          * way should be devised in the future.
903          */
904         rx_buf_size = roundup_pow_of_two(max(nrx * rx_size, PAGE_SIZE));
905
906         for (i = 0; i < immediate_rx_buf_count; i++) {
907
908                 /* Using physically contiguous allocations can allow for
909                  * underlying kfabric providers to use untranslated addressing
910                  * instead of having to setup NIC memory mappings. This
911                  * typically leads to improved performance.
912                  */
913                 ep->end_immed_bufs[i].immed_buf_page =
914                         alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
915                                          GFP_KERNEL | __GFP_NOWARN,
916                                          order_base_2(rx_buf_size / PAGE_SIZE));
917                 if (!ep->end_immed_bufs[i].immed_buf_page) {
918                         rc = -ENOMEM;
919                         goto err_free_rx_buffers;
920                 }
921
922                 atomic_set(&ep->end_immed_bufs[i].immed_ref, 0);
923                 ep->end_immed_bufs[i].immed_buf =
924                         page_address(ep->end_immed_bufs[i].immed_buf_page);
925                 ep->end_immed_bufs[i].immed_buf_size = rx_buf_size;
926                 ep->end_immed_bufs[i].immed_end = ep;
927         }
928
929         return ep;
930
931 err_free_rx_buffers:
932         for (i = 0; i < immediate_rx_buf_count; i++) {
933                 if (ep->end_immed_bufs[i].immed_buf_page)
934                         __free_pages(ep->end_immed_bufs[i].immed_buf_page,
935                                      order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
936         }
937
938 err_free_tx_context:
939         kfi_close(&ep->end_tx->fid);
940 err_free_rx_context:
941         kfi_close(&ep->end_rx->fid);
942 err_free_tx_cq:
943         kfilnd_cq_free(ep->end_tx_cq);
944 err_free_rx_cq:
945         kfilnd_cq_free(ep->end_rx_cq);
946 err_free_ep:
947         LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
948 err:
949         return ERR_PTR(rc);
950 }
951
952 int kfilnd_ep_get_key(struct kfilnd_ep *ep)
953 {
954         return ida_simple_get(&ep->keys, 1, KFILND_EP_KEY_MAX, GFP_KERNEL);
955 }
956
957 void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key)
958 {
959         ida_simple_remove(&ep->keys, key);
960 }