4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright 2022 Hewlett Packard Enterprise Development LP
26 * This file is part of Lustre, http://www.lustre.org/
29 * kfilnd transaction and state machine processing.
32 #include "kfilnd_tn.h"
33 #include "kfilnd_ep.h"
34 #include "kfilnd_dev.h"
35 #include "kfilnd_dom.h"
36 #include "kfilnd_peer.h"
37 #include <asm/checksum.h>
39 static struct kmem_cache *tn_cache;
40 static struct kmem_cache *imm_buf_cache;
42 static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
45 return csum_fold(csum_partial(ptr, nob, 0));
49 static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
51 const int hdr_size = offsetof(struct kfilnd_msg, proto);
54 case KFILND_MSG_IMMEDIATE:
55 return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
57 case KFILND_MSG_BULK_PUT_REQ:
58 case KFILND_MSG_BULK_GET_REQ:
59 return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
66 static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
68 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
70 /* Pack the protocol header and payload. */
71 msg->proto.hello.version = KFILND_MSG_VERSION;
72 msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
73 msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
75 /* TODO: Support multiple RX contexts per peer. */
76 msg->proto.hello.rx_count = 1;
78 /* Pack the transport header. */
79 msg->magic = KFILND_MSG_MAGIC;
81 /* Mesage version zero is only valid for hello requests. */
83 msg->type = KFILND_MSG_HELLO_REQ;
84 msg->nob = sizeof(struct kfilnd_hello_msg) +
85 offsetof(struct kfilnd_msg, proto);
86 msg->cksum = NO_CHECKSUM;
87 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
88 msg->dstnid = tn->tn_kp->kp_nid;
90 /* Checksum entire message. */
91 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
93 tn->tn_tx_msg.length = msg->nob;
96 static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
98 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
100 /* Pack the protocol header and payload. */
101 msg->proto.hello.version = tn->tn_kp->kp_version;
102 msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
103 msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
105 /* TODO: Support multiple RX contexts per peer. */
106 msg->proto.hello.rx_count = 1;
108 /* Pack the transport header. */
109 msg->magic = KFILND_MSG_MAGIC;
111 /* Mesage version zero is only valid for hello requests. */
113 msg->type = KFILND_MSG_HELLO_RSP;
114 msg->nob = sizeof(struct kfilnd_hello_msg) +
115 offsetof(struct kfilnd_msg, proto);
116 msg->cksum = NO_CHECKSUM;
117 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
118 msg->dstnid = tn->tn_kp->kp_nid;
120 /* Checksum entire message. */
121 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
123 tn->tn_tx_msg.length = msg->nob;
126 static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
128 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
130 /* Pack the protocol header and payload. */
131 lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
132 msg->proto.bulk_req.key = tn->tn_mr_key;
133 msg->proto.bulk_req.response_rx = tn->tn_response_rx;
135 /* Pack the transport header. */
136 msg->magic = KFILND_MSG_MAGIC;
137 msg->version = KFILND_MSG_VERSION;
138 msg->type = tn->msg_type;
139 msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
140 offsetof(struct kfilnd_msg, proto);
141 msg->cksum = NO_CHECKSUM;
142 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
143 msg->dstnid = tn->tn_kp->kp_nid;
145 /* Checksum entire message. */
146 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
148 tn->tn_tx_msg.length = msg->nob;
151 static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
153 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
155 /* Pack the protocol header and payload. */
156 lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
158 lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
160 offsetof(struct kfilnd_msg,
161 proto.immed.payload),
162 tn->tn_num_iovec, tn->tn_kiov, 0,
165 /* Pack the transport header. */
166 msg->magic = KFILND_MSG_MAGIC;
167 msg->version = KFILND_MSG_VERSION;
168 msg->type = tn->msg_type;
169 msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
170 msg->cksum = NO_CHECKSUM;
171 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
172 msg->dstnid = tn->tn_kp->kp_nid;
174 /* Checksum entire message. */
175 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
177 tn->tn_tx_msg.length = msg->nob;
180 static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
183 const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
185 if (nob < hdr_size) {
186 KFILND_EP_ERROR(ep, "Short message: %u", nob);
190 /* TODO: Support byte swapping on mixed endian systems. */
191 if (msg->magic != KFILND_MSG_MAGIC) {
192 KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
196 /* TODO: Allow for older versions. */
197 if (msg->version > KFILND_MSG_VERSION) {
198 KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
202 if (msg->nob > nob) {
203 KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
208 /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
209 if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
210 KFILND_EP_ERROR(ep, "Bad checksum");
214 if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
215 KFILND_EP_ERROR(ep, "Bad destination nid: %s",
216 libcfs_nid2str(msg->dstnid));
220 if (msg->srcnid == LNET_NID_ANY) {
221 KFILND_EP_ERROR(ep, "Bad source nid: %s",
222 libcfs_nid2str(msg->srcnid));
226 if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
227 KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
228 msg_type_to_str(msg->type),
229 msg->nob, kfilnd_tn_msgtype2size(msg->type));
233 switch ((enum kfilnd_msg_type)msg->type) {
234 case KFILND_MSG_IMMEDIATE:
235 case KFILND_MSG_BULK_PUT_REQ:
236 case KFILND_MSG_BULK_GET_REQ:
237 if (msg->version == 0) {
239 "Bad message type and version: type=%s version=%u",
240 msg_type_to_str(msg->type),
246 case KFILND_MSG_HELLO_REQ:
247 case KFILND_MSG_HELLO_RSP:
248 if (msg->version != 0) {
250 "Bad message type and version: type=%s version=%u",
251 msg_type_to_str(msg->type),
258 CERROR("Unknown message type %x\n", msg->type);
264 static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
266 unsigned int data_size_bucket =
267 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
268 struct kfilnd_tn_duration_stat *stat;
270 if (tn->is_initiator)
271 stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
273 stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
275 atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts)),
276 &stat->accumulated_duration);
277 atomic_inc(&stat->accumulated_count);
280 static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
281 enum tn_states new_state)
283 KFILND_TN_DEBUG(tn, "%s -> %s state change",
284 tn_state_to_str(tn->tn_state),
285 tn_state_to_str(new_state));
287 kfilnd_tn_record_state_change(tn);
289 tn->tn_state = new_state;
290 tn->tn_state_ts = ktime_get();
293 static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
294 enum lnet_msg_hstatus hstatus)
296 /* Only the first non-ok status will take. */
297 if (tn->tn_status == 0) {
298 KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
300 tn->tn_status = status;
303 if (tn->hstatus == LNET_MSG_STATUS_OK) {
304 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
305 tn->hstatus, hstatus);
306 tn->hstatus = hstatus;
310 static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
312 return tn->tn_status != 0;
316 * kfilnd_tn_process_rx_event() - Process an immediate receive event.
318 * For each immediate receive, a transaction structure needs to be allocated to
319 * process the receive.
321 void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
322 struct kfilnd_msg *rx_msg, int msg_size)
324 struct kfilnd_transaction *tn;
325 bool alloc_msg = true;
327 enum tn_events event = TN_EVENT_RX_HELLO;
329 /* Increment buf ref count for this work */
330 atomic_inc(&bufdesc->immed_ref);
332 /* Unpack the message */
333 rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
334 if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
335 kfilnd_ep_imm_buffer_put(bufdesc);
336 KFILND_EP_ERROR(bufdesc->immed_end,
337 "Failed to unpack message %d", rc);
341 switch ((enum kfilnd_msg_type)rx_msg->type) {
342 case KFILND_MSG_IMMEDIATE:
343 case KFILND_MSG_BULK_PUT_REQ:
344 case KFILND_MSG_BULK_GET_REQ:
345 event = TN_EVENT_RX_OK;
347 case KFILND_MSG_HELLO_RSP:
350 case KFILND_MSG_HELLO_REQ:
351 /* Context points to a received buffer and status is the length.
352 * Allocate a Tn structure, set its values, then launch the
355 tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
356 bufdesc->immed_end->end_cpt,
357 rx_msg->srcnid, alloc_msg, false,
360 kfilnd_ep_imm_buffer_put(bufdesc);
361 KFILND_EP_ERROR(bufdesc->immed_end,
362 "Failed to allocate transaction struct: rc=%ld",
367 tn->tn_rx_msg.msg = rx_msg;
368 tn->tn_rx_msg.length = msg_size;
369 tn->tn_posted_buf = bufdesc;
371 KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
372 msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
377 KFILND_EP_ERROR(bufdesc->immed_end,
378 "Unhandled kfilnd message type: %d",
379 (enum kfilnd_msg_type)rx_msg->type);
383 kfilnd_tn_event_handler(tn, event, 0);
386 static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
388 unsigned int data_size_bucket =
389 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
390 struct kfilnd_tn_duration_stat *stat;
392 if (tn->is_initiator)
393 stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
395 stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
397 atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts)),
398 &stat->accumulated_duration);
399 atomic_inc(&stat->accumulated_count);
403 * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
405 * All state machine functions should call kfilnd_tn_finalize() instead of
406 * kfilnd_tn_free(). Once all expected asynchronous events have been received,
407 * if the transaction lock has not been released, it will now be released,
408 * transaction resources cleaned up, and LNet finalized will be called.
410 static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
413 mutex_unlock(&tn->tn_lock);
417 /* Release the reference on the multi-receive buffer. */
418 if (tn->tn_posted_buf)
419 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
421 /* Finalize LNet operation. */
423 tn->tn_lntmsg->msg_health_status = tn->hstatus;
424 lnet_finalize(tn->tn_lntmsg, tn->tn_status);
427 if (tn->tn_getreply) {
428 tn->tn_getreply->msg_health_status = tn->hstatus;
429 lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
431 tn->tn_status ? 0 : tn->tn_nob);
432 lnet_finalize(tn->tn_getreply, tn->tn_status);
435 if (KFILND_TN_PEER_VALID(tn))
436 kfilnd_peer_put(tn->tn_kp);
438 kfilnd_tn_record_state_change(tn);
439 kfilnd_tn_record_duration(tn);
445 * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
446 * @tn: Transaction to have tagged received cancelled.
448 * Return: 0 on success. Else, negative errno. If an error occurs, resources may
451 static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
455 /* Issue a cancel. A return code of zero means the operation issued an
456 * async cancel. A return code of -ENOENT means the tagged receive was
457 * not found. The assumption here is that a tagged send landed thus
458 * removing the tagged receive buffer from hardware. For both cases,
459 * async events should occur.
461 rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
462 if (rc != 0 && rc != -ENOENT) {
463 KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
470 static void kfilnd_tn_timeout_work(struct work_struct *work)
472 struct kfilnd_transaction *tn =
473 container_of(work, struct kfilnd_transaction, timeout_work);
475 KFILND_TN_ERROR(tn, "Bulk operation timeout");
476 kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
479 static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
481 struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
483 queue_work(kfilnd_wq, &tn->timeout_work);
486 static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
488 return timer_delete(&tn->timeout_timer);
491 static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
493 ktime_t remaining_time = max_t(ktime_t, 0,
494 tn->deadline - ktime_get_seconds());
495 unsigned long expires = remaining_time * HZ + jiffies;
497 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
500 cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
501 (unsigned long)tn, 0);
502 mod_timer(&tn->timeout_timer, expires);
505 /* The following are the state machine routines for the transactions. */
506 static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
507 enum tn_events event, int status,
512 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
516 case TN_EVENT_INIT_BULK:
517 /* Need to cancel the tagged receive to prevent resources from
520 rc = kfilnd_tn_cancel_tag_recv(tn);
523 /* Async event will progress transaction. */
525 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
528 /* Need to replay TN_EVENT_INIT_BULK event while in the
529 * TN_STATE_SEND_FAILED state.
533 "Need to replay cancel tagged recv");
538 "Unexpected error during cancel tagged receive: rc=%d",
545 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
550 static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
551 enum tn_events event, int status,
556 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
560 case TN_EVENT_INIT_BULK:
561 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
562 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
563 libcfs_nid2str(tn->tn_kp->kp_nid),
566 kfilnd_tn_pack_bulk_req(tn);
568 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
570 /* Async event will progress immediate send. */
572 kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
575 /* Need to replay TN_EVENT_INIT_BULK event while in the
576 * TN_STATE_TAGGED_RECV_POSTED state.
580 "Need to replay post send to %s(%#llx)",
581 libcfs_nid2str(tn->tn_kp->kp_nid),
585 /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
586 * posted tagged receive buffer.
590 "Failed to post send to %s(%#llx): rc=%d",
591 libcfs_nid2str(tn->tn_kp->kp_nid),
592 tn->tn_target_addr, rc);
593 kfilnd_tn_status_update(tn, rc,
594 LNET_MSG_STATUS_LOCAL_ERROR);
595 kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
597 /* Propogate TN_EVENT_INIT_BULK event to
598 * TN_STATE_SEND_FAILED handler.
600 return kfilnd_tn_state_send_failed(tn, event, rc,
605 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
610 static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
611 enum tn_events event, int status,
614 struct kfilnd_msg *msg;
616 bool finalize = false;
618 struct lnet_nid srcnid;
620 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
623 /* For new peers, send a hello request message and queue the true LNet
624 * message for replay.
626 if (kfilnd_peer_needs_throttle(tn->tn_kp) &&
627 (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
628 if (kfilnd_peer_deleted(tn->tn_kp)) {
629 /* We'll assign a NETWORK_TIMEOUT message health status
630 * below because we don't know why this peer was marked
634 KFILND_TN_DEBUG(tn, "Drop message to deleted peer");
635 } else if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
636 /* We're throttling transactions to this peer until
637 * a handshake can be completed, but there is no HELLO
638 * currently in flight. This implies the HELLO has
639 * failed, and we should cancel this TN. Otherwise we
640 * are stuck waiting for the TN deadline.
642 * We assign NETWORK_TIMEOUT health status below because
643 * we do not know why the HELLO failed.
646 KFILND_TN_DEBUG(tn, "Cancel throttled TN");
647 } else if (ktime_before(ktime_get_seconds(),
648 tn->tn_replay_deadline)) {
649 /* If the transaction replay deadline has not been met,
650 * then return -EAGAIN. This will cause this transaction
651 * event to be replayed. During this time, an async
652 * hello message from the peer should occur at which
653 * point we can resume sending new messages to this peer
655 KFILND_TN_DEBUG(tn, "hello response pending");
661 kfilnd_tn_status_update(tn, rc,
662 LNET_MSG_STATUS_NETWORK_TIMEOUT);
667 if ((event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK) &&
668 ktime_after(ktime_get_seconds(), tn->tn_replay_deadline)) {
669 kfilnd_tn_status_update(tn, -ETIMEDOUT,
670 LNET_MSG_STATUS_NETWORK_TIMEOUT);
676 case TN_EVENT_INIT_IMMEDIATE:
677 case TN_EVENT_TX_HELLO:
678 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
679 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
680 libcfs_nid2str(tn->tn_kp->kp_nid),
683 if (event == TN_EVENT_INIT_IMMEDIATE)
684 kfilnd_tn_pack_immed_msg(tn);
686 kfilnd_tn_pack_hello_req(tn);
688 /* Send immediate message. */
689 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
691 /* Async event will progress immediate send. */
693 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
696 /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
700 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
701 libcfs_nid2str(tn->tn_kp->kp_nid),
707 "Failed to post send to %s(%#llx): rc=%d",
708 libcfs_nid2str(tn->tn_kp->kp_nid),
709 tn->tn_target_addr, rc);
710 if (event == TN_EVENT_TX_HELLO)
711 kfilnd_peer_clear_hello_pending(tn->tn_kp);
712 kfilnd_tn_status_update(tn, rc,
713 LNET_MSG_STATUS_LOCAL_ERROR);
717 case TN_EVENT_INIT_BULK:
718 /* Post tagged receive buffer used to land bulk response. */
719 rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
722 /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
724 kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
726 /* Propogate TN_EVENT_INIT_BULK event to
727 * TN_STATE_TAGGED_RECV_POSTED handler.
729 return kfilnd_tn_state_tagged_recv_posted(tn, event,
733 /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
737 KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
741 KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
743 kfilnd_tn_status_update(tn, rc,
744 LNET_MSG_STATUS_LOCAL_ERROR);
749 if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
750 rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
755 "Failed to send hello request: rc=%d",
760 /* If this is a new peer then we cannot progress the transaction
763 if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
765 "Dropping message from %s due to stale peer",
766 libcfs_nid2str(tn->tn_kp->kp_nid));
767 kfilnd_tn_status_update(tn, -EPROTO,
768 LNET_MSG_STATUS_LOCAL_DROPPED);
773 LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);
774 msg = tn->tn_rx_msg.msg;
776 /* Update the NID address with the new preferred RX context. */
777 kfilnd_peer_alive(tn->tn_kp);
779 /* Pass message up to LNet
780 * The TN will be reused in this call chain so we need to
781 * release the lock on the TN before proceeding.
783 KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
784 tn_state_to_str(tn->tn_state));
786 /* TODO: Do not manually update this state change. */
787 tn->tn_state = TN_STATE_IMM_RECV;
788 mutex_unlock(&tn->tn_lock);
790 lnet_nid4_to_nid(msg->srcnid, &srcnid);
791 if (msg->type == KFILND_MSG_IMMEDIATE) {
792 lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
793 rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
794 &hdr, &srcnid, tn, 0);
796 lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
797 rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
798 &hdr, &srcnid, tn, 1);
801 /* If successful, transaction has been accepted by LNet and we
802 * cannot process the transaction anymore within this context.
807 KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
808 kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
811 case TN_EVENT_RX_HELLO:
812 msg = tn->tn_rx_msg.msg;
814 kfilnd_peer_alive(tn->tn_kp);
817 case KFILND_MSG_HELLO_REQ:
818 kfilnd_peer_process_hello(tn->tn_kp, msg);
819 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
820 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
821 libcfs_nid2str(tn->tn_kp->kp_nid),
824 kfilnd_tn_pack_hello_rsp(tn);
826 /* Send immediate message. */
827 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
830 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
834 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
835 libcfs_nid2str(tn->tn_kp->kp_nid),
841 "Failed to post send to %s(%#llx): rc=%d",
842 libcfs_nid2str(tn->tn_kp->kp_nid),
843 tn->tn_target_addr, rc);
844 kfilnd_tn_status_update(tn, rc,
845 LNET_MSG_STATUS_LOCAL_ERROR);
849 case KFILND_MSG_HELLO_RSP:
851 kfilnd_peer_process_hello(tn->tn_kp, msg);
856 KFILND_TN_ERROR(tn, "Invalid message type: %s",
857 msg_type_to_str(msg->type));
863 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
868 if (kfilnd_tn_has_failed(tn))
872 kfilnd_tn_finalize(tn, tn_released);
877 static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
878 enum tn_events event, int status,
881 enum lnet_msg_hstatus hstatus;
883 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
887 case TN_EVENT_TX_FAIL:
888 if (status == -ETIMEDOUT || status == -EIO)
889 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
891 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
893 kfilnd_tn_status_update(tn, status, hstatus);
894 kfilnd_peer_tn_failed(tn->tn_kp, status);
895 if (tn->msg_type == KFILND_MSG_HELLO_REQ)
896 kfilnd_peer_clear_hello_pending(tn->tn_kp);
900 kfilnd_peer_alive(tn->tn_kp);
904 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
908 kfilnd_tn_finalize(tn, tn_released);
913 static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
914 enum tn_events event, int status,
918 bool finalize = false;
920 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
924 case TN_EVENT_INIT_TAG_RMA:
925 case TN_EVENT_SKIP_TAG_RMA:
926 /* Release the buffer we received the request on. All relevant
927 * information to perform the RMA operation is stored in the
928 * transaction structure. This should be done before the RMA
929 * operation to prevent two contexts from potentially processing
930 * the same transaction.
932 * TODO: Prevent this from returning -EAGAIN.
934 if (tn->tn_posted_buf) {
935 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
936 tn->tn_posted_buf = NULL;
939 /* Update the KFI address to use the response RX context. */
941 kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
942 tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
943 KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
944 libcfs_nid2str(tn->tn_kp->kp_nid),
947 /* Initiate the RMA operation to push/pull the LNet payload or
948 * send a tagged message to finalize the bulk operation if the
949 * RMA operation should be skipped.
951 if (event == TN_EVENT_INIT_TAG_RMA) {
953 rc = kfilnd_ep_post_read(tn->tn_ep, tn);
955 rc = kfilnd_ep_post_write(tn->tn_ep, tn);
958 /* Async tagged RMA event will progress transaction. */
960 kfilnd_tn_state_change(tn,
961 TN_STATE_WAIT_TAG_RMA_COMP);
964 /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
965 * the TN_STATE_IMM_RECV state.
969 "Need to replay tagged %s to %s(%#llx)",
970 tn->sink_buffer ? "read" : "write",
971 libcfs_nid2str(tn->tn_kp->kp_nid),
977 "Failed to post tagged %s to %s(%#llx): rc=%d",
978 tn->sink_buffer ? "read" : "write",
979 libcfs_nid2str(tn->tn_kp->kp_nid),
980 tn->tn_target_addr, rc);
981 kfilnd_tn_status_update(tn, rc,
982 LNET_MSG_STATUS_LOCAL_ERROR);
985 kfilnd_tn_status_update(tn, status,
988 /* Since the LNet initiator has posted a unique tagged
989 * buffer specific for this LNet transaction and the
990 * LNet target has decide not to push/pull to/for the
991 * LNet initiator tagged buffer, a noop operation is
992 * done to this tagged buffer (i/e payload transfer size
993 * is zero). But, immediate data, which contains the
994 * LNet target status for the transaction, is sent to
995 * the LNet initiator. Immediate data only appears in
996 * the completion event at the LNet initiator and not in
999 tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
1001 rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
1003 /* Async tagged RMA event will progress transaction. */
1005 kfilnd_tn_state_change(tn,
1006 TN_STATE_WAIT_TAG_COMP);
1009 /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
1010 * the TN_STATE_IMM_RECV state.
1014 "Need to replay tagged send to %s(%#llx)",
1015 libcfs_nid2str(tn->tn_kp->kp_nid),
1016 tn->tn_target_addr);
1021 "Failed to post tagged send to %s(%#llx): rc=%d",
1022 libcfs_nid2str(tn->tn_kp->kp_nid),
1023 tn->tn_target_addr, rc);
1024 kfilnd_tn_status_update(tn, rc,
1025 LNET_MSG_STATUS_LOCAL_ERROR);
1030 case TN_EVENT_RX_OK:
1035 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1039 if (kfilnd_tn_has_failed(tn))
1043 kfilnd_tn_finalize(tn, tn_released);
1048 static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
1049 enum tn_events event, int status,
1053 enum lnet_msg_hstatus hstatus;
1055 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1059 case TN_EVENT_TX_OK:
1060 kfilnd_peer_alive(tn->tn_kp);
1061 kfilnd_tn_timeout_enable(tn);
1062 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
1065 case TN_EVENT_TAG_RX_OK:
1066 kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
1069 case TN_EVENT_TX_FAIL:
1070 if (status == -ETIMEDOUT)
1071 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1073 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1075 kfilnd_tn_status_update(tn, status, hstatus);
1076 kfilnd_peer_tn_failed(tn->tn_kp, status);
1078 /* Need to cancel the tagged receive to prevent resources from
1081 rc = kfilnd_tn_cancel_tag_recv(tn);
1084 /* Async cancel event will progress transaction. */
1086 kfilnd_tn_status_update(tn, status,
1087 LNET_MSG_STATUS_LOCAL_ERROR);
1088 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1091 /* Need to replay TN_EVENT_INIT_BULK event while in the
1092 * TN_STATE_SEND_FAILED state.
1096 "Need to replay cancel tagged recv");
1101 "Unexpected error during cancel tagged receive: rc=%d",
1107 case TN_EVENT_TAG_RX_FAIL:
1108 kfilnd_tn_status_update(tn, status,
1109 LNET_MSG_STATUS_LOCAL_ERROR);
1110 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1114 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1121 static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
1122 enum tn_events event, int status,
1125 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1128 if (event == TN_EVENT_TX_OK) {
1129 kfilnd_peer_alive(tn->tn_kp);
1130 kfilnd_tn_finalize(tn, tn_released);
1132 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1139 static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
1140 enum tn_events event, int status,
1143 enum lnet_msg_hstatus hstatus;
1145 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1149 case TN_EVENT_TAG_TX_OK:
1150 kfilnd_peer_alive(tn->tn_kp);
1153 case TN_EVENT_TAG_TX_FAIL:
1154 if (status == -ETIMEDOUT)
1155 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1157 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1159 kfilnd_tn_status_update(tn, status, hstatus);
1160 kfilnd_peer_tn_failed(tn->tn_kp, status);
1164 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1168 kfilnd_tn_finalize(tn, tn_released);
1173 static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
1174 enum tn_events event, int status,
1178 enum lnet_msg_hstatus hstatus;
1180 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1184 case TN_EVENT_TAG_RX_FAIL:
1185 case TN_EVENT_TAG_RX_OK:
1186 /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
1187 * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
1188 * LNet target returned -ENODATA.
1191 if (event == TN_EVENT_TAG_RX_FAIL)
1192 kfilnd_tn_status_update(tn, status,
1193 LNET_MSG_STATUS_LOCAL_ERROR);
1195 kfilnd_tn_status_update(tn, status,
1196 LNET_MSG_STATUS_OK);
1199 if (!kfilnd_tn_timeout_cancel(tn)) {
1200 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
1205 case TN_EVENT_TIMEOUT:
1206 /* Need to cancel the tagged receive to prevent resources from
1209 rc = kfilnd_tn_cancel_tag_recv(tn);
1212 /* Async cancel event will progress transaction. */
1214 kfilnd_tn_state_change(tn,
1215 TN_STATE_WAIT_TIMEOUT_TAG_COMP);
1218 /* Need to replay TN_EVENT_INIT_BULK event while in the
1219 * TN_STATE_WAIT_TAG_COMP state.
1223 "Need to replay cancel tagged recv");
1228 "Unexpected error during cancel tagged receive: rc=%d",
1234 case TN_EVENT_TAG_TX_FAIL:
1235 if (status == -ETIMEDOUT)
1236 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1238 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1240 kfilnd_tn_status_update(tn, status, hstatus);
1241 kfilnd_peer_tn_failed(tn->tn_kp, status);
1244 case TN_EVENT_TAG_TX_OK:
1245 kfilnd_peer_alive(tn->tn_kp);
1249 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1253 kfilnd_tn_finalize(tn, tn_released);
1258 static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
1259 enum tn_events event, int status,
1262 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1266 case TN_EVENT_TX_FAIL:
1267 kfilnd_peer_tn_failed(tn->tn_kp, status);
1270 case TN_EVENT_TX_OK:
1271 kfilnd_peer_alive(tn->tn_kp);
1274 case TN_EVENT_TAG_RX_FAIL:
1275 case TN_EVENT_TAG_RX_CANCEL:
1279 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1283 kfilnd_tn_finalize(tn, tn_released);
1288 static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
1289 enum tn_events event,
1290 int status, bool *tn_released)
1292 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1296 case TN_EVENT_TAG_RX_CANCEL:
1297 kfilnd_tn_status_update(tn, -ETIMEDOUT,
1298 LNET_MSG_STATUS_REMOTE_TIMEOUT);
1299 kfilnd_peer_tn_failed(tn->tn_kp, -ETIMEDOUT);
1302 case TN_EVENT_TAG_RX_FAIL:
1303 kfilnd_tn_status_update(tn, status,
1304 LNET_MSG_STATUS_LOCAL_ERROR);
1307 case TN_EVENT_TAG_RX_OK:
1311 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1315 kfilnd_tn_finalize(tn, tn_released);
1320 static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
1321 enum tn_events event, int status,
1324 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1327 if (event == TN_EVENT_TIMEOUT) {
1328 kfilnd_tn_finalize(tn, tn_released);
1330 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1338 (* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
1339 enum tn_events event,
1341 bool *tn_released) = {
1342 [TN_STATE_IDLE] = kfilnd_tn_state_idle,
1343 [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
1344 [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
1345 [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
1346 [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
1347 [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
1348 [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
1349 [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
1350 [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
1351 kfilnd_tn_state_wait_timeout_tag_comp,
1352 [TN_STATE_FAIL] = kfilnd_tn_state_fail,
1353 [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
1354 [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
1358 * kfilnd_tn_event_handler() - Update transaction state machine with an event.
1359 * @tn: Transaction to be updated.
1360 * @event: Transaction event.
1361 * @status: Errno status associated with the event.
1363 * When the transaction event handler is first called on a new transaction, the
1364 * transaction is now own by the transaction system. This means that will be
1365 * freed by the system as the transaction is progressed through the state
1368 void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
1369 enum tn_events event, int status)
1371 bool tn_released = false;
1377 mutex_lock(&tn->tn_lock);
1378 rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
1380 if (rc == -EAGAIN) {
1381 tn->replay_event = event;
1382 tn->replay_status = status;
1383 kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
1387 mutex_unlock(&tn->tn_lock);
1391 * kfilnd_tn_free() - Free a transaction.
1393 void kfilnd_tn_free(struct kfilnd_transaction *tn)
1395 spin_lock(&tn->tn_ep->tn_list_lock);
1396 list_del(&tn->tn_entry);
1397 spin_unlock(&tn->tn_ep->tn_list_lock);
1399 KFILND_TN_DEBUG(tn, "Transaction freed");
1402 kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
1404 /* Free send message buffer if needed. */
1405 if (tn->tn_tx_msg.msg)
1406 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1408 kmem_cache_free(tn_cache, tn);
1412 * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
1413 * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
1415 * @cpt: CPT of the transaction.
1416 * @target_nid: Target NID of the transaction.
1417 * @alloc_msg: Allocate an immediate message for the transaction.
1418 * @is_initiator: Is initiator of LNet transaction.
1419 * @key: Is transaction memory region key need.
1421 * During transaction allocation, each transaction is associated with a KFI LND
1422 * endpoint use to post data transfer operations. The CPT argument is used to
1423 * lookup the KFI LND endpoint within the KFI LND device.
1425 * Return: On success, valid pointer. Else, negative errno pointer.
1427 struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
1428 lnet_nid_t target_nid,
1429 bool alloc_msg, bool is_initiator,
1432 struct kfilnd_transaction *tn;
1433 struct kfilnd_peer *kp;
1441 kp = kfilnd_peer_get(dev, target_nid);
1447 tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, alloc_msg, is_initiator,
1451 kfilnd_peer_put(kp);
1461 /* See kfilnd_tn_alloc()
1462 * Note: Caller must have a reference on @kp
1464 struct kfilnd_transaction *kfilnd_tn_alloc_for_peer(struct kfilnd_dev *dev,
1466 struct kfilnd_peer *kp,
1471 struct kfilnd_transaction *tn;
1472 struct kfilnd_ep *ep;
1474 ktime_t tn_alloc_ts;
1481 tn_alloc_ts = ktime_get();
1483 /* If the CPT does not fall into the LNet NI CPT range, force the CPT
1484 * into the LNet NI CPT range. This should never happen.
1486 ep = dev->cpt_to_endpoint[cpt];
1488 CWARN("%s used invalid cpt=%d\n",
1489 libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
1490 ep = dev->kfd_endpoints[0];
1493 tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
1500 tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
1501 if (!tn->tn_tx_msg.msg) {
1508 rc = kfilnd_ep_get_key(ep);
1516 mutex_init(&tn->tn_lock);
1518 tn->tn_response_rx = ep->end_context_id;
1519 tn->tn_state = TN_STATE_IDLE;
1520 tn->hstatus = LNET_MSG_STATUS_OK;
1521 tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
1522 tn->tn_replay_deadline = ktime_sub(tn->deadline,
1523 (lnet_get_lnd_timeout() / 2));
1524 tn->is_initiator = is_initiator;
1525 INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
1527 /* Add the transaction to an endpoint. This is like
1528 * incrementing a ref counter.
1530 spin_lock(&ep->tn_list_lock);
1531 list_add_tail(&tn->tn_entry, &ep->tn_list);
1532 spin_unlock(&ep->tn_list_lock);
1534 tn->tn_alloc_ts = tn_alloc_ts;
1535 tn->tn_state_ts = ktime_get();
1537 KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
1542 if (tn->tn_tx_msg.msg)
1543 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1544 kmem_cache_free(tn_cache, tn);
1550 * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
1552 * This function should only be called when there are no outstanding
1555 void kfilnd_tn_cleanup(void)
1557 kmem_cache_destroy(imm_buf_cache);
1558 kmem_cache_destroy(tn_cache);
1562 * kfilnd_tn_init() - Initialize KFI LND transaction system.
1564 * Return: On success, zero. Else, negative errno.
1566 int kfilnd_tn_init(void)
1568 tn_cache = kmem_cache_create("kfilnd_tn",
1569 sizeof(struct kfilnd_transaction), 0,
1570 SLAB_HWCACHE_ALIGN, NULL);
1574 imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
1575 KFILND_IMMEDIATE_MSG_SIZE, 0,
1576 SLAB_HWCACHE_ALIGN, NULL);
1578 goto err_tn_cache_destroy;
1582 err_tn_cache_destroy:
1583 kmem_cache_destroy(tn_cache);
1589 * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
1590 * @tn: Transaction to have buffer set.
1591 * @kiov: LNet KIOV buffer.
1592 * @num_iov: Number of IOVs.
1593 * @offset: Offset into IOVs where the buffer starts.
1594 * @len: Length of the buffer.
1596 * This function takes the user provided IOV, offset, and len, and sets the
1597 * transaction buffer. The user provided IOV is an LNet KIOV. When the
1598 * transaction buffer is configured, the user provided offset is applied
1599 * when the transaction buffer is configured (i.e. the transaction buffer
1602 int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
1603 struct bio_vec *kiov, size_t num_iov,
1604 size_t offset, size_t len)
1608 size_t cur_offset = offset;
1613 for (i = 0; (i < num_iov) && (cur_len < len); i++) {
1614 /* Skip KIOVs until a KIOV with a length less than the current
1617 if (kiov[i].bv_len <= cur_offset) {
1618 cur_offset -= kiov[i].bv_len;
1622 tmp_len = kiov[i].bv_len - cur_offset;
1623 tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
1625 if (tmp_len + cur_len > len)
1626 tmp_len = len - cur_len;
1628 /* tn_kiov is an array of size LNET_MAX_IOV */
1629 if (cur_iov >= LNET_MAX_IOV)
1632 tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
1633 tn->tn_kiov[cur_iov].bv_len = tmp_len;
1634 tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
1641 tn->tn_num_iovec = cur_iov;
1642 tn->tn_nob = cur_len;