4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright 2022 Hewlett Packard Enterprise Development LP
26 * This file is part of Lustre, http://www.lustre.org/
29 * kfilnd transaction and state machine processing.
32 #include "kfilnd_tn.h"
33 #include "kfilnd_ep.h"
34 #include "kfilnd_dev.h"
35 #include "kfilnd_dom.h"
36 #include "kfilnd_peer.h"
37 #include <asm/checksum.h>
39 static struct kmem_cache *tn_cache;
40 static struct kmem_cache *imm_buf_cache;
42 static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
45 return csum_fold(csum_partial(ptr, nob, 0));
49 static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
51 const int hdr_size = offsetof(struct kfilnd_msg, proto);
54 case KFILND_MSG_IMMEDIATE:
55 return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
57 case KFILND_MSG_BULK_PUT_REQ:
58 case KFILND_MSG_BULK_GET_REQ:
59 return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
66 static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
68 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
70 /* Pack the protocol header and payload. */
71 msg->proto.hello.version = KFILND_MSG_VERSION;
72 msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
73 msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
75 /* TODO: Support multiple RX contexts per peer. */
76 msg->proto.hello.rx_count = 1;
78 /* Pack the transport header. */
79 msg->magic = KFILND_MSG_MAGIC;
81 /* Mesage version zero is only valid for hello requests. */
83 msg->type = KFILND_MSG_HELLO_REQ;
84 msg->nob = sizeof(struct kfilnd_hello_msg) +
85 offsetof(struct kfilnd_msg, proto);
86 msg->cksum = NO_CHECKSUM;
87 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
88 msg->dstnid = tn->tn_kp->kp_nid;
90 /* Checksum entire message. */
91 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
93 tn->tn_tx_msg.length = msg->nob;
96 static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
98 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
100 /* Pack the protocol header and payload. */
101 msg->proto.hello.version = tn->tn_kp->kp_version;
102 msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
103 msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
105 /* TODO: Support multiple RX contexts per peer. */
106 msg->proto.hello.rx_count = 1;
108 /* Pack the transport header. */
109 msg->magic = KFILND_MSG_MAGIC;
111 /* Mesage version zero is only valid for hello requests. */
113 msg->type = KFILND_MSG_HELLO_RSP;
114 msg->nob = sizeof(struct kfilnd_hello_msg) +
115 offsetof(struct kfilnd_msg, proto);
116 msg->cksum = NO_CHECKSUM;
117 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
118 msg->dstnid = tn->tn_kp->kp_nid;
120 /* Checksum entire message. */
121 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
123 tn->tn_tx_msg.length = msg->nob;
126 static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
128 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
130 /* Pack the protocol header and payload. */
131 lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
132 msg->proto.bulk_req.key = tn->tn_mr_key;
133 msg->proto.bulk_req.response_rx = tn->tn_response_rx;
135 /* Pack the transport header. */
136 msg->magic = KFILND_MSG_MAGIC;
137 msg->version = KFILND_MSG_VERSION;
138 msg->type = tn->msg_type;
139 msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
140 offsetof(struct kfilnd_msg, proto);
141 msg->cksum = NO_CHECKSUM;
142 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
143 msg->dstnid = tn->tn_kp->kp_nid;
145 /* Checksum entire message. */
146 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
148 tn->tn_tx_msg.length = msg->nob;
151 static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
153 struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
155 /* Pack the protocol header and payload. */
156 lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
158 lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
160 offsetof(struct kfilnd_msg,
161 proto.immed.payload),
162 tn->tn_num_iovec, tn->tn_kiov, 0,
165 /* Pack the transport header. */
166 msg->magic = KFILND_MSG_MAGIC;
167 msg->version = KFILND_MSG_VERSION;
168 msg->type = tn->msg_type;
169 msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
170 msg->cksum = NO_CHECKSUM;
171 msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
172 msg->dstnid = tn->tn_kp->kp_nid;
174 /* Checksum entire message. */
175 msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
177 tn->tn_tx_msg.length = msg->nob;
180 static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
183 const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
185 if (nob < hdr_size) {
186 KFILND_EP_ERROR(ep, "Short message: %u", nob);
190 /* TODO: Support byte swapping on mixed endian systems. */
191 if (msg->magic != KFILND_MSG_MAGIC) {
192 KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
196 /* TODO: Allow for older versions. */
197 if (msg->version > KFILND_MSG_VERSION) {
198 KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
202 if (msg->nob > nob) {
203 KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
208 /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
209 if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
210 KFILND_EP_ERROR(ep, "Bad checksum");
214 if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
215 KFILND_EP_ERROR(ep, "Bad destination nid: %s",
216 libcfs_nid2str(msg->dstnid));
220 if (msg->srcnid == LNET_NID_ANY) {
221 KFILND_EP_ERROR(ep, "Bad source nid: %s",
222 libcfs_nid2str(msg->srcnid));
226 if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
227 KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
228 msg_type_to_str(msg->type),
229 msg->nob, kfilnd_tn_msgtype2size(msg->type));
233 switch ((enum kfilnd_msg_type)msg->type) {
234 case KFILND_MSG_IMMEDIATE:
235 case KFILND_MSG_BULK_PUT_REQ:
236 case KFILND_MSG_BULK_GET_REQ:
237 if (msg->version == 0) {
239 "Bad message type and version: type=%s version=%u",
240 msg_type_to_str(msg->type),
246 case KFILND_MSG_HELLO_REQ:
247 case KFILND_MSG_HELLO_RSP:
248 if (msg->version != 0) {
250 "Bad message type and version: type=%s version=%u",
251 msg_type_to_str(msg->type),
258 CERROR("Unknown message type %x\n", msg->type);
264 static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
266 unsigned int data_size_bucket =
267 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
268 struct kfilnd_tn_duration_stat *stat;
270 if (tn->is_initiator)
271 stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
273 stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
275 atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts)),
276 &stat->accumulated_duration);
277 atomic_inc(&stat->accumulated_count);
280 static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
281 enum tn_states new_state)
283 KFILND_TN_DEBUG(tn, "%s -> %s state change",
284 tn_state_to_str(tn->tn_state),
285 tn_state_to_str(new_state));
287 kfilnd_tn_record_state_change(tn);
289 tn->tn_state = new_state;
290 tn->tn_state_ts = ktime_get();
293 static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
294 enum lnet_msg_hstatus hstatus)
296 /* Only the first non-ok status will take. */
297 if (tn->tn_status == 0) {
298 KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
300 tn->tn_status = status;
303 if (tn->hstatus == LNET_MSG_STATUS_OK) {
304 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
305 tn->hstatus, hstatus);
306 tn->hstatus = hstatus;
310 static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
312 return tn->tn_status != 0;
316 * kfilnd_tn_process_rx_event() - Process an immediate receive event.
318 * For each immediate receive, a transaction structure needs to be allocated to
319 * process the receive.
321 void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
322 struct kfilnd_msg *rx_msg, int msg_size)
324 struct kfilnd_transaction *tn;
325 bool alloc_msg = true;
327 enum tn_events event = TN_EVENT_RX_HELLO;
329 /* Increment buf ref count for this work */
330 atomic_inc(&bufdesc->immed_ref);
332 /* Unpack the message */
333 rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
334 if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
335 kfilnd_ep_imm_buffer_put(bufdesc);
336 KFILND_EP_ERROR(bufdesc->immed_end,
337 "Failed to unpack message %d", rc);
341 switch ((enum kfilnd_msg_type)rx_msg->type) {
342 case KFILND_MSG_IMMEDIATE:
343 case KFILND_MSG_BULK_PUT_REQ:
344 case KFILND_MSG_BULK_GET_REQ:
345 event = TN_EVENT_RX_OK;
347 case KFILND_MSG_HELLO_RSP:
350 case KFILND_MSG_HELLO_REQ:
351 /* Context points to a received buffer and status is the length.
352 * Allocate a Tn structure, set its values, then launch the
355 tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
356 bufdesc->immed_end->end_cpt,
357 rx_msg->srcnid, alloc_msg, false,
360 kfilnd_ep_imm_buffer_put(bufdesc);
361 KFILND_EP_ERROR(bufdesc->immed_end,
362 "Failed to allocate transaction struct: rc=%ld",
367 tn->tn_rx_msg.msg = rx_msg;
368 tn->tn_rx_msg.length = msg_size;
369 tn->tn_posted_buf = bufdesc;
371 KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
372 msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
377 KFILND_EP_ERROR(bufdesc->immed_end,
378 "Unhandled kfilnd message type: %d",
379 (enum kfilnd_msg_type)rx_msg->type);
383 kfilnd_tn_event_handler(tn, event, 0);
386 static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
388 unsigned int data_size_bucket =
389 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
390 struct kfilnd_tn_duration_stat *stat;
392 if (tn->is_initiator)
393 stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
395 stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
397 atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts)),
398 &stat->accumulated_duration);
399 atomic_inc(&stat->accumulated_count);
403 * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
405 * All state machine functions should call kfilnd_tn_finalize() instead of
406 * kfilnd_tn_free(). Once all expected asynchronous events have been received,
407 * if the transaction lock has not been released, it will now be released,
408 * transaction resources cleaned up, and LNet finalized will be called.
410 static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
413 mutex_unlock(&tn->tn_lock);
417 /* Release the reference on the multi-receive buffer. */
418 if (tn->tn_posted_buf)
419 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
421 /* Finalize LNet operation. */
423 tn->tn_lntmsg->msg_health_status = tn->hstatus;
424 lnet_finalize(tn->tn_lntmsg, tn->tn_status);
427 if (tn->tn_getreply) {
428 tn->tn_getreply->msg_health_status = tn->hstatus;
429 lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
431 tn->tn_status ? 0 : tn->tn_nob);
432 lnet_finalize(tn->tn_getreply, tn->tn_status);
435 if (KFILND_TN_PEER_VALID(tn))
436 kfilnd_peer_put(tn->tn_kp);
438 kfilnd_tn_record_state_change(tn);
439 kfilnd_tn_record_duration(tn);
445 * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
446 * @tn: Transaction to have tagged received cancelled.
448 * Return: 0 on success. Else, negative errno. If an error occurs, resources may
451 static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
455 /* Issue a cancel. A return code of zero means the operation issued an
456 * async cancel. A return code of -ENOENT means the tagged receive was
457 * not found. The assumption here is that a tagged send landed thus
458 * removing the tagged receive buffer from hardware. For both cases,
459 * async events should occur.
461 rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
462 if (rc != 0 && rc != -ENOENT) {
463 KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
470 static void kfilnd_tn_timeout_work(struct work_struct *work)
472 struct kfilnd_transaction *tn =
473 container_of(work, struct kfilnd_transaction, timeout_work);
475 KFILND_TN_ERROR(tn, "Bulk operation timeout");
476 kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
479 static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
481 struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
483 queue_work(kfilnd_wq, &tn->timeout_work);
486 static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
488 return del_timer(&tn->timeout_timer);
491 static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
493 ktime_t remaining_time = max_t(ktime_t, 0,
494 tn->deadline - ktime_get_seconds());
495 unsigned long expires = remaining_time * HZ + jiffies;
497 if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
500 cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
501 (unsigned long)tn, 0);
502 mod_timer(&tn->timeout_timer, expires);
505 /* The following are the state machine routines for the transactions. */
506 static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
507 enum tn_events event, int status,
512 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
516 case TN_EVENT_INIT_BULK:
517 /* Need to cancel the tagged receive to prevent resources from
520 rc = kfilnd_tn_cancel_tag_recv(tn);
523 /* Async event will progress transaction. */
525 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
528 /* Need to replay TN_EVENT_INIT_BULK event while in the
529 * TN_STATE_SEND_FAILED state.
533 "Need to replay cancel tagged recv");
538 "Unexpected error during cancel tagged receive: rc=%d",
545 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
550 static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
551 enum tn_events event, int status,
556 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
560 case TN_EVENT_INIT_BULK:
561 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
562 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
563 libcfs_nid2str(tn->tn_kp->kp_nid),
566 kfilnd_tn_pack_bulk_req(tn);
568 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
570 /* Async event will progress immediate send. */
572 kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
575 /* Need to replay TN_EVENT_INIT_BULK event while in the
576 * TN_STATE_TAGGED_RECV_POSTED state.
580 "Need to replay post send to %s(%#llx)",
581 libcfs_nid2str(tn->tn_kp->kp_nid),
585 /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
586 * posted tagged receive buffer.
590 "Failed to post send to %s(%#llx): rc=%d",
591 libcfs_nid2str(tn->tn_kp->kp_nid),
592 tn->tn_target_addr, rc);
593 kfilnd_tn_status_update(tn, rc,
594 LNET_MSG_STATUS_LOCAL_ERROR);
595 kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
597 /* Propogate TN_EVENT_INIT_BULK event to
598 * TN_STATE_SEND_FAILED handler.
600 return kfilnd_tn_state_send_failed(tn, event, rc,
605 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
610 static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
611 enum tn_events event, int status,
614 struct kfilnd_msg *msg;
616 bool finalize = false;
618 struct lnet_nid srcnid;
620 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
623 /* For new peers, send a hello request message and queue the true LNet
624 * message for replay.
626 if (kfilnd_peer_is_new_peer(tn->tn_kp) &&
627 (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
628 if (kfilnd_peer_deleted(tn->tn_kp)) {
629 /* We'll assign a NETWORK_TIMEOUT message health status
630 * below because we don't know why this peer was marked
635 "Dropping message to stale peer %s\n",
636 libcfs_nid2str(tn->tn_kp->kp_nid));
637 } else if (ktime_after(tn->deadline, ktime_get_seconds())) {
638 /* If transaction deadline has not been met, return
639 * -EAGAIN. This will cause this transaction event to be
640 * replayed. During this time, an async message from the
641 * peer should occur at which point the kfilnd version
642 * should be negotiated.
644 KFILND_TN_DEBUG(tn, "%s hello response pending",
645 libcfs_nid2str(tn->tn_kp->kp_nid));
651 kfilnd_tn_status_update(tn, rc,
652 LNET_MSG_STATUS_NETWORK_TIMEOUT);
658 case TN_EVENT_INIT_IMMEDIATE:
659 case TN_EVENT_TX_HELLO:
660 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
661 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
662 libcfs_nid2str(tn->tn_kp->kp_nid),
665 if (event == TN_EVENT_INIT_IMMEDIATE)
666 kfilnd_tn_pack_immed_msg(tn);
668 kfilnd_tn_pack_hello_req(tn);
670 /* Send immediate message. */
671 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
673 /* Async event will progress immediate send. */
675 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
678 /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
682 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
683 libcfs_nid2str(tn->tn_kp->kp_nid),
689 "Failed to post send to %s(%#llx): rc=%d",
690 libcfs_nid2str(tn->tn_kp->kp_nid),
691 tn->tn_target_addr, rc);
692 if (event == TN_EVENT_TX_HELLO)
693 kfilnd_peer_clear_hello_pending(tn->tn_kp);
694 kfilnd_tn_status_update(tn, rc,
695 LNET_MSG_STATUS_LOCAL_ERROR);
699 case TN_EVENT_INIT_BULK:
700 /* Post tagged receive buffer used to land bulk response. */
701 rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
704 /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
706 kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
708 /* Propogate TN_EVENT_INIT_BULK event to
709 * TN_STATE_TAGGED_RECV_POSTED handler.
711 return kfilnd_tn_state_tagged_recv_posted(tn, event,
715 /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
719 KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
723 KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
725 kfilnd_tn_status_update(tn, rc,
726 LNET_MSG_STATUS_LOCAL_ERROR);
731 if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
732 rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
737 "Failed to send hello request: rc=%d",
742 /* If this is a new peer then we cannot progress the transaction
745 if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
747 "Dropping message from %s due to stale peer",
748 libcfs_nid2str(tn->tn_kp->kp_nid));
749 kfilnd_tn_status_update(tn, -EPROTO,
750 LNET_MSG_STATUS_LOCAL_DROPPED);
755 LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);
756 msg = tn->tn_rx_msg.msg;
758 /* Update the NID address with the new preferred RX context. */
759 kfilnd_peer_alive(tn->tn_kp);
761 /* Pass message up to LNet
762 * The TN will be reused in this call chain so we need to
763 * release the lock on the TN before proceeding.
765 KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
766 tn_state_to_str(tn->tn_state));
768 /* TODO: Do not manually update this state change. */
769 tn->tn_state = TN_STATE_IMM_RECV;
770 mutex_unlock(&tn->tn_lock);
772 lnet_nid4_to_nid(msg->srcnid, &srcnid);
773 if (msg->type == KFILND_MSG_IMMEDIATE) {
774 lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
775 rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
776 &hdr, &srcnid, tn, 0);
778 lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
779 rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
780 &hdr, &srcnid, tn, 1);
783 /* If successful, transaction has been accepted by LNet and we
784 * cannot process the transaction anymore within this context.
789 KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
790 kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
793 case TN_EVENT_RX_HELLO:
794 msg = tn->tn_rx_msg.msg;
796 kfilnd_peer_alive(tn->tn_kp);
799 case KFILND_MSG_HELLO_REQ:
800 kfilnd_peer_process_hello(tn->tn_kp, msg);
801 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
802 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
803 libcfs_nid2str(tn->tn_kp->kp_nid),
806 kfilnd_tn_pack_hello_rsp(tn);
808 /* Send immediate message. */
809 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
812 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
816 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
817 libcfs_nid2str(tn->tn_kp->kp_nid),
823 "Failed to post send to %s(%#llx): rc=%d",
824 libcfs_nid2str(tn->tn_kp->kp_nid),
825 tn->tn_target_addr, rc);
826 kfilnd_tn_status_update(tn, rc,
827 LNET_MSG_STATUS_LOCAL_ERROR);
831 case KFILND_MSG_HELLO_RSP:
833 kfilnd_peer_process_hello(tn->tn_kp, msg);
838 KFILND_TN_ERROR(tn, "Invalid message type: %s",
839 msg_type_to_str(msg->type));
845 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
850 if (kfilnd_tn_has_failed(tn))
854 kfilnd_tn_finalize(tn, tn_released);
859 static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
860 enum tn_events event, int status,
863 enum lnet_msg_hstatus hstatus;
865 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
869 case TN_EVENT_TX_FAIL:
870 if (status == -ETIMEDOUT || status == -EIO)
871 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
873 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
875 kfilnd_tn_status_update(tn, status, hstatus);
876 kfilnd_peer_stale(tn->tn_kp);
877 if (tn->msg_type == KFILND_MSG_HELLO_REQ)
878 kfilnd_peer_clear_hello_pending(tn->tn_kp);
882 kfilnd_peer_alive(tn->tn_kp);
886 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
890 kfilnd_tn_finalize(tn, tn_released);
895 static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
896 enum tn_events event, int status,
900 bool finalize = false;
902 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
906 case TN_EVENT_INIT_TAG_RMA:
907 case TN_EVENT_SKIP_TAG_RMA:
908 /* Release the buffer we received the request on. All relevant
909 * information to perform the RMA operation is stored in the
910 * transaction structure. This should be done before the RMA
911 * operation to prevent two contexts from potentially processing
912 * the same transaction.
914 * TODO: Prevent this from returning -EAGAIN.
916 if (tn->tn_posted_buf) {
917 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
918 tn->tn_posted_buf = NULL;
921 /* Update the KFI address to use the response RX context. */
923 kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
924 tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
925 KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
926 libcfs_nid2str(tn->tn_kp->kp_nid),
929 /* Initiate the RMA operation to push/pull the LNet payload or
930 * send a tagged message to finalize the bulk operation if the
931 * RMA operation should be skipped.
933 if (event == TN_EVENT_INIT_TAG_RMA) {
935 rc = kfilnd_ep_post_read(tn->tn_ep, tn);
937 rc = kfilnd_ep_post_write(tn->tn_ep, tn);
940 /* Async tagged RMA event will progress transaction. */
942 kfilnd_tn_state_change(tn,
943 TN_STATE_WAIT_TAG_RMA_COMP);
946 /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
947 * the TN_STATE_IMM_RECV state.
951 "Need to replay tagged %s to %s(%#llx)",
952 tn->sink_buffer ? "read" : "write",
953 libcfs_nid2str(tn->tn_kp->kp_nid),
959 "Failed to post tagged %s to %s(%#llx): rc=%d",
960 tn->sink_buffer ? "read" : "write",
961 libcfs_nid2str(tn->tn_kp->kp_nid),
962 tn->tn_target_addr, rc);
963 kfilnd_tn_status_update(tn, rc,
964 LNET_MSG_STATUS_LOCAL_ERROR);
967 kfilnd_tn_status_update(tn, status,
970 /* Since the LNet initiator has posted a unique tagged
971 * buffer specific for this LNet transaction and the
972 * LNet target has decide not to push/pull to/for the
973 * LNet initiator tagged buffer, a noop operation is
974 * done to this tagged buffer (i/e payload transfer size
975 * is zero). But, immediate data, which contains the
976 * LNet target status for the transaction, is sent to
977 * the LNet initiator. Immediate data only appears in
978 * the completion event at the LNet initiator and not in
981 tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
983 rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
985 /* Async tagged RMA event will progress transaction. */
987 kfilnd_tn_state_change(tn,
988 TN_STATE_WAIT_TAG_COMP);
991 /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
992 * the TN_STATE_IMM_RECV state.
996 "Need to replay tagged send to %s(%#llx)",
997 libcfs_nid2str(tn->tn_kp->kp_nid),
1003 "Failed to post tagged send to %s(%#llx): rc=%d",
1004 libcfs_nid2str(tn->tn_kp->kp_nid),
1005 tn->tn_target_addr, rc);
1006 kfilnd_tn_status_update(tn, rc,
1007 LNET_MSG_STATUS_LOCAL_ERROR);
1012 case TN_EVENT_RX_OK:
1017 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1021 if (kfilnd_tn_has_failed(tn))
1025 kfilnd_tn_finalize(tn, tn_released);
1030 static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
1031 enum tn_events event, int status,
1035 enum lnet_msg_hstatus hstatus;
1037 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1041 case TN_EVENT_TX_OK:
1042 kfilnd_peer_alive(tn->tn_kp);
1043 kfilnd_tn_timeout_enable(tn);
1044 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
1047 case TN_EVENT_TAG_RX_OK:
1048 kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
1051 case TN_EVENT_TX_FAIL:
1052 if (status == -ETIMEDOUT)
1053 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1055 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1057 kfilnd_tn_status_update(tn, status, hstatus);
1058 kfilnd_peer_stale(tn->tn_kp);
1060 /* Need to cancel the tagged receive to prevent resources from
1063 rc = kfilnd_tn_cancel_tag_recv(tn);
1066 /* Async cancel event will progress transaction. */
1068 kfilnd_tn_status_update(tn, status,
1069 LNET_MSG_STATUS_LOCAL_ERROR);
1070 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1073 /* Need to replay TN_EVENT_INIT_BULK event while in the
1074 * TN_STATE_SEND_FAILED state.
1078 "Need to replay cancel tagged recv");
1083 "Unexpected error during cancel tagged receive: rc=%d",
1089 case TN_EVENT_TAG_RX_FAIL:
1090 kfilnd_tn_status_update(tn, status,
1091 LNET_MSG_STATUS_LOCAL_ERROR);
1092 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1096 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1103 static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
1104 enum tn_events event, int status,
1107 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1110 if (event == TN_EVENT_TX_OK) {
1111 kfilnd_peer_alive(tn->tn_kp);
1112 kfilnd_tn_finalize(tn, tn_released);
1114 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1121 static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
1122 enum tn_events event, int status,
1125 enum lnet_msg_hstatus hstatus;
1127 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1131 case TN_EVENT_TAG_TX_OK:
1132 kfilnd_peer_alive(tn->tn_kp);
1135 case TN_EVENT_TAG_TX_FAIL:
1136 if (status == -ETIMEDOUT)
1137 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1139 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1141 kfilnd_tn_status_update(tn, status, hstatus);
1142 kfilnd_peer_stale(tn->tn_kp);
1146 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1150 kfilnd_tn_finalize(tn, tn_released);
1155 static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
1156 enum tn_events event, int status,
1160 enum lnet_msg_hstatus hstatus;
1162 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1166 case TN_EVENT_TAG_RX_FAIL:
1167 case TN_EVENT_TAG_RX_OK:
1168 /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
1169 * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
1170 * LNet target returned -ENODATA.
1173 if (event == TN_EVENT_TAG_RX_FAIL)
1174 kfilnd_tn_status_update(tn, status,
1175 LNET_MSG_STATUS_LOCAL_ERROR);
1177 kfilnd_tn_status_update(tn, status,
1178 LNET_MSG_STATUS_OK);
1181 if (!kfilnd_tn_timeout_cancel(tn)) {
1182 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
1187 case TN_EVENT_TIMEOUT:
1188 /* Need to cancel the tagged receive to prevent resources from
1191 rc = kfilnd_tn_cancel_tag_recv(tn);
1194 /* Async cancel event will progress transaction. */
1196 kfilnd_tn_state_change(tn,
1197 TN_STATE_WAIT_TIMEOUT_TAG_COMP);
1200 /* Need to replay TN_EVENT_INIT_BULK event while in the
1201 * TN_STATE_WAIT_TAG_COMP state.
1205 "Need to replay cancel tagged recv");
1210 "Unexpected error during cancel tagged receive: rc=%d",
1216 case TN_EVENT_TAG_TX_FAIL:
1217 if (status == -ETIMEDOUT)
1218 hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1220 hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1222 kfilnd_tn_status_update(tn, status, hstatus);
1223 kfilnd_peer_stale(tn->tn_kp);
1226 case TN_EVENT_TAG_TX_OK:
1227 kfilnd_peer_alive(tn->tn_kp);
1231 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1235 kfilnd_tn_finalize(tn, tn_released);
1240 static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
1241 enum tn_events event, int status,
1244 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1248 case TN_EVENT_TX_FAIL:
1249 kfilnd_peer_stale(tn->tn_kp);
1252 case TN_EVENT_TX_OK:
1253 kfilnd_peer_alive(tn->tn_kp);
1256 case TN_EVENT_TAG_RX_FAIL:
1257 case TN_EVENT_TAG_RX_CANCEL:
1261 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1265 kfilnd_tn_finalize(tn, tn_released);
1270 static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
1271 enum tn_events event,
1272 int status, bool *tn_released)
1274 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1278 case TN_EVENT_TAG_RX_CANCEL:
1279 kfilnd_tn_status_update(tn, -ETIMEDOUT,
1280 LNET_MSG_STATUS_REMOTE_TIMEOUT);
1281 kfilnd_peer_stale(tn->tn_kp);
1284 case TN_EVENT_TAG_RX_FAIL:
1285 kfilnd_tn_status_update(tn, status,
1286 LNET_MSG_STATUS_LOCAL_ERROR);
1289 case TN_EVENT_TAG_RX_OK:
1293 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1297 kfilnd_tn_finalize(tn, tn_released);
1302 static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
1303 enum tn_events event, int status,
1306 KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1309 if (event == TN_EVENT_TIMEOUT) {
1310 kfilnd_tn_finalize(tn, tn_released);
1312 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1320 (* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
1321 enum tn_events event,
1323 bool *tn_released) = {
1324 [TN_STATE_IDLE] = kfilnd_tn_state_idle,
1325 [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
1326 [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
1327 [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
1328 [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
1329 [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
1330 [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
1331 [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
1332 [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
1333 kfilnd_tn_state_wait_timeout_tag_comp,
1334 [TN_STATE_FAIL] = kfilnd_tn_state_fail,
1335 [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
1336 [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
1340 * kfilnd_tn_event_handler() - Update transaction state machine with an event.
1341 * @tn: Transaction to be updated.
1342 * @event: Transaction event.
1343 * @status: Errno status associated with the event.
1345 * When the transaction event handler is first called on a new transaction, the
1346 * transaction is now own by the transaction system. This means that will be
1347 * freed by the system as the transaction is progressed through the state
1350 void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
1351 enum tn_events event, int status)
1353 bool tn_released = false;
1359 mutex_lock(&tn->tn_lock);
1360 rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
1362 if (rc == -EAGAIN) {
1363 tn->replay_event = event;
1364 tn->replay_status = status;
1365 kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
1369 mutex_unlock(&tn->tn_lock);
1373 * kfilnd_tn_free() - Free a transaction.
1375 void kfilnd_tn_free(struct kfilnd_transaction *tn)
1377 spin_lock(&tn->tn_ep->tn_list_lock);
1378 list_del(&tn->tn_entry);
1379 spin_unlock(&tn->tn_ep->tn_list_lock);
1381 KFILND_TN_DEBUG(tn, "Transaction freed");
1384 kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
1386 /* Free send message buffer if needed. */
1387 if (tn->tn_tx_msg.msg)
1388 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1390 kmem_cache_free(tn_cache, tn);
1394 * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
1395 * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
1397 * @cpt: CPT of the transaction.
1398 * @target_nid: Target NID of the transaction.
1399 * @alloc_msg: Allocate an immediate message for the transaction.
1400 * @is_initiator: Is initiator of LNet transaction.
1401 * @key: Is transaction memory region key need.
1403 * During transaction allocation, each transaction is associated with a KFI LND
1404 * endpoint use to post data transfer operations. The CPT argument is used to
1405 * lookup the KFI LND endpoint within the KFI LND device.
1407 * Return: On success, valid pointer. Else, negative errno pointer.
1409 struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
1410 lnet_nid_t target_nid,
1411 bool alloc_msg, bool is_initiator,
1414 struct kfilnd_transaction *tn;
1415 struct kfilnd_peer *kp;
1423 kp = kfilnd_peer_get(dev, target_nid);
1429 tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, alloc_msg, is_initiator,
1433 kfilnd_peer_put(kp);
1443 /* See kfilnd_tn_alloc()
1444 * Note: Caller must have a reference on @kp
1446 struct kfilnd_transaction *kfilnd_tn_alloc_for_peer(struct kfilnd_dev *dev,
1448 struct kfilnd_peer *kp,
1453 struct kfilnd_transaction *tn;
1454 struct kfilnd_ep *ep;
1456 ktime_t tn_alloc_ts;
1463 tn_alloc_ts = ktime_get();
1465 /* If the CPT does not fall into the LNet NI CPT range, force the CPT
1466 * into the LNet NI CPT range. This should never happen.
1468 ep = dev->cpt_to_endpoint[cpt];
1470 CWARN("%s used invalid cpt=%d\n",
1471 libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
1472 ep = dev->kfd_endpoints[0];
1475 tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
1482 tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
1483 if (!tn->tn_tx_msg.msg) {
1490 rc = kfilnd_ep_get_key(ep);
1497 refcount_inc(&kp->kp_cnt);
1499 mutex_init(&tn->tn_lock);
1501 tn->tn_response_rx = ep->end_context_id;
1502 tn->tn_state = TN_STATE_IDLE;
1503 tn->hstatus = LNET_MSG_STATUS_OK;
1504 tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
1505 tn->is_initiator = is_initiator;
1506 INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
1508 /* Add the transaction to an endpoint. This is like
1509 * incrementing a ref counter.
1511 spin_lock(&ep->tn_list_lock);
1512 list_add_tail(&tn->tn_entry, &ep->tn_list);
1513 spin_unlock(&ep->tn_list_lock);
1515 tn->tn_alloc_ts = tn_alloc_ts;
1516 tn->tn_state_ts = ktime_get();
1518 KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
1523 if (tn->tn_tx_msg.msg)
1524 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1525 kmem_cache_free(tn_cache, tn);
1531 * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
1533 * This function should only be called when there are no outstanding
1536 void kfilnd_tn_cleanup(void)
1538 kmem_cache_destroy(imm_buf_cache);
1539 kmem_cache_destroy(tn_cache);
1543 * kfilnd_tn_init() - Initialize KFI LND transaction system.
1545 * Return: On success, zero. Else, negative errno.
1547 int kfilnd_tn_init(void)
1549 tn_cache = kmem_cache_create("kfilnd_tn",
1550 sizeof(struct kfilnd_transaction), 0,
1551 SLAB_HWCACHE_ALIGN, NULL);
1555 imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
1556 KFILND_IMMEDIATE_MSG_SIZE, 0,
1557 SLAB_HWCACHE_ALIGN, NULL);
1559 goto err_tn_cache_destroy;
1563 err_tn_cache_destroy:
1564 kmem_cache_destroy(tn_cache);
1570 * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
1571 * @tn: Transaction to have buffer set.
1572 * @kiov: LNet KIOV buffer.
1573 * @num_iov: Number of IOVs.
1574 * @offset: Offset into IOVs where the buffer starts.
1575 * @len: Length of the buffer.
1577 * This function takes the user provided IOV, offset, and len, and sets the
1578 * transaction buffer. The user provided IOV is an LNet KIOV. When the
1579 * transaction buffer is configured, the user provided offset is applied
1580 * when the transaction buffer is configured (i.e. the transaction buffer
1583 int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
1584 struct bio_vec *kiov, size_t num_iov,
1585 size_t offset, size_t len)
1589 size_t cur_offset = offset;
1594 for (i = 0; (i < num_iov) && (cur_len < len); i++) {
1595 /* Skip KIOVs until a KIOV with a length less than the current
1598 if (kiov[i].bv_len <= cur_offset) {
1599 cur_offset -= kiov[i].bv_len;
1603 tmp_len = kiov[i].bv_len - cur_offset;
1604 tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
1606 if (tmp_len + cur_len > len)
1607 tmp_len = len - cur_len;
1609 /* tn_kiov is an array of size LNET_MAX_IOV */
1610 if (cur_iov >= LNET_MAX_IOV)
1613 tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
1614 tn->tn_kiov[cur_iov].bv_len = tmp_len;
1615 tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
1622 tn->tn_num_iovec = cur_iov;
1623 tn->tn_nob = cur_len;