4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright 2022 Hewlett Packard Enterprise Development LP
26 * This file is part of Lustre, http://www.lustre.org/
29 * kfilnd endpoint implementation.
31 #include "kfilnd_ep.h"
32 #include "kfilnd_dev.h"
33 #include "kfilnd_tn.h"
34 #include "kfilnd_cq.h"
37 * kfilnd_ep_post_recv() - Post a single receive buffer.
38 * @ep: KFI LND endpoint to have receive buffers posted on.
39 * @buf: Receive buffer to be posted.
41 * Return: On succes, zero. Else, negative errno.
43 static int kfilnd_ep_post_recv(struct kfilnd_ep *ep,
44 struct kfilnd_immediate_buffer *buf)
51 if (buf->immed_no_repost)
54 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV))
56 else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_RECV_EAGAIN))
59 atomic_inc(&buf->immed_ref);
60 rc = kfi_recv(ep->end_rx, buf->immed_buf, buf->immed_buf_size, NULL,
61 KFI_ADDR_UNSPEC, buf);
63 atomic_dec(&buf->immed_ref);
68 #define KFILND_EP_REPLAY_TIMER_MSEC (100U)
71 * kfilnd_ep_imm_buffer_put() - Decrement the immediate buffer count reference
73 * @buf: Immediate buffer to have reference count decremented.
75 * If the immediate buffer's reference count reaches zero, the buffer will
76 * automatically be reposted.
78 void kfilnd_ep_imm_buffer_put(struct kfilnd_immediate_buffer *buf)
80 unsigned long expires;
86 if (atomic_sub_return(1, &buf->immed_ref) != 0)
89 rc = kfilnd_ep_post_recv(buf->immed_end, buf);
94 /* Return the buffer reference and queue the immediate buffer put to be
98 expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
100 atomic_inc(&buf->immed_ref);
102 spin_lock(&buf->immed_end->replay_lock);
103 list_add_tail(&buf->replay_entry,
104 &buf->immed_end->imm_buffer_replay);
105 atomic_inc(&buf->immed_end->replay_count);
106 spin_unlock(&buf->immed_end->replay_lock);
108 if (!timer_pending(&buf->immed_end->replay_timer))
109 mod_timer(&buf->immed_end->replay_timer, expires);
112 /* Unexpected error resulting in immediate buffer not being able to be
113 * posted. Since immediate buffers are used to sink incoming messages,
114 * failure to post immediate buffers means failure to communicate.
116 * TODO: Prevent LNet NI from doing sends/recvs?
119 KFILND_EP_ERROR(buf->immed_end,
120 "Failed to post immediate receive buffer: rc=%d",
126 * kfilnd_ep_post_imm_buffers() - Post all immediate receive buffers.
127 * @ep: KFI LND endpoint to have receive buffers posted on.
129 * This function should be called only during KFI LND device initialization.
131 * Return: On success, zero. Else, negative errno.
133 int kfilnd_ep_post_imm_buffers(struct kfilnd_ep *ep)
141 for (i = 0; i < immediate_rx_buf_count; i++) {
142 rc = kfilnd_ep_post_recv(ep, &ep->end_immed_bufs[i]);
152 * kfilnd_ep_cancel_imm_buffers() - Cancel all immediate receive buffers.
153 * @ep: KFI LND endpoint to have receive buffers canceled.
155 void kfilnd_ep_cancel_imm_buffers(struct kfilnd_ep *ep)
162 for (i = 0; i < immediate_rx_buf_count; i++) {
163 ep->end_immed_bufs[i].immed_no_repost = true;
165 /* Since this is called during LNet NI teardown, no need to
166 * pipeline retries. Just spin until -EAGAIN is not returned.
168 while (kfi_cancel(&ep->end_rx->fid, &ep->end_immed_bufs[i]) ==
174 static void kfilnd_ep_err_fail_loc_work(struct work_struct *work)
176 struct kfilnd_ep_err_fail_loc_work *err =
177 container_of(work, struct kfilnd_ep_err_fail_loc_work, work);
179 kfilnd_cq_process_error(err->ep, &err->err);
183 static int kfilnd_ep_gen_fake_err(struct kfilnd_ep *ep,
184 const struct kfi_cq_err_entry *err)
186 struct kfilnd_ep_err_fail_loc_work *fake_err;
188 fake_err = kmalloc(sizeof(*fake_err), GFP_KERNEL);
193 fake_err->err = *err;
194 INIT_WORK(&fake_err->work, kfilnd_ep_err_fail_loc_work);
195 queue_work(kfilnd_wq, &fake_err->work);
200 static uint64_t gen_init_tag_bits(struct kfilnd_transaction *tn)
202 return (tn->peer->remote_session_key << KFILND_EP_KEY_BITS) |
203 tn->tn_response_mr_key;
207 * kfilnd_ep_post_tagged_send() - Post a tagged send operation.
208 * @ep: KFI LND endpoint used to post the tagged receivce operation.
209 * @tn: Transaction structure containing the send buffer to be posted.
211 * The tag for the post tagged send operation is the response memory region key
212 * associated with the transaction.
214 * Return: On success, zero. Else, negative errno value.
216 int kfilnd_ep_post_tagged_send(struct kfilnd_ep *ep,
217 struct kfilnd_transaction *tn)
219 struct kfi_cq_err_entry fake_error = {
221 .flags = KFI_TAGGED | KFI_SEND,
229 /* Make sure the device is not being shut down */
230 if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
233 /* Progress transaction to failure if send should fail. */
234 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EVENT)) {
235 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
238 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND)) {
240 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_SEND_EAGAIN)) {
244 rc = kfi_tsenddata(ep->end_tx, NULL, 0, NULL, tn->tagged_data,
245 tn->tn_target_addr, gen_init_tag_bits(tn), tn);
250 "Transaction ID %p: %s tagged send of with tag 0x%x to peer 0x%llx: rc=%d",
251 tn, rc ? "Failed to post" : "Posted",
252 tn->tn_response_mr_key, tn->tn_target_addr, rc);
257 "Transaction ID %p: Failed to post tagged send with tag 0x%x to peer 0x%llx: rc=%d",
258 tn, tn->tn_response_mr_key,
259 tn->tn_target_addr, rc);
266 * kfilnd_ep_cancel_tagged_recv() - Cancel a tagged recv.
267 * @ep: KFI LND endpoint used to cancel the tagged receivce operation.
268 * @tn: Transaction structure containing the receive buffer to be cancelled.
270 * The tagged receive buffer context pointer is used to cancel a tagged receive
271 * operation. The context pointer is always the transaction pointer.
273 * Return: 0 on success. -ENOENT if the tagged receive buffer is not found. The
274 * tagged receive buffer may not be found due to a tagged send operation already
275 * landing or the tagged receive buffer never being posted. Negative errno value
278 int kfilnd_ep_cancel_tagged_recv(struct kfilnd_ep *ep,
279 struct kfilnd_transaction *tn)
284 /* Make sure the device is not being shut down */
285 if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
288 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_CANCEL_EAGAIN))
291 /* The async event count is not decremented for a cancel operation since
292 * it was incremented for the post tagged receive.
294 return kfi_cancel(&ep->end_rx->fid, tn);
297 static uint64_t gen_target_tag_bits(struct kfilnd_transaction *tn)
299 return (tn->peer->local_session_key << KFILND_EP_KEY_BITS) |
304 * kfilnd_ep_post_tagged_recv() - Post a tagged receive operation.
305 * @ep: KFI LND endpoint used to post the tagged receivce operation.
306 * @tn: Transaction structure containing the receive buffer to be posted.
308 * The tag for the post tagged receive operation is the memory region key
309 * associated with the transaction.
311 * Return: On success, zero. Else, negative errno value.
313 int kfilnd_ep_post_tagged_recv(struct kfilnd_ep *ep,
314 struct kfilnd_transaction *tn)
316 struct kfi_msg_tagged msg = {
317 .tag = gen_target_tag_bits(tn),
319 .addr = tn->peer->addr,
321 struct kfi_cq_err_entry fake_error = {
323 .flags = KFI_TAGGED | KFI_RECV,
331 /* Make sure the device is not being shut down */
332 if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
335 /* Progress transaction to failure if send should fail. */
336 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EVENT)) {
337 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
340 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV)) {
342 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_TAGGED_RECV_EAGAIN)) {
346 msg.iov_count = tn->tn_num_iovec;
348 msg.msg_biov = tn->tn_kiov;
350 rc = kfi_trecvmsg(ep->end_rx, &msg, KFI_COMPLETION);
355 "Transaction ID %p: %s tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
356 tn, rc ? "Failed to post" : "Posted",
357 tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
362 "Transaction ID %p: Failed to post tagged recv of %u bytes (%u frags) with tag 0x%llx: rc=%d",
363 tn, tn->tn_nob, tn->tn_num_iovec, msg.tag, rc);
370 * kfilnd_ep_post_send() - Post a send operation.
371 * @ep: KFI LND endpoint used to post the send operation.
372 * @tn: Transaction structure containing the buffer to be sent.
374 * The target of the send operation is based on the target LNet NID field within
375 * the transaction structure. A lookup of LNet NID to KFI address is performed.
377 * Return: On success, zero. Else, negative errno value.
379 int kfilnd_ep_post_send(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
383 struct kfi_cq_err_entry fake_error = {
385 .flags = KFI_MSG | KFI_SEND,
393 buf = tn->tn_tx_msg.msg;
394 len = tn->tn_tx_msg.length;
396 /* Make sure the device is not being shut down */
397 if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
400 /* Progress transaction to failure if send should fail. */
401 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EVENT)) {
402 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
405 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND)) {
407 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_SEND_EAGAIN)) {
411 rc = kfi_send(ep->end_tx, buf, len, NULL, tn->tn_target_addr, tn);
416 "Transaction ID %p: %s send of %lu bytes to peer 0x%llx: rc=%d",
417 tn, rc ? "Failed to post" : "Posted",
418 len, tn->tn_target_addr, rc);
423 "Transaction ID %p: Failed to post send of %lu bytes to peer 0x%llx: rc=%d",
424 tn, len, tn->tn_target_addr, rc);
431 * kfilnd_ep_post_write() - Post a write operation.
432 * @ep: KFI LND endpoint used to post the write operation.
433 * @tn: Transaction structure containing the buffer to be read from.
435 * The target of the write operation is based on the target LNet NID field
436 * within the transaction structure. A lookup of LNet NID to KFI address is
439 * The transaction cookie is used as the remote key for the target memory
442 * Return: On success, zero. Else, negative errno value.
444 int kfilnd_ep_post_write(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
447 struct kfi_cq_err_entry fake_error = {
449 .flags = KFI_TAGGED | KFI_RMA | KFI_WRITE | KFI_SEND,
452 struct kfi_rma_iov rma_iov = {
454 .key = gen_init_tag_bits(tn),
456 struct kfi_msg_rma rma = {
457 .addr = tn->tn_target_addr,
466 /* Make sure the device is not being shut down */
467 if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
470 /* Progress transaction to failure if read should fail. */
471 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EVENT)) {
472 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
475 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE)) {
477 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_WRITE_EAGAIN)) {
481 rma.iov_count = tn->tn_num_iovec;
483 rma.msg_biov = tn->tn_kiov;
485 rc = kfi_writemsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
490 "Transaction ID %p: %s write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
491 tn, rc ? "Failed to post" : "Posted",
492 tn->tn_nob, tn->tn_num_iovec,
493 tn->tn_response_mr_key, tn->tn_target_addr, rc);
498 "Transaction ID %p: Failed to post write of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
499 tn, tn->tn_nob, tn->tn_num_iovec,
500 tn->tn_response_mr_key, tn->tn_target_addr,
508 * kfilnd_ep_post_read() - Post a read operation.
509 * @ep: KFI LND endpoint used to post the read operation.
510 * @tn: Transaction structure containing the buffer to be read into.
512 * The target of the read operation is based on the target LNet NID field within
513 * the transaction structure. A lookup of LNet NID to KFI address is performed.
515 * The transaction cookie is used as the remote key for the target memory
518 * Return: On success, zero. Else, negative errno value.
520 int kfilnd_ep_post_read(struct kfilnd_ep *ep, struct kfilnd_transaction *tn)
523 struct kfi_cq_err_entry fake_error = {
525 .flags = KFI_TAGGED | KFI_RMA | KFI_READ | KFI_SEND,
528 struct kfi_rma_iov rma_iov = {
530 .key = gen_init_tag_bits(tn),
532 struct kfi_msg_rma rma = {
533 .addr = tn->tn_target_addr,
542 /* Make sure the device is not being shut down */
543 if (ep->end_dev->kfd_state != KFILND_STATE_INITIALIZED)
546 /* Progress transaction to failure if read should fail. */
547 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EVENT)) {
548 rc = kfilnd_ep_gen_fake_err(ep, &fake_error);
551 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ)) {
553 } else if (CFS_FAIL_CHECK(CFS_KFI_FAIL_READ_EAGAIN)) {
557 rma.iov_count = tn->tn_num_iovec;
559 rma.msg_biov = tn->tn_kiov;
561 rc = kfi_readmsg(ep->end_tx, &rma, KFI_TAGGED | KFI_COMPLETION);
566 "Transaction ID %p: %s read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
567 tn, rc ? "Failed to post" : "Posted",
568 tn->tn_nob, tn->tn_num_iovec,
569 tn->tn_response_mr_key, tn->tn_target_addr, rc);
574 "Transaction ID %p: Failed to post read of %u bytes in %u frags with key 0x%x to peer 0x%llx: rc=%d",
575 tn, tn->tn_nob, tn->tn_num_iovec,
576 tn->tn_response_mr_key, tn->tn_target_addr, rc);
582 void kfilnd_ep_queue_tn_replay(struct kfilnd_ep *ep,
583 struct kfilnd_transaction *tn)
585 unsigned long expires = msecs_to_jiffies(KFILND_EP_REPLAY_TIMER_MSEC) +
588 spin_lock(&ep->replay_lock);
589 list_add_tail(&tn->replay_entry, &ep->tn_replay);
590 atomic_inc(&ep->replay_count);
591 spin_unlock(&ep->replay_lock);
593 if (!timer_pending(&ep->replay_timer))
594 mod_timer(&ep->replay_timer, expires);
597 void kfilnd_ep_flush_replay_queue(struct kfilnd_ep *ep)
599 LIST_HEAD(tn_replay);
600 LIST_HEAD(imm_buf_replay);
601 struct kfilnd_transaction *tn_first;
602 struct kfilnd_transaction *tn_last;
603 struct kfilnd_immediate_buffer *buf_first;
604 struct kfilnd_immediate_buffer *buf_last;
606 /* Since the endpoint replay lists can be manipulated while
607 * attempting to do replays, the entire replay list is moved to a
610 spin_lock(&ep->replay_lock);
612 tn_first = list_first_entry_or_null(&ep->tn_replay,
613 struct kfilnd_transaction,
616 tn_last = list_last_entry(&ep->tn_replay,
617 struct kfilnd_transaction,
619 list_bulk_move_tail(&tn_replay, &tn_first->replay_entry,
620 &tn_last->replay_entry);
621 LASSERT(list_empty(&ep->tn_replay));
624 buf_first = list_first_entry_or_null(&ep->imm_buffer_replay,
625 struct kfilnd_immediate_buffer,
628 buf_last = list_last_entry(&ep->imm_buffer_replay,
629 struct kfilnd_immediate_buffer,
631 list_bulk_move_tail(&imm_buf_replay, &buf_first->replay_entry,
632 &buf_last->replay_entry);
633 LASSERT(list_empty(&ep->imm_buffer_replay));
636 spin_unlock(&ep->replay_lock);
638 /* Replay all queued transactions. */
639 list_for_each_entry_safe(tn_first, tn_last, &tn_replay, replay_entry) {
640 list_del(&tn_first->replay_entry);
641 atomic_dec(&ep->replay_count);
642 kfilnd_tn_event_handler(tn_first, tn_first->replay_event,
643 tn_first->replay_status);
646 list_for_each_entry_safe(buf_first, buf_last, &imm_buf_replay,
648 list_del(&buf_first->replay_entry);
649 atomic_dec(&ep->replay_count);
650 kfilnd_ep_imm_buffer_put(buf_first);
654 static void kfilnd_ep_replay_work(struct work_struct *work)
656 struct kfilnd_ep *ep =
657 container_of(work, struct kfilnd_ep, replay_work);
659 kfilnd_ep_flush_replay_queue(ep);
662 static void kfilnd_ep_replay_timer(cfs_timer_cb_arg_t data)
664 struct kfilnd_ep *ep = cfs_from_timer(ep, data, replay_timer);
666 cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), ep->end_cpt));
668 queue_work_on(cpu, kfilnd_wq, &ep->replay_work);
671 #define KFILND_EP_ALLOC_SIZE \
672 (sizeof(struct kfilnd_ep) + \
673 (sizeof(struct kfilnd_immediate_buffer) * immediate_rx_buf_count))
676 * kfilnd_ep_free() - Free a KFI LND endpoint.
677 * @ep: KFI LND endpoint to be freed.
679 * Safe to call on NULL or error pointer.
681 void kfilnd_ep_free(struct kfilnd_ep *ep)
686 if (IS_ERR_OR_NULL(ep))
689 while (atomic_read(&ep->replay_count)) {
691 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
692 "Waiting for replay count %d not zero\n",
693 atomic_read(&ep->replay_count));
694 schedule_timeout_uninterruptible(HZ);
697 /* Cancel any outstanding immediate receive buffers. */
698 kfilnd_ep_cancel_imm_buffers(ep);
700 /* Wait for RX buffers to no longer be used and then free them. */
701 for (i = 0; i < immediate_rx_buf_count; i++) {
703 while (atomic_read(&ep->end_immed_bufs[i].immed_ref)) {
705 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
706 "Waiting for RX buffer %d to release\n", i);
707 schedule_timeout_uninterruptible(HZ);
711 /* Wait for all transactions to complete. */
713 spin_lock(&ep->tn_list_lock);
714 while (!list_empty(&ep->tn_list)) {
715 spin_unlock(&ep->tn_list_lock);
717 CDEBUG(((k & (-k)) == k) ? D_WARNING : D_NET,
718 "Waiting for transactions to complete\n");
719 schedule_timeout_uninterruptible(HZ);
720 spin_lock(&ep->tn_list_lock);
722 spin_unlock(&ep->tn_list_lock);
724 /* Free all immediate buffers. */
725 for (i = 0; i < immediate_rx_buf_count; i++)
726 __free_pages(ep->end_immed_bufs[i].immed_buf_page,
727 order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
729 kfi_close(&ep->end_tx->fid);
730 kfi_close(&ep->end_rx->fid);
731 kfilnd_cq_free(ep->end_tx_cq);
732 kfilnd_cq_free(ep->end_rx_cq);
733 ida_destroy(&ep->keys);
734 LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
738 * kfilnd_ep_alloc() - Allocate a new KFI LND endpoint.
739 * @dev: KFI LND device used to allocate endpoints.
740 * @context_id: Context ID associated with the endpoint.
741 * @cpt: CPT KFI LND endpoint should be associated with.
743 * An KFI LND endpoint consists of unique transmit/receive command queues
744 * (contexts) and completion queues. The underlying completion queue interrupt
745 * vector is associated with a core within the CPT.
747 * Return: On success, valid pointer. Else, negative errno pointer.
749 struct kfilnd_ep *kfilnd_ep_alloc(struct kfilnd_dev *dev,
750 unsigned int context_id, unsigned int cpt,
751 size_t nrx, size_t rx_size)
754 struct kfi_cq_attr cq_attr = {};
755 struct kfi_rx_attr rx_attr = {};
756 struct kfi_tx_attr tx_attr = {};
758 size_t min_multi_recv = KFILND_IMMEDIATE_MSG_SIZE;
759 struct kfilnd_ep *ep;
763 if (!dev || !nrx || !rx_size) {
768 ncpts = dev->kfd_ni->ni_ncpts;
770 LIBCFS_CPT_ALLOC(ep, lnet_cpt_table(), cpt, KFILND_EP_ALLOC_SIZE);
778 ep->end_context_id = context_id;
779 INIT_LIST_HEAD(&ep->tn_list);
780 spin_lock_init(&ep->tn_list_lock);
781 INIT_LIST_HEAD(&ep->tn_replay);
782 INIT_LIST_HEAD(&ep->imm_buffer_replay);
783 spin_lock_init(&ep->replay_lock);
784 cfs_timer_setup(&ep->replay_timer, kfilnd_ep_replay_timer,
785 (unsigned long)ep, 0);
786 INIT_WORK(&ep->replay_work, kfilnd_ep_replay_work);
787 atomic_set(&ep->replay_count, 0);
790 /* Create a CQ for this CPT */
791 cq_attr.flags = KFI_AFFINITY;
792 cq_attr.format = KFI_CQ_FORMAT_DATA;
793 cq_attr.wait_cond = KFI_CQ_COND_NONE;
794 cq_attr.wait_obj = KFI_WAIT_NONE;
796 /* Vector is set to first core in the CPT */
797 cq_attr.signaling_vector =
798 cpumask_first(*cfs_cpt_cpumask(lnet_cpt_table(), cpt));
800 cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
802 ep->end_rx_cq = kfilnd_cq_alloc(ep, &cq_attr);
803 if (IS_ERR(ep->end_rx_cq)) {
804 rc = PTR_ERR(ep->end_rx_cq);
805 CERROR("Failed to allocated KFILND RX CQ: rc=%d\n", rc);
809 cq_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
811 ep->end_tx_cq = kfilnd_cq_alloc(ep, &cq_attr);
812 if (IS_ERR(ep->end_tx_cq)) {
813 rc = PTR_ERR(ep->end_tx_cq);
814 CERROR("Failed to allocated KFILND TX CQ: rc=%d\n", rc);
818 /* Initialize the RX/TX contexts for the given CPT */
819 rx_attr.op_flags = KFI_COMPLETION | KFI_MULTI_RECV;
820 rx_attr.msg_order = KFI_ORDER_NONE;
821 rx_attr.comp_order = KFI_ORDER_NONE;
822 rx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits +
823 immediate_rx_buf_count;
824 rx_attr.iov_limit = LNET_MAX_IOV;
825 rc = kfi_rx_context(dev->kfd_sep, context_id, &rx_attr, &ep->end_rx,
828 CERROR("Could not create RX context on CPT %d, rc = %d\n", cpt,
833 /* Set the lower limit for multi-receive buffers */
834 rc = kfi_setopt(&ep->end_rx->fid, KFI_OPT_ENDPOINT,
835 KFI_OPT_MIN_MULTI_RECV, &min_multi_recv,
836 sizeof(min_multi_recv));
838 CERROR("Could not set min_multi_recv on CPT %d, rc = %d\n", cpt,
840 goto err_free_rx_context;
843 tx_attr.op_flags = KFI_COMPLETION | KFI_TRANSMIT_COMPLETE;
844 tx_attr.msg_order = KFI_ORDER_NONE;
845 tx_attr.comp_order = KFI_ORDER_NONE;
846 tx_attr.size = dev->kfd_ni->ni_net->net_tunables.lct_max_tx_credits *
848 tx_attr.iov_limit = LNET_MAX_IOV;
849 tx_attr.rma_iov_limit = LNET_MAX_IOV;
850 rc = kfi_tx_context(dev->kfd_sep, context_id, &tx_attr, &ep->end_tx,
853 CERROR("Could not create TX context on CPT %d, rc = %d\n", cpt,
855 goto err_free_rx_context;
858 /* Bind these two contexts to the CPT's CQ */
859 rc = kfi_ep_bind(ep->end_rx, &ep->end_rx_cq->cq->fid, 0);
861 CERROR("Could not bind RX context on CPT %d, rc = %d\n", cpt,
863 goto err_free_tx_context;
866 rc = kfi_ep_bind(ep->end_tx, &ep->end_tx_cq->cq->fid, 0);
868 CERROR("Could not bind TX context on CPT %d, rc = %d\n", cpt,
870 goto err_free_tx_context;
873 /* Enable both endpoints */
874 rc = kfi_enable(ep->end_rx);
876 CERROR("Could not enable RX context on CPT %d, rc = %d\n", cpt,
878 goto err_free_tx_context;
881 rc = kfi_enable(ep->end_tx);
883 CERROR("Could not enable TX context on CPT %d, rc=%d\n", cpt,
885 goto err_free_tx_context;
888 /* The nrx value is the max number of immediate messages any one peer
889 * can send us. Given that compute nodes are RPC-based, we should not
890 * see any more incoming messages than we are able to send. A such, nrx
891 * is a good size for each multi-receive buffer. However, if we are
892 * a server or LNet router, we need a multiplier of this value. For
893 * now, we will just have nrx drive the buffer size per CPT. Then,
894 * LNet routers and servers can just define more CPTs to get a better
895 * spread of buffers to receive messages from multiple peers. A better
896 * way should be devised in the future.
898 rx_buf_size = roundup_pow_of_two(max(nrx * rx_size, PAGE_SIZE));
900 for (i = 0; i < immediate_rx_buf_count; i++) {
902 /* Using physically contiguous allocations can allow for
903 * underlying kfabric providers to use untranslated addressing
904 * instead of having to setup NIC memory mappings. This
905 * typically leads to improved performance.
907 ep->end_immed_bufs[i].immed_buf_page =
908 alloc_pages_node(cfs_cpt_spread_node(lnet_cpt_table(), cpt),
909 GFP_KERNEL | __GFP_NOWARN,
910 order_base_2(rx_buf_size / PAGE_SIZE));
911 if (!ep->end_immed_bufs[i].immed_buf_page) {
913 goto err_free_rx_buffers;
916 atomic_set(&ep->end_immed_bufs[i].immed_ref, 0);
917 ep->end_immed_bufs[i].immed_buf =
918 page_address(ep->end_immed_bufs[i].immed_buf_page);
919 ep->end_immed_bufs[i].immed_buf_size = rx_buf_size;
920 ep->end_immed_bufs[i].immed_end = ep;
926 for (i = 0; i < immediate_rx_buf_count; i++) {
927 if (ep->end_immed_bufs[i].immed_buf_page)
928 __free_pages(ep->end_immed_bufs[i].immed_buf_page,
929 order_base_2(ep->end_immed_bufs[i].immed_buf_size / PAGE_SIZE));
933 kfi_close(&ep->end_tx->fid);
935 kfi_close(&ep->end_rx->fid);
937 kfilnd_cq_free(ep->end_tx_cq);
939 kfilnd_cq_free(ep->end_rx_cq);
941 LIBCFS_FREE(ep, KFILND_EP_ALLOC_SIZE);
946 int kfilnd_ep_get_key(struct kfilnd_ep *ep)
948 return ida_simple_get(&ep->keys, 1, KFILND_EP_KEY_MAX, GFP_KERNEL);
951 void kfilnd_ep_put_key(struct kfilnd_ep *ep, unsigned int key)
953 ida_simple_remove(&ep->keys, key);