4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright 2022 Hewlett Packard Enterprise Development LP
26 * This file is part of Lustre, http://www.lustre.org/
29 * kfilnd transaction and state machine processing.
32 #include "kfilnd_tn.h"
33 #include "kfilnd_ep.h"
34 #include "kfilnd_dev.h"
35 #include "kfilnd_dom.h"
36 #include "kfilnd_peer.h"
37 #include <asm/checksum.h>
39 static struct kmem_cache *tn_cache;
40 static struct kmem_cache *imm_buf_cache;
42 static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
45 return csum_fold(csum_partial(ptr, nob, 0));
49 static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
51 const int hdr_size = offsetof(struct kfilnd_msg, proto);
54 case KFILND_MSG_IMMEDIATE:
55 return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
57 case KFILND_MSG_BULK_PUT_REQ:
58 case KFILND_MSG_BULK_GET_REQ:
59 return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
66 static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
68 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
70 /* Pack the protocol header and payload. */
71 msg->proto.hello.version = KFILND_MSG_VERSION;
72 msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
73 msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
75 /* TODO: Support multiple RX contexts per peer. */
76 msg->proto.hello.rx_count = 1;
78 /* Pack the transport header. */
79 msg->magic = KFILND_MSG_MAGIC;
81 /* Mesage version zero is only valid for hello requests. */
83 msg->type = KFILND_MSG_HELLO_REQ;
84 msg->nob = sizeof(struct kfilnd_hello_msg) +
85 offsetof(struct kfilnd_msg, proto);
86 msg->cksum = NO_CHECKSUM;
87 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
88 msg->dstnid = tn->tn_kp->kp_nid;
90 /* Checksum entire message. */
91 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
93 tn->tn_tx_msg.length = msg->nob;
96 static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
98 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
100 /* Pack the protocol header and payload. */
101 msg->proto.hello.version = tn->tn_kp->kp_version;
102 msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
103 msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
105 /* TODO: Support multiple RX contexts per peer. */
106 msg->proto.hello.rx_count = 1;
108 /* Pack the transport header. */
109 msg->magic = KFILND_MSG_MAGIC;
111 /* Mesage version zero is only valid for hello requests. */
113 msg->type = KFILND_MSG_HELLO_RSP;
114 msg->nob = sizeof(struct kfilnd_hello_msg) +
115 offsetof(struct kfilnd_msg, proto);
116 msg->cksum = NO_CHECKSUM;
117 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
118 msg->dstnid = tn->tn_kp->kp_nid;
120 /* Checksum entire message. */
121 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
123 tn->tn_tx_msg.length = msg->nob;
126 static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
128 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
130 /* Pack the protocol header and payload. */
131 lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
132 msg->proto.bulk_req.key = tn->tn_mr_key;
133 msg->proto.bulk_req.response_rx = tn->tn_response_rx;
135 /* Pack the transport header. */
136 msg->magic = KFILND_MSG_MAGIC;
137 msg->version = KFILND_MSG_VERSION;
138 msg->type = tn->msg_type;
139 msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
140 offsetof(struct kfilnd_msg, proto);
141 msg->cksum = NO_CHECKSUM;
142 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
143 msg->dstnid = tn->tn_kp->kp_nid;
145 /* Checksum entire message. */
146 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
148 tn->tn_tx_msg.length = msg->nob;
151 static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
153 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
155 /* Pack the protocol header and payload. */
156 lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
158 lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
160 offsetof(struct kfilnd_msg,
161 proto.immed.payload),
162 tn->tn_num_iovec, tn->tn_kiov, 0,
165 /* Pack the transport header. */
166 msg->magic = KFILND_MSG_MAGIC;
167 msg->version = KFILND_MSG_VERSION;
168 msg->type = tn->msg_type;
169 msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
170 msg->cksum = NO_CHECKSUM;
171 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
172 msg->dstnid = tn->tn_kp->kp_nid;
174 /* Checksum entire message. */
175 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
177 tn->tn_tx_msg.length = msg->nob;
180 static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
183 const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
185 if (nob < hdr_size) {
186 KFILND_EP_ERROR(ep, "Short message: %u", nob);
190 /* TODO: Support byte swapping on mixed endian systems. */
191 if (msg->magic != KFILND_MSG_MAGIC) {
192 KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
196 /* TODO: Allow for older versions. */
197 if (msg->version > KFILND_MSG_VERSION) {
198 KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
202 if (msg->nob > nob) {
203 KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
208 /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
209 if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
210 KFILND_EP_ERROR(ep, "Bad checksum");
214 if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
215 KFILND_EP_ERROR(ep, "Bad destination nid: %s",
216 libcfs_nid2str(msg->dstnid));
220 if (msg->srcnid == LNET_NID_ANY) {
221 KFILND_EP_ERROR(ep, "Bad source nid: %s",
222 libcfs_nid2str(msg->srcnid));
226 if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
227 KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
228 msg_type_to_str(msg->type),
229 msg->nob, kfilnd_tn_msgtype2size(msg->type));
233 switch ((enum kfilnd_msg_type)msg->type) {
234 case KFILND_MSG_IMMEDIATE:
235 case KFILND_MSG_BULK_PUT_REQ:
236 case KFILND_MSG_BULK_GET_REQ:
237 if (msg->version == 0) {
239 "Bad message type and version: type=%s version=%u",
240 msg_type_to_str(msg->type),
246 case KFILND_MSG_HELLO_REQ:
247 case KFILND_MSG_HELLO_RSP:
248 if (msg->version != 0) {
250 "Bad message type and version: type=%s version=%u",
251 msg_type_to_str(msg->type),
258 CERROR("Unknown message type %x\n", msg->type);
264 static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
266 unsigned int data_size_bucket =
267 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
268 struct kfilnd_tn_duration_stat *stat;
272 if (tn->is_initiator)
273 stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
275 stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
277 time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts));
278 atomic64_add(time, &stat->accumulated_duration);
279 atomic_inc(&stat->accumulated_count);
282 cur = atomic64_read(&stat->max_duration);
285 } while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);
288 cur = atomic64_read(&stat->min_duration);
291 } while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
294 static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
295 enum tn_states new_state)
297 KFILND_TN_DEBUG(tn, "%s -> %s state change",
298 tn_state_to_str(tn->tn_state),
299 tn_state_to_str(new_state));
301 kfilnd_tn_record_state_change(tn);
303 tn->tn_state = new_state;
304 tn->tn_state_ts = ktime_get();
307 static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
308 enum lnet_msg_hstatus hstatus)
310 /* Only the first non-ok status will take. */
311 if (tn->tn_status == 0) {
312 KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
314 tn->tn_status = status;
317 if (tn->hstatus == LNET_MSG_STATUS_OK) {
318 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
319 tn->hstatus, hstatus);
320 tn->hstatus = hstatus;
324 static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
326 return tn->tn_status != 0;
330 * kfilnd_tn_process_rx_event() - Process an immediate receive event.
332 * For each immediate receive, a transaction structure needs to be allocated to
333 * process the receive.
335 void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
336 struct kfilnd_msg *rx_msg, int msg_size)
338 struct kfilnd_transaction *tn;
339 bool alloc_msg = true;
341 enum tn_events event = TN_EVENT_RX_HELLO;
343 /* Increment buf ref count for this work */
344 atomic_inc(&bufdesc->immed_ref);
346 /* Unpack the message */
347 rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
348 if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
349 kfilnd_ep_imm_buffer_put(bufdesc);
350 KFILND_EP_ERROR(bufdesc->immed_end,
351 "Failed to unpack message %d", rc);
355 switch ((enum kfilnd_msg_type)rx_msg->type) {
356 case KFILND_MSG_IMMEDIATE:
357 case KFILND_MSG_BULK_PUT_REQ:
358 case KFILND_MSG_BULK_GET_REQ:
359 event = TN_EVENT_RX_OK;
361 case KFILND_MSG_HELLO_RSP:
364 case KFILND_MSG_HELLO_REQ:
365 /* Context points to a received buffer and status is the length.
366 * Allocate a Tn structure, set its values, then launch the
369 tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
370 bufdesc->immed_end->end_cpt,
371 rx_msg->srcnid, alloc_msg, false,
374 kfilnd_ep_imm_buffer_put(bufdesc);
375 KFILND_EP_ERROR(bufdesc->immed_end,
376 "Failed to allocate transaction struct: rc=%ld",
381 tn->tn_rx_msg.msg = rx_msg;
382 tn->tn_rx_msg.length = msg_size;
383 tn->tn_posted_buf = bufdesc;
385 KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
386 msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
391 KFILND_EP_ERROR(bufdesc->immed_end,
392 "Unhandled kfilnd message type: %d",
393 (enum kfilnd_msg_type)rx_msg->type);
397 kfilnd_tn_event_handler(tn, event, 0);
400 static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
402 unsigned int data_size_bucket =
403 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
404 struct kfilnd_tn_duration_stat *stat;
408 if (tn->is_initiator)
409 stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
411 stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
413 time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts));
414 atomic64_add(time, &stat->accumulated_duration);
415 atomic_inc(&stat->accumulated_count);
418 cur = atomic64_read(&stat->max_duration);
421 } while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);
424 cur = atomic64_read(&stat->min_duration);
427 } while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
431 * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
433 * All state machine functions should call kfilnd_tn_finalize() instead of
434 * kfilnd_tn_free(). Once all expected asynchronous events have been received,
435 * if the transaction lock has not been released, it will now be released,
436 * transaction resources cleaned up, and LNet finalized will be called.
438 static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
441 mutex_unlock(&tn->tn_lock);
445 /* Release the reference on the multi-receive buffer. */
446 if (tn->tn_posted_buf)
447 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
449 /* Finalize LNet operation. */
451 tn->tn_lntmsg->msg_health_status = tn->hstatus;
452 lnet_finalize(tn->tn_lntmsg, tn->tn_status);
455 if (tn->tn_getreply) {
456 tn->tn_getreply->msg_health_status = tn->hstatus;
457 lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
459 tn->tn_status ? 0 : tn->tn_nob);
460 lnet_finalize(tn->tn_getreply, tn->tn_status);
463 if (KFILND_TN_PEER_VALID(tn))
464 kfilnd_peer_put(tn->tn_kp);
466 kfilnd_tn_record_state_change(tn);
467 kfilnd_tn_record_duration(tn);
473 * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
474 * @tn: Transaction to have tagged received cancelled.
476 * Return: 0 on success. Else, negative errno. If an error occurs, resources may
479 static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
483 /* Issue a cancel. A return code of zero means the operation issued an
484 * async cancel. A return code of -ENOENT means the tagged receive was
485 * not found. The assumption here is that a tagged send landed thus
486 * removing the tagged receive buffer from hardware. For both cases,
487 * async events should occur.
489 rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
490 if (rc != 0 && rc != -ENOENT) {
491 KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
498 static void kfilnd_tn_timeout_work(struct work_struct *work)
500 struct kfilnd_transaction *tn =
501 container_of(work, struct kfilnd_transaction, timeout_work);
503 KFILND_TN_ERROR(tn, "Bulk operation timeout");
504 kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
507 static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
509 struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
511 queue_work(kfilnd_wq, &tn->timeout_work);
514 static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
516 return timer_delete(&tn->timeout_timer);
519 static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
521 ktime_t remaining_time = max_t(ktime_t, 0,
522 tn->deadline - ktime_get_seconds());
523 unsigned long expires = remaining_time * HZ + jiffies;
525 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
528 cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
529 (unsigned long)tn, 0);
530 mod_timer(&tn->timeout_timer, expires);
533 /* The following are the state machine routines for the transactions. */
534 static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
535 enum tn_events event, int status,
540 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
544 case TN_EVENT_INIT_BULK:
545 /* Need to cancel the tagged receive to prevent resources from
548 rc = kfilnd_tn_cancel_tag_recv(tn);
551 /* Async event will progress transaction. */
553 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
556 /* Need to replay TN_EVENT_INIT_BULK event while in the
557 * TN_STATE_SEND_FAILED state.
561 "Need to replay cancel tagged recv");
566 "Unexpected error during cancel tagged receive: rc=%d",
573 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
578 static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
579 enum tn_events event, int status,
584 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
588 case TN_EVENT_INIT_BULK:
589 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
590 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
591 libcfs_nid2str(tn->tn_kp->kp_nid),
594 kfilnd_tn_pack_bulk_req(tn);
596 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
598 /* Async event will progress immediate send. */
600 kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
603 /* Need to replay TN_EVENT_INIT_BULK event while in the
604 * TN_STATE_TAGGED_RECV_POSTED state.
608 "Need to replay post send to %s(%#llx)",
609 libcfs_nid2str(tn->tn_kp->kp_nid),
613 /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
614 * posted tagged receive buffer.
618 "Failed to post send to %s(%#llx): rc=%d",
619 libcfs_nid2str(tn->tn_kp->kp_nid),
620 tn->tn_target_addr, rc);
621 kfilnd_tn_status_update(tn, rc,
622 LNET_MSG_STATUS_LOCAL_ERROR);
623 kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
625 /* Propogate TN_EVENT_INIT_BULK event to
626 * TN_STATE_SEND_FAILED handler.
628 return kfilnd_tn_state_send_failed(tn, event, rc,
633 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
638 static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
639 enum tn_events event, int status,
642 struct kfilnd_msg *msg;
644 bool finalize = false;
646 struct lnet_nid srcnid;
648 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
651 /* For new peers, send a hello request message and queue the true LNet
652 * message for replay.
654 if (kfilnd_peer_needs_throttle(tn->tn_kp) &&
655 (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
656 if (kfilnd_peer_deleted(tn->tn_kp)) {
657 /* We'll assign a NETWORK_TIMEOUT message health status
658 * below because we don't know why this peer was marked
662 KFILND_TN_DEBUG(tn, "Drop message to deleted peer");
663 } else if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
664 /* We're throttling transactions to this peer until
665 * a handshake can be completed, but there is no HELLO
666 * currently in flight. This implies the HELLO has
667 * failed, and we should cancel this TN. Otherwise we
668 * are stuck waiting for the TN deadline.
670 * We assign NETWORK_TIMEOUT health status below because
671 * we do not know why the HELLO failed.
674 KFILND_TN_DEBUG(tn, "Cancel throttled TN");
675 } else if (ktime_before(ktime_get_seconds(),
676 tn->tn_replay_deadline)) {
677 /* If the transaction replay deadline has not been met,
678 * then return -EAGAIN. This will cause this transaction
679 * event to be replayed. During this time, an async
680 * hello message from the peer should occur at which
681 * point we can resume sending new messages to this peer
683 KFILND_TN_DEBUG(tn, "hello response pending");
689 kfilnd_tn_status_update(tn, rc,
690 LNET_MSG_STATUS_NETWORK_TIMEOUT);
695 if ((event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK) &&
696 ktime_after(ktime_get_seconds(), tn->tn_replay_deadline)) {
697 kfilnd_tn_status_update(tn, -ETIMEDOUT,
698 LNET_MSG_STATUS_NETWORK_TIMEOUT);
703 if (CFS_FAIL_CHECK_VALUE(CFS_KFI_REPLAY_IDLE_EVENT, event))
707 case TN_EVENT_INIT_IMMEDIATE:
708 case TN_EVENT_TX_HELLO:
709 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
710 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
711 libcfs_nid2str(tn->tn_kp->kp_nid),
714 if (event == TN_EVENT_INIT_IMMEDIATE)
715 kfilnd_tn_pack_immed_msg(tn);
717 kfilnd_tn_pack_hello_req(tn);
719 /* Send immediate message. */
720 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
722 /* Async event will progress immediate send. */
724 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
727 /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
731 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
732 libcfs_nid2str(tn->tn_kp->kp_nid),
738 "Failed to post send to %s(%#llx): rc=%d",
739 libcfs_nid2str(tn->tn_kp->kp_nid),
740 tn->tn_target_addr, rc);
741 if (event == TN_EVENT_TX_HELLO)
742 kfilnd_peer_clear_hello_state(tn->tn_kp);
743 kfilnd_tn_status_update(tn, rc,
744 LNET_MSG_STATUS_LOCAL_ERROR);
748 case TN_EVENT_INIT_BULK:
749 /* Post tagged receive buffer used to land bulk response. */
750 rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
753 /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
755 kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
757 /* Propogate TN_EVENT_INIT_BULK event to
758 * TN_STATE_TAGGED_RECV_POSTED handler.
760 return kfilnd_tn_state_tagged_recv_posted(tn, event,
764 /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
768 KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
772 KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
774 kfilnd_tn_status_update(tn, rc,
775 LNET_MSG_STATUS_LOCAL_ERROR);
780 if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
781 rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
786 "Failed to send hello request: rc=%d",
791 /* If this is a new peer then we cannot progress the transaction
794 if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
796 "Dropping message from %s due to stale peer",
797 libcfs_nid2str(tn->tn_kp->kp_nid));
798 kfilnd_tn_status_update(tn, -EPROTO,
799 LNET_MSG_STATUS_LOCAL_DROPPED);
804 LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);
805 msg = tn->tn_rx_msg.msg;
807 /* Update the NID address with the new preferred RX context. */
808 kfilnd_peer_alive(tn->tn_kp);
810 /* Pass message up to LNet
811 * The TN will be reused in this call chain so we need to
812 * release the lock on the TN before proceeding.
814 KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
815 tn_state_to_str(tn->tn_state));
817 /* TODO: Do not manually update this state change. */
818 tn->tn_state = TN_STATE_IMM_RECV;
819 mutex_unlock(&tn->tn_lock);
821 lnet_nid4_to_nid(msg->srcnid, &srcnid);
822 if (msg->type == KFILND_MSG_IMMEDIATE) {
823 lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
824 rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
825 &hdr, &srcnid, tn, 0);
827 lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
828 rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
829 &hdr, &srcnid, tn, 1);
832 /* If successful, transaction has been accepted by LNet and we
833 * cannot process the transaction anymore within this context.
838 KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
839 kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
842 case TN_EVENT_RX_HELLO:
843 msg = tn->tn_rx_msg.msg;
845 kfilnd_peer_alive(tn->tn_kp);
848 case KFILND_MSG_HELLO_REQ:
849 kfilnd_peer_process_hello(tn->tn_kp, msg);
850 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
851 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
852 libcfs_nid2str(tn->tn_kp->kp_nid),
855 kfilnd_tn_pack_hello_rsp(tn);
857 /* Send immediate message. */
858 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
861 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
865 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
866 libcfs_nid2str(tn->tn_kp->kp_nid),
872 "Failed to post send to %s(%#llx): rc=%d",
873 libcfs_nid2str(tn->tn_kp->kp_nid),
874 tn->tn_target_addr, rc);
875 kfilnd_tn_status_update(tn, rc,
876 LNET_MSG_STATUS_LOCAL_ERROR);
880 case KFILND_MSG_HELLO_RSP:
882 kfilnd_peer_process_hello(tn->tn_kp, msg);
887 KFILND_TN_ERROR(tn, "Invalid message type: %s",
888 msg_type_to_str(msg->type));
894 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
899 if (kfilnd_tn_has_failed(tn))
903 kfilnd_tn_finalize(tn, tn_released);
908 static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
909 enum tn_events event, int status,
912 enum lnet_msg_hstatus hstatus;
914 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
918 case TN_EVENT_TX_FAIL:
919 if (status == -ETIMEDOUT || status == -EIO)
920 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
922 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
924 kfilnd_tn_status_update(tn, status, hstatus);
925 /* RKEY is not involved in immediate sends, so no need to
928 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
929 if (tn->msg_type == KFILND_MSG_HELLO_REQ)
930 kfilnd_peer_clear_hello_state(tn->tn_kp);
934 kfilnd_peer_alive(tn->tn_kp);
938 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
942 kfilnd_tn_finalize(tn, tn_released);
947 static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
948 enum tn_events event, int status,
952 bool finalize = false;
954 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
958 case TN_EVENT_INIT_TAG_RMA:
959 case TN_EVENT_SKIP_TAG_RMA:
960 /* Release the buffer we received the request on. All relevant
961 * information to perform the RMA operation is stored in the
962 * transaction structure. This should be done before the RMA
963 * operation to prevent two contexts from potentially processing
964 * the same transaction.
966 * TODO: Prevent this from returning -EAGAIN.
968 if (tn->tn_posted_buf) {
969 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
970 tn->tn_posted_buf = NULL;
973 /* Update the KFI address to use the response RX context. */
975 kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
976 tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
977 KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
978 libcfs_nid2str(tn->tn_kp->kp_nid),
981 /* Initiate the RMA operation to push/pull the LNet payload or
982 * send a tagged message to finalize the bulk operation if the
983 * RMA operation should be skipped.
985 if (event == TN_EVENT_INIT_TAG_RMA) {
987 rc = kfilnd_ep_post_read(tn->tn_ep, tn);
989 rc = kfilnd_ep_post_write(tn->tn_ep, tn);
992 /* Async tagged RMA event will progress transaction. */
994 kfilnd_tn_state_change(tn,
995 TN_STATE_WAIT_TAG_RMA_COMP);
998 /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
999 * the TN_STATE_IMM_RECV state.
1003 "Need to replay tagged %s to %s(%#llx)",
1004 tn->sink_buffer ? "read" : "write",
1005 libcfs_nid2str(tn->tn_kp->kp_nid),
1006 tn->tn_target_addr);
1011 "Failed to post tagged %s to %s(%#llx): rc=%d",
1012 tn->sink_buffer ? "read" : "write",
1013 libcfs_nid2str(tn->tn_kp->kp_nid),
1014 tn->tn_target_addr, rc);
1015 kfilnd_tn_status_update(tn, rc,
1016 LNET_MSG_STATUS_LOCAL_ERROR);
1019 kfilnd_tn_status_update(tn, status,
1020 LNET_MSG_STATUS_OK);
1022 /* Since the LNet initiator has posted a unique tagged
1023 * buffer specific for this LNet transaction and the
1024 * LNet target has decide not to push/pull to/for the
1025 * LNet initiator tagged buffer, a noop operation is
1026 * done to this tagged buffer (i/e payload transfer size
1027 * is zero). But, immediate data, which contains the
1028 * LNet target status for the transaction, is sent to
1029 * the LNet initiator. Immediate data only appears in
1030 * the completion event at the LNet initiator and not in
1031 * the tagged buffer.
1033 tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
1035 rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
1037 /* Async tagged RMA event will progress transaction. */
1039 kfilnd_tn_state_change(tn,
1040 TN_STATE_WAIT_TAG_COMP);
1043 /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
1044 * the TN_STATE_IMM_RECV state.
1048 "Need to replay tagged send to %s(%#llx)",
1049 libcfs_nid2str(tn->tn_kp->kp_nid),
1050 tn->tn_target_addr);
1055 "Failed to post tagged send to %s(%#llx): rc=%d",
1056 libcfs_nid2str(tn->tn_kp->kp_nid),
1057 tn->tn_target_addr, rc);
1058 kfilnd_tn_status_update(tn, rc,
1059 LNET_MSG_STATUS_LOCAL_ERROR);
1064 case TN_EVENT_RX_OK:
1069 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1073 if (kfilnd_tn_has_failed(tn))
1077 kfilnd_tn_finalize(tn, tn_released);
1082 static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
1083 enum tn_events event, int status,
1087 enum lnet_msg_hstatus hstatus;
1089 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1093 case TN_EVENT_TX_OK:
1094 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
1095 CFS_FAIL_CHECK_RESET(CFS_KFI_FAIL_WAIT_SEND_COMP1,
1096 CFS_KFI_FAIL_WAIT_SEND_COMP2 |
1099 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ ||
1100 tn->msg_type == KFILND_MSG_BULK_GET_REQ) &&
1101 CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP3)) {
1102 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1103 kfilnd_tn_status_update(tn, -EIO, hstatus);
1104 /* Don't delete peer on debug/test path */
1105 kfilnd_peer_tn_failed(tn->tn_kp, -EIO, false);
1106 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1109 kfilnd_peer_alive(tn->tn_kp);
1110 kfilnd_tn_timeout_enable(tn);
1111 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
1114 case TN_EVENT_TAG_RX_OK:
1116 kfilnd_tn_status_update(tn, status, LNET_MSG_STATUS_OK);
1118 kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
1119 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
1120 CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP2)) {
1121 struct kfi_cq_err_entry fake_error = {
1123 .flags = KFI_MSG | KFI_SEND,
1127 kfilnd_ep_gen_fake_err(tn->tn_ep, &fake_error);
1131 case TN_EVENT_TX_FAIL:
1132 if (status == -ETIMEDOUT)
1133 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1135 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1137 kfilnd_tn_status_update(tn, status, hstatus);
1138 /* The bulk request message failed, however, there is an edge
1139 * case where the last request packet of a message is received
1140 * at the target successfully, but the corresponding response
1141 * packet is repeatedly dropped. This results in the target
1142 * generating a success completion event but the initiator
1143 * generating an error completion event. Due to this, we have to
1144 * delete the peer here to protect the RKEY.
1146 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1148 /* Need to cancel the tagged receive to prevent resources from
1151 rc = kfilnd_tn_cancel_tag_recv(tn);
1154 /* Async cancel event will progress transaction. */
1156 kfilnd_tn_status_update(tn, status,
1157 LNET_MSG_STATUS_LOCAL_ERROR);
1158 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1161 /* Need to replay TN_EVENT_INIT_BULK event while in the
1162 * TN_STATE_SEND_FAILED state.
1166 "Need to replay cancel tagged recv");
1171 "Unexpected error during cancel tagged receive: rc=%d",
1177 case TN_EVENT_TAG_RX_FAIL:
1178 kfilnd_tn_status_update(tn, status,
1179 LNET_MSG_STATUS_LOCAL_ERROR);
1180 /* The target may hold a reference to the RKEY, so we need to
1181 * delete the peer to protect it
1183 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1184 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1188 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1195 static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
1196 enum tn_events event, int status,
1199 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1203 case TN_EVENT_TX_OK:
1204 kfilnd_peer_alive(tn->tn_kp);
1206 case TN_EVENT_TX_FAIL:
1207 kfilnd_tn_status_update(tn, status,
1208 LNET_MSG_STATUS_NETWORK_TIMEOUT);
1209 /* The bulk request message was never queued so we do not need
1210 * to delete the peer
1212 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1215 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1219 kfilnd_tn_finalize(tn, tn_released);
1224 static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
1225 enum tn_events event, int status,
1228 enum lnet_msg_hstatus hstatus;
1230 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1234 case TN_EVENT_TAG_TX_OK:
1235 kfilnd_peer_alive(tn->tn_kp);
1238 case TN_EVENT_TAG_TX_FAIL:
1239 if (status == -ETIMEDOUT)
1240 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1242 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1244 kfilnd_tn_status_update(tn, status, hstatus);
1245 /* This event occurrs at the target of a bulk LNetPut/Get.
1246 * Since the target did not generate the RKEY, we needn't
1249 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1253 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1257 kfilnd_tn_finalize(tn, tn_released);
1262 static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
1263 enum tn_events event, int status,
1267 enum lnet_msg_hstatus hstatus;
1269 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1273 case TN_EVENT_TAG_RX_FAIL:
1274 case TN_EVENT_TAG_RX_OK:
1275 /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
1276 * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
1277 * LNet target returned -ENODATA.
1280 if (event == TN_EVENT_TAG_RX_FAIL)
1281 kfilnd_tn_status_update(tn, status,
1282 LNET_MSG_STATUS_LOCAL_ERROR);
1284 kfilnd_tn_status_update(tn, status,
1285 LNET_MSG_STATUS_OK);
1288 if (!kfilnd_tn_timeout_cancel(tn)) {
1289 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
1294 case TN_EVENT_TIMEOUT:
1295 /* Need to cancel the tagged receive to prevent resources from
1298 rc = kfilnd_tn_cancel_tag_recv(tn);
1301 /* Async cancel event will progress transaction. */
1303 kfilnd_tn_state_change(tn,
1304 TN_STATE_WAIT_TIMEOUT_TAG_COMP);
1307 /* Need to replay TN_EVENT_INIT_BULK event while in the
1308 * TN_STATE_WAIT_TAG_COMP state.
1312 "Need to replay cancel tagged recv");
1317 "Unexpected error during cancel tagged receive: rc=%d",
1323 case TN_EVENT_TAG_TX_FAIL:
1324 if (status == -ETIMEDOUT)
1325 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1327 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1329 kfilnd_tn_status_update(tn, status, hstatus);
1330 /* This event occurrs at the target of a bulk LNetPut/Get.
1331 * Since the target did not generate the RKEY, we needn't
1334 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1337 case TN_EVENT_TAG_TX_OK:
1338 kfilnd_peer_alive(tn->tn_kp);
1342 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1346 kfilnd_tn_finalize(tn, tn_released);
1351 static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
1352 enum tn_events event, int status,
1355 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1359 case TN_EVENT_TX_FAIL:
1360 /* Prior TN states will have deleted the peer if necessary */
1361 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1364 case TN_EVENT_TX_OK:
1365 kfilnd_peer_alive(tn->tn_kp);
1368 case TN_EVENT_TAG_RX_OK:
1369 kfilnd_peer_alive(tn->tn_kp);
1370 if (tn->tn_status != status) {
1371 KFILND_TN_DEBUG(tn, "%d -> %d status change",
1372 tn->tn_status, status);
1373 tn->tn_status = status;
1375 if (tn->hstatus != LNET_MSG_STATUS_OK) {
1376 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
1377 tn->hstatus, LNET_MSG_STATUS_OK);
1378 tn->hstatus = LNET_MSG_STATUS_OK;
1382 case TN_EVENT_TAG_RX_FAIL:
1383 case TN_EVENT_TAG_RX_CANCEL:
1387 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1391 kfilnd_tn_finalize(tn, tn_released);
1396 static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
1397 enum tn_events event,
1398 int status, bool *tn_released)
1400 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1404 case TN_EVENT_TAG_RX_CANCEL:
1405 kfilnd_tn_status_update(tn, -ETIMEDOUT,
1406 LNET_MSG_STATUS_NETWORK_TIMEOUT);
1407 /* We've cancelled locally, but the target may still have a ref
1408 * on the RKEY. Delete the peer to protect it.
1410 kfilnd_peer_tn_failed(tn->tn_kp, -ETIMEDOUT, true);
1413 case TN_EVENT_TAG_RX_FAIL:
1414 kfilnd_tn_status_update(tn, status,
1415 LNET_MSG_STATUS_LOCAL_ERROR);
1416 /* The initiator of a bulk LNetPut/Get eagerly sends the bulk
1417 * request message to the target without ensuring the tagged
1418 * receive buffer is posted. Thus, the target could be issuing
1419 * kfi_write/read operations using the tagged receive buffer
1420 * RKEY, and we need to delete this peer to protect the it.
1422 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1425 case TN_EVENT_TAG_RX_OK:
1429 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1433 kfilnd_tn_finalize(tn, tn_released);
1438 static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
1439 enum tn_events event, int status,
1442 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1445 if (event == TN_EVENT_TIMEOUT) {
1446 kfilnd_tn_finalize(tn, tn_released);
1448 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1456 (* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
1457 enum tn_events event,
1459 bool *tn_released) = {
1460 [TN_STATE_IDLE] = kfilnd_tn_state_idle,
1461 [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
1462 [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
1463 [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
1464 [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
1465 [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
1466 [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
1467 [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
1468 [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
1469 kfilnd_tn_state_wait_timeout_tag_comp,
1470 [TN_STATE_FAIL] = kfilnd_tn_state_fail,
1471 [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
1472 [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
1476 * kfilnd_tn_event_handler() - Update transaction state machine with an event.
1477 * @tn: Transaction to be updated.
1478 * @event: Transaction event.
1479 * @status: Errno status associated with the event.
1481 * When the transaction event handler is first called on a new transaction, the
1482 * transaction is now own by the transaction system. This means that will be
1483 * freed by the system as the transaction is progressed through the state
1486 void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
1487 enum tn_events event, int status)
1489 bool tn_released = false;
1495 mutex_lock(&tn->tn_lock);
1496 rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
1498 if (rc == -EAGAIN) {
1499 tn->replay_event = event;
1500 tn->replay_status = status;
1501 kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
1505 mutex_unlock(&tn->tn_lock);
1509 * kfilnd_tn_free() - Free a transaction.
1511 void kfilnd_tn_free(struct kfilnd_transaction *tn)
1513 spin_lock(&tn->tn_ep->tn_list_lock);
1514 list_del(&tn->tn_entry);
1515 spin_unlock(&tn->tn_ep->tn_list_lock);
1517 KFILND_TN_DEBUG(tn, "Transaction freed");
1520 kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
1522 /* Free send message buffer if needed. */
1523 if (tn->tn_tx_msg.msg)
1524 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1526 kmem_cache_free(tn_cache, tn);
1530 * Allocation logic common to kfilnd_tn_alloc() and kfilnd_tn_alloc_for_hello().
1531 * @ep: The KFI LND endpoint to associate with the transaction.
1532 * @kp: The kfilnd peer to associate with the transaction.
1533 * See kfilnd_tn_alloc() for a description of the other fields
1534 * Note: Caller must have a reference on @kp
1536 static struct kfilnd_transaction *kfilnd_tn_alloc_common(struct kfilnd_ep *ep,
1537 struct kfilnd_peer *kp,
1542 struct kfilnd_transaction *tn;
1544 ktime_t tn_alloc_ts;
1546 tn_alloc_ts = ktime_get();
1548 tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
1555 tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
1556 if (!tn->tn_tx_msg.msg) {
1562 tn->tn_mr_key = key;
1566 mutex_init(&tn->tn_lock);
1568 tn->tn_response_rx = ep->end_context_id;
1569 tn->tn_state = TN_STATE_IDLE;
1570 tn->hstatus = LNET_MSG_STATUS_OK;
1571 tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
1572 tn->tn_replay_deadline = ktime_sub(tn->deadline,
1573 (lnet_get_lnd_timeout() / 2));
1574 tn->is_initiator = is_initiator;
1575 INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
1577 /* Add the transaction to an endpoint. This is like
1578 * incrementing a ref counter.
1580 spin_lock(&ep->tn_list_lock);
1581 list_add_tail(&tn->tn_entry, &ep->tn_list);
1582 spin_unlock(&ep->tn_list_lock);
1584 tn->tn_alloc_ts = tn_alloc_ts;
1585 tn->tn_state_ts = ktime_get();
1587 KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
1592 if (tn->tn_tx_msg.msg)
1593 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1594 kmem_cache_free(tn_cache, tn);
1599 static struct kfilnd_ep *kfilnd_dev_to_ep(struct kfilnd_dev *dev, int cpt)
1601 struct kfilnd_ep *ep;
1604 return ERR_PTR(-EINVAL);
1606 ep = dev->cpt_to_endpoint[cpt];
1608 CWARN("%s used invalid cpt=%d\n",
1609 libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
1610 ep = dev->kfd_endpoints[0];
1617 * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
1618 * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
1620 * @cpt: CPT of the transaction.
1621 * @target_nid: Target NID of the transaction.
1622 * @alloc_msg: Allocate an immediate message for the transaction.
1623 * @is_initiator: Is initiator of LNet transaction.
1624 * @need_key: Is transaction memory region key needed.
1626 * During transaction allocation, each transaction is associated with a KFI LND
1627 * endpoint use to post data transfer operations. The CPT argument is used to
1628 * lookup the KFI LND endpoint within the KFI LND device.
1630 * Return: On success, valid pointer. Else, negative errno pointer.
1632 struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
1633 lnet_nid_t target_nid,
1634 bool alloc_msg, bool is_initiator,
1637 struct kfilnd_transaction *tn;
1638 struct kfilnd_ep *ep;
1639 struct kfilnd_peer *kp;
1643 ep = kfilnd_dev_to_ep(dev, cpt);
1649 /* Consider the following:
1650 * Thread 1: Posts tagged receive with RKEY based on
1651 * peerA::kp_local_session_key X and tn_mr_key Y
1652 * Thread 2: Fetches peerA with kp_local_session_key X
1653 * Thread 1: Cancels tagged receive, marks peerA for removal, and
1654 * releases tn_mr_key Y
1655 * Thread 2: allocates tn_mr_key Y
1656 * At this point, thread 2 has the same RKEY used by thread 1.
1657 * Thus, we always allocate the tn_mr_key before looking up the peer,
1658 * and we always mark peers for removal before releasing tn_mr_key.
1661 rc = kfilnd_ep_get_key(ep);
1667 kp = kfilnd_peer_get(dev, target_nid);
1673 tn = kfilnd_tn_alloc_common(ep, kp, alloc_msg, is_initiator, key);
1676 kfilnd_peer_put(kp);
1683 kfilnd_ep_put_key(ep, key);
1688 /* Like kfilnd_tn_alloc(), but caller already looked up the kfilnd_peer.
1689 * Used only to allocate a TN for a hello request.
1690 * See kfilnd_tn_alloc()/kfilnd_tn_alloc_comm()
1691 * Note: Caller must have a reference on @kp
1693 struct kfilnd_transaction *kfilnd_tn_alloc_for_hello(struct kfilnd_dev *dev, int cpt,
1694 struct kfilnd_peer *kp)
1696 struct kfilnd_transaction *tn;
1697 struct kfilnd_ep *ep;
1700 ep = kfilnd_dev_to_ep(dev, cpt);
1706 tn = kfilnd_tn_alloc_common(ep, kp, true, true, 0);
1719 * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
1721 * This function should only be called when there are no outstanding
1724 void kfilnd_tn_cleanup(void)
1726 kmem_cache_destroy(imm_buf_cache);
1727 kmem_cache_destroy(tn_cache);
1731 * kfilnd_tn_init() - Initialize KFI LND transaction system.
1733 * Return: On success, zero. Else, negative errno.
1735 int kfilnd_tn_init(void)
1737 tn_cache = kmem_cache_create("kfilnd_tn",
1738 sizeof(struct kfilnd_transaction), 0,
1739 SLAB_HWCACHE_ALIGN, NULL);
1743 imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
1744 KFILND_IMMEDIATE_MSG_SIZE, 0,
1745 SLAB_HWCACHE_ALIGN, NULL);
1747 goto err_tn_cache_destroy;
1751 err_tn_cache_destroy:
1752 kmem_cache_destroy(tn_cache);
1758 * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
1759 * @tn: Transaction to have buffer set.
1760 * @kiov: LNet KIOV buffer.
1761 * @num_iov: Number of IOVs.
1762 * @offset: Offset into IOVs where the buffer starts.
1763 * @len: Length of the buffer.
1765 * This function takes the user provided IOV, offset, and len, and sets the
1766 * transaction buffer. The user provided IOV is an LNet KIOV. When the
1767 * transaction buffer is configured, the user provided offset is applied
1768 * when the transaction buffer is configured (i.e. the transaction buffer
1771 int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
1772 struct bio_vec *kiov, size_t num_iov,
1773 size_t offset, size_t len)
1777 size_t cur_offset = offset;
1782 for (i = 0; (i < num_iov) && (cur_len < len); i++) {
1783 /* Skip KIOVs until a KIOV with a length less than the current
1786 if (kiov[i].bv_len <= cur_offset) {
1787 cur_offset -= kiov[i].bv_len;
1791 tmp_len = kiov[i].bv_len - cur_offset;
1792 tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
1794 if (tmp_len + cur_len > len)
1795 tmp_len = len - cur_len;
1797 /* tn_kiov is an array of size LNET_MAX_IOV */
1798 if (cur_iov >= LNET_MAX_IOV)
1801 tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
1802 tn->tn_kiov[cur_iov].bv_len = tmp_len;
1803 tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
1810 tn->tn_num_iovec = cur_iov;
1811 tn->tn_nob = cur_len;