Whamcloud - gitweb
LU-8191 tests: convert functions to static
[fs/lustre-release.git] / lnet / klnds / kfilnd / kfilnd_tn.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright 2022 Hewlett Packard Enterprise Development LP
24  */
25 /*
26  * This file is part of Lustre, http://www.lustre.org/
27  */
28 /*
29  * kfilnd transaction and state machine processing.
30  */
31
32 #include "kfilnd_tn.h"
33 #include "kfilnd_ep.h"
34 #include "kfilnd_dev.h"
35 #include "kfilnd_dom.h"
36 #include "kfilnd_peer.h"
37 #include <asm/checksum.h>
38
39 static struct kmem_cache *tn_cache;
40 static struct kmem_cache *imm_buf_cache;
41
42 static __sum16 kfilnd_tn_cksum(void *ptr, int nob)
43 {
44         if (cksum)
45                 return csum_fold(csum_partial(ptr, nob, 0));
46         return NO_CHECKSUM;
47 }
48
49 static int kfilnd_tn_msgtype2size(enum kfilnd_msg_type type)
50 {
51         const int hdr_size = offsetof(struct kfilnd_msg, proto);
52
53         switch (type) {
54         case KFILND_MSG_IMMEDIATE:
55                 return offsetof(struct kfilnd_msg, proto.immed.payload[0]);
56
57         case KFILND_MSG_BULK_PUT_REQ:
58         case KFILND_MSG_BULK_GET_REQ:
59                 return hdr_size + sizeof(struct kfilnd_bulk_req_msg);
60
61         default:
62                 return -1;
63         }
64 }
65
66 static void kfilnd_tn_pack_hello_req(struct kfilnd_transaction *tn)
67 {
68         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
69
70         /* Pack the protocol header and payload. */
71         msg->proto.hello.version = KFILND_MSG_VERSION;
72         msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
73         msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
74
75         /* TODO: Support multiple RX contexts per peer. */
76         msg->proto.hello.rx_count = 1;
77
78         /* Pack the transport header. */
79         msg->magic = KFILND_MSG_MAGIC;
80
81         /* Mesage version zero is only valid for hello requests. */
82         msg->version = 0;
83         msg->type = KFILND_MSG_HELLO_REQ;
84         msg->nob = sizeof(struct kfilnd_hello_msg) +
85                 offsetof(struct kfilnd_msg, proto);
86         msg->cksum = NO_CHECKSUM;
87         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
88         msg->dstnid = tn->tn_kp->kp_nid;
89
90         /* Checksum entire message. */
91         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
92
93         tn->tn_tx_msg.length = msg->nob;
94 }
95
96 static void kfilnd_tn_pack_hello_rsp(struct kfilnd_transaction *tn)
97 {
98         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
99
100         /* Pack the protocol header and payload. */
101         msg->proto.hello.version = tn->tn_kp->kp_version;
102         msg->proto.hello.rx_base = kfilnd_peer_target_rx_base(tn->tn_kp);
103         msg->proto.hello.session_key = tn->tn_kp->kp_local_session_key;
104
105         /* TODO: Support multiple RX contexts per peer. */
106         msg->proto.hello.rx_count = 1;
107
108         /* Pack the transport header. */
109         msg->magic = KFILND_MSG_MAGIC;
110
111         /* Mesage version zero is only valid for hello requests. */
112         msg->version = 0;
113         msg->type = KFILND_MSG_HELLO_RSP;
114         msg->nob = sizeof(struct kfilnd_hello_msg) +
115                 offsetof(struct kfilnd_msg, proto);
116         msg->cksum = NO_CHECKSUM;
117         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
118         msg->dstnid = tn->tn_kp->kp_nid;
119
120         /* Checksum entire message. */
121         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
122
123         tn->tn_tx_msg.length = msg->nob;
124 }
125
126 static void kfilnd_tn_pack_bulk_req(struct kfilnd_transaction *tn)
127 {
128         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
129
130         /* Pack the protocol header and payload. */
131         lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.bulk_req.hdr);
132         msg->proto.bulk_req.key = tn->tn_mr_key;
133         msg->proto.bulk_req.response_rx = tn->tn_response_rx;
134
135         /* Pack the transport header. */
136         msg->magic = KFILND_MSG_MAGIC;
137         msg->version = KFILND_MSG_VERSION;
138         msg->type = tn->msg_type;
139         msg->nob = sizeof(struct kfilnd_bulk_req_msg) +
140                 offsetof(struct kfilnd_msg, proto);
141         msg->cksum = NO_CHECKSUM;
142         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
143         msg->dstnid = tn->tn_kp->kp_nid;
144
145         /* Checksum entire message. */
146         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
147
148         tn->tn_tx_msg.length = msg->nob;
149 }
150
151 static void kfilnd_tn_pack_immed_msg(struct kfilnd_transaction *tn)
152 {
153         struct kfilnd_msg *msg = tn->tn_tx_msg.msg;
154
155         /* Pack the protocol header and payload. */
156         lnet_hdr_to_nid4(&tn->tn_lntmsg->msg_hdr, &msg->proto.immed.hdr);
157
158         lnet_copy_kiov2flat(KFILND_IMMEDIATE_MSG_SIZE,
159                             msg,
160                             offsetof(struct kfilnd_msg,
161                                      proto.immed.payload),
162                             tn->tn_num_iovec, tn->tn_kiov, 0,
163                             tn->tn_nob);
164
165         /* Pack the transport header. */
166         msg->magic = KFILND_MSG_MAGIC;
167         msg->version = KFILND_MSG_VERSION;
168         msg->type = tn->msg_type;
169         msg->nob = offsetof(struct kfilnd_msg, proto.immed.payload[tn->tn_nob]);
170         msg->cksum = NO_CHECKSUM;
171         msg->srcnid = lnet_nid_to_nid4(&tn->tn_ep->end_dev->kfd_ni->ni_nid);
172         msg->dstnid = tn->tn_kp->kp_nid;
173
174         /* Checksum entire message. */
175         msg->cksum = kfilnd_tn_cksum(msg, msg->nob);
176
177         tn->tn_tx_msg.length = msg->nob;
178 }
179
180 static int kfilnd_tn_unpack_msg(struct kfilnd_ep *ep, struct kfilnd_msg *msg,
181                                 unsigned int nob)
182 {
183         const unsigned int hdr_size = offsetof(struct kfilnd_msg, proto);
184
185         if (nob < hdr_size) {
186                 KFILND_EP_ERROR(ep, "Short message: %u", nob);
187                 return -EPROTO;
188         }
189
190         /* TODO: Support byte swapping on mixed endian systems. */
191         if (msg->magic != KFILND_MSG_MAGIC) {
192                 KFILND_EP_ERROR(ep, "Bad magic: %#x", msg->magic);
193                 return -EPROTO;
194         }
195
196         /* TODO: Allow for older versions. */
197         if (msg->version > KFILND_MSG_VERSION) {
198                 KFILND_EP_ERROR(ep, "Bad version: %#x", msg->version);
199                 return -EPROTO;
200         }
201
202         if (msg->nob > nob) {
203                 KFILND_EP_ERROR(ep, "Short message: got=%u, expected=%u", nob,
204                                 msg->nob);
205                 return -EPROTO;
206         }
207
208         /* If kfilnd_tn_cksum() returns a non-zero value, checksum is bad. */
209         if (msg->cksum != NO_CHECKSUM && kfilnd_tn_cksum(msg, msg->nob)) {
210                 KFILND_EP_ERROR(ep, "Bad checksum");
211                 return -EPROTO;
212         }
213
214         if (msg->dstnid != lnet_nid_to_nid4(&ep->end_dev->kfd_ni->ni_nid)) {
215                 KFILND_EP_ERROR(ep, "Bad destination nid: %s",
216                                 libcfs_nid2str(msg->dstnid));
217                 return -EPROTO;
218         }
219
220         if (msg->srcnid == LNET_NID_ANY) {
221                 KFILND_EP_ERROR(ep, "Bad source nid: %s",
222                                 libcfs_nid2str(msg->srcnid));
223                 return -EPROTO;
224         }
225
226         if (msg->nob < kfilnd_tn_msgtype2size(msg->type)) {
227                 KFILND_EP_ERROR(ep, "Short %s: %d(%d)\n",
228                                 msg_type_to_str(msg->type),
229                                 msg->nob, kfilnd_tn_msgtype2size(msg->type));
230                 return -EPROTO;
231         }
232
233         switch ((enum kfilnd_msg_type)msg->type) {
234         case KFILND_MSG_IMMEDIATE:
235         case KFILND_MSG_BULK_PUT_REQ:
236         case KFILND_MSG_BULK_GET_REQ:
237                 if (msg->version == 0) {
238                         KFILND_EP_ERROR(ep,
239                                         "Bad message type and version: type=%s version=%u",
240                                         msg_type_to_str(msg->type),
241                                         msg->version);
242                         return -EPROTO;
243                 }
244                 break;
245
246         case KFILND_MSG_HELLO_REQ:
247         case KFILND_MSG_HELLO_RSP:
248                 if (msg->version != 0) {
249                         KFILND_EP_ERROR(ep,
250                                         "Bad message type and version: type=%s version=%u",
251                                         msg_type_to_str(msg->type),
252                                         msg->version);
253                         return -EPROTO;
254                 }
255                 break;
256
257         default:
258                 CERROR("Unknown message type %x\n", msg->type);
259                 return -EPROTO;
260         }
261         return 0;
262 }
263
264 static void kfilnd_tn_record_state_change(struct kfilnd_transaction *tn)
265 {
266         unsigned int data_size_bucket =
267                 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
268         struct kfilnd_tn_duration_stat *stat;
269
270         if (tn->is_initiator)
271                 stat = &tn->tn_ep->end_dev->initiator_state_stats.state[tn->tn_state].data_size[data_size_bucket];
272         else
273                 stat = &tn->tn_ep->end_dev->target_state_stats.state[tn->tn_state].data_size[data_size_bucket];
274
275         atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_state_ts)),
276                      &stat->accumulated_duration);
277         atomic_inc(&stat->accumulated_count);
278 }
279
280 static void kfilnd_tn_state_change(struct kfilnd_transaction *tn,
281                                    enum tn_states new_state)
282 {
283         KFILND_TN_DEBUG(tn, "%s -> %s state change",
284                         tn_state_to_str(tn->tn_state),
285                         tn_state_to_str(new_state));
286
287         kfilnd_tn_record_state_change(tn);
288
289         tn->tn_state = new_state;
290         tn->tn_state_ts = ktime_get();
291 }
292
293 static void kfilnd_tn_status_update(struct kfilnd_transaction *tn, int status,
294                                     enum lnet_msg_hstatus hstatus)
295 {
296         /* Only the first non-ok status will take. */
297         if (tn->tn_status == 0) {
298                 KFILND_TN_DEBUG(tn, "%d -> %d status change", tn->tn_status,
299                                 status);
300                 tn->tn_status = status;
301         }
302
303         if (tn->hstatus == LNET_MSG_STATUS_OK) {
304                 KFILND_TN_DEBUG(tn, "%d -> %d health status change",
305                                 tn->hstatus, hstatus);
306                 tn->hstatus = hstatus;
307         }
308 }
309
310 static bool kfilnd_tn_has_failed(struct kfilnd_transaction *tn)
311 {
312         return tn->tn_status != 0;
313 }
314
315 /**
316  * kfilnd_tn_process_rx_event() - Process an immediate receive event.
317  *
318  * For each immediate receive, a transaction structure needs to be allocated to
319  * process the receive.
320  */
321 void kfilnd_tn_process_rx_event(struct kfilnd_immediate_buffer *bufdesc,
322                                 struct kfilnd_msg *rx_msg, int msg_size)
323 {
324         struct kfilnd_transaction *tn;
325         bool alloc_msg = true;
326         int rc;
327         enum tn_events event = TN_EVENT_RX_HELLO;
328
329         /* Increment buf ref count for this work */
330         atomic_inc(&bufdesc->immed_ref);
331
332         /* Unpack the message */
333         rc = kfilnd_tn_unpack_msg(bufdesc->immed_end, rx_msg, msg_size);
334         if (rc || CFS_FAIL_CHECK(CFS_KFI_FAIL_MSG_UNPACK)) {
335                 kfilnd_ep_imm_buffer_put(bufdesc);
336                 KFILND_EP_ERROR(bufdesc->immed_end,
337                                 "Failed to unpack message %d", rc);
338                 return;
339         }
340
341         switch ((enum kfilnd_msg_type)rx_msg->type) {
342         case KFILND_MSG_IMMEDIATE:
343         case KFILND_MSG_BULK_PUT_REQ:
344         case KFILND_MSG_BULK_GET_REQ:
345                 event = TN_EVENT_RX_OK;
346                 fallthrough;
347         case KFILND_MSG_HELLO_RSP:
348                 alloc_msg = false;
349                 fallthrough;
350         case KFILND_MSG_HELLO_REQ:
351                 /* Context points to a received buffer and status is the length.
352                  * Allocate a Tn structure, set its values, then launch the
353                  * receive.
354                  */
355                 tn = kfilnd_tn_alloc(bufdesc->immed_end->end_dev,
356                                      bufdesc->immed_end->end_cpt,
357                                      rx_msg->srcnid, alloc_msg, false,
358                                      false);
359                 if (IS_ERR(tn)) {
360                         kfilnd_ep_imm_buffer_put(bufdesc);
361                         KFILND_EP_ERROR(bufdesc->immed_end,
362                                         "Failed to allocate transaction struct: rc=%ld",
363                                         PTR_ERR(tn));
364                         return;
365                 }
366
367                 tn->tn_rx_msg.msg = rx_msg;
368                 tn->tn_rx_msg.length = msg_size;
369                 tn->tn_posted_buf = bufdesc;
370
371                 KFILND_EP_DEBUG(bufdesc->immed_end, "%s transaction ID %u",
372                                 msg_type_to_str((enum kfilnd_msg_type)rx_msg->type),
373                                 tn->tn_mr_key);
374                 break;
375
376         default:
377                 KFILND_EP_ERROR(bufdesc->immed_end,
378                                 "Unhandled kfilnd message type: %d",
379                                 (enum kfilnd_msg_type)rx_msg->type);
380                 LBUG();
381         };
382
383         kfilnd_tn_event_handler(tn, event, 0);
384 }
385
386 static void kfilnd_tn_record_duration(struct kfilnd_transaction *tn)
387 {
388         unsigned int data_size_bucket =
389                 kfilnd_msg_len_to_data_size_bucket(tn->lnet_msg_len);
390         struct kfilnd_tn_duration_stat *stat;
391
392         if (tn->is_initiator)
393                 stat = &tn->tn_ep->end_dev->initiator_stats.data_size[data_size_bucket];
394         else
395                 stat = &tn->tn_ep->end_dev->target_stats.data_size[data_size_bucket];
396
397         atomic64_add(ktime_to_ns(ktime_sub(ktime_get(), tn->tn_alloc_ts)),
398                      &stat->accumulated_duration);
399         atomic_inc(&stat->accumulated_count);
400 }
401
402 /**
403  * kfilnd_tn_finalize() - Cleanup resources and finalize LNet operation.
404  *
405  * All state machine functions should call kfilnd_tn_finalize() instead of
406  * kfilnd_tn_free(). Once all expected asynchronous events have been received,
407  * if the transaction lock has not been released, it will now be released,
408  * transaction resources cleaned up, and LNet finalized will be called.
409  */
410 static void kfilnd_tn_finalize(struct kfilnd_transaction *tn, bool *tn_released)
411 {
412         if (!*tn_released) {
413                 mutex_unlock(&tn->tn_lock);
414                 *tn_released = true;
415         }
416
417         /* Release the reference on the multi-receive buffer. */
418         if (tn->tn_posted_buf)
419                 kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
420
421         /* Finalize LNet operation. */
422         if (tn->tn_lntmsg) {
423                 tn->tn_lntmsg->msg_health_status = tn->hstatus;
424                 lnet_finalize(tn->tn_lntmsg, tn->tn_status);
425         }
426
427         if (tn->tn_getreply) {
428                 tn->tn_getreply->msg_health_status = tn->hstatus;
429                 lnet_set_reply_msg_len(tn->tn_ep->end_dev->kfd_ni,
430                                        tn->tn_getreply,
431                                        tn->tn_status ? 0 : tn->tn_nob);
432                 lnet_finalize(tn->tn_getreply, tn->tn_status);
433         }
434
435         if (KFILND_TN_PEER_VALID(tn))
436                 kfilnd_peer_put(tn->tn_kp);
437
438         kfilnd_tn_record_state_change(tn);
439         kfilnd_tn_record_duration(tn);
440
441         kfilnd_tn_free(tn);
442 }
443
444 /**
445  * kfilnd_tn_cancel_tag_recv() - Attempt to cancel a tagged receive.
446  * @tn: Transaction to have tagged received cancelled.
447  *
448  * Return: 0 on success. Else, negative errno. If an error occurs, resources may
449  * be leaked.
450  */
451 static int kfilnd_tn_cancel_tag_recv(struct kfilnd_transaction *tn)
452 {
453         int rc;
454
455         /* Issue a cancel. A return code of zero means the operation issued an
456          * async cancel. A return code of -ENOENT means the tagged receive was
457          * not found. The assumption here is that a tagged send landed thus
458          * removing the tagged receive buffer from hardware. For both cases,
459          * async events should occur.
460          */
461         rc = kfilnd_ep_cancel_tagged_recv(tn->tn_ep, tn);
462         if (rc != 0 && rc != -ENOENT) {
463                 KFILND_TN_ERROR(tn, "Failed to cancel tag receive. Resources may leak.");
464                 return rc;
465         }
466
467         return 0;
468 }
469
470 static void kfilnd_tn_timeout_work(struct work_struct *work)
471 {
472         struct kfilnd_transaction *tn =
473                 container_of(work, struct kfilnd_transaction, timeout_work);
474
475         KFILND_TN_ERROR(tn, "Bulk operation timeout");
476         kfilnd_tn_event_handler(tn, TN_EVENT_TIMEOUT, 0);
477 }
478
479 static void kfilnd_tn_timeout(cfs_timer_cb_arg_t data)
480 {
481         struct kfilnd_transaction *tn = cfs_from_timer(tn, data, timeout_timer);
482
483         queue_work(kfilnd_wq, &tn->timeout_work);
484 }
485
486 static bool kfilnd_tn_timeout_cancel(struct kfilnd_transaction *tn)
487 {
488         return timer_delete(&tn->timeout_timer);
489 }
490
491 static void kfilnd_tn_timeout_enable(struct kfilnd_transaction *tn)
492 {
493         ktime_t remaining_time = max_t(ktime_t, 0,
494                                        tn->deadline - ktime_get_seconds());
495         unsigned long expires = remaining_time * HZ + jiffies;
496
497         if (CFS_FAIL_CHECK(CFS_KFI_FAIL_BULK_TIMEOUT))
498                 expires = jiffies;
499
500         cfs_timer_setup(&tn->timeout_timer, kfilnd_tn_timeout,
501                         (unsigned long)tn, 0);
502         mod_timer(&tn->timeout_timer, expires);
503 }
504
505 /*  The following are the state machine routines for the transactions. */
506 static int kfilnd_tn_state_send_failed(struct kfilnd_transaction *tn,
507                                        enum tn_events event, int status,
508                                        bool *tn_released)
509 {
510         int rc;
511
512         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
513                         status);
514
515         switch (event) {
516         case TN_EVENT_INIT_BULK:
517                 /* Need to cancel the tagged receive to prevent resources from
518                  * being leaked.
519                  */
520                 rc = kfilnd_tn_cancel_tag_recv(tn);
521
522                 switch (rc) {
523                 /* Async event will progress transaction. */
524                 case 0:
525                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
526                         return 0;
527
528                 /* Need to replay TN_EVENT_INIT_BULK event while in the
529                  * TN_STATE_SEND_FAILED state.
530                  */
531                 case -EAGAIN:
532                         KFILND_TN_DEBUG(tn,
533                                         "Need to replay cancel tagged recv");
534                         return -EAGAIN;
535
536                 default:
537                         KFILND_TN_ERROR(tn,
538                                         "Unexpected error during cancel tagged receive: rc=%d",
539                                         rc);
540                         LBUG();
541                 }
542                 break;
543
544         default:
545                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
546                 LBUG();
547         }
548 }
549
550 static int kfilnd_tn_state_tagged_recv_posted(struct kfilnd_transaction *tn,
551                                               enum tn_events event, int status,
552                                               bool *tn_released)
553 {
554         int rc;
555
556         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
557                         status);
558
559         switch (event) {
560         case TN_EVENT_INIT_BULK:
561                 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
562                 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
563                                 libcfs_nid2str(tn->tn_kp->kp_nid),
564                                 tn->tn_target_addr);
565
566                 kfilnd_tn_pack_bulk_req(tn);
567
568                 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
569                 switch (rc) {
570                 /* Async event will progress immediate send. */
571                 case 0:
572                         kfilnd_tn_state_change(tn, TN_STATE_WAIT_COMP);
573                         return 0;
574
575                 /* Need to replay TN_EVENT_INIT_BULK event while in the
576                  * TN_STATE_TAGGED_RECV_POSTED state.
577                  */
578                 case -EAGAIN:
579                         KFILND_TN_DEBUG(tn,
580                                         "Need to replay post send to %s(%#llx)",
581                                         libcfs_nid2str(tn->tn_kp->kp_nid),
582                                         tn->tn_target_addr);
583                         return -EAGAIN;
584
585                 /* Need to transition to the TN_STATE_SEND_FAILED to cleanup
586                  * posted tagged receive buffer.
587                  */
588                 default:
589                         KFILND_TN_ERROR(tn,
590                                         "Failed to post send to %s(%#llx): rc=%d",
591                                         libcfs_nid2str(tn->tn_kp->kp_nid),
592                                         tn->tn_target_addr, rc);
593                         kfilnd_tn_status_update(tn, rc,
594                                                 LNET_MSG_STATUS_LOCAL_ERROR);
595                         kfilnd_tn_state_change(tn, TN_STATE_SEND_FAILED);
596
597                         /* Propogate TN_EVENT_INIT_BULK event to
598                          * TN_STATE_SEND_FAILED handler.
599                          */
600                         return kfilnd_tn_state_send_failed(tn, event, rc,
601                                                            tn_released);
602                 }
603
604         default:
605                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
606                 LBUG();
607         }
608 }
609
610 static int kfilnd_tn_state_idle(struct kfilnd_transaction *tn,
611                                 enum tn_events event, int status,
612                                 bool *tn_released)
613 {
614         struct kfilnd_msg *msg;
615         int rc = 0;
616         bool finalize = false;
617         struct lnet_hdr hdr;
618         struct lnet_nid srcnid;
619
620         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
621                         status);
622
623         /* For new peers, send a hello request message and queue the true LNet
624          * message for replay.
625          */
626         if (kfilnd_peer_needs_throttle(tn->tn_kp) &&
627             (event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK)) {
628                 if (kfilnd_peer_deleted(tn->tn_kp)) {
629                         /* We'll assign a NETWORK_TIMEOUT message health status
630                          * below because we don't know why this peer was marked
631                          * for removal
632                          */
633                         rc = -ESTALE;
634                         KFILND_TN_DEBUG(tn, "Drop message to deleted peer");
635                 } else if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
636                         /* We're throttling transactions to this peer until
637                          * a handshake can be completed, but there is no HELLO
638                          * currently in flight. This implies the HELLO has
639                          * failed, and we should cancel this TN. Otherwise we
640                          * are stuck waiting for the TN deadline.
641                          *
642                          * We assign NETWORK_TIMEOUT health status below because
643                          * we do not know why the HELLO failed.
644                          */
645                         rc = -ECANCELED;
646                         KFILND_TN_DEBUG(tn, "Cancel throttled TN");
647                 } else if (ktime_before(ktime_get_seconds(),
648                                         tn->tn_replay_deadline)) {
649                         /* If the transaction replay deadline has not been met,
650                          * then return -EAGAIN. This will cause this transaction
651                          * event to be replayed. During this time, an async
652                          * hello message from the peer should occur at which
653                          * point we can resume sending new messages to this peer
654                          */
655                         KFILND_TN_DEBUG(tn, "hello response pending");
656                         return -EAGAIN;
657                 } else {
658                         rc = -ETIMEDOUT;
659                 }
660
661                 kfilnd_tn_status_update(tn, rc,
662                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
663                 rc = 0;
664                 goto out;
665         }
666
667         if ((event == TN_EVENT_INIT_IMMEDIATE || event == TN_EVENT_INIT_BULK) &&
668             ktime_after(ktime_get_seconds(), tn->tn_replay_deadline)) {
669                 kfilnd_tn_status_update(tn, -ETIMEDOUT,
670                                         LNET_MSG_STATUS_NETWORK_TIMEOUT);
671                 rc = 0;
672                 goto out;
673         }
674
675         switch (event) {
676         case TN_EVENT_INIT_IMMEDIATE:
677         case TN_EVENT_TX_HELLO:
678                 tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
679                 KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
680                                 libcfs_nid2str(tn->tn_kp->kp_nid),
681                                 tn->tn_target_addr);
682
683                 if (event == TN_EVENT_INIT_IMMEDIATE)
684                         kfilnd_tn_pack_immed_msg(tn);
685                 else
686                         kfilnd_tn_pack_hello_req(tn);
687
688                 /* Send immediate message. */
689                 rc = kfilnd_ep_post_send(tn->tn_ep, tn);
690                 switch (rc) {
691                 /* Async event will progress immediate send. */
692                 case 0:
693                         kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
694                         return 0;
695
696                 /* Need to TN_EVENT_INIT_IMMEDIATE event while in TN_STATE_IDLE
697                  * state.
698                  */
699                 case -EAGAIN:
700                         KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
701                                         libcfs_nid2str(tn->tn_kp->kp_nid),
702                                         tn->tn_target_addr);
703                         return -EAGAIN;
704
705                 default:
706                         KFILND_TN_ERROR(tn,
707                                         "Failed to post send to %s(%#llx): rc=%d",
708                                         libcfs_nid2str(tn->tn_kp->kp_nid),
709                                         tn->tn_target_addr, rc);
710                         if (event == TN_EVENT_TX_HELLO)
711                                 kfilnd_peer_clear_hello_pending(tn->tn_kp);
712                         kfilnd_tn_status_update(tn, rc,
713                                                 LNET_MSG_STATUS_LOCAL_ERROR);
714                 }
715                 break;
716
717         case TN_EVENT_INIT_BULK:
718                 /* Post tagged receive buffer used to land bulk response. */
719                 rc = kfilnd_ep_post_tagged_recv(tn->tn_ep, tn);
720
721                 switch (rc) {
722                 /* Transition to TN_STATE_TAGGED_RECV_POSTED on success. */
723                 case 0:
724                         kfilnd_tn_state_change(tn, TN_STATE_TAGGED_RECV_POSTED);
725
726                         /* Propogate TN_EVENT_INIT_BULK event to
727                          * TN_STATE_TAGGED_RECV_POSTED handler.
728                          */
729                         return kfilnd_tn_state_tagged_recv_posted(tn, event,
730                                                                   rc,
731                                                                   tn_released);
732
733                 /* Need to replay TN_EVENT_INIT_BULK event in the TN_STATE_IDLE
734                  * state.
735                  */
736                 case -EAGAIN:
737                         KFILND_TN_DEBUG(tn, "Need to replay tagged recv");
738                         return -EAGAIN;
739
740                 default:
741                         KFILND_TN_ERROR(tn, "Failed to post tagged recv %d",
742                                         rc);
743                         kfilnd_tn_status_update(tn, rc,
744                                                 LNET_MSG_STATUS_LOCAL_ERROR);
745                 }
746                 break;
747
748         case TN_EVENT_RX_OK:
749                 if (kfilnd_peer_needs_hello(tn->tn_kp, false)) {
750                         rc = kfilnd_send_hello_request(tn->tn_ep->end_dev,
751                                                        tn->tn_ep->end_cpt,
752                                                        tn->tn_kp);
753                         if (rc)
754                                 KFILND_TN_ERROR(tn,
755                                                 "Failed to send hello request: rc=%d",
756                                                 rc);
757                         rc = 0;
758                 }
759
760                 /* If this is a new peer then we cannot progress the transaction
761                  * and must drop it
762                  */
763                 if (kfilnd_peer_is_new_peer(tn->tn_kp)) {
764                         KFILND_TN_ERROR(tn,
765                                         "Dropping message from %s due to stale peer",
766                                         libcfs_nid2str(tn->tn_kp->kp_nid));
767                         kfilnd_tn_status_update(tn, -EPROTO,
768                                                 LNET_MSG_STATUS_LOCAL_DROPPED);
769                         rc = 0;
770                         goto out;
771                 }
772
773                 LASSERT(kfilnd_peer_is_new_peer(tn->tn_kp) == false);
774                 msg = tn->tn_rx_msg.msg;
775
776                 /* Update the NID address with the new preferred RX context. */
777                 kfilnd_peer_alive(tn->tn_kp);
778
779                 /* Pass message up to LNet
780                  * The TN will be reused in this call chain so we need to
781                  * release the lock on the TN before proceeding.
782                  */
783                 KFILND_TN_DEBUG(tn, "%s -> TN_STATE_IMM_RECV state change",
784                                 tn_state_to_str(tn->tn_state));
785
786                 /* TODO: Do not manually update this state change. */
787                 tn->tn_state = TN_STATE_IMM_RECV;
788                 mutex_unlock(&tn->tn_lock);
789                 *tn_released = true;
790                 lnet_nid4_to_nid(msg->srcnid, &srcnid);
791                 if (msg->type == KFILND_MSG_IMMEDIATE) {
792                         lnet_hdr_from_nid4(&hdr, &msg->proto.immed.hdr);
793                         rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
794                                         &hdr, &srcnid, tn, 0);
795                 } else {
796                         lnet_hdr_from_nid4(&hdr, &msg->proto.bulk_req.hdr);
797                         rc = lnet_parse(tn->tn_ep->end_dev->kfd_ni,
798                                         &hdr, &srcnid, tn, 1);
799                 }
800
801                 /* If successful, transaction has been accepted by LNet and we
802                  * cannot process the transaction anymore within this context.
803                  */
804                 if (!rc)
805                         return 0;
806
807                 KFILND_TN_ERROR(tn, "Failed to parse LNet message: rc=%d", rc);
808                 kfilnd_tn_status_update(tn, rc, LNET_MSG_STATUS_LOCAL_ERROR);
809                 break;
810
811         case TN_EVENT_RX_HELLO:
812                 msg = tn->tn_rx_msg.msg;
813
814                 kfilnd_peer_alive(tn->tn_kp);
815
816                 switch (msg->type) {
817                 case KFILND_MSG_HELLO_REQ:
818                         kfilnd_peer_process_hello(tn->tn_kp, msg);
819                         tn->tn_target_addr = kfilnd_peer_get_kfi_addr(tn->tn_kp);
820                         KFILND_TN_DEBUG(tn, "Using peer %s(%#llx)",
821                                         libcfs_nid2str(tn->tn_kp->kp_nid),
822                                         tn->tn_target_addr);
823
824                         kfilnd_tn_pack_hello_rsp(tn);
825
826                         /* Send immediate message. */
827                         rc = kfilnd_ep_post_send(tn->tn_ep, tn);
828                         switch (rc) {
829                         case 0:
830                                 kfilnd_tn_state_change(tn, TN_STATE_IMM_SEND);
831                                 return 0;
832
833                         case -EAGAIN:
834                                 KFILND_TN_DEBUG(tn, "Need to replay send to %s(%#llx)",
835                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
836                                                 tn->tn_target_addr);
837                                 return -EAGAIN;
838
839                         default:
840                                 KFILND_TN_ERROR(tn,
841                                                 "Failed to post send to %s(%#llx): rc=%d",
842                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
843                                                 tn->tn_target_addr, rc);
844                                 kfilnd_tn_status_update(tn, rc,
845                                                         LNET_MSG_STATUS_LOCAL_ERROR);
846                         }
847                         break;
848
849                 case KFILND_MSG_HELLO_RSP:
850                         rc = 0;
851                         kfilnd_peer_process_hello(tn->tn_kp, msg);
852                         finalize = true;
853                         break;
854
855                 default:
856                         KFILND_TN_ERROR(tn, "Invalid message type: %s",
857                                         msg_type_to_str(msg->type));
858                         LBUG();
859                 }
860                 break;
861
862         default:
863                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
864                 LBUG();
865         }
866
867 out:
868         if (kfilnd_tn_has_failed(tn))
869                 finalize = true;
870
871         if (finalize)
872                 kfilnd_tn_finalize(tn, tn_released);
873
874         return rc;
875 }
876
877 static int kfilnd_tn_state_imm_send(struct kfilnd_transaction *tn,
878                                     enum tn_events event, int status,
879                                     bool *tn_released)
880 {
881         enum lnet_msg_hstatus hstatus;
882
883         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
884                         status);
885
886         switch (event) {
887         case TN_EVENT_TX_FAIL:
888                 if (status == -ETIMEDOUT || status == -EIO)
889                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
890                 else
891                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
892
893                 kfilnd_tn_status_update(tn, status, hstatus);
894                 kfilnd_peer_tn_failed(tn->tn_kp, status);
895                 if (tn->msg_type == KFILND_MSG_HELLO_REQ)
896                         kfilnd_peer_clear_hello_pending(tn->tn_kp);
897                 break;
898
899         case TN_EVENT_TX_OK:
900                 kfilnd_peer_alive(tn->tn_kp);
901                 break;
902
903         default:
904                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
905                 LBUG();
906         }
907
908         kfilnd_tn_finalize(tn, tn_released);
909
910         return 0;
911 }
912
913 static int kfilnd_tn_state_imm_recv(struct kfilnd_transaction *tn,
914                                     enum tn_events event, int status,
915                                     bool *tn_released)
916 {
917         int rc = 0;
918         bool finalize = false;
919
920         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
921                         status);
922
923         switch (event) {
924         case TN_EVENT_INIT_TAG_RMA:
925         case TN_EVENT_SKIP_TAG_RMA:
926                 /* Release the buffer we received the request on. All relevant
927                  * information to perform the RMA operation is stored in the
928                  * transaction structure. This should be done before the RMA
929                  * operation to prevent two contexts from potentially processing
930                  * the same transaction.
931                  *
932                  * TODO: Prevent this from returning -EAGAIN.
933                  */
934                 if (tn->tn_posted_buf) {
935                         kfilnd_ep_imm_buffer_put(tn->tn_posted_buf);
936                         tn->tn_posted_buf = NULL;
937                 }
938
939                 /* Update the KFI address to use the response RX context. */
940                 tn->tn_target_addr =
941                         kfi_rx_addr(KFILND_BASE_ADDR(tn->tn_kp->kp_addr),
942                                     tn->tn_response_rx, KFILND_FAB_RX_CTX_BITS);
943                 KFILND_TN_DEBUG(tn, "Using peer %s(0x%llx)",
944                                 libcfs_nid2str(tn->tn_kp->kp_nid),
945                                 tn->tn_target_addr);
946
947                 /* Initiate the RMA operation to push/pull the LNet payload or
948                  * send a tagged message to finalize the bulk operation if the
949                  * RMA operation should be skipped.
950                  */
951                 if (event == TN_EVENT_INIT_TAG_RMA) {
952                         if (tn->sink_buffer)
953                                 rc = kfilnd_ep_post_read(tn->tn_ep, tn);
954                         else
955                                 rc = kfilnd_ep_post_write(tn->tn_ep, tn);
956
957                         switch (rc) {
958                         /* Async tagged RMA event will progress transaction. */
959                         case 0:
960                                 kfilnd_tn_state_change(tn,
961                                                        TN_STATE_WAIT_TAG_RMA_COMP);
962                                 return 0;
963
964                         /* Need to replay TN_EVENT_INIT_TAG_RMA event while in
965                          * the TN_STATE_IMM_RECV state.
966                          */
967                         case -EAGAIN:
968                                 KFILND_TN_DEBUG(tn,
969                                                 "Need to replay tagged %s to %s(%#llx)",
970                                                 tn->sink_buffer ? "read" : "write",
971                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
972                                                 tn->tn_target_addr);
973                                 return -EAGAIN;
974
975                         default:
976                                 KFILND_TN_ERROR(tn,
977                                                 "Failed to post tagged %s to %s(%#llx): rc=%d",
978                                                 tn->sink_buffer ? "read" : "write",
979                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
980                                                 tn->tn_target_addr, rc);
981                                 kfilnd_tn_status_update(tn, rc,
982                                                         LNET_MSG_STATUS_LOCAL_ERROR);
983                         }
984                 } else {
985                         kfilnd_tn_status_update(tn, status,
986                                                 LNET_MSG_STATUS_OK);
987
988                         /* Since the LNet initiator has posted a unique tagged
989                          * buffer specific for this LNet transaction and the
990                          * LNet target has decide not to push/pull to/for the
991                          * LNet initiator tagged buffer, a noop operation is
992                          * done to this tagged buffer (i/e payload transfer size
993                          * is zero). But, immediate data, which contains the
994                          * LNet target status for the transaction, is sent to
995                          * the LNet initiator. Immediate data only appears in
996                          * the completion event at the LNet initiator and not in
997                          * the tagged buffer.
998                          */
999                         tn->tagged_data = cpu_to_be64(abs(tn->tn_status));
1000
1001                         rc = kfilnd_ep_post_tagged_send(tn->tn_ep, tn);
1002                         switch (rc) {
1003                         /* Async tagged RMA event will progress transaction. */
1004                         case 0:
1005                                 kfilnd_tn_state_change(tn,
1006                                                        TN_STATE_WAIT_TAG_COMP);
1007                                 return 0;
1008
1009                         /* Need to replay TN_EVENT_SKIP_TAG_RMA event while in
1010                          * the TN_STATE_IMM_RECV state.
1011                          */
1012                         case -EAGAIN:
1013                                 KFILND_TN_DEBUG(tn,
1014                                                 "Need to replay tagged send to %s(%#llx)",
1015                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1016                                                 tn->tn_target_addr);
1017                                 return -EAGAIN;
1018
1019                         default:
1020                                 KFILND_TN_ERROR(tn,
1021                                                 "Failed to post tagged send to %s(%#llx): rc=%d",
1022                                                 libcfs_nid2str(tn->tn_kp->kp_nid),
1023                                                 tn->tn_target_addr, rc);
1024                                 kfilnd_tn_status_update(tn, rc,
1025                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1026                         }
1027                 }
1028                 break;
1029
1030         case TN_EVENT_RX_OK:
1031                 finalize = true;
1032                 break;
1033
1034         default:
1035                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1036                 LBUG();
1037         }
1038
1039         if (kfilnd_tn_has_failed(tn))
1040                 finalize = true;
1041
1042         if (finalize)
1043                 kfilnd_tn_finalize(tn, tn_released);
1044
1045         return rc;
1046 }
1047
1048 static int kfilnd_tn_state_wait_comp(struct kfilnd_transaction *tn,
1049                                      enum tn_events event, int status,
1050                                      bool *tn_released)
1051 {
1052         int rc;
1053         enum lnet_msg_hstatus hstatus;
1054
1055         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1056                         status);
1057
1058         switch (event) {
1059         case TN_EVENT_TX_OK:
1060                 kfilnd_peer_alive(tn->tn_kp);
1061                 kfilnd_tn_timeout_enable(tn);
1062                 kfilnd_tn_state_change(tn, TN_STATE_WAIT_TAG_COMP);
1063                 break;
1064
1065         case TN_EVENT_TAG_RX_OK:
1066                 kfilnd_tn_state_change(tn, TN_STATE_WAIT_SEND_COMP);
1067                 break;
1068
1069         case TN_EVENT_TX_FAIL:
1070                 if (status == -ETIMEDOUT)
1071                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1072                 else
1073                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1074
1075                 kfilnd_tn_status_update(tn, status, hstatus);
1076                 kfilnd_peer_tn_failed(tn->tn_kp, status);
1077
1078                 /* Need to cancel the tagged receive to prevent resources from
1079                  * being leaked.
1080                  */
1081                 rc = kfilnd_tn_cancel_tag_recv(tn);
1082
1083                 switch (rc) {
1084                 /* Async cancel event will progress transaction. */
1085                 case 0:
1086                         kfilnd_tn_status_update(tn, status,
1087                                                 LNET_MSG_STATUS_LOCAL_ERROR);
1088                         kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1089                         return 0;
1090
1091                 /* Need to replay TN_EVENT_INIT_BULK event while in the
1092                  * TN_STATE_SEND_FAILED state.
1093                  */
1094                 case -EAGAIN:
1095                         KFILND_TN_DEBUG(tn,
1096                                         "Need to replay cancel tagged recv");
1097                         return -EAGAIN;
1098
1099                 default:
1100                         KFILND_TN_ERROR(tn,
1101                                         "Unexpected error during cancel tagged receive: rc=%d",
1102                                         rc);
1103                         LBUG();
1104                 }
1105                 break;
1106
1107         case TN_EVENT_TAG_RX_FAIL:
1108                 kfilnd_tn_status_update(tn, status,
1109                                         LNET_MSG_STATUS_LOCAL_ERROR);
1110                 kfilnd_tn_state_change(tn, TN_STATE_FAIL);
1111                 break;
1112
1113         default:
1114                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1115                 LBUG();
1116         }
1117
1118         return 0;
1119 }
1120
1121 static int kfilnd_tn_state_wait_send_comp(struct kfilnd_transaction *tn,
1122                                           enum tn_events event, int status,
1123                                           bool *tn_released)
1124 {
1125         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1126                         status);
1127
1128         if (event == TN_EVENT_TX_OK) {
1129                 kfilnd_peer_alive(tn->tn_kp);
1130                 kfilnd_tn_finalize(tn, tn_released);
1131         } else {
1132                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1133                 LBUG();
1134         }
1135
1136         return 0;
1137 }
1138
1139 static int kfilnd_tn_state_wait_tag_rma_comp(struct kfilnd_transaction *tn,
1140                                              enum tn_events event, int status,
1141                                              bool *tn_released)
1142 {
1143         enum lnet_msg_hstatus hstatus;
1144
1145         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1146                         status);
1147
1148         switch (event) {
1149         case TN_EVENT_TAG_TX_OK:
1150                 kfilnd_peer_alive(tn->tn_kp);
1151                 break;
1152
1153         case TN_EVENT_TAG_TX_FAIL:
1154                 if (status == -ETIMEDOUT)
1155                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1156                 else
1157                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1158
1159                 kfilnd_tn_status_update(tn, status, hstatus);
1160                 kfilnd_peer_tn_failed(tn->tn_kp, status);
1161                 break;
1162
1163         default:
1164                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1165                 LBUG();
1166         }
1167
1168         kfilnd_tn_finalize(tn, tn_released);
1169
1170         return 0;
1171 }
1172
1173 static int kfilnd_tn_state_wait_tag_comp(struct kfilnd_transaction *tn,
1174                                          enum tn_events event, int status,
1175                                          bool *tn_released)
1176 {
1177         int rc;
1178         enum lnet_msg_hstatus hstatus;
1179
1180         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1181                         status);
1182
1183         switch (event) {
1184         case TN_EVENT_TAG_RX_FAIL:
1185         case TN_EVENT_TAG_RX_OK:
1186                 /* Status can be set for both TN_EVENT_TAG_RX_FAIL and
1187                  * TN_EVENT_TAG_RX_OK. For TN_EVENT_TAG_RX_OK, if status is set,
1188                  * LNet target returned -ENODATA.
1189                  */
1190                 if (status) {
1191                         if (event == TN_EVENT_TAG_RX_FAIL)
1192                                 kfilnd_tn_status_update(tn, status,
1193                                                         LNET_MSG_STATUS_LOCAL_ERROR);
1194                         else
1195                                 kfilnd_tn_status_update(tn, status,
1196                                                         LNET_MSG_STATUS_OK);
1197                 }
1198
1199                 if (!kfilnd_tn_timeout_cancel(tn)) {
1200                         kfilnd_tn_state_change(tn, TN_STATE_WAIT_TIMEOUT_COMP);
1201                         return 0;
1202                 }
1203                 break;
1204
1205         case TN_EVENT_TIMEOUT:
1206                 /* Need to cancel the tagged receive to prevent resources from
1207                  * being leaked.
1208                  */
1209                 rc = kfilnd_tn_cancel_tag_recv(tn);
1210
1211                 switch (rc) {
1212                 /* Async cancel event will progress transaction. */
1213                 case 0:
1214                         kfilnd_tn_state_change(tn,
1215                                                TN_STATE_WAIT_TIMEOUT_TAG_COMP);
1216                         return 0;
1217
1218                 /* Need to replay TN_EVENT_INIT_BULK event while in the
1219                  * TN_STATE_WAIT_TAG_COMP state.
1220                  */
1221                 case -EAGAIN:
1222                         KFILND_TN_DEBUG(tn,
1223                                         "Need to replay cancel tagged recv");
1224                         return -EAGAIN;
1225
1226                 default:
1227                         KFILND_TN_ERROR(tn,
1228                                         "Unexpected error during cancel tagged receive: rc=%d",
1229                                         rc);
1230                         LBUG();
1231                 }
1232                 break;
1233
1234         case TN_EVENT_TAG_TX_FAIL:
1235                 if (status == -ETIMEDOUT)
1236                         hstatus = LNET_MSG_STATUS_NETWORK_TIMEOUT;
1237                 else
1238                         hstatus = LNET_MSG_STATUS_REMOTE_ERROR;
1239
1240                 kfilnd_tn_status_update(tn, status, hstatus);
1241                 kfilnd_peer_tn_failed(tn->tn_kp, status);
1242                 break;
1243
1244         case TN_EVENT_TAG_TX_OK:
1245                 kfilnd_peer_alive(tn->tn_kp);
1246                 break;
1247
1248         default:
1249                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1250                 LBUG();
1251         }
1252
1253         kfilnd_tn_finalize(tn, tn_released);
1254
1255         return 0;
1256 }
1257
1258 static int kfilnd_tn_state_fail(struct kfilnd_transaction *tn,
1259                                 enum tn_events event, int status,
1260                                 bool *tn_released)
1261 {
1262         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1263                         status);
1264
1265         switch (event) {
1266         case TN_EVENT_TX_FAIL:
1267                 kfilnd_peer_tn_failed(tn->tn_kp, status);
1268                 break;
1269
1270         case TN_EVENT_TX_OK:
1271                 kfilnd_peer_alive(tn->tn_kp);
1272                 break;
1273
1274         case TN_EVENT_TAG_RX_FAIL:
1275         case TN_EVENT_TAG_RX_CANCEL:
1276                 break;
1277
1278         default:
1279                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1280                 LBUG();
1281         }
1282
1283         kfilnd_tn_finalize(tn, tn_released);
1284
1285         return 0;
1286 }
1287
1288 static int kfilnd_tn_state_wait_timeout_tag_comp(struct kfilnd_transaction *tn,
1289                                                  enum tn_events event,
1290                                                  int status, bool *tn_released)
1291 {
1292         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1293                         status);
1294
1295         switch (event) {
1296         case TN_EVENT_TAG_RX_CANCEL:
1297                 kfilnd_tn_status_update(tn, -ETIMEDOUT,
1298                                         LNET_MSG_STATUS_REMOTE_TIMEOUT);
1299                 kfilnd_peer_tn_failed(tn->tn_kp, -ETIMEDOUT);
1300                 break;
1301
1302         case TN_EVENT_TAG_RX_FAIL:
1303                 kfilnd_tn_status_update(tn, status,
1304                                         LNET_MSG_STATUS_LOCAL_ERROR);
1305                 break;
1306
1307         case TN_EVENT_TAG_RX_OK:
1308                 break;
1309
1310         default:
1311                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1312                 LBUG();
1313         }
1314
1315         kfilnd_tn_finalize(tn, tn_released);
1316
1317         return 0;
1318 }
1319
1320 static int kfilnd_tn_state_wait_timeout_comp(struct kfilnd_transaction *tn,
1321                                              enum tn_events event, int status,
1322                                              bool *tn_released)
1323 {
1324         KFILND_TN_DEBUG(tn, "%s event status %d", tn_event_to_str(event),
1325                         status);
1326
1327         if (event == TN_EVENT_TIMEOUT) {
1328                 kfilnd_tn_finalize(tn, tn_released);
1329         } else {
1330                 KFILND_TN_ERROR(tn, "Invalid %s event", tn_event_to_str(event));
1331                 LBUG();
1332         }
1333
1334         return 0;
1335 }
1336
1337 static int
1338 (* const kfilnd_tn_state_dispatch_table[TN_STATE_MAX])(struct kfilnd_transaction *tn,
1339                                                        enum tn_events event,
1340                                                        int status,
1341                                                        bool *tn_released) = {
1342         [TN_STATE_IDLE] = kfilnd_tn_state_idle,
1343         [TN_STATE_WAIT_TAG_COMP] = kfilnd_tn_state_wait_tag_comp,
1344         [TN_STATE_IMM_SEND] = kfilnd_tn_state_imm_send,
1345         [TN_STATE_TAGGED_RECV_POSTED] = kfilnd_tn_state_tagged_recv_posted,
1346         [TN_STATE_SEND_FAILED] = kfilnd_tn_state_send_failed,
1347         [TN_STATE_WAIT_COMP] = kfilnd_tn_state_wait_comp,
1348         [TN_STATE_WAIT_TIMEOUT_COMP] = kfilnd_tn_state_wait_timeout_comp,
1349         [TN_STATE_WAIT_SEND_COMP] = kfilnd_tn_state_wait_send_comp,
1350         [TN_STATE_WAIT_TIMEOUT_TAG_COMP] =
1351                 kfilnd_tn_state_wait_timeout_tag_comp,
1352         [TN_STATE_FAIL] = kfilnd_tn_state_fail,
1353         [TN_STATE_IMM_RECV] = kfilnd_tn_state_imm_recv,
1354         [TN_STATE_WAIT_TAG_RMA_COMP] = kfilnd_tn_state_wait_tag_rma_comp,
1355 };
1356
1357 /**
1358  * kfilnd_tn_event_handler() - Update transaction state machine with an event.
1359  * @tn: Transaction to be updated.
1360  * @event: Transaction event.
1361  * @status: Errno status associated with the event.
1362  *
1363  * When the transaction event handler is first called on a new transaction, the
1364  * transaction is now own by the transaction system. This means that will be
1365  * freed by the system as the transaction is progressed through the state
1366  * machine.
1367  */
1368 void kfilnd_tn_event_handler(struct kfilnd_transaction *tn,
1369                              enum tn_events event, int status)
1370 {
1371         bool tn_released = false;
1372         int rc;
1373
1374         if (!tn)
1375                 return;
1376
1377         mutex_lock(&tn->tn_lock);
1378         rc = kfilnd_tn_state_dispatch_table[tn->tn_state](tn, event, status,
1379                                                           &tn_released);
1380         if (rc == -EAGAIN) {
1381                 tn->replay_event = event;
1382                 tn->replay_status = status;
1383                 kfilnd_ep_queue_tn_replay(tn->tn_ep, tn);
1384         }
1385
1386         if (!tn_released)
1387                 mutex_unlock(&tn->tn_lock);
1388 }
1389
1390 /**
1391  * kfilnd_tn_free() - Free a transaction.
1392  */
1393 void kfilnd_tn_free(struct kfilnd_transaction *tn)
1394 {
1395         spin_lock(&tn->tn_ep->tn_list_lock);
1396         list_del(&tn->tn_entry);
1397         spin_unlock(&tn->tn_ep->tn_list_lock);
1398
1399         KFILND_TN_DEBUG(tn, "Transaction freed");
1400
1401         if (tn->tn_mr_key)
1402                 kfilnd_ep_put_key(tn->tn_ep, tn->tn_mr_key);
1403
1404         /* Free send message buffer if needed. */
1405         if (tn->tn_tx_msg.msg)
1406                 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1407
1408         kmem_cache_free(tn_cache, tn);
1409 }
1410
1411 /**
1412  * kfilnd_tn_alloc() - Allocate a new KFI LND transaction.
1413  * @dev: KFI LND device used to look the KFI LND endpoint to associate with the
1414  * transaction.
1415  * @cpt: CPT of the transaction.
1416  * @target_nid: Target NID of the transaction.
1417  * @alloc_msg: Allocate an immediate message for the transaction.
1418  * @is_initiator: Is initiator of LNet transaction.
1419  * @key: Is transaction memory region key need.
1420  *
1421  * During transaction allocation, each transaction is associated with a KFI LND
1422  * endpoint use to post data transfer operations. The CPT argument is used to
1423  * lookup the KFI LND endpoint within the KFI LND device.
1424  *
1425  * Return: On success, valid pointer. Else, negative errno pointer.
1426  */
1427 struct kfilnd_transaction *kfilnd_tn_alloc(struct kfilnd_dev *dev, int cpt,
1428                                            lnet_nid_t target_nid,
1429                                            bool alloc_msg, bool is_initiator,
1430                                            bool key)
1431 {
1432         struct kfilnd_transaction *tn;
1433         struct kfilnd_peer *kp;
1434         int rc;
1435
1436         if (!dev) {
1437                 rc = -EINVAL;
1438                 goto err;
1439         }
1440
1441         kp = kfilnd_peer_get(dev, target_nid);
1442         if (IS_ERR(kp)) {
1443                 rc = PTR_ERR(kp);
1444                 goto err;
1445         }
1446
1447         tn = kfilnd_tn_alloc_for_peer(dev, cpt, kp, alloc_msg, is_initiator,
1448                                       key);
1449         if (IS_ERR(tn)) {
1450                 rc = PTR_ERR(tn);
1451                 kfilnd_peer_put(kp);
1452                 goto err;
1453         }
1454
1455         return tn;
1456
1457 err:
1458         return ERR_PTR(rc);
1459 }
1460
1461 /* See kfilnd_tn_alloc()
1462  * Note: Caller must have a reference on @kp
1463  */
1464 struct kfilnd_transaction *kfilnd_tn_alloc_for_peer(struct kfilnd_dev *dev,
1465                                                     int cpt,
1466                                                     struct kfilnd_peer *kp,
1467                                                     bool alloc_msg,
1468                                                     bool is_initiator,
1469                                                     bool key)
1470 {
1471         struct kfilnd_transaction *tn;
1472         struct kfilnd_ep *ep;
1473         int rc;
1474         ktime_t tn_alloc_ts;
1475
1476         if (!dev) {
1477                 rc = -EINVAL;
1478                 goto err;
1479         }
1480
1481         tn_alloc_ts = ktime_get();
1482
1483         /* If the CPT does not fall into the LNet NI CPT range, force the CPT
1484          * into the LNet NI CPT range. This should never happen.
1485          */
1486         ep = dev->cpt_to_endpoint[cpt];
1487         if (!ep) {
1488                 CWARN("%s used invalid cpt=%d\n",
1489                       libcfs_nidstr(&dev->kfd_ni->ni_nid), cpt);
1490                 ep = dev->kfd_endpoints[0];
1491         }
1492
1493         tn = kmem_cache_zalloc(tn_cache, GFP_KERNEL);
1494         if (!tn) {
1495                 rc = -ENOMEM;
1496                 goto err;
1497         }
1498
1499         if (alloc_msg) {
1500                 tn->tn_tx_msg.msg = kmem_cache_alloc(imm_buf_cache, GFP_KERNEL);
1501                 if (!tn->tn_tx_msg.msg) {
1502                         rc = -ENOMEM;
1503                         goto err_free_tn;
1504                 }
1505         }
1506
1507         if (key) {
1508                 rc = kfilnd_ep_get_key(ep);
1509                 if (rc < 0)
1510                         goto err_free_tn;
1511                 tn->tn_mr_key = rc;
1512         }
1513
1514         tn->tn_kp = kp;
1515
1516         mutex_init(&tn->tn_lock);
1517         tn->tn_ep = ep;
1518         tn->tn_response_rx = ep->end_context_id;
1519         tn->tn_state = TN_STATE_IDLE;
1520         tn->hstatus = LNET_MSG_STATUS_OK;
1521         tn->deadline = ktime_get_seconds() + lnet_get_lnd_timeout();
1522         tn->tn_replay_deadline = ktime_sub(tn->deadline,
1523                                            (lnet_get_lnd_timeout() / 2));
1524         tn->is_initiator = is_initiator;
1525         INIT_WORK(&tn->timeout_work, kfilnd_tn_timeout_work);
1526
1527         /* Add the transaction to an endpoint.  This is like
1528          * incrementing a ref counter.
1529          */
1530         spin_lock(&ep->tn_list_lock);
1531         list_add_tail(&tn->tn_entry, &ep->tn_list);
1532         spin_unlock(&ep->tn_list_lock);
1533
1534         tn->tn_alloc_ts = tn_alloc_ts;
1535         tn->tn_state_ts = ktime_get();
1536
1537         KFILND_EP_DEBUG(ep, "Transaction ID %u allocated", tn->tn_mr_key);
1538
1539         return tn;
1540
1541 err_free_tn:
1542         if (tn->tn_tx_msg.msg)
1543                 kmem_cache_free(imm_buf_cache, tn->tn_tx_msg.msg);
1544         kmem_cache_free(tn_cache, tn);
1545 err:
1546         return ERR_PTR(rc);
1547 }
1548
1549 /**
1550  * kfilnd_tn_cleanup() - Cleanup KFI LND transaction system.
1551  *
1552  * This function should only be called when there are no outstanding
1553  * transactions.
1554  */
1555 void kfilnd_tn_cleanup(void)
1556 {
1557         kmem_cache_destroy(imm_buf_cache);
1558         kmem_cache_destroy(tn_cache);
1559 }
1560
1561 /**
1562  * kfilnd_tn_init() - Initialize KFI LND transaction system.
1563  *
1564  * Return: On success, zero. Else, negative errno.
1565  */
1566 int kfilnd_tn_init(void)
1567 {
1568         tn_cache = kmem_cache_create("kfilnd_tn",
1569                                      sizeof(struct kfilnd_transaction), 0,
1570                                      SLAB_HWCACHE_ALIGN, NULL);
1571         if (!tn_cache)
1572                 goto err;
1573
1574         imm_buf_cache = kmem_cache_create("kfilnd_imm_buf",
1575                                           KFILND_IMMEDIATE_MSG_SIZE, 0,
1576                                           SLAB_HWCACHE_ALIGN, NULL);
1577         if (!imm_buf_cache)
1578                 goto err_tn_cache_destroy;
1579
1580         return 0;
1581
1582 err_tn_cache_destroy:
1583         kmem_cache_destroy(tn_cache);
1584 err:
1585         return -ENOMEM;
1586 }
1587
1588 /**
1589  * kfilnd_tn_set_kiov_buf() - Set the buffer used for a transaction.
1590  * @tn: Transaction to have buffer set.
1591  * @kiov: LNet KIOV buffer.
1592  * @num_iov: Number of IOVs.
1593  * @offset: Offset into IOVs where the buffer starts.
1594  * @len: Length of the buffer.
1595  *
1596  * This function takes the user provided IOV, offset, and len, and sets the
1597  * transaction buffer. The user provided IOV is an LNet KIOV. When the
1598  * transaction buffer is configured, the user provided offset is applied
1599  * when the transaction buffer is configured (i.e. the transaction buffer
1600  * offset is zero).
1601  */
1602 int kfilnd_tn_set_kiov_buf(struct kfilnd_transaction *tn,
1603                            struct bio_vec *kiov, size_t num_iov,
1604                            size_t offset, size_t len)
1605 {
1606         size_t i;
1607         size_t cur_len = 0;
1608         size_t cur_offset = offset;
1609         size_t cur_iov = 0;
1610         size_t tmp_len;
1611         size_t tmp_offset;
1612
1613         for (i = 0; (i < num_iov) && (cur_len < len); i++) {
1614                 /* Skip KIOVs until a KIOV with a length less than the current
1615                  * offset is found.
1616                  */
1617                 if (kiov[i].bv_len <= cur_offset) {
1618                         cur_offset -= kiov[i].bv_len;
1619                         continue;
1620                 }
1621
1622                 tmp_len = kiov[i].bv_len - cur_offset;
1623                 tmp_offset = kiov[i].bv_len - tmp_len + kiov[i].bv_offset;
1624
1625                 if (tmp_len + cur_len > len)
1626                         tmp_len = len - cur_len;
1627
1628                 /* tn_kiov is an array of size LNET_MAX_IOV */
1629                 if (cur_iov >= LNET_MAX_IOV)
1630                         return -EINVAL;
1631
1632                 tn->tn_kiov[cur_iov].bv_page = kiov[i].bv_page;
1633                 tn->tn_kiov[cur_iov].bv_len = tmp_len;
1634                 tn->tn_kiov[cur_iov].bv_offset = tmp_offset;
1635
1636                 cur_iov++;
1637                 cur_len += tmp_len;
1638                 cur_offset = 0;
1639         }
1640
1641         tn->tn_num_iovec = cur_iov;
1642         tn->tn_nob = cur_len;
1643
1644         return 0;
1645 }