Whamcloud - gitweb
LU-17271 kfilnd: Allocate tn_mr_key before kfilnd_peer
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd_tn.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd transaction and state machine processing.
30  */
31
32 #include "kfilnd_tn.h"
33 #include "kfilnd_ep.h"
34 #include "kfilnd_dev.h"
35 #include "kfilnd_dom.h"
36 #include "kfilnd_peer.h"
37 #include <asm/checksum.h>
38
39 static struct kmem_cache *tn_cache;
40 static struct kmem_cache *imm_buf_cache;
41
42 static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
43 {
44         if (cksum)
45                 return csum_fold(csum_partial(ptr, nob, 0));
46         return NO_CHECKSUM;
47 }
48
49 static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
50 {
51         const int hdr_size = offsetof(struct kfilnd_msg, proto);
52
53         switch (type) {
54         case KFILND_MSG_IMMEDIATE:
55                 return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
56
57         case KFILND_MSG_BULK_PUT_REQ:
58         case KFILND_MSG_BULK_GET_REQ:
59                 return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
60
61         default:
62                 return -1;
63         }
64 }
65
66 static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
67 {
68         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
69
70         /* Pack the protocol header and payload. */
71         msg->proto.hello.version = KFILND_MSG_VERSION;
72         msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
73         msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
74
75         /* TODO: Support multiple RX contexts per peer. */
76         msg->proto.hello.rx_count = 1;
77
78         /* Pack the transport header. */
79         msg->magic = KFILND_MSG_MAGIC;
80
81         /* Mesage version zero is only valid for hello requests. */
82         msg->version = 0;
83         msg->type = KFILND_MSG_HELLO_REQ;
84         msg->nob = sizeof(struct kfilnd_hello_msg) +
85                 offsetof(struct kfilnd_msg, proto);
86         msg->cksum = NO_CHECKSUM;
87         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
88         msg->dstnid = tn->tn_kp->kp_nid;
89
90         /* Checksum entire message. */
91         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
92
93         tn->tn_tx_msg.length = msg->nob;
94 }
95
96 static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
97 {
98         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
99
100         /* Pack the protocol header and payload. */
101         msg->proto.hello.version = tn->tn_kp->kp_version;
102         msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
103         msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
104
105         /* TODO: Support multiple RX contexts per peer. */
106         msg->proto.hello.rx_count = 1;
107
108         /* Pack the transport header. */
109         msg->magic = KFILND_MSG_MAGIC;
110
111         /* Mesage version zero is only valid for hello requests. */
112         msg->version = 0;
113         msg->type = KFILND_MSG_HELLO_RSP;
114         msg->nob = sizeof(struct kfilnd_hello_msg) +
115                 offsetof(struct kfilnd_msg, proto);
116         msg->cksum = NO_CHECKSUM;
117         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
118         msg->dstnid = tn->tn_kp->kp_nid;
119
120         /* Checksum entire message. */
121         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
122
123         tn->tn_tx_msg.length = msg->nob;
124 }
125
126 static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
127 {
128         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
129
130         /* Pack the protocol header and payload. */
131         lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
132         msg->proto.bulk_req.key = tn->tn_mr_key;
133         msg->proto.bulk_req.response_rx = tn->tn_response_rx;
134
135         /* Pack the transport header. */
136         msg->magic = KFILND_MSG_MAGIC;
137         msg->version = KFILND_MSG_VERSION;
138         msg->type = tn->msg_type;
139         msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
140                 offsetof(struct kfilnd_msg, proto);
141         msg->cksum = NO_CHECKSUM;
142         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
143         msg->dstnid = tn->tn_kp->kp_nid;
144
145         /* Checksum entire message. */
146         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
147
148         tn->tn_tx_msg.length = msg->nob;
149 }
150
151 static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
152 {
153         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
154
155         /* Pack the protocol header and payload. */
156         lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
157
158         lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
159                             msg,
160                             offsetof(struct kfilnd_msg,
161                                      proto.immed.payload),
162                             tn->tn_num_iovec, tn->tn_kiov, 0,
163                             tn->tn_nob);
164
165         /* Pack the transport header. */
166         msg->magic = KFILND_MSG_MAGIC;
167         msg->version = KFILND_MSG_VERSION;
168         msg->type = tn->msg_type;
169         msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
170         msg->cksum = NO_CHECKSUM;
171         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
172         msg->dstnid = tn->tn_kp->kp_nid;
173
174         /* Checksum entire message. */
175         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
176
177         tn->tn_tx_msg.length = msg->nob;
178 }
179
180 static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
181                                 unsigned int nob)
182 {
183         const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
184
185         if (nob < hdr_size) {
186                 KFILND_EP_ERROR(ep, "Short message: %u", nob);
187                 return -EPROTO;
188         }
189
190         /* TODO: Support byte swapping on mixed endian systems. */
191         if (msg->magic != KFILND_MSG_MAGIC) {
192                 KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
193                 return -EPROTO;
194         }
195
196         /* TODO: Allow for older versions. */
197         if (msg->version > KFILND_MSG_VERSION) {
198                 KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
199                 return -EPROTO;
200         }
201
202         if (msg->nob > nob) {
203                 KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
204                                 msg->nob);
205                 return -EPROTO;
206         }
207
208         /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
209         if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
210                 KFILND_EP_ERROR(ep, "Bad checksum");
211                 return -EPROTO;
212         }
213
214         if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
215                 KFILND_EP_ERROR(ep, "Bad destination nid: %s",
216                                 libcfs_nid2str(msg->dstnid));
217                 return -EPROTO;
218         }
219
220         if (msg->srcnid == LNET_NID_ANY) {
221                 KFILND_EP_ERROR(ep, "Bad source nid: %s",
222                                 libcfs_nid2str(msg->srcnid));
223                 return -EPROTO;
224         }
225
226         if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
227                 KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
228                                 msg_type_to_str(msg->type),
229                                 msg->nob, kfilnd_tn_msgtype2size(msg->type));
230                 return -EPROTO;
231         }
232
233         switch ((enum kfilnd_msg_type)msg->type) {
234         case KFILND_MSG_IMMEDIATE:
235         case KFILND_MSG_BULK_PUT_REQ:
236         case KFILND_MSG_BULK_GET_REQ:
237                 if (msg->version == 0) {
238                         KFILND_EP_ERROR(ep,
239                                         "Bad message type and version: type=%s version=%u",
240                                         msg_type_to_str(msg->type),
241                                         msg->version);
242                         return -EPROTO;
243                 }
244                 break;
245
246         case KFILND_MSG_HELLO_REQ:
247         case KFILND_MSG_HELLO_RSP:
248                 if (msg->version != 0) {
249                         KFILND_EP_ERROR(ep,
250                                         "Bad message type and version: type=%s version=%u",
251                                         msg_type_to_str(msg->type),
252                                         msg->version);
253                         return -EPROTO;
254                 }
255                 break;
256
257         default:
258                 CERROR("Unknown message type %x\n", msg->type);
259                 return -EPROTO;
260         }
261         return 0;
262 }
263
264 static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
265 {
266         unsigned int data_size_bucket =
267                 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
268         struct kfilnd_tn_duration_stat *stat;
269         s64 time;
270         s64 cur;
271
272         if (tn->is_initiator)
273                 stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
274         else
275                 stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
276
277         time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts));
278         atomic64_add(time, &stat->accumulated_duration);
279         atomic_inc(&stat->accumulated_count);
280
281         do {
282                 cur = atomic64_read(&stat->max_duration);
283                 if (time <= cur)
284                         break;
285         } while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);
286
287         do {
288                 cur = atomic64_read(&stat->min_duration);
289                 if (time >= cur)
290                         break;
291         } while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
292 }
293
294 static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
295                                    enum tn_states new_state)
296 {
297         KFILND_TN_DEBUG(tn, "%s -> %s state change",
298                         tn_state_to_str(tn->tn_state),
299                         tn_state_to_str(new_state));
300
301         kfilnd_tn_record_state_change(tn);
302
303         tn->tn_state = new_state;
304         tn->tn_state_ts = ktime_get();
305 }
306
307 static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
308                                     enum lnet_msg_hstatus hstatus)
309 {
310         /* Only the first non-ok status will take. */
311         if (tn->tn_status == 0) {
312                 KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
313                                 status);
314                 tn->tn_status = status;
315         }
316
317         if (tn->hstatus == LNET_MSG_STATUS_OK) {
318                 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
319                                 tn->hstatus, hstatus);
320                 tn->hstatus = hstatus;
321         }
322 }
323
324 static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
325 {
326         return tn->tn_status != 0;
327 }
328
329 /**
330  * kfilnd_tn_process_rx_event() - Process an immediate receive event.
331  *
332  * For each immediate receive, a transaction structure needs to be allocated to
333  * process the receive.
334  */
335 void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
336                                 struct kfilnd_msg *rx_msg, int msg_size)
337 {
338         struct kfilnd_transaction *tn;
339         bool alloc_msg = true;
340         int rc;
341         enum tn_events event = TN_EVENT_RX_HELLO;
342
343         /* Increment buf ref count for this work */
344         atomic_inc(&bufdesc->immed_ref);
345
346         /* Unpack the message */
347         rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
348         if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
349                 kfilnd_ep_imm_buffer_put(bufdesc);
350                 KFILND_EP_ERROR(bufdesc->immed_end,
351                                 "Failed to unpack message %d", rc);
352                 return;
353         }
354
355         switch ((enum kfilnd_msg_type)rx_msg->type) {
356         case KFILND_MSG_IMMEDIATE:
357         case KFILND_MSG_BULK_PUT_REQ:
358         case KFILND_MSG_BULK_GET_REQ:
359                 event = TN_EVENT_RX_OK;
360                 fallthrough;
361         case KFILND_MSG_HELLO_RSP:
362                 alloc_msg = false;
363                 fallthrough;
364         case KFILND_MSG_HELLO_REQ:
365                 /* Context points to a received buffer and status is the length.
366                  * Allocate a Tn structure, set its values, then launch the
367                  * receive.
368                  */
369                 tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
370                                      bufdesc->immed_end->end_cpt,
371                                      rx_msg->srcnid, alloc_msg, false,
372                                      false);
373                 if (IS_ERR(tn)) {
374                         kfilnd_ep_imm_buffer_put(bufdesc);
375                         KFILND_EP_ERROR(bufdesc->immed_end,
376                                         "Failed to allocate transaction struct: rc=%ld",
377                                         PTR_ERR(tn));
378                         return;
379                 }
380
381                 tn->tn_rx_msg.msg = rx_msg;
382                 tn->tn_rx_msg.length = msg_size;
383                 tn->tn_posted_buf = bufdesc;
384
385                 KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
386                                 msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
387                                 tn->tn_mr_key);
388                 break;
389
390         default:
391                 KFILND_EP_ERROR(bufdesc->immed_end,
392                                 "Unhandled kfilnd message type: %d",
393                                 (enum kfilnd_msg_type)rx_msg->type);
394                 LBUG();
395         };
396
397         kfilnd_tn_event_handler(tn, event, 0);
398 }
399
400 static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
401 {
402         unsigned int data_size_bucket =
403                 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
404         struct kfilnd_tn_duration_stat *stat;
405         s64 time;
406         s64 cur;
407
408         if (tn->is_initiator)
409                 stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
410         else
411                 stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
412
413         time = ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts));
414         atomic64_add(time, &stat->accumulated_duration);
415         atomic_inc(&stat->accumulated_count);
416
417         do {
418                 cur = atomic64_read(&stat->max_duration);
419                 if (time <= cur)
420                         break;
421         } while (atomic64_cmpxchg(&stat->max_duration, cur, time) != cur);
422
423         do {
424                 cur = atomic64_read(&stat->min_duration);
425                 if (time >= cur)
426                         break;
427         } while (atomic64_cmpxchg(&stat->min_duration, cur, time) != cur);
428 }
429
430 /**
431  * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
432  *
433  * All state machine functions should call kfilnd_tn_finalize() instead of
434  * kfilnd_tn_free(). Once all expected asynchronous events have been received,
435  * if the transaction lock has not been released, it will now be released,
436  * transaction resources cleaned up, and LNet finalized will be called.
437  */
438 static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
439 {
440         if (!*tn_released) {
441                 mutex_unlock(&tn->tn_lock);
442                 *tn_released = true;
443         }
444
445         /* Release the reference on the multi-receive buffer. */
446         if (tn->tn_posted_buf)
447                 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
448
449         /* Finalize LNet operation. */
450         if (tn->tn_lntmsg) {
451                 tn->tn_lntmsg->msg_health_status = tn->hstatus;
452                 lnet_finalize(tn->tn_lntmsg, tn->tn_status);
453         }
454
455         if (tn->tn_getreply) {
456                 tn->tn_getreply->msg_health_status = tn->hstatus;
457                 lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
458                                        tn->tn_getreply,
459                                        tn->tn_status ? 0 : tn->tn_nob);
460                 lnet_finalize(tn->tn_getreply, tn->tn_status);
461         }
462
463         if (KFILND_TN_PEER_VALID(tn))
464                 kfilnd_peer_put(tn->tn_kp);
465
466         kfilnd_tn_record_state_change(tn);
467         kfilnd_tn_record_duration(tn);
468
469         kfilnd_tn_free(tn);
470 }
471
472 /**
473  * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
474  * @tn: Transaction to have tagged received cancelled.
475  *
476  * Return: 0 on success. Else, negative errno. If an error occurs, resources may
477  * be leaked.
478  */
479 static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
480 {
481         int rc;
482
483         /* Issue a cancel. A return code of zero means the operation issued an
484          * async cancel. A return code of -ENOENT means the tagged receive was
485          * not found. The assumption here is that a tagged send landed thus
486          * removing the tagged receive buffer from hardware. For both cases,
487          * async events should occur.
488          */
489         rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
490         if (rc != 0 && rc != -ENOENT) {
491                 KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
492                 return rc;
493         }
494
495         return 0;
496 }
497
498 static void kfilnd_tn_timeout_work(struct work_struct *work)
499 {
500         struct kfilnd_transaction *tn =
501                 container_of(work, struct kfilnd_transaction, timeout_work);
502
503         KFILND_TN_ERROR(tn, "Bulk operation timeout");
504         kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
505 }
506
507 static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
508 {
509         struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
510
511         queue_work(kfilnd_wq, &tn->timeout_work);
512 }
513
514 static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
515 {
516         return timer_delete(&tn->timeout_timer);
517 }
518
519 static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
520 {
521         ktime_t remaining_time = max_t(ktime_t, 0,
522                                        tn->deadline - ktime_get_seconds());
523         unsigned long expires = remaining_time * HZ + jiffies;
524
525         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
526                 expires = jiffies;
527
528         cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
529                         (unsigned long)tn, 0);
530         mod_timer(&tn->timeout_timer, expires);
531 }
532
533 /*  The following are the state machine routines for the transactions. */
534 static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
535                                        enum tn_events event, int status,
536                                        bool *tn_released)
537 {
538         int rc;
539
540         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
541                         status);
542
543         switch (event) {
544         case TN_EVENT_INIT_BULK:
545                 /* Need to cancel the tagged receive to prevent resources from
546                  * being leaked.
547                  */
548                 rc = kfilnd_tn_cancel_tag_recv(tn);
549
550                 switch (rc) {
551                 /* Async event will progress transaction. */
552                 case 0:
553                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
554                         return 0;
555
556                 /* Need to replay TN_EVENT_INIT_BULK event while in the
557                  * TN_STATE_SEND_FAILED state.
558                  */
559                 case -EAGAIN:
560                         KFILND_TN_DEBUG(tn,
561                                         "Need to replay cancel tagged recv");
562                         return -EAGAIN;
563
564                 default:
565                         KFILND_TN_ERROR(tn,
566                                         "Unexpected error during cancel tagged receive: rc=%d",
567                                         rc);
568                         LBUG();
569                 }
570                 break;
571
572         default:
573                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
574                 LBUG();
575         }
576 }
577
578 static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
579                                               enum tn_events event, int status,
580                                               bool *tn_released)
581 {
582         int rc;
583
584         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
585                         status);
586
587         switch (event) {
588         case TN_EVENT_INIT_BULK:
589                 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
590                 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
591                                 libcfs_nid2str(tn->tn_kp->kp_nid),
592                                 tn->tn_target_addr);
593
594                 kfilnd_tn_pack_bulk_req(tn);
595
596                 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
597                 switch (rc) {
598                 /* Async event will progress immediate send. */
599                 case 0:
600                         kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
601                         return 0;
602
603                 /* Need to replay TN_EVENT_INIT_BULK event while in the
604                  * TN_STATE_TAGGED_RECV_POSTED state.
605                  */
606                 case -EAGAIN:
607                         KFILND_TN_DEBUG(tn,
608                                         "Need to replay post send to %s(%#llx)",
609                                         libcfs_nid2str(tn->tn_kp->kp_nid),
610                                         tn->tn_target_addr);
611                         return -EAGAIN;
612
613                 /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
614                  * posted tagged receive buffer.
615                  */
616                 default:
617                         KFILND_TN_ERROR(tn,
618                                         "Failed to post send to %s(%#llx): rc=%d",
619                                         libcfs_nid2str(tn->tn_kp->kp_nid),
620                                         tn->tn_target_addr, rc);
621                         kfilnd_tn_status_update(tn, rc,
622                                                 LNET_MSG_STATUS_LOCAL_ERROR);
623                         kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
624
625                         /* Propogate TN_EVENT_INIT_BULK event to
626                          * TN_STATE_SEND_FAILED handler.
627                          */
628                         return kfilnd_tn_state_send_failed(tn, event, rc,
629                                                            tn_released);
630                 }
631
632         default:
633                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
634                 LBUG();
635         }
636 }
637
638 static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
639                                 enum tn_events event, int status,
640                                 bool *tn_released)
641 {
642         struct kfilnd_msg *msg;
643         int rc = 0;
644         bool finalize = false;
645         struct lnet_hdr hdr;
646         struct lnet_nid srcnid;
647
648         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
649                         status);
650
651         /* For new peers, send a hello request message and queue the true LNet
652          * message for replay.
653          */
654         if (kfilnd_peer_needs_throttle(tn->tn_kp) &&
655             (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
656                 if (kfilnd_peer_deleted(tn->tn_kp)) {
657                         /* We'll assign a NETWORK_TIMEOUT message health status
658                          * below because we don't know why this peer was marked
659                          * for removal
660                          */
661                         rc = -ESTALE;
662                         KFILND_TN_DEBUG(tn, "Drop message to deleted peer");
663                 } else if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
664                         /* We're throttling transactions to this peer until
665                          * a handshake can be completed, but there is no HELLO
666                          * currently in flight. This implies the HELLO has
667                          * failed, and we should cancel this TN. Otherwise we
668                          * are stuck waiting for the TN deadline.
669                          *
670                          * We assign NETWORK_TIMEOUT health status below because
671                          * we do not know why the HELLO failed.
672                          */
673                         rc = -ECANCELED;
674                         KFILND_TN_DEBUG(tn, "Cancel throttled TN");
675                 } else if (ktime_before(ktime_get_seconds(),
676                                         tn->tn_replay_deadline)) {
677                         /* If the transaction replay deadline has not been met,
678                          * then return -EAGAIN. This will cause this transaction
679                          * event to be replayed. During this time, an async
680                          * hello message from the peer should occur at which
681                          * point we can resume sending new messages to this peer
682                          */
683                         KFILND_TN_DEBUG(tn, "hello response pending");
684                         return -EAGAIN;
685                 } else {
686                         rc = -ETIMEDOUT;
687                 }
688
689                 kfilnd_tn_status_update(tn, rc,
690                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
691                 rc = 0;
692                 goto out;
693         }
694
695         if ((event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK) &&
696             ktime_after(ktime_get_seconds(), tn->tn_replay_deadline)) {
697                 kfilnd_tn_status_update(tn, -ETIMEDOUT,
698                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
699                 rc = 0;
700                 goto out;
701         }
702
703         switch (event) {
704         case TN_EVENT_INIT_IMMEDIATE:
705         case TN_EVENT_TX_HELLO:
706                 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
707                 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
708                                 libcfs_nid2str(tn->tn_kp->kp_nid),
709                                 tn->tn_target_addr);
710
711                 if (event == TN_EVENT_INIT_IMMEDIATE)
712                         kfilnd_tn_pack_immed_msg(tn);
713                 else
714                         kfilnd_tn_pack_hello_req(tn);
715
716                 /* Send immediate message. */
717                 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
718                 switch (rc) {
719                 /* Async event will progress immediate send. */
720                 case 0:
721                         kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
722                         return 0;
723
724                 /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
725                  * state.
726                  */
727                 case -EAGAIN:
728                         KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
729                                         libcfs_nid2str(tn->tn_kp->kp_nid),
730                                         tn->tn_target_addr);
731                         return -EAGAIN;
732
733                 default:
734                         KFILND_TN_ERROR(tn,
735                                         "Failed to post send to %s(%#llx): rc=%d",
736                                         libcfs_nid2str(tn->tn_kp->kp_nid),
737                                         tn->tn_target_addr, rc);
738                         if (event == TN_EVENT_TX_HELLO)
739                                 kfilnd_peer_clear_hello_pending(tn->tn_kp);
740                         kfilnd_tn_status_update(tn, rc,
741                                                 LNET_MSG_STATUS_LOCAL_ERROR);
742                 }
743                 break;
744
745         case TN_EVENT_INIT_BULK:
746                 /* Post tagged receive buffer used to land bulk response. */
747                 rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
748
749                 switch (rc) {
750                 /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
751                 case 0:
752                         kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
753
754                         /* Propogate TN_EVENT_INIT_BULK event to
755                          * TN_STATE_TAGGED_RECV_POSTED handler.
756                          */
757                         return kfilnd_tn_state_tagged_recv_posted(tn, event,
758                                                                   rc,
759                                                                   tn_released);
760
761                 /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
762                  * state.
763                  */
764                 case -EAGAIN:
765                         KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
766                         return -EAGAIN;
767
768                 default:
769                         KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
770                                         rc);
771                         kfilnd_tn_status_update(tn, rc,
772                                                 LNET_MSG_STATUS_LOCAL_ERROR);
773                 }
774                 break;
775
776         case TN_EVENT_RX_OK:
777                 if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
778                         rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
779                                                        tn->tn_ep->end_cpt,
780                                                        tn->tn_kp);
781                         if (rc)
782                                 KFILND_TN_ERROR(tn,
783                                                 "Failed to send hello request: rc=%d",
784                                                 rc);
785                         rc = 0;
786                 }
787
788                 /* If this is a new peer then we cannot progress the transaction
789                  * and must drop it
790                  */
791                 if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
792                         KFILND_TN_ERROR(tn,
793                                         "Dropping message from %s due to stale peer",
794                                         libcfs_nid2str(tn->tn_kp->kp_nid));
795                         kfilnd_tn_status_update(tn, -EPROTO,
796                                                 LNET_MSG_STATUS_LOCAL_DROPPED);
797                         rc = 0;
798                         goto out;
799                 }
800
801                 LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);
802                 msg = tn->tn_rx_msg.msg;
803
804                 /* Update the NID address with the new preferred RX context. */
805                 kfilnd_peer_alive(tn->tn_kp);
806
807                 /* Pass message up to LNet
808                  * The TN will be reused in this call chain so we need to
809                  * release the lock on the TN before proceeding.
810                  */
811                 KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
812                                 tn_state_to_str(tn->tn_state));
813
814                 /* TODO: Do not manually update this state change. */
815                 tn->tn_state = TN_STATE_IMM_RECV;
816                 mutex_unlock(&tn->tn_lock);
817                 *tn_released = true;
818                 lnet_nid4_to_nid(msg->srcnid, &srcnid);
819                 if (msg->type == KFILND_MSG_IMMEDIATE) {
820                         lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
821                         rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
822                                         &hdr, &srcnid, tn, 0);
823                 } else {
824                         lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
825                         rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
826                                         &hdr, &srcnid, tn, 1);
827                 }
828
829                 /* If successful, transaction has been accepted by LNet and we
830                  * cannot process the transaction anymore within this context.
831                  */
832                 if (!rc)
833                         return 0;
834
835                 KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
836                 kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
837                 break;
838
839         case TN_EVENT_RX_HELLO:
840                 msg = tn->tn_rx_msg.msg;
841
842                 kfilnd_peer_alive(tn->tn_kp);
843
844                 switch (msg->type) {
845                 case KFILND_MSG_HELLO_REQ:
846                         kfilnd_peer_process_hello(tn->tn_kp, msg);
847                         tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
848                         KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
849                                         libcfs_nid2str(tn->tn_kp->kp_nid),
850                                         tn->tn_target_addr);
851
852                         kfilnd_tn_pack_hello_rsp(tn);
853
854                         /* Send immediate message. */
855                         rc = kfilnd_ep_post_send(tn->tn_ep, tn);
856                         switch (rc) {
857                         case 0:
858                                 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
859                                 return 0;
860
861                         case -EAGAIN:
862                                 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
863                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
864                                                 tn->tn_target_addr);
865                                 return -EAGAIN;
866
867                         default:
868                                 KFILND_TN_ERROR(tn,
869                                                 "Failed to post send to %s(%#llx): rc=%d",
870                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
871                                                 tn->tn_target_addr, rc);
872                                 kfilnd_tn_status_update(tn, rc,
873                                                         LNET_MSG_STATUS_LOCAL_ERROR);
874                         }
875                         break;
876
877                 case KFILND_MSG_HELLO_RSP:
878                         rc = 0;
879                         kfilnd_peer_process_hello(tn->tn_kp, msg);
880                         finalize = true;
881                         break;
882
883                 default:
884                         KFILND_TN_ERROR(tn, "Invalid message type: %s",
885                                         msg_type_to_str(msg->type));
886                         LBUG();
887                 }
888                 break;
889
890         default:
891                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
892                 LBUG();
893         }
894
895 out:
896         if (kfilnd_tn_has_failed(tn))
897                 finalize = true;
898
899         if (finalize)
900                 kfilnd_tn_finalize(tn, tn_released);
901
902         return rc;
903 }
904
905 static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
906                                     enum tn_events event, int status,
907                                     bool *tn_released)
908 {
909         enum lnet_msg_hstatus hstatus;
910
911         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
912                         status);
913
914         switch (event) {
915         case TN_EVENT_TX_FAIL:
916                 if (status == -ETIMEDOUT || status == -EIO)
917                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
918                 else
919                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
920
921                 kfilnd_tn_status_update(tn, status, hstatus);
922                 /* RKEY is not involved in immediate sends, so no need to
923                  * delete peer
924                  */
925                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
926                 if (tn->msg_type == KFILND_MSG_HELLO_REQ)
927                         kfilnd_peer_clear_hello_pending(tn->tn_kp);
928                 break;
929
930         case TN_EVENT_TX_OK:
931                 kfilnd_peer_alive(tn->tn_kp);
932                 break;
933
934         default:
935                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
936                 LBUG();
937         }
938
939         kfilnd_tn_finalize(tn, tn_released);
940
941         return 0;
942 }
943
944 static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
945                                     enum tn_events event, int status,
946                                     bool *tn_released)
947 {
948         int rc = 0;
949         bool finalize = false;
950
951         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
952                         status);
953
954         switch (event) {
955         case TN_EVENT_INIT_TAG_RMA:
956         case TN_EVENT_SKIP_TAG_RMA:
957                 /* Release the buffer we received the request on. All relevant
958                  * information to perform the RMA operation is stored in the
959                  * transaction structure. This should be done before the RMA
960                  * operation to prevent two contexts from potentially processing
961                  * the same transaction.
962                  *
963                  * TODO: Prevent this from returning -EAGAIN.
964                  */
965                 if (tn->tn_posted_buf) {
966                         kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
967                         tn->tn_posted_buf = NULL;
968                 }
969
970                 /* Update the KFI address to use the response RX context. */
971                 tn->tn_target_addr =
972                         kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
973                                     tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
974                 KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
975                                 libcfs_nid2str(tn->tn_kp->kp_nid),
976                                 tn->tn_target_addr);
977
978                 /* Initiate the RMA operation to push/pull the LNet payload or
979                  * send a tagged message to finalize the bulk operation if the
980                  * RMA operation should be skipped.
981                  */
982                 if (event == TN_EVENT_INIT_TAG_RMA) {
983                         if (tn->sink_buffer)
984                                 rc = kfilnd_ep_post_read(tn->tn_ep, tn);
985                         else
986                                 rc = kfilnd_ep_post_write(tn->tn_ep, tn);
987
988                         switch (rc) {
989                         /* Async tagged RMA event will progress transaction. */
990                         case 0:
991                                 kfilnd_tn_state_change(tn,
992                                                        TN_STATE_WAIT_TAG_RMA_COMP);
993                                 return 0;
994
995                         /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
996                          * the TN_STATE_IMM_RECV state.
997                          */
998                         case -EAGAIN:
999                                 KFILND_TN_DEBUG(tn,
1000                                                 "Need to replay tagged %s to %s(%#llx)",
1001                                                 tn->sink_buffer ? "read" : "write",
1002                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1003                                                 tn->tn_target_addr);
1004                                 return -EAGAIN;
1005
1006                         default:
1007                                 KFILND_TN_ERROR(tn,
1008                                                 "Failed to post tagged %s to %s(%#llx): rc=%d",
1009                                                 tn->sink_buffer ? "read" : "write",
1010                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1011                                                 tn->tn_target_addr, rc);
1012                                 kfilnd_tn_status_update(tn, rc,
1013                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1014                         }
1015                 } else {
1016                         kfilnd_tn_status_update(tn, status,
1017                                                 LNET_MSG_STATUS_OK);
1018
1019                         /* Since the LNet initiator has posted a unique tagged
1020                          * buffer specific for this LNet transaction and the
1021                          * LNet target has decide not to push/pull to/for the
1022                          * LNet initiator tagged buffer, a noop operation is
1023                          * done to this tagged buffer (i/e payload transfer size
1024                          * is zero). But, immediate data, which contains the
1025                          * LNet target status for the transaction, is sent to
1026                          * the LNet initiator. Immediate data only appears in
1027                          * the completion event at the LNet initiator and not in
1028                          * the tagged buffer.
1029                          */
1030                         tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
1031
1032                         rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
1033                         switch (rc) {
1034                         /* Async tagged RMA event will progress transaction. */
1035                         case 0:
1036                                 kfilnd_tn_state_change(tn,
1037                                                        TN_STATE_WAIT_TAG_COMP);
1038                                 return 0;
1039
1040                         /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
1041                          * the TN_STATE_IMM_RECV state.
1042                          */
1043                         case -EAGAIN:
1044                                 KFILND_TN_DEBUG(tn,
1045                                                 "Need to replay tagged send to %s(%#llx)",
1046                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1047                                                 tn->tn_target_addr);
1048                                 return -EAGAIN;
1049
1050                         default:
1051                                 KFILND_TN_ERROR(tn,
1052                                                 "Failed to post tagged send to %s(%#llx): rc=%d",
1053                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1054                                                 tn->tn_target_addr, rc);
1055                                 kfilnd_tn_status_update(tn, rc,
1056                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1057                         }
1058                 }
1059                 break;
1060
1061         case TN_EVENT_RX_OK:
1062                 finalize = true;
1063                 break;
1064
1065         default:
1066                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1067                 LBUG();
1068         }
1069
1070         if (kfilnd_tn_has_failed(tn))
1071                 finalize = true;
1072
1073         if (finalize)
1074                 kfilnd_tn_finalize(tn, tn_released);
1075
1076         return rc;
1077 }
1078
1079 static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
1080                                      enum tn_events event, int status,
1081                                      bool *tn_released)
1082 {
1083         int rc;
1084         enum lnet_msg_hstatus hstatus;
1085
1086         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1087                         status);
1088
1089         switch (event) {
1090         case TN_EVENT_TX_OK:
1091                 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
1092                     CFS_FAIL_CHECK_RESET(CFS_KFI_FAIL_WAIT_SEND_COMP1,
1093                                          CFS_KFI_FAIL_WAIT_SEND_COMP2 |
1094                                          CFS_FAIL_ONCE))
1095                         break;
1096                 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ ||
1097                              tn->msg_type == KFILND_MSG_BULK_GET_REQ) &&
1098                     CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP3)) {
1099                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1100                         kfilnd_tn_status_update(tn, -EIO, hstatus);
1101                         /* Don't delete peer on debug/test path */
1102                         kfilnd_peer_tn_failed(tn->tn_kp, -EIO, false);
1103                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1104                         break;
1105                 }
1106                 kfilnd_peer_alive(tn->tn_kp);
1107                 kfilnd_tn_timeout_enable(tn);
1108                 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
1109                 break;
1110
1111         case TN_EVENT_TAG_RX_OK:
1112                 if (status)
1113                         kfilnd_tn_status_update(tn, status, LNET_MSG_STATUS_OK);
1114
1115                 kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
1116                 if (unlikely(tn->msg_type == KFILND_MSG_BULK_PUT_REQ) &&
1117                     CFS_FAIL_CHECK(CFS_KFI_FAIL_WAIT_SEND_COMP2)) {
1118                         struct kfi_cq_err_entry fake_error = {
1119                                 .op_context = tn,
1120                                 .flags = KFI_MSG | KFI_SEND,
1121                                 .err = EIO,
1122                         };
1123
1124                         kfilnd_ep_gen_fake_err(tn->tn_ep, &fake_error);
1125                 }
1126                 break;
1127
1128         case TN_EVENT_TX_FAIL:
1129                 if (status == -ETIMEDOUT)
1130                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1131                 else
1132                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1133
1134                 kfilnd_tn_status_update(tn, status, hstatus);
1135                 /* The bulk request message failed, however, there is an edge
1136                  * case where the last request packet of a message is received
1137                  * at the target successfully, but the corresponding response
1138                  * packet is repeatedly dropped. This results in the target
1139                  * generating a success completion event but the initiator
1140                  * generating an error completion event. Due to this, we have to
1141                  * delete the peer here to protect the RKEY.
1142                  */
1143                 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1144
1145                 /* Need to cancel the tagged receive to prevent resources from
1146                  * being leaked.
1147                  */
1148                 rc = kfilnd_tn_cancel_tag_recv(tn);
1149
1150                 switch (rc) {
1151                 /* Async cancel event will progress transaction. */
1152                 case 0:
1153                         kfilnd_tn_status_update(tn, status,
1154                                                 LNET_MSG_STATUS_LOCAL_ERROR);
1155                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1156                         return 0;
1157
1158                 /* Need to replay TN_EVENT_INIT_BULK event while in the
1159                  * TN_STATE_SEND_FAILED state.
1160                  */
1161                 case -EAGAIN:
1162                         KFILND_TN_DEBUG(tn,
1163                                         "Need to replay cancel tagged recv");
1164                         return -EAGAIN;
1165
1166                 default:
1167                         KFILND_TN_ERROR(tn,
1168                                         "Unexpected error during cancel tagged receive: rc=%d",
1169                                         rc);
1170                         LBUG();
1171                 }
1172                 break;
1173
1174         case TN_EVENT_TAG_RX_FAIL:
1175                 kfilnd_tn_status_update(tn, status,
1176                                         LNET_MSG_STATUS_LOCAL_ERROR);
1177                 /* The target may hold a reference to the RKEY, so we need to
1178                  * delete the peer to protect it
1179                  */
1180                 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1181                 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1182                 break;
1183
1184         default:
1185                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1186                 LBUG();
1187         }
1188
1189         return 0;
1190 }
1191
1192 static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
1193                                           enum tn_events event, int status,
1194                                           bool *tn_released)
1195 {
1196         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1197                         status);
1198
1199         switch (event) {
1200         case TN_EVENT_TX_OK:
1201                 kfilnd_peer_alive(tn->tn_kp);
1202                 break;
1203         case TN_EVENT_TX_FAIL:
1204                 kfilnd_tn_status_update(tn, status,
1205                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
1206                 /* The bulk request message was never queued so we do not need
1207                  * to delete the peer
1208                  */
1209                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1210                 break;
1211         default:
1212                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1213                 LBUG();
1214         }
1215
1216         kfilnd_tn_finalize(tn, tn_released);
1217
1218         return 0;
1219 }
1220
1221 static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
1222                                              enum tn_events event, int status,
1223                                              bool *tn_released)
1224 {
1225         enum lnet_msg_hstatus hstatus;
1226
1227         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1228                         status);
1229
1230         switch (event) {
1231         case TN_EVENT_TAG_TX_OK:
1232                 kfilnd_peer_alive(tn->tn_kp);
1233                 break;
1234
1235         case TN_EVENT_TAG_TX_FAIL:
1236                 if (status == -ETIMEDOUT)
1237                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1238                 else
1239                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1240
1241                 kfilnd_tn_status_update(tn, status, hstatus);
1242                 /* This event occurrs at the target of a bulk LNetPut/Get.
1243                  * Since the target did not generate the RKEY, we needn't
1244                  * delete the peer.
1245                  */
1246                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1247                 break;
1248
1249         default:
1250                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1251                 LBUG();
1252         }
1253
1254         kfilnd_tn_finalize(tn, tn_released);
1255
1256         return 0;
1257 }
1258
1259 static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
1260                                          enum tn_events event, int status,
1261                                          bool *tn_released)
1262 {
1263         int rc;
1264         enum lnet_msg_hstatus hstatus;
1265
1266         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1267                         status);
1268
1269         switch (event) {
1270         case TN_EVENT_TAG_RX_FAIL:
1271         case TN_EVENT_TAG_RX_OK:
1272                 /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
1273                  * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
1274                  * LNet target returned -ENODATA.
1275                  */
1276                 if (status) {
1277                         if (event == TN_EVENT_TAG_RX_FAIL)
1278                                 kfilnd_tn_status_update(tn, status,
1279                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1280                         else
1281                                 kfilnd_tn_status_update(tn, status,
1282                                                         LNET_MSG_STATUS_OK);
1283                 }
1284
1285                 if (!kfilnd_tn_timeout_cancel(tn)) {
1286                         kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
1287                         return 0;
1288                 }
1289                 break;
1290
1291         case TN_EVENT_TIMEOUT:
1292                 /* Need to cancel the tagged receive to prevent resources from
1293                  * being leaked.
1294                  */
1295                 rc = kfilnd_tn_cancel_tag_recv(tn);
1296
1297                 switch (rc) {
1298                 /* Async cancel event will progress transaction. */
1299                 case 0:
1300                         kfilnd_tn_state_change(tn,
1301                                                TN_STATE_WAIT_TIMEOUT_TAG_COMP);
1302                         return 0;
1303
1304                 /* Need to replay TN_EVENT_INIT_BULK event while in the
1305                  * TN_STATE_WAIT_TAG_COMP state.
1306                  */
1307                 case -EAGAIN:
1308                         KFILND_TN_DEBUG(tn,
1309                                         "Need to replay cancel tagged recv");
1310                         return -EAGAIN;
1311
1312                 default:
1313                         KFILND_TN_ERROR(tn,
1314                                         "Unexpected error during cancel tagged receive: rc=%d",
1315                                         rc);
1316                         LBUG();
1317                 }
1318                 break;
1319
1320         case TN_EVENT_TAG_TX_FAIL:
1321                 if (status == -ETIMEDOUT)
1322                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1323                 else
1324                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1325
1326                 kfilnd_tn_status_update(tn, status, hstatus);
1327                 /* This event occurrs at the target of a bulk LNetPut/Get.
1328                  * Since the target did not generate the RKEY, we needn't
1329                  * delete the peer.
1330                  */
1331                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1332                 break;
1333
1334         case TN_EVENT_TAG_TX_OK:
1335                 kfilnd_peer_alive(tn->tn_kp);
1336                 break;
1337
1338         default:
1339                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1340                 LBUG();
1341         }
1342
1343         kfilnd_tn_finalize(tn, tn_released);
1344
1345         return 0;
1346 }
1347
1348 static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
1349                                 enum tn_events event, int status,
1350                                 bool *tn_released)
1351 {
1352         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1353                         status);
1354
1355         switch (event) {
1356         case TN_EVENT_TX_FAIL:
1357                 /* Prior TN states will have deleted the peer if necessary */
1358                 kfilnd_peer_tn_failed(tn->tn_kp, status, false);
1359                 break;
1360
1361         case TN_EVENT_TX_OK:
1362                 kfilnd_peer_alive(tn->tn_kp);
1363                 break;
1364
1365         case TN_EVENT_TAG_RX_OK:
1366                 kfilnd_peer_alive(tn->tn_kp);
1367                 if (tn->tn_status != status) {
1368                         KFILND_TN_DEBUG(tn, "%d -> %d status change",
1369                                         tn->tn_status, status);
1370                         tn->tn_status = status;
1371                 }
1372                 if (tn->hstatus != LNET_MSG_STATUS_OK) {
1373                         KFILND_TN_DEBUG(tn, "%d -> %d health status change",
1374                                         tn->hstatus, LNET_MSG_STATUS_OK);
1375                         tn->hstatus = LNET_MSG_STATUS_OK;
1376                 }
1377                 break;
1378
1379         case TN_EVENT_TAG_RX_FAIL:
1380         case TN_EVENT_TAG_RX_CANCEL:
1381                 break;
1382
1383         default:
1384                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1385                 LBUG();
1386         }
1387
1388         kfilnd_tn_finalize(tn, tn_released);
1389
1390         return 0;
1391 }
1392
1393 static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
1394                                                  enum tn_events event,
1395                                                  int status, bool *tn_released)
1396 {
1397         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1398                         status);
1399
1400         switch (event) {
1401         case TN_EVENT_TAG_RX_CANCEL:
1402                 kfilnd_tn_status_update(tn, -ETIMEDOUT,
1403                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
1404                 /* We've cancelled locally, but the target may still have a ref
1405                  * on the RKEY. Delete the peer to protect it.
1406                  */
1407                 kfilnd_peer_tn_failed(tn->tn_kp, -ETIMEDOUT, true);
1408                 break;
1409
1410         case TN_EVENT_TAG_RX_FAIL:
1411                 kfilnd_tn_status_update(tn, status,
1412                                         LNET_MSG_STATUS_LOCAL_ERROR);
1413                 /* The initiator of a bulk LNetPut/Get eagerly sends the bulk
1414                  * request message to the target without ensuring the tagged
1415                  * receive buffer is posted. Thus, the target could be issuing
1416                  * kfi_write/read operations using the tagged receive buffer
1417                  * RKEY, and we need to delete this peer to protect the it.
1418                  */
1419                 kfilnd_peer_tn_failed(tn->tn_kp, status, true);
1420                 break;
1421
1422         case TN_EVENT_TAG_RX_OK:
1423                 break;
1424
1425         default:
1426                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1427                 LBUG();
1428         }
1429
1430         kfilnd_tn_finalize(tn, tn_released);
1431
1432         return 0;
1433 }
1434
1435 static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
1436                                              enum tn_events event, int status,
1437                                              bool *tn_released)
1438 {
1439         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1440                         status);
1441
1442         if (event == TN_EVENT_TIMEOUT) {
1443                 kfilnd_tn_finalize(tn, tn_released);
1444         } else {
1445                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1446                 LBUG();
1447         }
1448
1449         return 0;
1450 }
1451
1452 static int
1453 (* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
1454                                                        enum tn_events event,
1455                                                        int status,
1456                                                        bool *tn_released) = {
1457         [TN_STATE_IDLE] = kfilnd_tn_state_idle,
1458         [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
1459         [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
1460         [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
1461         [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
1462         [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
1463         [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
1464         [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
1465         [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
1466                 kfilnd_tn_state_wait_timeout_tag_comp,
1467         [TN_STATE_FAIL] = kfilnd_tn_state_fail,
1468         [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
1469         [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
1470 };
1471
1472 /**
1473  * kfilnd_tn_event_handler() - Update transaction state machine with an event.
1474  * @tn: Transaction to be updated.
1475  * @event: Transaction event.
1476  * @status: Errno status associated with the event.
1477  *
1478  * When the transaction event handler is first called on a new transaction, the
1479  * transaction is now own by the transaction system. This means that will be
1480  * freed by the system as the transaction is progressed through the state
1481  * machine.
1482  */
1483 void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
1484                              enum tn_events event, int status)
1485 {
1486         bool tn_released = false;
1487         int rc;
1488
1489         if (!tn)
1490                 return;
1491
1492         mutex_lock(&tn->tn_lock);
1493         rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
1494                                                           &tn_released);
1495         if (rc == -EAGAIN) {
1496                 tn->replay_event = event;
1497                 tn->replay_status = status;
1498                 kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
1499         }
1500
1501         if (!tn_released)
1502                 mutex_unlock(&tn->tn_lock);
1503 }
1504
1505 /**
1506  * kfilnd_tn_free() - Free a transaction.
1507  */
1508 void kfilnd_tn_free(struct kfilnd_transaction *tn)
1509 {
1510         spin_lock(&tn->tn_ep->tn_list_lock);
1511         list_del(&tn->tn_entry);
1512         spin_unlock(&tn->tn_ep->tn_list_lock);
1513
1514         KFILND_TN_DEBUG(tn, "Transaction freed");
1515
1516         if (tn->tn_mr_key)
1517                 kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
1518
1519         /* Free send message buffer if needed. */
1520         if (tn->tn_tx_msg.msg)
1521                 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1522
1523         kmem_cache_free(tn_cache, tn);
1524 }
1525
1526 /*
1527  * Allocation logic common to kfilnd_tn_alloc() and kfilnd_tn_alloc_for_hello().
1528  * @ep: The KFI LND endpoint to associate with the transaction.
1529  * @kp: The kfilnd peer to associate with the transaction.
1530  * See kfilnd_tn_alloc() for a description of the other fields
1531  * Note: Caller must have a reference on @kp
1532  */
1533 static struct kfilnd_transaction *kfilnd_tn_alloc_common(struct kfilnd_ep *ep,
1534                                                          struct kfilnd_peer *kp,
1535                                                          bool alloc_msg,
1536                                                          bool is_initiator,
1537                                                          u16 key)
1538 {
1539         struct kfilnd_transaction *tn;
1540         int rc;
1541         ktime_t tn_alloc_ts;
1542
1543         tn_alloc_ts = ktime_get();
1544
1545         tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
1546         if (!tn) {
1547                 rc = -ENOMEM;
1548                 goto err;
1549         }
1550
1551         if (alloc_msg) {
1552                 tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
1553                 if (!tn->tn_tx_msg.msg) {
1554                         rc = -ENOMEM;
1555                         goto err_free_tn;
1556                 }
1557         }
1558
1559         tn->tn_mr_key = key;
1560
1561         tn->tn_kp = kp;
1562
1563         mutex_init(&tn->tn_lock);
1564         tn->tn_ep = ep;
1565         tn->tn_response_rx = ep->end_context_id;
1566         tn->tn_state = TN_STATE_IDLE;
1567         tn->hstatus = LNET_MSG_STATUS_OK;
1568         tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
1569         tn->tn_replay_deadline = ktime_sub(tn->deadline,
1570                                            (lnet_get_lnd_timeout() / 2));
1571         tn->is_initiator = is_initiator;
1572         INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
1573
1574         /* Add the transaction to an endpoint.  This is like
1575          * incrementing a ref counter.
1576          */
1577         spin_lock(&ep->tn_list_lock);
1578         list_add_tail(&tn->tn_entry, &ep->tn_list);
1579         spin_unlock(&ep->tn_list_lock);
1580
1581         tn->tn_alloc_ts = tn_alloc_ts;
1582         tn->tn_state_ts = ktime_get();
1583
1584         KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
1585
1586         return tn;
1587
1588 err_free_tn:
1589         if (tn->tn_tx_msg.msg)
1590                 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1591         kmem_cache_free(tn_cache, tn);
1592 err:
1593         return ERR_PTR(rc);
1594 }
1595
1596 static struct kfilnd_ep *kfilnd_dev_to_ep(struct kfilnd_dev *dev, int cpt)
1597 {
1598         struct kfilnd_ep *ep;
1599
1600         if (!dev)
1601                 return ERR_PTR(-EINVAL);
1602
1603         ep = dev->cpt_to_endpoint[cpt];
1604         if (!ep) {
1605                 CWARN("%s used invalid cpt=%d\n",
1606                       libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
1607                 ep = dev->kfd_endpoints[0];
1608         }
1609
1610         return ep;
1611 }
1612
1613 /**
1614  * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
1615  * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
1616  * transaction.
1617  * @cpt: CPT of the transaction.
1618  * @target_nid: Target NID of the transaction.
1619  * @alloc_msg: Allocate an immediate message for the transaction.
1620  * @is_initiator: Is initiator of LNet transaction.
1621  * @need_key: Is transaction memory region key needed.
1622  *
1623  * During transaction allocation, each transaction is associated with a KFI LND
1624  * endpoint use to post data transfer operations. The CPT argument is used to
1625  * lookup the KFI LND endpoint within the KFI LND device.
1626  *
1627  * Return: On success, valid pointer. Else, negative errno pointer.
1628  */
1629 struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
1630                                            lnet_nid_t target_nid,
1631                                            bool alloc_msg, bool is_initiator,
1632                                            bool need_key)
1633 {
1634         struct kfilnd_transaction *tn;
1635         struct kfilnd_ep *ep;
1636         struct kfilnd_peer *kp;
1637         int rc;
1638         u16 key = 0;
1639
1640         ep = kfilnd_dev_to_ep(dev, cpt);
1641         if (IS_ERR(ep)) {
1642                 rc = PTR_ERR(ep);
1643                 goto err;
1644         }
1645
1646         /* Consider the following:
1647          * Thread 1: Posts tagged receive with RKEY based on
1648          *           peerA::kp_local_session_key X and tn_mr_key Y
1649          * Thread 2: Fetches peerA with kp_local_session_key X
1650          * Thread 1: Cancels tagged receive, marks peerA for removal, and
1651          *           releases tn_mr_key Y
1652          * Thread 2: allocates tn_mr_key Y
1653          * At this point, thread 2 has the same RKEY used by thread 1.
1654          * Thus, we always allocate the tn_mr_key before looking up the peer,
1655          * and we always mark peers for removal before releasing tn_mr_key.
1656          */
1657         if (need_key) {
1658                 rc = kfilnd_ep_get_key(ep);
1659                 if (rc < 0)
1660                         goto err;
1661                 key = rc;
1662         }
1663
1664         kp = kfilnd_peer_get(dev, target_nid);
1665         if (IS_ERR(kp)) {
1666                 rc = PTR_ERR(kp);
1667                 goto err_put_key;
1668         }
1669
1670         tn = kfilnd_tn_alloc_common(ep, kp, alloc_msg, is_initiator, key);
1671         if (IS_ERR(tn)) {
1672                 rc = PTR_ERR(tn);
1673                 kfilnd_peer_put(kp);
1674                 goto err_put_key;
1675         }
1676
1677         return tn;
1678
1679 err_put_key:
1680         kfilnd_ep_put_key(ep, key);
1681 err:
1682         return ERR_PTR(rc);
1683 }
1684
1685 /* Like kfilnd_tn_alloc(), but caller already looked up the kfilnd_peer.
1686  * Used only to allocate a TN for a hello request.
1687  * See kfilnd_tn_alloc()/kfilnd_tn_alloc_comm()
1688  * Note: Caller must have a reference on @kp
1689  */
1690 struct kfilnd_transaction *kfilnd_tn_alloc_for_hello(struct kfilnd_dev *dev, int cpt,
1691                                                      struct kfilnd_peer *kp)
1692 {
1693         struct kfilnd_transaction *tn;
1694         struct kfilnd_ep *ep;
1695         int rc;
1696
1697         ep = kfilnd_dev_to_ep(dev, cpt);
1698         if (IS_ERR(ep)) {
1699                 rc = PTR_ERR(ep);
1700                 goto err;
1701         }
1702
1703         tn = kfilnd_tn_alloc_common(ep, kp, true, true, 0);
1704         if (IS_ERR(tn)) {
1705                 rc = PTR_ERR(tn);
1706                 goto err;
1707         }
1708
1709         return tn;
1710
1711 err:
1712         return ERR_PTR(rc);
1713 }
1714
1715 /**
1716  * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
1717  *
1718  * This function should only be called when there are no outstanding
1719  * transactions.
1720  */
1721 void kfilnd_tn_cleanup(void)
1722 {
1723         kmem_cache_destroy(imm_buf_cache);
1724         kmem_cache_destroy(tn_cache);
1725 }
1726
1727 /**
1728  * kfilnd_tn_init() - Initialize KFI LND transaction system.
1729  *
1730  * Return: On success, zero. Else, negative errno.
1731  */
1732 int kfilnd_tn_init(void)
1733 {
1734         tn_cache = kmem_cache_create("kfilnd_tn",
1735                                      sizeof(struct kfilnd_transaction), 0,
1736                                      SLAB_HWCACHE_ALIGN, NULL);
1737         if (!tn_cache)
1738                 goto err;
1739
1740         imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
1741                                           KFILND_IMMEDIATE_MSG_SIZE, 0,
1742                                           SLAB_HWCACHE_ALIGN, NULL);
1743         if (!imm_buf_cache)
1744                 goto err_tn_cache_destroy;
1745
1746         return 0;
1747
1748 err_tn_cache_destroy:
1749         kmem_cache_destroy(tn_cache);
1750 err:
1751         return -ENOMEM;
1752 }
1753
1754 /**
1755  * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
1756  * @tn: Transaction to have buffer set.
1757  * @kiov: LNet KIOV buffer.
1758  * @num_iov: Number of IOVs.
1759  * @offset: Offset into IOVs where the buffer starts.
1760  * @len: Length of the buffer.
1761  *
1762  * This function takes the user provided IOV, offset, and len, and sets the
1763  * transaction buffer. The user provided IOV is an LNet KIOV. When the
1764  * transaction buffer is configured, the user provided offset is applied
1765  * when the transaction buffer is configured (i.e. the transaction buffer
1766  * offset is zero).
1767  */
1768 int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
1769                            struct bio_vec *kiov, size_t num_iov,
1770                            size_t offset, size_t len)
1771 {
1772         size_t i;
1773         size_t cur_len = 0;
1774         size_t cur_offset = offset;
1775         size_t cur_iov = 0;
1776         size_t tmp_len;
1777         size_t tmp_offset;
1778
1779         for (i = 0; (i < num_iov) && (cur_len < len); i++) {
1780                 /* Skip KIOVs until a KIOV with a length less than the current
1781                  * offset is found.
1782                  */
1783                 if (kiov[i].bv_len <= cur_offset) {
1784                         cur_offset -= kiov[i].bv_len;
1785                         continue;
1786                 }
1787
1788                 tmp_len = kiov[i].bv_len - cur_offset;
1789                 tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
1790
1791                 if (tmp_len + cur_len > len)
1792                         tmp_len = len - cur_len;
1793
1794                 /* tn_kiov is an array of size LNET_MAX_IOV */
1795                 if (cur_iov >= LNET_MAX_IOV)
1796                         return -EINVAL;
1797
1798                 tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
1799                 tn->tn_kiov[cur_iov].bv_len = tmp_len;
1800                 tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
1801
1802                 cur_iov++;
1803                 cur_len += tmp_len;
1804                 cur_offset = 0;
1805         }
1806
1807         tn->tn_num_iovec = cur_iov;
1808         tn->tn_nob = cur_len;
1809
1810         return 0;
1811 }