Whamcloud - gitweb
eba1077ca4d2a9b7b46cf8e425a74bc96ce338f7
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd_tn.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd transaction and state machine processing.
30  */
31
32 #include "kfilnd_tn.h"
33 #include "kfilnd_ep.h"
34 #include "kfilnd_dev.h"
35 #include "kfilnd_dom.h"
36 #include "kfilnd_peer.h"
37 #include <asm/checksum.h>
38
39 static struct kmem_cache *tn_cache;
40 static struct kmem_cache *imm_buf_cache;
41
42 static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
43 {
44         if (cksum)
45                 return csum_fold(csum_partial(ptr, nob, 0));
46         return NO_CHECKSUM;
47 }
48
49 static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
50 {
51         const int hdr_size = offsetof(struct kfilnd_msg, proto);
52
53         switch (type) {
54         case KFILND_MSG_IMMEDIATE:
55                 return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
56
57         case KFILND_MSG_BULK_PUT_REQ:
58         case KFILND_MSG_BULK_GET_REQ:
59                 return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
60
61         default:
62                 return -1;
63         }
64 }
65
66 static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
67 {
68         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
69
70         /* Pack the protocol header and payload. */
71         msg->proto.hello.version = KFILND_MSG_VERSION;
72         msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
73         msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
74
75         /* TODO: Support multiple RX contexts per peer. */
76         msg->proto.hello.rx_count = 1;
77
78         /* Pack the transport header. */
79         msg->magic = KFILND_MSG_MAGIC;
80
81         /* Mesage version zero is only valid for hello requests. */
82         msg->version = 0;
83         msg->type = KFILND_MSG_HELLO_REQ;
84         msg->nob = sizeof(struct kfilnd_hello_msg) +
85                 offsetof(struct kfilnd_msg, proto);
86         msg->cksum = NO_CHECKSUM;
87         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
88         msg->dstnid = tn->tn_kp->kp_nid;
89
90         /* Checksum entire message. */
91         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
92
93         tn->tn_tx_msg.length = msg->nob;
94 }
95
96 static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
97 {
98         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
99
100         /* Pack the protocol header and payload. */
101         msg->proto.hello.version = tn->tn_kp->kp_version;
102         msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
103         msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
104
105         /* TODO: Support multiple RX contexts per peer. */
106         msg->proto.hello.rx_count = 1;
107
108         /* Pack the transport header. */
109         msg->magic = KFILND_MSG_MAGIC;
110
111         /* Mesage version zero is only valid for hello requests. */
112         msg->version = 0;
113         msg->type = KFILND_MSG_HELLO_RSP;
114         msg->nob = sizeof(struct kfilnd_hello_msg) +
115                 offsetof(struct kfilnd_msg, proto);
116         msg->cksum = NO_CHECKSUM;
117         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
118         msg->dstnid = tn->tn_kp->kp_nid;
119
120         /* Checksum entire message. */
121         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
122
123         tn->tn_tx_msg.length = msg->nob;
124 }
125
126 static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
127 {
128         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
129
130         /* Pack the protocol header and payload. */
131         lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
132         msg->proto.bulk_req.key = tn->tn_mr_key;
133         msg->proto.bulk_req.response_rx = tn->tn_response_rx;
134
135         /* Pack the transport header. */
136         msg->magic = KFILND_MSG_MAGIC;
137         msg->version = KFILND_MSG_VERSION;
138         msg->type = tn->msg_type;
139         msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
140                 offsetof(struct kfilnd_msg, proto);
141         msg->cksum = NO_CHECKSUM;
142         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
143         msg->dstnid = tn->tn_kp->kp_nid;
144
145         /* Checksum entire message. */
146         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
147
148         tn->tn_tx_msg.length = msg->nob;
149 }
150
151 static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
152 {
153         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
154
155         /* Pack the protocol header and payload. */
156         lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
157
158         lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
159                             msg,
160                             offsetof(struct kfilnd_msg,
161                                      proto.immed.payload),
162                             tn->tn_num_iovec, tn->tn_kiov, 0,
163                             tn->tn_nob);
164
165         /* Pack the transport header. */
166         msg->magic = KFILND_MSG_MAGIC;
167         msg->version = KFILND_MSG_VERSION;
168         msg->type = tn->msg_type;
169         msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
170         msg->cksum = NO_CHECKSUM;
171         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
172         msg->dstnid = tn->tn_kp->kp_nid;
173
174         /* Checksum entire message. */
175         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
176
177         tn->tn_tx_msg.length = msg->nob;
178 }
179
180 static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
181                                 unsigned int nob)
182 {
183         const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
184
185         if (nob < hdr_size) {
186                 KFILND_EP_ERROR(ep, "Short message: %u", nob);
187                 return -EPROTO;
188         }
189
190         /* TODO: Support byte swapping on mixed endian systems. */
191         if (msg->magic != KFILND_MSG_MAGIC) {
192                 KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
193                 return -EPROTO;
194         }
195
196         /* TODO: Allow for older versions. */
197         if (msg->version > KFILND_MSG_VERSION) {
198                 KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
199                 return -EPROTO;
200         }
201
202         if (msg->nob > nob) {
203                 KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
204                                 msg->nob);
205                 return -EPROTO;
206         }
207
208         /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
209         if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
210                 KFILND_EP_ERROR(ep, "Bad checksum");
211                 return -EPROTO;
212         }
213
214         if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
215                 KFILND_EP_ERROR(ep, "Bad destination nid: %s",
216                                 libcfs_nid2str(msg->dstnid));
217                 return -EPROTO;
218         }
219
220         if (msg->srcnid == LNET_NID_ANY) {
221                 KFILND_EP_ERROR(ep, "Bad source nid: %s",
222                                 libcfs_nid2str(msg->srcnid));
223                 return -EPROTO;
224         }
225
226         if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
227                 KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
228                                 msg_type_to_str(msg->type),
229                                 msg->nob, kfilnd_tn_msgtype2size(msg->type));
230                 return -EPROTO;
231         }
232
233         switch ((enum kfilnd_msg_type)msg->type) {
234         case KFILND_MSG_IMMEDIATE:
235         case KFILND_MSG_BULK_PUT_REQ:
236         case KFILND_MSG_BULK_GET_REQ:
237                 if (msg->version == 0) {
238                         KFILND_EP_ERROR(ep,
239                                         "Bad message type and version: type=%s version=%u",
240                                         msg_type_to_str(msg->type),
241                                         msg->version);
242                         return -EPROTO;
243                 }
244                 break;
245
246         case KFILND_MSG_HELLO_REQ:
247         case KFILND_MSG_HELLO_RSP:
248                 if (msg->version != 0) {
249                         KFILND_EP_ERROR(ep,
250                                         "Bad message type and version: type=%s version=%u",
251                                         msg_type_to_str(msg->type),
252                                         msg->version);
253                         return -EPROTO;
254                 }
255                 break;
256
257         default:
258                 CERROR("Unknown message type %x\n", msg->type);
259                 return -EPROTO;
260         }
261         return 0;
262 }
263
264 static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
265 {
266         unsigned int data_size_bucket =
267                 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
268         struct kfilnd_tn_duration_stat *stat;
269         s64 time;
270         s64 cur;
271
272         if (tn->is_initiator)
273                 stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
274         else
275                 stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
276
277         time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts));
278         atomic64_add(time, &stat->accumulated_duration);
279         atomic_inc(&stat->accumulated_count);
280
281         do {
282                 cur = atomic64_read(&stat->max_duration);
283                 if (time <= cur)
284                         break;
285         } while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);
286
287         do {
288                 cur = atomic64_read(&stat->min_duration);
289                 if (time >= cur)
290                         break;
291         } while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
292 }
293
294 static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
295                                    enum tn_states new_state)
296 {
297         KFILND_TN_DEBUG(tn, "%s -> %s state change",
298                         tn_state_to_str(tn->tn_state),
299                         tn_state_to_str(new_state));
300
301         kfilnd_tn_record_state_change(tn);
302
303         tn->tn_state = new_state;
304         tn->tn_state_ts = ktime_get();
305 }
306
307 static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
308                                     enum lnet_msg_hstatus hstatus)
309 {
310         /* Only the first non-ok status will take. */
311         if (tn->tn_status == 0) {
312                 KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
313                                 status);
314                 tn->tn_status = status;
315         }
316
317         if (tn->hstatus == LNET_MSG_STATUS_OK) {
318                 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
319                                 tn->hstatus, hstatus);
320                 tn->hstatus = hstatus;
321         }
322 }
323
324 static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
325 {
326         return tn->tn_status != 0;
327 }
328
329 /**
330  * kfilnd_tn_process_rx_event() - Process an immediate receive event.
331  *
332  * For each immediate receive, a transaction structure needs to be allocated to
333  * process the receive.
334  */
335 void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
336                                 struct kfilnd_msg *rx_msg, int msg_size)
337 {
338         struct kfilnd_transaction *tn;
339         bool alloc_msg = true;
340         int rc;
341         enum tn_events event = TN_EVENT_RX_HELLO;
342
343         /* Increment buf ref count for this work */
344         atomic_inc(&bufdesc->immed_ref);
345
346         /* Unpack the message */
347         rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
348         if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
349                 kfilnd_ep_imm_buffer_put(bufdesc);
350                 KFILND_EP_ERROR(bufdesc->immed_end,
351                                 "Failed to unpack message %d", rc);
352                 return;
353         }
354
355         switch ((enum kfilnd_msg_type)rx_msg->type) {
356         case KFILND_MSG_IMMEDIATE:
357         case KFILND_MSG_BULK_PUT_REQ:
358         case KFILND_MSG_BULK_GET_REQ:
359                 event = TN_EVENT_RX_OK;
360                 fallthrough;
361         case KFILND_MSG_HELLO_RSP:
362                 alloc_msg = false;
363                 fallthrough;
364         case KFILND_MSG_HELLO_REQ:
365                 /* Context points to a received buffer and status is the length.
366                  * Allocate a Tn structure, set its values, then launch the
367                  * receive.
368                  */
369                 tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
370                                      bufdesc->immed_end->end_cpt,
371                                      rx_msg->srcnid, alloc_msg, false,
372                                      false);
373                 if (IS_ERR(tn)) {
374                         kfilnd_ep_imm_buffer_put(bufdesc);
375                         KFILND_EP_ERROR(bufdesc->immed_end,
376                                         "Failed to allocate transaction struct: rc=%ld",
377                                         PTR_ERR(tn));
378                         return;
379                 }
380
381                 tn->tn_rx_msg.msg = rx_msg;
382                 tn->tn_rx_msg.length = msg_size;
383                 tn->tn_posted_buf = bufdesc;
384
385                 KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
386                                 msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
387                                 tn->tn_mr_key);
388                 break;
389
390         default:
391                 KFILND_EP_ERROR(bufdesc->immed_end,
392                                 "Unhandled kfilnd message type: %d",
393                                 (enum kfilnd_msg_type)rx_msg->type);
394                 LBUG();
395         };
396
397         kfilnd_tn_event_handler(tn, event, 0);
398 }
399
400 static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
401 {
402         unsigned int data_size_bucket =
403                 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
404         struct kfilnd_tn_duration_stat *stat;
405         s64 time;
406         s64 cur;
407
408         if (tn->is_initiator)
409                 stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
410         else
411                 stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
412
413         time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts));
414         atomic64_add(time, &stat->accumulated_duration);
415         atomic_inc(&stat->accumulated_count);
416
417         do {
418                 cur = atomic64_read(&stat->max_duration);
419                 if (time <= cur)
420                         break;
421         } while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);
422
423         do {
424                 cur = atomic64_read(&stat->min_duration);
425                 if (time >= cur)
426                         break;
427         } while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
428 }
429
430 /**
431  * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
432  *
433  * All state machine functions should call kfilnd_tn_finalize() instead of
434  * kfilnd_tn_free(). Once all expected asynchronous events have been received,
435  * if the transaction lock has not been released, it will now be released,
436  * transaction resources cleaned up, and LNet finalized will be called.
437  */
438 static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
439 {
440         if (!*tn_released) {
441                 mutex_unlock(&tn->tn_lock);
442                 *tn_released = true;
443         }
444
445         /* Release the reference on the multi-receive buffer. */
446         if (tn->tn_posted_buf)
447                 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
448
449         /* Finalize LNet operation. */
450         if (tn->tn_lntmsg) {
451                 tn->tn_lntmsg->msg_health_status = tn->hstatus;
452                 lnet_finalize(tn->tn_lntmsg, tn->tn_status);
453         }
454
455         if (tn->tn_getreply) {
456                 tn->tn_getreply->msg_health_status = tn->hstatus;
457                 lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
458                                        tn->tn_getreply,
459                                        tn->tn_status ? 0 : tn->tn_nob);
460                 lnet_finalize(tn->tn_getreply, tn->tn_status);
461         }
462
463         if (KFILND_TN_PEER_VALID(tn))
464                 kfilnd_peer_put(tn->tn_kp);
465
466         kfilnd_tn_record_state_change(tn);
467         kfilnd_tn_record_duration(tn);
468
469         kfilnd_tn_free(tn);
470 }
471
472 /**
473  * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
474  * @tn: Transaction to have tagged received cancelled.
475  *
476  * Return: 0 on success. Else, negative errno. If an error occurs, resources may
477  * be leaked.
478  */
479 static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
480 {
481         int rc;
482
483         /* Issue a cancel. A return code of zero means the operation issued an
484          * async cancel. A return code of -ENOENT means the tagged receive was
485          * not found. The assumption here is that a tagged send landed thus
486          * removing the tagged receive buffer from hardware. For both cases,
487          * async events should occur.
488          */
489         rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
490         if (rc != 0 && rc != -ENOENT) {
491                 KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
492                 return rc;
493         }
494
495         return 0;
496 }
497
498 static void kfilnd_tn_timeout_work(struct work_struct *work)
499 {
500         struct kfilnd_transaction *tn =
501                 container_of(work, struct kfilnd_transaction, timeout_work);
502
503         KFILND_TN_ERROR(tn, "Bulk operation timeout");
504         kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
505 }
506
507 static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
508 {
509         struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
510
511         queue_work(kfilnd_wq, &tn->timeout_work);
512 }
513
514 static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
515 {
516         return timer_delete(&tn->timeout_timer);
517 }
518
519 static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
520 {
521         ktime_t remaining_time = max_t(ktime_t, 0,
522                                        tn->deadline - ktime_get_seconds());
523         unsigned long expires = remaining_time * HZ + jiffies;
524
525         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
526                 expires = jiffies;
527
528         cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
529                         (unsigned long)tn, 0);
530         mod_timer(&tn->timeout_timer, expires);
531 }
532
533 /*  The following are the state machine routines for the transactions. */
534 static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
535                                        enum tn_events event, int status,
536                                        bool *tn_released)
537 {
538         int rc;
539
540         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
541                         status);
542
543         switch (event) {
544         case TN_EVENT_INIT_BULK:
545                 /* Need to cancel the tagged receive to prevent resources from
546                  * being leaked.
547                  */
548                 rc = kfilnd_tn_cancel_tag_recv(tn);
549
550                 switch (rc) {
551                 /* Async event will progress transaction. */
552                 case 0:
553                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
554                         return 0;
555
556                 /* Need to replay TN_EVENT_INIT_BULK event while in the
557                  * TN_STATE_SEND_FAILED state.
558                  */
559                 case -EAGAIN:
560                         KFILND_TN_DEBUG(tn,
561                                         "Need to replay cancel tagged recv");
562                         return -EAGAIN;
563
564                 default:
565                         KFILND_TN_ERROR(tn,
566                                         "Unexpected error during cancel tagged receive: rc=%d",
567                                         rc);
568                         LBUG();
569                 }
570                 break;
571
572         default:
573                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
574                 LBUG();
575         }
576 }
577
578 static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
579                                               enum tn_events event, int status,
580                                               bool *tn_released)
581 {
582         int rc;
583
584         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
585                         status);
586
587         switch (event) {
588         case TN_EVENT_INIT_BULK:
589                 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
590                 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
591                                 libcfs_nid2str(tn->tn_kp->kp_nid),
592                                 tn->tn_target_addr);
593
594                 kfilnd_tn_pack_bulk_req(tn);
595
596                 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
597                 switch (rc) {
598                 /* Async event will progress immediate send. */
599                 case 0:
600                         kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
601                         return 0;
602
603                 /* Need to replay TN_EVENT_INIT_BULK event while in the
604                  * TN_STATE_TAGGED_RECV_POSTED state.
605                  */
606                 case -EAGAIN:
607                         KFILND_TN_DEBUG(tn,
608                                         "Need to replay post send to %s(%#llx)",
609                                         libcfs_nid2str(tn->tn_kp->kp_nid),
610                                         tn->tn_target_addr);
611                         return -EAGAIN;
612
613                 /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
614                  * posted tagged receive buffer.
615                  */
616                 default:
617                         KFILND_TN_ERROR(tn,
618                                         "Failed to post send to %s(%#llx): rc=%d",
619                                         libcfs_nid2str(tn->tn_kp->kp_nid),
620                                         tn->tn_target_addr, rc);
621                         kfilnd_tn_status_update(tn, rc,
622                                                 LNET_MSG_STATUS_LOCAL_ERROR);
623                         kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
624
625                         /* Propogate TN_EVENT_INIT_BULK event to
626                          * TN_STATE_SEND_FAILED handler.
627                          */
628                         return kfilnd_tn_state_send_failed(tn, event, rc,
629                                                            tn_released);
630                 }
631
632         default:
633                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
634                 LBUG();
635         }
636 }
637
638 static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
639                                 enum tn_events event, int status,
640                                 bool *tn_released)
641 {
642         struct kfilnd_msg *msg;
643         int rc = 0;
644         bool finalize = false;
645         struct lnet_hdr hdr;
646         struct lnet_nid srcnid;
647
648         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
649                         status);
650
651         /* For new peers, send a hello request message and queue the true LNet
652          * message for replay.
653          */
654         if (kfilnd_peer_needs_throttle(tn->tn_kp) &&
655             (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
656                 if (kfilnd_peer_deleted(tn->tn_kp)) {
657                         /* We'll assign a NETWORK_TIMEOUT message health status
658                          * below because we don't know why this peer was marked
659                          * for removal
660                          */
661                         rc = -ESTALE;
662                         KFILND_TN_DEBUG(tn, "Drop message to deleted peer");
663                 } else if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
664                         /* We're throttling transactions to this peer until
665                          * a handshake can be completed, but there is no HELLO
666                          * currently in flight. This implies the HELLO has
667                          * failed, and we should cancel this TN. Otherwise we
668                          * are stuck waiting for the TN deadline.
669                          *
670                          * We assign NETWORK_TIMEOUT health status below because
671                          * we do not know why the HELLO failed.
672                          */
673                         rc = -ECANCELED;
674                         KFILND_TN_DEBUG(tn, "Cancel throttled TN");
675                 } else if (ktime_before(ktime_get_seconds(),
676                                         tn->tn_replay_deadline)) {
677                         /* If the transaction replay deadline has not been met,
678                          * then return -EAGAIN. This will cause this transaction
679                          * event to be replayed. During this time, an async
680                          * hello message from the peer should occur at which
681                          * point we can resume sending new messages to this peer
682                          */
683                         KFILND_TN_DEBUG(tn, "hello response pending");
684                         return -EAGAIN;
685                 } else {
686                         rc = -ETIMEDOUT;
687                 }
688
689                 kfilnd_tn_status_update(tn, rc,
690                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
691                 rc = 0;
692                 goto out;
693         }
694
695         if ((event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK) &&
696             ktime_after(ktime_get_seconds(), tn->tn_replay_deadline)) {
697                 kfilnd_tn_status_update(tn, -ETIMEDOUT,
698                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
699                 rc = 0;
700                 goto out;
701         }
702
703         if (CFS_FAIL_CHECK_VALUE(CFS_KFI_REPLAY_IDLE_EVENT, event))
704                 return -EAGAIN;
705
706         switch (event) {
707         case TN_EVENT_INIT_IMMEDIATE:
708         case TN_EVENT_TX_HELLO:
709                 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
710                 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
711                                 libcfs_nid2str(tn->tn_kp->kp_nid),
712                                 tn->tn_target_addr);
713
714                 if (event == TN_EVENT_INIT_IMMEDIATE)
715                         kfilnd_tn_pack_immed_msg(tn);
716                 else
717                         kfilnd_tn_pack_hello_req(tn);
718
719                 /* Send immediate message. */
720                 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
721                 switch (rc) {
722                 /* Async event will progress immediate send. */
723                 case 0:
724                         kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
725                         return 0;
726
727                 /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
728                  * state.
729                  */
730                 case -EAGAIN:
731                         KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
732                                         libcfs_nid2str(tn->tn_kp->kp_nid),
733                                         tn->tn_target_addr);
734                         return -EAGAIN;
735
736                 default:
737                         KFILND_TN_ERROR(tn,
738                                         "Failed to post send to %s(%#llx): rc=%d",
739                                         libcfs_nid2str(tn->tn_kp->kp_nid),
740                                         tn->tn_target_addr, rc);
741                         if (event == TN_EVENT_TX_HELLO)
742                                 kfilnd_peer_clear_hello_state(tn->tn_kp);
743                         kfilnd_tn_status_update(tn, rc,
744                                                 LNET_MSG_STATUS_LOCAL_ERROR);
745                 }
746                 break;
747
748         case TN_EVENT_INIT_BULK:
749                 /* Post tagged receive buffer used to land bulk response. */
750                 rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
751
752                 switch (rc) {
753                 /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
754                 case 0:
755                         kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
756
757                         /* Propogate TN_EVENT_INIT_BULK event to
758                          * TN_STATE_TAGGED_RECV_POSTED handler.
759                          */
760                         return kfilnd_tn_state_tagged_recv_posted(tn, event,
761                                                                   rc,
762                                                                   tn_released);
763
764                 /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
765                  * state.
766                  */
767                 case -EAGAIN:
768                         KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
769                         return -EAGAIN;
770
771                 default:
772                         KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
773                                         rc);
774                         kfilnd_tn_status_update(tn, rc,
775                                                 LNET_MSG_STATUS_LOCAL_ERROR);
776                 }
777                 break;
778
779         case TN_EVENT_RX_OK:
780                 if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
781                         rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
782                                                        tn->tn_ep->end_cpt,
783                                                        tn->tn_kp);
784                         if (rc)
785                                 KFILND_TN_ERROR(tn,
786                                                 "Failed to send hello request: rc=%d",
787                                                 rc);
788                         rc = 0;
789                 }
790
791                 /* If this is a new peer then we cannot progress the transaction
792                  * and must drop it
793                  */
794                 if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
795                         KFILND_TN_ERROR(tn,
796                                         "Dropping message from %s due to stale peer",
797                                         libcfs_nid2str(tn->tn_kp->kp_nid));
798                         kfilnd_tn_status_update(tn, -EPROTO,
799                                                 LNET_MSG_STATUS_LOCAL_DROPPED);
800                         rc = 0;
801                         goto out;
802                 }
803
804                 LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);
805                 msg = tn->tn_rx_msg.msg;
806
807                 /* Update the NID address with the new preferred RX context. */
808                 kfilnd_peer_alive(tn->tn_kp);
809
810                 /* Pass message up to LNet
811                  * The TN will be reused in this call chain so we need to
812                  * release the lock on the TN before proceeding.
813                  */
814                 KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
815                                 tn_state_to_str(tn->tn_state));
816
817                 /* TODO: Do not manually update this state change. */
818                 tn->tn_state = TN_STATE_IMM_RECV;
819                 mutex_unlock(&tn->tn_lock);
820                 *tn_released = true;
821                 lnet_nid4_to_nid(msg->srcnid, &srcnid);
822                 if (msg->type == KFILND_MSG_IMMEDIATE) {
823                         lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
824                         rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
825                                         &hdr, &srcnid, tn, 0);
826                 } else {
827                         lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
828                         rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
829                                         &hdr, &srcnid, tn, 1);
830                 }
831
832                 /* If successful, transaction has been accepted by LNet and we
833                  * cannot process the transaction anymore within this context.
834                  */
835                 if (!rc)
836                         return 0;
837
838                 KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
839                 kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
840                 break;
841
842         case TN_EVENT_RX_HELLO:
843                 msg = tn->tn_rx_msg.msg;
844
845                 kfilnd_peer_alive(tn->tn_kp);
846
847                 switch (msg->type) {
848                 case KFILND_MSG_HELLO_REQ:
849                         kfilnd_peer_process_hello(tn->tn_kp, msg);
850                         tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
851                         KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
852                                         libcfs_nid2str(tn->tn_kp->kp_nid),
853                                         tn->tn_target_addr);
854
855                         kfilnd_tn_pack_hello_rsp(tn);
856
857                         /* Send immediate message. */
858                         rc = kfilnd_ep_post_send(tn->tn_ep, tn);
859                         switch (rc) {
860                         case 0:
861                                 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
862                                 return 0;
863
864                         case -EAGAIN:
865                                 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
866                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
867                                                 tn->tn_target_addr);
868                                 return -EAGAIN;
869
870                         default:
871                                 KFILND_TN_ERROR(tn,
872                                                 "Failed to post send to %s(%#llx): rc=%d",
873                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
874                                                 tn->tn_target_addr, rc);
875                                 kfilnd_tn_status_update(tn, rc,
876                                                         LNET_MSG_STATUS_LOCAL_ERROR);
877                         }
878                         break;
879
880                 case KFILND_MSG_HELLO_RSP:
881                         rc = 0;
882                         kfilnd_peer_process_hello(tn->tn_kp, msg);
883                         finalize = true;
884                         break;
885
886                 default:
887                         KFILND_TN_ERROR(tn, "Invalid message type: %s",
888                                         msg_type_to_str(msg->type));
889                         LBUG();
890                 }
891                 break;
892
893         default:
894                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
895                 LBUG();
896         }
897
898 out:
899         if (kfilnd_tn_has_failed(tn))
900                 finalize = true;
901
902         if (finalize)
903                 kfilnd_tn_finalize(tn, tn_released);
904
905         return rc;
906 }
907
908 static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
909                                     enum tn_events event, int status,
910                                     bool *tn_released)
911 {
912         enum lnet_msg_hstatus hstatus;
913
914         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
915                         status);
916
917         switch (event) {
918         case TN_EVENT_TX_FAIL:
919                 if (status == -ETIMEDOUT || status == -EIO)
920                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
921                 else
922                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
923
924                 kfilnd_tn_status_update(tn, status, hstatus);
925                 /* RKEY is not involved in immediate sends, so no need to
926                  * delete peer
927                  */
928                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
929                 if (tn->msg_type == KFILND_MSG_HELLO_REQ)
930                         kfilnd_peer_clear_hello_state(tn->tn_kp);
931                 break;
932
933         case TN_EVENT_TX_OK:
934                 kfilnd_peer_alive(tn->tn_kp);
935                 break;
936
937         default:
938                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
939                 LBUG();
940         }
941
942         kfilnd_tn_finalize(tn, tn_released);
943
944         return 0;
945 }
946
947 static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
948                                     enum tn_events event, int status,
949                                     bool *tn_released)
950 {
951         int rc = 0;
952         bool finalize = false;
953
954         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
955                         status);
956
957         switch (event) {
958         case TN_EVENT_INIT_TAG_RMA:
959         case TN_EVENT_SKIP_TAG_RMA:
960                 /* Release the buffer we received the request on. All relevant
961                  * information to perform the RMA operation is stored in the
962                  * transaction structure. This should be done before the RMA
963                  * operation to prevent two contexts from potentially processing
964                  * the same transaction.
965                  *
966                  * TODO: Prevent this from returning -EAGAIN.
967                  */
968                 if (tn->tn_posted_buf) {
969                         kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
970                         tn->tn_posted_buf = NULL;
971                 }
972
973                 /* Update the KFI address to use the response RX context. */
974                 tn->tn_target_addr =
975                         kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
976                                     tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
977                 KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
978                                 libcfs_nid2str(tn->tn_kp->kp_nid),
979                                 tn->tn_target_addr);
980
981                 /* Initiate the RMA operation to push/pull the LNet payload or
982                  * send a tagged message to finalize the bulk operation if the
983                  * RMA operation should be skipped.
984                  */
985                 if (event == TN_EVENT_INIT_TAG_RMA) {
986                         if (tn->sink_buffer)
987                                 rc = kfilnd_ep_post_read(tn->tn_ep, tn);
988                         else
989                                 rc = kfilnd_ep_post_write(tn->tn_ep, tn);
990
991                         switch (rc) {
992                         /* Async tagged RMA event will progress transaction. */
993                         case 0:
994                                 kfilnd_tn_state_change(tn,
995                                                        TN_STATE_WAIT_TAG_RMA_COMP);
996                                 return 0;
997
998                         /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
999                          * the TN_STATE_IMM_RECV state.
1000                          */
1001                         case -EAGAIN:
1002                                 KFILND_TN_DEBUG(tn,
1003                                                 "Need to replay tagged %s to %s(%#llx)",
1004                                                 tn->sink_buffer ? "read" : "write",
1005                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1006                                                 tn->tn_target_addr);
1007                                 return -EAGAIN;
1008
1009                         default:
1010                                 KFILND_TN_ERROR(tn,
1011                                                 "Failed to post tagged %s to %s(%#llx): rc=%d",
1012                                                 tn->sink_buffer ? "read" : "write",
1013                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1014                                                 tn->tn_target_addr, rc);
1015                                 kfilnd_tn_status_update(tn, rc,
1016                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1017                         }
1018                 } else {
1019                         kfilnd_tn_status_update(tn, status,
1020                                                 LNET_MSG_STATUS_OK);
1021
1022                         /* Since the LNet initiator has posted a unique tagged
1023                          * buffer specific for this LNet transaction and the
1024                          * LNet target has decide not to push/pull to/for the
1025                          * LNet initiator tagged buffer, a noop operation is
1026                          * done to this tagged buffer (i/e payload transfer size
1027                          * is zero). But, immediate data, which contains the
1028                          * LNet target status for the transaction, is sent to
1029                          * the LNet initiator. Immediate data only appears in
1030                          * the completion event at the LNet initiator and not in
1031                          * the tagged buffer.
1032                          */
1033                         tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
1034
1035                         rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
1036                         switch (rc) {
1037                         /* Async tagged RMA event will progress transaction. */
1038                         case 0:
1039                                 kfilnd_tn_state_change(tn,
1040                                                        TN_STATE_WAIT_TAG_COMP);
1041                                 return 0;
1042
1043                         /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
1044                          * the TN_STATE_IMM_RECV state.
1045                          */
1046                         case -EAGAIN:
1047                                 KFILND_TN_DEBUG(tn,
1048                                                 "Need to replay tagged send to %s(%#llx)",
1049                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1050                                                 tn->tn_target_addr);
1051                                 return -EAGAIN;
1052
1053                         default:
1054                                 KFILND_TN_ERROR(tn,
1055                                                 "Failed to post tagged send to %s(%#llx): rc=%d",
1056                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1057                                                 tn->tn_target_addr, rc);
1058                                 kfilnd_tn_status_update(tn, rc,
1059                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1060                         }
1061                 }
1062                 break;
1063
1064         case TN_EVENT_RX_OK:
1065                 finalize = true;
1066                 break;
1067
1068         default:
1069                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1070                 LBUG();
1071         }
1072
1073         if (kfilnd_tn_has_failed(tn))
1074                 finalize = true;
1075
1076         if (finalize)
1077                 kfilnd_tn_finalize(tn, tn_released);
1078
1079         return rc;
1080 }
1081
1082 static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
1083                                      enum tn_events event, int status,
1084                                      bool *tn_released)
1085 {
1086         int rc;
1087         enum lnet_msg_hstatus hstatus;
1088
1089         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1090                         status);
1091
1092         switch (event) {
1093         case TN_EVENT_TX_OK:
1094                 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
1095                     CFS_FAIL_CHECK_RESET(CFS_KFI_FAIL_WAIT_SEND_COMP1,
1096                                          CFS_KFI_FAIL_WAIT_SEND_COMP2 |
1097                                          CFS_FAIL_ONCE))
1098                         break;
1099                 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ ||
1100                              tn->msg_type == KFILND_MSG_BULK_GET_REQ) &&
1101                     CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP3)) {
1102                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1103                         kfilnd_tn_status_update(tn, -EIO, hstatus);
1104                         /* Don't delete peer on debug/test path */
1105                         kfilnd_peer_tn_failed(tn->tn_kp, -EIO, false);
1106                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1107                         break;
1108                 }
1109                 kfilnd_peer_alive(tn->tn_kp);
1110                 kfilnd_tn_timeout_enable(tn);
1111                 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
1112                 break;
1113
1114         case TN_EVENT_TAG_RX_OK:
1115                 if (status)
1116                         kfilnd_tn_status_update(tn, status, LNET_MSG_STATUS_OK);
1117
1118                 kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
1119                 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
1120                     CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP2)) {
1121                         struct kfi_cq_err_entry fake_error = {
1122                                 .op_context = tn,
1123                                 .flags = KFI_MSG | KFI_SEND,
1124                                 .err = EIO,
1125                         };
1126
1127                         kfilnd_ep_gen_fake_err(tn->tn_ep, &fake_error);
1128                 }
1129                 break;
1130
1131         case TN_EVENT_TX_FAIL:
1132                 if (status == -ETIMEDOUT)
1133                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1134                 else
1135                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1136
1137                 kfilnd_tn_status_update(tn, status, hstatus);
1138                 /* The bulk request message failed, however, there is an edge
1139                  * case where the last request packet of a message is received
1140                  * at the target successfully, but the corresponding response
1141                  * packet is repeatedly dropped. This results in the target
1142                  * generating a success completion event but the initiator
1143                  * generating an error completion event. Due to this, we have to
1144                  * delete the peer here to protect the RKEY.
1145                  */
1146                 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1147
1148                 /* Need to cancel the tagged receive to prevent resources from
1149                  * being leaked.
1150                  */
1151                 rc = kfilnd_tn_cancel_tag_recv(tn);
1152
1153                 switch (rc) {
1154                 /* Async cancel event will progress transaction. */
1155                 case 0:
1156                         kfilnd_tn_status_update(tn, status,
1157                                                 LNET_MSG_STATUS_LOCAL_ERROR);
1158                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1159                         return 0;
1160
1161                 /* Need to replay TN_EVENT_INIT_BULK event while in the
1162                  * TN_STATE_SEND_FAILED state.
1163                  */
1164                 case -EAGAIN:
1165                         KFILND_TN_DEBUG(tn,
1166                                         "Need to replay cancel tagged recv");
1167                         return -EAGAIN;
1168
1169                 default:
1170                         KFILND_TN_ERROR(tn,
1171                                         "Unexpected error during cancel tagged receive: rc=%d",
1172                                         rc);
1173                         LBUG();
1174                 }
1175                 break;
1176
1177         case TN_EVENT_TAG_RX_FAIL:
1178                 kfilnd_tn_status_update(tn, status,
1179                                         LNET_MSG_STATUS_LOCAL_ERROR);
1180                 /* The target may hold a reference to the RKEY, so we need to
1181                  * delete the peer to protect it
1182                  */
1183                 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1184                 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1185                 break;
1186
1187         default:
1188                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1189                 LBUG();
1190         }
1191
1192         return 0;
1193 }
1194
1195 static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
1196                                           enum tn_events event, int status,
1197                                           bool *tn_released)
1198 {
1199         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1200                         status);
1201
1202         switch (event) {
1203         case TN_EVENT_TX_OK:
1204                 kfilnd_peer_alive(tn->tn_kp);
1205                 break;
1206         case TN_EVENT_TX_FAIL:
1207                 kfilnd_tn_status_update(tn, status,
1208                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
1209                 /* The bulk request message was never queued so we do not need
1210                  * to delete the peer
1211                  */
1212                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1213                 break;
1214         default:
1215                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1216                 LBUG();
1217         }
1218
1219         kfilnd_tn_finalize(tn, tn_released);
1220
1221         return 0;
1222 }
1223
1224 static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
1225                                              enum tn_events event, int status,
1226                                              bool *tn_released)
1227 {
1228         enum lnet_msg_hstatus hstatus;
1229
1230         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1231                         status);
1232
1233         switch (event) {
1234         case TN_EVENT_TAG_TX_OK:
1235                 kfilnd_peer_alive(tn->tn_kp);
1236                 break;
1237
1238         case TN_EVENT_TAG_TX_FAIL:
1239                 if (status == -ETIMEDOUT)
1240                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1241                 else
1242                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1243
1244                 kfilnd_tn_status_update(tn, status, hstatus);
1245                 /* This event occurrs at the target of a bulk LNetPut/Get.
1246                  * Since the target did not generate the RKEY, we needn't
1247                  * delete the peer.
1248                  */
1249                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1250                 break;
1251
1252         default:
1253                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1254                 LBUG();
1255         }
1256
1257         kfilnd_tn_finalize(tn, tn_released);
1258
1259         return 0;
1260 }
1261
1262 static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
1263                                          enum tn_events event, int status,
1264                                          bool *tn_released)
1265 {
1266         int rc;
1267         enum lnet_msg_hstatus hstatus;
1268
1269         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1270                         status);
1271
1272         switch (event) {
1273         case TN_EVENT_TAG_RX_FAIL:
1274         case TN_EVENT_TAG_RX_OK:
1275                 /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
1276                  * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
1277                  * LNet target returned -ENODATA.
1278                  */
1279                 if (status) {
1280                         if (event == TN_EVENT_TAG_RX_FAIL)
1281                                 kfilnd_tn_status_update(tn, status,
1282                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1283                         else
1284                                 kfilnd_tn_status_update(tn, status,
1285                                                         LNET_MSG_STATUS_OK);
1286                 }
1287
1288                 if (!kfilnd_tn_timeout_cancel(tn)) {
1289                         kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
1290                         return 0;
1291                 }
1292                 break;
1293
1294         case TN_EVENT_TIMEOUT:
1295                 /* Need to cancel the tagged receive to prevent resources from
1296                  * being leaked.
1297                  */
1298                 rc = kfilnd_tn_cancel_tag_recv(tn);
1299
1300                 switch (rc) {
1301                 /* Async cancel event will progress transaction. */
1302                 case 0:
1303                         kfilnd_tn_state_change(tn,
1304                                                TN_STATE_WAIT_TIMEOUT_TAG_COMP);
1305                         return 0;
1306
1307                 /* Need to replay TN_EVENT_INIT_BULK event while in the
1308                  * TN_STATE_WAIT_TAG_COMP state.
1309                  */
1310                 case -EAGAIN:
1311                         KFILND_TN_DEBUG(tn,
1312                                         "Need to replay cancel tagged recv");
1313                         return -EAGAIN;
1314
1315                 default:
1316                         KFILND_TN_ERROR(tn,
1317                                         "Unexpected error during cancel tagged receive: rc=%d",
1318                                         rc);
1319                         LBUG();
1320                 }
1321                 break;
1322
1323         case TN_EVENT_TAG_TX_FAIL:
1324                 if (status == -ETIMEDOUT)
1325                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1326                 else
1327                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1328
1329                 kfilnd_tn_status_update(tn, status, hstatus);
1330                 /* This event occurrs at the target of a bulk LNetPut/Get.
1331                  * Since the target did not generate the RKEY, we needn't
1332                  * delete the peer.
1333                  */
1334                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1335                 break;
1336
1337         case TN_EVENT_TAG_TX_OK:
1338                 kfilnd_peer_alive(tn->tn_kp);
1339                 break;
1340
1341         default:
1342                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1343                 LBUG();
1344         }
1345
1346         kfilnd_tn_finalize(tn, tn_released);
1347
1348         return 0;
1349 }
1350
1351 static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
1352                                 enum tn_events event, int status,
1353                                 bool *tn_released)
1354 {
1355         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1356                         status);
1357
1358         switch (event) {
1359         case TN_EVENT_TX_FAIL:
1360                 /* Prior TN states will have deleted the peer if necessary */
1361                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1362                 break;
1363
1364         case TN_EVENT_TX_OK:
1365                 kfilnd_peer_alive(tn->tn_kp);
1366                 break;
1367
1368         case TN_EVENT_TAG_RX_OK:
1369                 kfilnd_peer_alive(tn->tn_kp);
1370                 if (tn->tn_status != status) {
1371                         KFILND_TN_DEBUG(tn, "%d -> %d status change",
1372                                         tn->tn_status, status);
1373                         tn->tn_status = status;
1374                 }
1375                 if (tn->hstatus != LNET_MSG_STATUS_OK) {
1376                         KFILND_TN_DEBUG(tn, "%d -> %d health status change",
1377                                         tn->hstatus, LNET_MSG_STATUS_OK);
1378                         tn->hstatus = LNET_MSG_STATUS_OK;
1379                 }
1380                 break;
1381
1382         case TN_EVENT_TAG_RX_FAIL:
1383         case TN_EVENT_TAG_RX_CANCEL:
1384                 break;
1385
1386         default:
1387                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1388                 LBUG();
1389         }
1390
1391         kfilnd_tn_finalize(tn, tn_released);
1392
1393         return 0;
1394 }
1395
1396 static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
1397                                                  enum tn_events event,
1398                                                  int status, bool *tn_released)
1399 {
1400         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1401                         status);
1402
1403         switch (event) {
1404         case TN_EVENT_TAG_RX_CANCEL:
1405                 kfilnd_tn_status_update(tn, -ETIMEDOUT,
1406                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
1407                 /* We've cancelled locally, but the target may still have a ref
1408                  * on the RKEY. Delete the peer to protect it.
1409                  */
1410                 kfilnd_peer_tn_failed(tn->tn_kp, -ETIMEDOUT, true);
1411                 break;
1412
1413         case TN_EVENT_TAG_RX_FAIL:
1414                 kfilnd_tn_status_update(tn, status,
1415                                         LNET_MSG_STATUS_LOCAL_ERROR);
1416                 /* The initiator of a bulk LNetPut/Get eagerly sends the bulk
1417                  * request message to the target without ensuring the tagged
1418                  * receive buffer is posted. Thus, the target could be issuing
1419                  * kfi_write/read operations using the tagged receive buffer
1420                  * RKEY, and we need to delete this peer to protect the it.
1421                  */
1422                 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1423                 break;
1424
1425         case TN_EVENT_TAG_RX_OK:
1426                 break;
1427
1428         default:
1429                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1430                 LBUG();
1431         }
1432
1433         kfilnd_tn_finalize(tn, tn_released);
1434
1435         return 0;
1436 }
1437
1438 static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
1439                                              enum tn_events event, int status,
1440                                              bool *tn_released)
1441 {
1442         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1443                         status);
1444
1445         if (event == TN_EVENT_TIMEOUT) {
1446                 kfilnd_tn_finalize(tn, tn_released);
1447         } else {
1448                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1449                 LBUG();
1450         }
1451
1452         return 0;
1453 }
1454
1455 static int
1456 (* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
1457                                                        enum tn_events event,
1458                                                        int status,
1459                                                        bool *tn_released) = {
1460         [TN_STATE_IDLE] = kfilnd_tn_state_idle,
1461         [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
1462         [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
1463         [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
1464         [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
1465         [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
1466         [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
1467         [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
1468         [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
1469                 kfilnd_tn_state_wait_timeout_tag_comp,
1470         [TN_STATE_FAIL] = kfilnd_tn_state_fail,
1471         [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
1472         [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
1473 };
1474
1475 /**
1476  * kfilnd_tn_event_handler() - Update transaction state machine with an event.
1477  * @tn: Transaction to be updated.
1478  * @event: Transaction event.
1479  * @status: Errno status associated with the event.
1480  *
1481  * When the transaction event handler is first called on a new transaction, the
1482  * transaction is now own by the transaction system. This means that will be
1483  * freed by the system as the transaction is progressed through the state
1484  * machine.
1485  */
1486 void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
1487                              enum tn_events event, int status)
1488 {
1489         bool tn_released = false;
1490         int rc;
1491
1492         if (!tn)
1493                 return;
1494
1495         mutex_lock(&tn->tn_lock);
1496         rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
1497                                                           &tn_released);
1498         if (rc == -EAGAIN) {
1499                 tn->replay_event = event;
1500                 tn->replay_status = status;
1501                 kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
1502         }
1503
1504         if (!tn_released)
1505                 mutex_unlock(&tn->tn_lock);
1506 }
1507
1508 /**
1509  * kfilnd_tn_free() - Free a transaction.
1510  */
1511 void kfilnd_tn_free(struct kfilnd_transaction *tn)
1512 {
1513         spin_lock(&tn->tn_ep->tn_list_lock);
1514         list_del(&tn->tn_entry);
1515         spin_unlock(&tn->tn_ep->tn_list_lock);
1516
1517         KFILND_TN_DEBUG(tn, "Transaction freed");
1518
1519         if (tn->tn_mr_key)
1520                 kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
1521
1522         /* Free send message buffer if needed. */
1523         if (tn->tn_tx_msg.msg)
1524                 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1525
1526         kmem_cache_free(tn_cache, tn);
1527 }
1528
1529 /*
1530  * Allocation logic common to kfilnd_tn_alloc() and kfilnd_tn_alloc_for_hello().
1531  * @ep: The KFI LND endpoint to associate with the transaction.
1532  * @kp: The kfilnd peer to associate with the transaction.
1533  * See kfilnd_tn_alloc() for a description of the other fields
1534  * Note: Caller must have a reference on @kp
1535  */
1536 static struct kfilnd_transaction *kfilnd_tn_alloc_common(struct kfilnd_ep *ep,
1537                                                          struct kfilnd_peer *kp,
1538                                                          bool alloc_msg,
1539                                                          bool is_initiator,
1540                                                          u16 key)
1541 {
1542         struct kfilnd_transaction *tn;
1543         int rc;
1544         ktime_t tn_alloc_ts;
1545
1546         tn_alloc_ts = ktime_get();
1547
1548         tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
1549         if (!tn) {
1550                 rc = -ENOMEM;
1551                 goto err;
1552         }
1553
1554         if (alloc_msg) {
1555                 tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
1556                 if (!tn->tn_tx_msg.msg) {
1557                         rc = -ENOMEM;
1558                         goto err_free_tn;
1559                 }
1560         }
1561
1562         tn->tn_mr_key = key;
1563
1564         tn->tn_kp = kp;
1565
1566         mutex_init(&tn->tn_lock);
1567         tn->tn_ep = ep;
1568         tn->tn_response_rx = ep->end_context_id;
1569         tn->tn_state = TN_STATE_IDLE;
1570         tn->hstatus = LNET_MSG_STATUS_OK;
1571         tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
1572         tn->tn_replay_deadline = ktime_sub(tn->deadline,
1573                                            (lnet_get_lnd_timeout() / 2));
1574         tn->is_initiator = is_initiator;
1575         INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
1576
1577         /* Add the transaction to an endpoint.  This is like
1578          * incrementing a ref counter.
1579          */
1580         spin_lock(&ep->tn_list_lock);
1581         list_add_tail(&tn->tn_entry, &ep->tn_list);
1582         spin_unlock(&ep->tn_list_lock);
1583
1584         tn->tn_alloc_ts = tn_alloc_ts;
1585         tn->tn_state_ts = ktime_get();
1586
1587         KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
1588
1589         return tn;
1590
1591 err_free_tn:
1592         if (tn->tn_tx_msg.msg)
1593                 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1594         kmem_cache_free(tn_cache, tn);
1595 err:
1596         return ERR_PTR(rc);
1597 }
1598
1599 static struct kfilnd_ep *kfilnd_dev_to_ep(struct kfilnd_dev *dev, int cpt)
1600 {
1601         struct kfilnd_ep *ep;
1602
1603         if (!dev)
1604                 return ERR_PTR(-EINVAL);
1605
1606         ep = dev->cpt_to_endpoint[cpt];
1607         if (!ep) {
1608                 CWARN("%s used invalid cpt=%d\n",
1609                       libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
1610                 ep = dev->kfd_endpoints[0];
1611         }
1612
1613         return ep;
1614 }
1615
1616 /**
1617  * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
1618  * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
1619  * transaction.
1620  * @cpt: CPT of the transaction.
1621  * @target_nid: Target NID of the transaction.
1622  * @alloc_msg: Allocate an immediate message for the transaction.
1623  * @is_initiator: Is initiator of LNet transaction.
1624  * @need_key: Is transaction memory region key needed.
1625  *
1626  * During transaction allocation, each transaction is associated with a KFI LND
1627  * endpoint use to post data transfer operations. The CPT argument is used to
1628  * lookup the KFI LND endpoint within the KFI LND device.
1629  *
1630  * Return: On success, valid pointer. Else, negative errno pointer.
1631  */
1632 struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
1633                                            lnet_nid_t target_nid,
1634                                            bool alloc_msg, bool is_initiator,
1635                                            bool need_key)
1636 {
1637         struct kfilnd_transaction *tn;
1638         struct kfilnd_ep *ep;
1639         struct kfilnd_peer *kp;
1640         int rc;
1641         u16 key = 0;
1642
1643         ep = kfilnd_dev_to_ep(dev, cpt);
1644         if (IS_ERR(ep)) {
1645                 rc = PTR_ERR(ep);
1646                 goto err;
1647         }
1648
1649         /* Consider the following:
1650          * Thread 1: Posts tagged receive with RKEY based on
1651          *           peerA::kp_local_session_key X and tn_mr_key Y
1652          * Thread 2: Fetches peerA with kp_local_session_key X
1653          * Thread 1: Cancels tagged receive, marks peerA for removal, and
1654          *           releases tn_mr_key Y
1655          * Thread 2: allocates tn_mr_key Y
1656          * At this point, thread 2 has the same RKEY used by thread 1.
1657          * Thus, we always allocate the tn_mr_key before looking up the peer,
1658          * and we always mark peers for removal before releasing tn_mr_key.
1659          */
1660         if (need_key) {
1661                 rc = kfilnd_ep_get_key(ep);
1662                 if (rc < 0)
1663                         goto err;
1664                 key = rc;
1665         }
1666
1667         kp = kfilnd_peer_get(dev, target_nid);
1668         if (IS_ERR(kp)) {
1669                 rc = PTR_ERR(kp);
1670                 goto err_put_key;
1671         }
1672
1673         tn = kfilnd_tn_alloc_common(ep, kp, alloc_msg, is_initiator, key);
1674         if (IS_ERR(tn)) {
1675                 rc = PTR_ERR(tn);
1676                 kfilnd_peer_put(kp);
1677                 goto err_put_key;
1678         }
1679
1680         return tn;
1681
1682 err_put_key:
1683         kfilnd_ep_put_key(ep, key);
1684 err:
1685         return ERR_PTR(rc);
1686 }
1687
1688 /* Like kfilnd_tn_alloc(), but caller already looked up the kfilnd_peer.
1689  * Used only to allocate a TN for a hello request.
1690  * See kfilnd_tn_alloc()/kfilnd_tn_alloc_comm()
1691  * Note: Caller must have a reference on @kp
1692  */
1693 struct kfilnd_transaction *kfilnd_tn_alloc_for_hello(struct kfilnd_dev *dev, int cpt,
1694                                                      struct kfilnd_peer *kp)
1695 {
1696         struct kfilnd_transaction *tn;
1697         struct kfilnd_ep *ep;
1698         int rc;
1699
1700         ep = kfilnd_dev_to_ep(dev, cpt);
1701         if (IS_ERR(ep)) {
1702                 rc = PTR_ERR(ep);
1703                 goto err;
1704         }
1705
1706         tn = kfilnd_tn_alloc_common(ep, kp, true, true, 0);
1707         if (IS_ERR(tn)) {
1708                 rc = PTR_ERR(tn);
1709                 goto err;
1710         }
1711
1712         return tn;
1713
1714 err:
1715         return ERR_PTR(rc);
1716 }
1717
1718 /**
1719  * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
1720  *
1721  * This function should only be called when there are no outstanding
1722  * transactions.
1723  */
1724 void kfilnd_tn_cleanup(void)
1725 {
1726         kmem_cache_destroy(imm_buf_cache);
1727         kmem_cache_destroy(tn_cache);
1728 }
1729
1730 /**
1731  * kfilnd_tn_init() - Initialize KFI LND transaction system.
1732  *
1733  * Return: On success, zero. Else, negative errno.
1734  */
1735 int kfilnd_tn_init(void)
1736 {
1737         tn_cache = kmem_cache_create("kfilnd_tn",
1738                                      sizeof(struct kfilnd_transaction), 0,
1739                                      SLAB_HWCACHE_ALIGN, NULL);
1740         if (!tn_cache)
1741                 goto err;
1742
1743         imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
1744                                           KFILND_IMMEDIATE_MSG_SIZE, 0,
1745                                           SLAB_HWCACHE_ALIGN, NULL);
1746         if (!imm_buf_cache)
1747                 goto err_tn_cache_destroy;
1748
1749         return 0;
1750
1751 err_tn_cache_destroy:
1752         kmem_cache_destroy(tn_cache);
1753 err:
1754         return -ENOMEM;
1755 }
1756
1757 /**
1758  * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
1759  * @tn: Transaction to have buffer set.
1760  * @kiov: LNet KIOV buffer.
1761  * @num_iov: Number of IOVs.
1762  * @offset: Offset into IOVs where the buffer starts.
1763  * @len: Length of the buffer.
1764  *
1765  * This function takes the user provided IOV, offset, and len, and sets the
1766  * transaction buffer. The user provided IOV is an LNet KIOV. When the
1767  * transaction buffer is configured, the user provided offset is applied
1768  * when the transaction buffer is configured (i.e. the transaction buffer
1769  * offset is zero).
1770  */
1771 int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
1772                            struct bio_vec *kiov, size_t num_iov,
1773                            size_t offset, size_t len)
1774 {
1775         size_t i;
1776         size_t cur_len = 0;
1777         size_t cur_offset = offset;
1778         size_t cur_iov = 0;
1779         size_t tmp_len;
1780         size_t tmp_offset;
1781
1782         for (i = 0; (i < num_iov) && (cur_len < len); i++) {
1783                 /* Skip KIOVs until a KIOV with a length less than the current
1784                  * offset is found.
1785                  */
1786                 if (kiov[i].bv_len <= cur_offset) {
1787                         cur_offset -= kiov[i].bv_len;
1788                         continue;
1789                 }
1790
1791                 tmp_len = kiov[i].bv_len - cur_offset;
1792                 tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
1793
1794                 if (tmp_len + cur_len > len)
1795                         tmp_len = len - cur_len;
1796
1797                 /* tn_kiov is an array of size LNET_MAX_IOV */
1798                 if (cur_iov >= LNET_MAX_IOV)
1799                         return -EINVAL;
1800
1801                 tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
1802                 tn->tn_kiov[cur_iov].bv_len = tmp_len;
1803                 tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
1804
1805                 cur_iov++;
1806                 cur_len += tmp_len;
1807                 cur_offset = 0;
1808         }
1809
1810         tn->tn_num_iovec = cur_iov;
1811         tn->tn_nob = cur_len;
1812
1813         return 0;
1814 }