Whamcloud - gitweb
LU-17662 osd-zfs: Support for ZFS 2.2.3
[fs/lustre-release.git] / lnet / lnet / lib-msg.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /* Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
4  * Use is subject to license terms.
5  *
6  * Copyright (c) 2012, 2017, Intel Corporation.
7  */
8
9 /* This file is part of Lustre, http://www.lustre.org/
10  *
11  * Message decoding, parsing and finalizing routines
12  */
13
14 #define DEBUG_SUBSYSTEM S_LNET
15
16 #include <lnet/lib-lnet.h>
17
18 void
19 lnet_build_unlink_event(struct lnet_libmd *md, struct lnet_event *ev)
20 {
21         ENTRY;
22
23         memset(ev, 0, sizeof(*ev));
24
25         ev->status   = 0;
26         ev->unlinked = 1;
27         ev->type     = LNET_EVENT_UNLINK;
28         lnet_md_deconstruct(md, ev);
29         lnet_md2handle(&ev->md_handle, md);
30         EXIT;
31 }
32
33 /*
34  * Don't need any lock, must be called after lnet_commit_md
35  */
36 void
37 lnet_build_msg_event(struct lnet_msg *msg, enum lnet_event_kind ev_type)
38 {
39         struct lnet_hdr *hdr = &msg->msg_hdr;
40         struct lnet_event *ev = &msg->msg_ev;
41
42         LASSERT(!msg->msg_routing);
43
44         ev->type = ev_type;
45         ev->msg_type = msg->msg_type;
46
47         if (ev_type == LNET_EVENT_SEND) {
48                 /* event for active message */
49                 ev->target.nid    = hdr->dest_nid;
50                 ev->target.pid    = hdr->dest_pid;
51                 ev->initiator.nid = LNET_ANY_NID;
52                 ev->initiator.pid = the_lnet.ln_pid;
53                 ev->source.nid    = LNET_ANY_NID;
54                 ev->source.pid    = the_lnet.ln_pid;
55                 ev->sender        = LNET_ANY_NID;
56         } else {
57                 /* event for passive message */
58                 ev->target.pid    = hdr->dest_pid;
59                 ev->target.nid    = hdr->dest_nid;
60                 ev->initiator.pid = hdr->src_pid;
61                 /* Multi-Rail: resolve src_nid to "primary" peer NID */
62                 ev->initiator.nid = msg->msg_initiator;
63                 /* Multi-Rail: track source NID. */
64                 ev->source.pid    = hdr->src_pid;
65                 ev->source.nid    = hdr->src_nid;
66                 ev->rlength       = hdr->payload_length;
67                 ev->sender        = msg->msg_from;
68                 ev->mlength       = msg->msg_wanted;
69                 ev->offset        = msg->msg_offset;
70         }
71
72         switch (ev_type) {
73         default:
74                 LBUG();
75
76         case LNET_EVENT_PUT: /* passive PUT */
77                 ev->pt_index   = hdr->msg.put.ptl_index;
78                 ev->match_bits = hdr->msg.put.match_bits;
79                 ev->hdr_data   = hdr->msg.put.hdr_data;
80                 return;
81
82         case LNET_EVENT_GET: /* passive GET */
83                 ev->pt_index   = hdr->msg.get.ptl_index;
84                 ev->match_bits = hdr->msg.get.match_bits;
85                 ev->hdr_data   = 0;
86                 return;
87
88         case LNET_EVENT_ACK: /* ACK */
89                 ev->match_bits = hdr->msg.ack.match_bits;
90                 ev->mlength    = hdr->msg.ack.mlength;
91                 return;
92
93         case LNET_EVENT_REPLY: /* REPLY */
94                 return;
95
96         case LNET_EVENT_SEND: /* active message */
97                 if (msg->msg_type == LNET_MSG_PUT) {
98                         ev->pt_index   = le32_to_cpu(hdr->msg.put.ptl_index);
99                         ev->match_bits = le64_to_cpu(hdr->msg.put.match_bits);
100                         ev->offset     = le32_to_cpu(hdr->msg.put.offset);
101                         ev->mlength    =
102                         ev->rlength    = le32_to_cpu(hdr->payload_length);
103                         ev->hdr_data   = le64_to_cpu(hdr->msg.put.hdr_data);
104
105                 } else {
106                         LASSERT(msg->msg_type == LNET_MSG_GET);
107                         ev->pt_index   = le32_to_cpu(hdr->msg.get.ptl_index);
108                         ev->match_bits = le64_to_cpu(hdr->msg.get.match_bits);
109                         ev->mlength    =
110                         ev->rlength    = le32_to_cpu(hdr->msg.get.sink_length);
111                         ev->offset     = le32_to_cpu(hdr->msg.get.src_offset);
112                         ev->hdr_data   = 0;
113                 }
114                 return;
115         }
116 }
117
118 void
119 lnet_msg_commit(struct lnet_msg *msg, int cpt)
120 {
121         struct lnet_msg_container *container = the_lnet.ln_msg_containers[cpt];
122         struct lnet_counters_common *common;
123         s64 timeout_ns;
124
125         /* set the message deadline */
126         timeout_ns = lnet_transaction_timeout * NSEC_PER_SEC;
127         msg->msg_deadline = ktime_add_ns(ktime_get(), timeout_ns);
128
129         /* routed message can be committed for both receiving and sending */
130         LASSERT(!msg->msg_tx_committed);
131
132         if (msg->msg_sending) {
133                 LASSERT(!msg->msg_receiving);
134                 msg->msg_tx_cpt = cpt;
135                 msg->msg_tx_committed = 1;
136                 if (msg->msg_rx_committed) { /* routed message REPLY */
137                         LASSERT(msg->msg_onactivelist);
138                         return;
139                 }
140         } else {
141                 LASSERT(!msg->msg_sending);
142                 msg->msg_rx_cpt = cpt;
143                 msg->msg_rx_committed = 1;
144         }
145
146         LASSERT(!msg->msg_onactivelist);
147
148         msg->msg_onactivelist = 1;
149         list_add_tail(&msg->msg_activelist, &container->msc_active);
150
151         common = &the_lnet.ln_counters[cpt]->lct_common;
152         common->lcc_msgs_alloc++;
153         if (common->lcc_msgs_alloc > common->lcc_msgs_max)
154                 common->lcc_msgs_max = common->lcc_msgs_alloc;
155 }
156
157 static void
158 lnet_msg_decommit_tx(struct lnet_msg *msg, int status)
159 {
160         struct lnet_counters_common *common;
161         struct lnet_event *ev = &msg->msg_ev;
162
163         LASSERT(msg->msg_tx_committed);
164         if (status != 0)
165                 goto out;
166
167         common = &(the_lnet.ln_counters[msg->msg_tx_cpt]->lct_common);
168         switch (ev->type) {
169         default: /* routed message */
170                 LASSERT(msg->msg_routing);
171                 LASSERT(msg->msg_rx_committed);
172                 LASSERT(ev->type == 0);
173
174                 common->lcc_route_length += msg->msg_len;
175                 common->lcc_route_count++;
176                 goto incr_stats;
177
178         case LNET_EVENT_PUT:
179                 /* should have been decommitted */
180                 LASSERT(!msg->msg_rx_committed);
181                 /* overwritten while sending ACK */
182                 LASSERT(msg->msg_type == LNET_MSG_ACK);
183                 msg->msg_type = LNET_MSG_PUT; /* fix type */
184                 break;
185
186         case LNET_EVENT_SEND:
187                 LASSERT(!msg->msg_rx_committed);
188                 if (msg->msg_type == LNET_MSG_PUT)
189                         common->lcc_send_length += msg->msg_len;
190                 break;
191
192         case LNET_EVENT_GET:
193                 LASSERT(msg->msg_rx_committed);
194                 /* overwritten while sending reply, we should never be
195                  * here for optimized GET */
196                 LASSERT(msg->msg_type == LNET_MSG_REPLY);
197                 msg->msg_type = LNET_MSG_GET; /* fix type */
198                 break;
199         }
200
201         common->lcc_send_count++;
202
203 incr_stats:
204         if (msg->msg_txpeer)
205                 lnet_incr_stats(&msg->msg_txpeer->lpni_stats,
206                                 msg->msg_type,
207                                 LNET_STATS_TYPE_SEND);
208         if (msg->msg_txni)
209                 lnet_incr_stats(&msg->msg_txni->ni_stats,
210                                 msg->msg_type,
211                                 LNET_STATS_TYPE_SEND);
212  out:
213         lnet_return_tx_credits_locked(msg);
214         msg->msg_tx_committed = 0;
215 }
216
217 static void
218 lnet_msg_decommit_rx(struct lnet_msg *msg, int status)
219 {
220         struct lnet_counters_common *common;
221         struct lnet_event *ev = &msg->msg_ev;
222
223         LASSERT(!msg->msg_tx_committed); /* decommitted or never committed */
224         LASSERT(msg->msg_rx_committed);
225
226         if (status != 0)
227                 goto out;
228
229         common = &(the_lnet.ln_counters[msg->msg_rx_cpt]->lct_common);
230         switch (ev->type) {
231         default:
232                 LASSERT(ev->type == 0);
233                 LASSERT(msg->msg_routing);
234                 goto incr_stats;
235
236         case LNET_EVENT_ACK:
237                 LASSERT(msg->msg_type == LNET_MSG_ACK);
238                 break;
239
240         case LNET_EVENT_GET:
241                 /* type is "REPLY" if it's an optimized GET on passive side,
242                  * because optimized GET will never be committed for sending,
243                  * so message type wouldn't be changed back to "GET" by
244                  * lnet_msg_decommit_tx(), see details in lnet_parse_get() */
245                 LASSERT(msg->msg_type == LNET_MSG_REPLY ||
246                         msg->msg_type == LNET_MSG_GET);
247                 common->lcc_send_length += msg->msg_wanted;
248                 break;
249
250         case LNET_EVENT_PUT:
251                 LASSERT(msg->msg_type == LNET_MSG_PUT);
252                 break;
253
254         case LNET_EVENT_REPLY:
255                 /* type is "GET" if it's an optimized GET on active side,
256                  * see details in lnet_create_reply_msg() */
257                 LASSERT(msg->msg_type == LNET_MSG_GET ||
258                         msg->msg_type == LNET_MSG_REPLY);
259                 break;
260         }
261
262         common->lcc_recv_count++;
263
264 incr_stats:
265         if (msg->msg_rxpeer)
266                 lnet_incr_stats(&msg->msg_rxpeer->lpni_stats,
267                                 msg->msg_type,
268                                 LNET_STATS_TYPE_RECV);
269         if (msg->msg_rxni)
270                 lnet_incr_stats(&msg->msg_rxni->ni_stats,
271                                 msg->msg_type,
272                                 LNET_STATS_TYPE_RECV);
273         if (ev->type == LNET_EVENT_PUT || ev->type == LNET_EVENT_REPLY)
274                 common->lcc_recv_length += msg->msg_wanted;
275
276  out:
277         lnet_return_rx_credits_locked(msg);
278         msg->msg_rx_committed = 0;
279 }
280
281 void
282 lnet_msg_decommit(struct lnet_msg *msg, int cpt, int status)
283 {
284         int     cpt2 = cpt;
285
286         LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
287         LASSERT(msg->msg_onactivelist);
288
289         if (msg->msg_tx_committed) { /* always decommit for sending first */
290                 LASSERT(cpt == msg->msg_tx_cpt);
291                 lnet_msg_decommit_tx(msg, status);
292         }
293
294         if (msg->msg_rx_committed) {
295                 /* forwarding msg committed for both receiving and sending */
296                 if (cpt != msg->msg_rx_cpt) {
297                         lnet_net_unlock(cpt);
298                         cpt2 = msg->msg_rx_cpt;
299                         lnet_net_lock(cpt2);
300                 }
301                 lnet_msg_decommit_rx(msg, status);
302         }
303
304         list_del(&msg->msg_activelist);
305         msg->msg_onactivelist = 0;
306
307         the_lnet.ln_counters[cpt2]->lct_common.lcc_msgs_alloc--;
308
309         if (cpt2 != cpt) {
310                 lnet_net_unlock(cpt2);
311                 lnet_net_lock(cpt);
312         }
313 }
314
315 void
316 lnet_msg_attach_md(struct lnet_msg *msg, struct lnet_libmd *md,
317                    unsigned int offset, unsigned int mlen)
318 {
319         /* NB: @offset and @len are only useful for receiving */
320         /* Here, we attach the MD on lnet_msg and mark it busy and
321          * decrementing its threshold. Come what may, the lnet_msg "owns"
322          * the MD until a call to lnet_msg_detach_md or lnet_finalize()
323          * signals completion. */
324         LASSERT(!msg->msg_routing);
325
326         msg->msg_md = md;
327         if (msg->msg_receiving) { /* committed for receiving */
328                 msg->msg_offset = offset;
329                 msg->msg_wanted = mlen;
330         }
331
332         md->md_refcount++;
333         if (md->md_threshold != LNET_MD_THRESH_INF) {
334                 LASSERT(md->md_threshold > 0);
335                 md->md_threshold--;
336         }
337
338         /* build umd in event */
339         lnet_md2handle(&msg->msg_ev.md_handle, md);
340         lnet_md_deconstruct(md, &msg->msg_ev);
341 }
342
343 static int
344 lnet_complete_msg_locked(struct lnet_msg *msg, int cpt)
345 {
346         struct lnet_handle_wire ack_wmd;
347         int                rc;
348         int                status = msg->msg_ev.status;
349
350         LASSERT(msg->msg_onactivelist);
351
352         if (status == 0 && msg->msg_ack) {
353                 /* Only send an ACK if the PUT completed successfully */
354
355                 lnet_msg_decommit(msg, cpt, 0);
356
357                 msg->msg_ack = 0;
358                 lnet_net_unlock(cpt);
359
360                 LASSERT(msg->msg_ev.type == LNET_EVENT_PUT);
361                 LASSERT(!msg->msg_routing);
362
363                 ack_wmd = msg->msg_hdr.msg.put.ack_wmd;
364
365                 lnet_prep_send(msg, LNET_MSG_ACK, &msg->msg_ev.source, 0, 0);
366
367                 msg->msg_hdr.msg.ack.dst_wmd = ack_wmd;
368                 msg->msg_hdr.msg.ack.match_bits = msg->msg_ev.match_bits;
369                 msg->msg_hdr.msg.ack.mlength = cpu_to_le32(msg->msg_ev.mlength);
370
371                 rc = lnet_send(&msg->msg_ev.target.nid, msg,
372                                &msg->msg_from);
373
374                 lnet_net_lock(cpt);
375                 /*
376                  * NB: message is committed for sending, we should return
377                  * on success because LND will finalize this message later.
378                  *
379                  * Also, there is possibility that message is committed for
380                  * sending and also failed before delivering to LND,
381                  * i.e: ENOMEM, in that case we can't fall through either
382                  * because CPT for sending can be different with CPT for
383                  * receiving, so we should return back to lnet_finalize()
384                  * to make sure we are locking the correct partition.
385                  */
386                 return rc;
387
388         } else if (status == 0 &&       /* OK so far */
389                    (msg->msg_routing && !msg->msg_sending)) {
390                 /* not forwarded */
391                 LASSERT(!msg->msg_receiving);   /* called back recv already */
392                 lnet_net_unlock(cpt);
393
394                 rc = lnet_send(NULL, msg, NULL);
395
396                 lnet_net_lock(cpt);
397                 /*
398                  * NB: message is committed for sending, we should return
399                  * on success because LND will finalize this message later.
400                  *
401                  * Also, there is possibility that message is committed for
402                  * sending and also failed before delivering to LND,
403                  * i.e: ENOMEM, in that case we can't fall through either:
404                  * - The rule is message must decommit for sending first if
405                  *   the it's committed for both sending and receiving
406                  * - CPT for sending can be different with CPT for receiving,
407                  *   so we should return back to lnet_finalize() to make
408                  *   sure we are locking the correct partition.
409                  */
410                 return rc;
411         }
412
413         lnet_msg_decommit(msg, cpt, status);
414         lnet_msg_free(msg);
415         return 0;
416 }
417
418 /* must hold net_lock/0 */
419 void
420 lnet_ni_add_to_recoveryq_locked(struct lnet_ni *ni,
421                                 struct list_head *recovery_queue, time64_t now)
422 {
423         if (!list_empty(&ni->ni_recovery))
424                 return;
425
426         if (atomic_read(&ni->ni_healthv) == LNET_MAX_HEALTH_VALUE)
427                 return;
428
429         /* This NI is going on the recovery queue, so take a ref on it */
430         lnet_ni_addref_locked(ni, 0);
431
432         lnet_ni_set_next_ping(ni, now);
433
434         CDEBUG(D_NET, "%s added to recovery queue. ping count: %u next ping: %lld health :%d\n",
435                libcfs_nidstr(&ni->ni_nid),
436                ni->ni_ping_count,
437                ni->ni_next_ping,
438                atomic_read(&ni->ni_healthv));
439
440         list_add_tail(&ni->ni_recovery, recovery_queue);
441 }
442
443 static void
444 lnet_handle_local_failure(struct lnet_ni *local_ni)
445 {
446         /*
447          * the lnet_net_lock(0) is used to protect the addref on the ni
448          * and the recovery queue.
449          */
450         lnet_net_lock(0);
451         /* the mt could've shutdown and cleaned up the queues */
452         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
453                 lnet_net_unlock(0);
454                 return;
455         }
456
457         lnet_dec_healthv_locked(&local_ni->ni_healthv, lnet_health_sensitivity);
458         lnet_ni_add_to_recoveryq_locked(local_ni, &the_lnet.ln_mt_localNIRecovq,
459                                         ktime_get_seconds());
460         lnet_net_unlock(0);
461 }
462
463 /* must hold net_lock/0 */
464 void
465 lnet_handle_remote_failure_locked(struct lnet_peer_ni *lpni)
466 {
467         lnet_dec_lpni_healthv_locked(lpni);
468
469         /*
470          * add the peer NI to the recovery queue if it's not already there
471          * and it's health value is actually below the maximum. It's
472          * possible that the sensitivity might be set to 0, and the health
473          * value will not be reduced. In this case, there is no reason to
474          * invoke recovery
475          */
476         lnet_peer_ni_add_to_recoveryq_locked(lpni,
477                                              &the_lnet.ln_mt_peerNIRecovq,
478                                              ktime_get_seconds());
479 }
480
481 static void
482 lnet_handle_remote_failure(struct lnet_peer_ni *lpni)
483 {
484         /* lpni could be NULL if we're in the LOLND case */
485         if (!lpni)
486                 return;
487
488         lnet_net_lock(0);
489         /* the mt could've shutdown and cleaned up the queues */
490         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
491                 lnet_net_unlock(0);
492                 return;
493         }
494         lnet_handle_remote_failure_locked(lpni);
495         lnet_net_unlock(0);
496 }
497
498 static void
499 lnet_incr_hstats(struct lnet_ni *ni, struct lnet_peer_ni *lpni,
500                  enum lnet_msg_hstatus hstatus)
501 {
502         struct lnet_counters_health *health;
503
504         health = &the_lnet.ln_counters[0]->lct_health;
505
506         switch (hstatus) {
507         case LNET_MSG_STATUS_LOCAL_INTERRUPT:
508                 atomic_inc(&ni->ni_hstats.hlt_local_interrupt);
509                 health->lch_local_interrupt_count++;
510                 break;
511         case LNET_MSG_STATUS_LOCAL_DROPPED:
512                 atomic_inc(&ni->ni_hstats.hlt_local_dropped);
513                 health->lch_local_dropped_count++;
514                 break;
515         case LNET_MSG_STATUS_LOCAL_ABORTED:
516                 atomic_inc(&ni->ni_hstats.hlt_local_aborted);
517                 health->lch_local_aborted_count++;
518                 break;
519         case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
520                 atomic_inc(&ni->ni_hstats.hlt_local_no_route);
521                 health->lch_local_no_route_count++;
522                 break;
523         case LNET_MSG_STATUS_LOCAL_TIMEOUT:
524                 atomic_inc(&ni->ni_hstats.hlt_local_timeout);
525                 health->lch_local_timeout_count++;
526                 break;
527         case LNET_MSG_STATUS_LOCAL_ERROR:
528                 atomic_inc(&ni->ni_hstats.hlt_local_error);
529                 health->lch_local_error_count++;
530                 break;
531         case LNET_MSG_STATUS_REMOTE_DROPPED:
532                 if (lpni)
533                         atomic_inc(&lpni->lpni_hstats.hlt_remote_dropped);
534                 health->lch_remote_dropped_count++;
535                 break;
536         case LNET_MSG_STATUS_REMOTE_ERROR:
537                 if (lpni)
538                         atomic_inc(&lpni->lpni_hstats.hlt_remote_error);
539                 health->lch_remote_error_count++;
540                 break;
541         case LNET_MSG_STATUS_REMOTE_TIMEOUT:
542                 if (lpni)
543                         atomic_inc(&lpni->lpni_hstats.hlt_remote_timeout);
544                 health->lch_remote_timeout_count++;
545                 break;
546         case LNET_MSG_STATUS_NETWORK_TIMEOUT:
547                 if (lpni)
548                         atomic_inc(&lpni->lpni_hstats.hlt_network_timeout);
549                 health->lch_network_timeout_count++;
550                 break;
551         case LNET_MSG_STATUS_OK:
552                 break;
553         default:
554                 LBUG();
555         }
556 }
557
558 static void
559 lnet_resend_msg_locked(struct lnet_msg *msg)
560 {
561         msg->msg_retry_count++;
562
563         /*
564          * remove message from the active list and reset it to prepare
565          * for a resend. Two exceptions to this
566          *
567          * 1. the router case. When a message is being routed it is
568          * committed for rx when received and committed for tx when
569          * forwarded. We don't want to remove it from the active list, since
570          * code which handles receiving expects it to remain on the active
571          * list.
572          *
573          * 2. The REPLY case. Reply messages use the same message
574          * structure for the GET that was received.
575          */
576         if (!msg->msg_routing && msg->msg_type != LNET_MSG_REPLY) {
577                 list_del_init(&msg->msg_activelist);
578                 msg->msg_onactivelist = 0;
579         }
580         /*
581          * The msg_target.nid which was originally set
582          * when calling LNetGet() or LNetPut() might've
583          * been overwritten if we're routing this message.
584          * Call lnet_msg_decommit_tx() to return the credit
585          * this message consumed. The message will
586          * consume another credit when it gets resent.
587          */
588         msg->msg_target.nid = msg->msg_hdr.dest_nid;
589         lnet_msg_decommit_tx(msg, -EAGAIN);
590         msg->msg_sending = 0;
591         msg->msg_receiving = 0;
592         msg->msg_target_is_router = 0;
593
594         CDEBUG(D_NET, "%s->%s:%s:%s - queuing msg (%p) for resend\n",
595                libcfs_nidstr(&msg->msg_hdr.src_nid),
596                libcfs_nidstr(&msg->msg_hdr.dest_nid),
597                lnet_msgtyp2str(msg->msg_type),
598                lnet_health_error2str(msg->msg_health_status), msg);
599
600         list_add_tail(&msg->msg_list, the_lnet.ln_mt_resendqs[msg->msg_tx_cpt]);
601
602         complete(&the_lnet.ln_mt_wait_complete);
603 }
604
605 static int
606 lnet_check_finalize_recursion_locked(struct lnet_msg *msg,
607                                      struct list_head *containerq,
608                                      int nworkers, void **workers)
609 {
610         int my_slot = -1;
611         int i;
612
613         list_add_tail(&msg->msg_list, containerq);
614
615         for (i = 0; i < nworkers; i++) {
616                 if (workers[i] == current)
617                         break;
618
619                 if (my_slot < 0 && workers[i] == NULL)
620                         my_slot = i;
621         }
622
623         if (i < nworkers || my_slot < 0)
624                 return -1;
625
626         workers[my_slot] = current;
627
628         return my_slot;
629 }
630
631 static int
632 lnet_attempt_msg_resend(struct lnet_msg *msg)
633 {
634         struct lnet_msg_container *container;
635         int my_slot;
636         int cpt;
637
638         /* we can only resend tx_committed messages */
639         LASSERT(msg->msg_tx_committed);
640
641         /* don't resend recovery messages */
642         if (msg->msg_recovery) {
643                 CDEBUG(D_NET, "msg %s->%s is a recovery ping. retry# %d\n",
644                         libcfs_nidstr(&msg->msg_from),
645                         libcfs_nidstr(&msg->msg_target.nid),
646                         msg->msg_retry_count);
647                 return -ENOTRECOVERABLE;
648         }
649
650         /*
651          * if we explicitly indicated we don't want to resend then just
652          * return
653          */
654         if (msg->msg_no_resend) {
655                 CDEBUG(D_NET, "msg %s->%s requested no resend. retry# %d\n",
656                         libcfs_nidstr(&msg->msg_from),
657                         libcfs_nidstr(&msg->msg_target.nid),
658                         msg->msg_retry_count);
659                 return -ENOTRECOVERABLE;
660         }
661
662         /* check if the message has exceeded the number of retries */
663         if (msg->msg_retry_count >= lnet_retry_count) {
664                 CNETERR("msg %s->%s exceeded retry count %d\n",
665                         libcfs_nidstr(&msg->msg_from),
666                         libcfs_nidstr(&msg->msg_target.nid),
667                         msg->msg_retry_count);
668                 return -ENOTRECOVERABLE;
669         }
670
671         cpt = msg->msg_tx_cpt;
672         lnet_net_lock(cpt);
673
674         /* check again under lock */
675         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING) {
676                 lnet_net_unlock(cpt);
677                 return -ESHUTDOWN;
678         }
679
680         container = the_lnet.ln_msg_containers[cpt];
681         my_slot =
682                 lnet_check_finalize_recursion_locked(msg,
683                                         &container->msc_resending,
684                                         container->msc_nfinalizers,
685                                         container->msc_resenders);
686
687         /* enough threads are resending */
688         if (my_slot == -1) {
689                 lnet_net_unlock(cpt);
690                 return 0;
691         }
692
693         while ((msg = list_first_entry_or_null(&container->msc_resending,
694                                                struct lnet_msg,
695                                                msg_list)) != NULL) {
696                 list_del(&msg->msg_list);
697
698                 /*
699                  * resending the message will require us to call
700                  * lnet_msg_decommit_tx() which will return the credit
701                  * which this message holds. This could trigger another
702                  * queued message to be sent. If that message fails and
703                  * requires a resend we will recurse.
704                  * But since at this point the slot is taken, the message
705                  * will be queued in the container and dealt with
706                  * later. This breaks the recursion.
707                  */
708                 lnet_resend_msg_locked(msg);
709         }
710
711         /*
712          * msc_resenders is an array of process pointers. Each entry holds
713          * a pointer to the current process operating on the message. An
714          * array entry is created per CPT. If the array slot is already
715          * set, then it means that there is a thread on the CPT currently
716          * resending a message.
717          * Once the thread finishes clear the slot to enable the thread to
718          * take on more resend work.
719          */
720         container->msc_resenders[my_slot] = NULL;
721         lnet_net_unlock(cpt);
722
723         return 0;
724 }
725
726 /*
727  * Do a health check on the message:
728  * return -1 if we're not going to handle the error or
729  *   if we've reached the maximum number of retries.
730  *   success case will return -1 as well
731  * return 0 if it the message is requeued for send
732  */
733 static int
734 lnet_health_check(struct lnet_msg *msg)
735 {
736         enum lnet_msg_hstatus hstatus = msg->msg_health_status;
737         struct lnet_peer_ni *lpni;
738         struct lnet_ni *ni;
739         bool lo = false;
740         bool attempt_local_resend;
741         bool attempt_remote_resend;
742         bool handle_local_health;
743         bool handle_remote_health;
744         ktime_t now;
745
746         /* if we're shutting down no point in handling health. */
747         if (the_lnet.ln_mt_state != LNET_MT_STATE_RUNNING)
748                 return -1;
749
750         LASSERT(msg->msg_tx_committed || msg->msg_rx_committed);
751
752         /*
753          * if we're sending to the LOLND then the msg_txpeer will not be
754          * set. So no need to sanity check it.
755          */
756         if (msg->msg_tx_committed &&
757             !nid_is_lo0(&msg->msg_txni->ni_nid))
758                 LASSERT(msg->msg_txpeer);
759         else if (msg->msg_tx_committed &&
760                  nid_is_lo0(&msg->msg_txni->ni_nid))
761                 lo = true;
762
763         /*
764          * always prefer txni/txpeer if they message is committed for both
765          * directions.
766          */
767         if (msg->msg_tx_committed) {
768                 ni = msg->msg_txni;
769                 lpni = msg->msg_txpeer;
770                 attempt_local_resend = attempt_remote_resend = true;
771         } else {
772                 ni = msg->msg_rxni;
773                 lpni = msg->msg_rxpeer;
774                 attempt_local_resend = attempt_remote_resend = false;
775         }
776
777         if (!lo)
778                 LASSERT(ni && lpni);
779         else
780                 LASSERT(ni);
781
782         now = ktime_get();
783         if (ktime_after(now, msg->msg_deadline)) {
784                 s64 time = ktime_to_ns(ktime_sub(now, msg->msg_deadline));
785
786                 atomic64_add(time, &the_lnet.ln_late_msg_nsecs);
787                 atomic_inc(&the_lnet.ln_late_msg_count);
788
789                 if (hstatus != LNET_MSG_STATUS_OK)
790                         return -1;
791         }
792
793         CDEBUG(D_NET, "health check: %s->%s: %s: %s\n",
794                libcfs_nidstr(&ni->ni_nid),
795                (lo) ? "self" : libcfs_nidstr(&lpni->lpni_nid),
796                lnet_msgtyp2str(msg->msg_type),
797                lnet_health_error2str(hstatus));
798
799         /*
800          * stats are only incremented for errors so avoid wasting time
801          * incrementing statistics if there is no error. Similarly, whether to
802          * update health values or perform resends is only applicable for
803          * messages with a health status != OK.
804          */
805         if (hstatus != LNET_MSG_STATUS_OK) {
806                 struct lnet_ping_info *pi;
807
808                 /* Don't further decrement the health value if a recovery
809                  * message failed.
810                  */
811                 if (msg->msg_recovery)
812                         handle_local_health = handle_remote_health = false;
813                 else
814                         handle_local_health = handle_remote_health = true;
815
816                 /* For local failures, health/recovery/resends are not needed if
817                  * I only have a single (non-lolnd) interface.
818                  */
819                 pi = &the_lnet.ln_ping_target->pb_info;
820                 if (lnet_ping_at_least_two_entries(pi)) {
821                         handle_local_health = false;
822                         attempt_local_resend = false;
823                 }
824
825                 lnet_net_lock(0);
826                 lnet_incr_hstats(ni, lpni, hstatus);
827                 /* For remote failures, health/recovery/resends are not needed
828                  * if the peer only has a single interface. Special case for
829                  * routers where we rely on health feature to manage route
830                  * aliveness. NB: lp_nnis does _not_ include the lolnd, so a
831                  * single-rail node would have lp_nnis == 1.
832                  */
833                 if (lpni && lpni->lpni_peer_net &&
834                     lpni->lpni_peer_net->lpn_peer &&
835                     lpni->lpni_peer_net->lpn_peer->lp_nnis <= 1) {
836                         attempt_remote_resend = false;
837                         if (!lnet_isrouter(lpni))
838                                 handle_remote_health = false;
839                 }
840                 /* Do not put my interfaces into peer NI recovery. They should
841                  * be handled with local NI recovery.
842                  */
843                 if (handle_remote_health && lpni &&
844                     lnet_nid_to_ni_locked(&lpni->lpni_nid, 0))
845                         handle_remote_health = false;
846                 lnet_net_unlock(0);
847         }
848
849         switch (hstatus) {
850         case LNET_MSG_STATUS_OK:
851                 /*
852                  * increment the local ni health whether we successfully
853                  * received or sent a message on it.
854                  *
855                  * Ping counts are reset to 0 as appropriate to allow for
856                  * faster recovery.
857                  */
858                 lnet_inc_healthv(&ni->ni_healthv, lnet_health_sensitivity);
859                 /*
860                  * It's possible msg_txpeer is NULL in the LOLND
861                  * case. Only increment the peer's health if we're
862                  * receiving a message from it. It's the only sure way to
863                  * know that a remote interface is up.
864                  * If this interface is part of a router, then take that
865                  * as indication that the router is fully healthy.
866                  */
867                 if (lpni && msg->msg_rx_committed) {
868                         lnet_net_lock(0);
869                         lpni->lpni_ping_count = 0;
870                         ni->ni_ping_count = 0;
871                         /*
872                          * If we're receiving a message from the router or
873                          * I'm a router, then set that lpni's health to
874                          * maximum so we can commence communication
875                          */
876                         if (lnet_isrouter(lpni) || the_lnet.ln_routing) {
877                                 lnet_set_lpni_healthv_locked(lpni,
878                                         LNET_MAX_HEALTH_VALUE);
879                         } else {
880                                 lnet_inc_lpni_healthv_locked(lpni);
881                                 /* This peer NI may have previously aged out
882                                  * of recovery. Now that we've received a
883                                  * message from it, we can continue recovery
884                                  * if its health value is still below the
885                                  * maximum.
886                                  */
887                                 lnet_peer_ni_add_to_recoveryq_locked(lpni,
888                                                 &the_lnet.ln_mt_peerNIRecovq,
889                                                 ktime_get_seconds());
890                         }
891                         lnet_net_unlock(0);
892                 }
893
894                 /* we can finalize this message */
895                 return -1;
896         case LNET_MSG_STATUS_LOCAL_INTERRUPT:
897         case LNET_MSG_STATUS_LOCAL_DROPPED:
898         case LNET_MSG_STATUS_LOCAL_ABORTED:
899         case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
900         case LNET_MSG_STATUS_LOCAL_TIMEOUT:
901                 if (handle_local_health)
902                         lnet_handle_local_failure(ni);
903                 if (attempt_local_resend)
904                         return lnet_attempt_msg_resend(msg);
905                 break;
906         case LNET_MSG_STATUS_LOCAL_ERROR:
907                 if (handle_local_health)
908                         lnet_handle_local_failure(ni);
909                 return -1;
910         case LNET_MSG_STATUS_REMOTE_DROPPED:
911                 if (handle_remote_health)
912                         lnet_handle_remote_failure(lpni);
913                 if (attempt_remote_resend)
914                         return lnet_attempt_msg_resend(msg);
915                 break;
916         case LNET_MSG_STATUS_REMOTE_ERROR:
917         case LNET_MSG_STATUS_REMOTE_TIMEOUT:
918                 if (handle_remote_health)
919                         lnet_handle_remote_failure(lpni);
920                 return -1;
921         case LNET_MSG_STATUS_NETWORK_TIMEOUT:
922                 if (handle_remote_health)
923                         lnet_handle_remote_failure(lpni);
924                 if (handle_local_health)
925                         lnet_handle_local_failure(ni);
926                 return -1;
927         default:
928                 LBUG();
929         }
930
931         /* no resend is needed */
932         return -1;
933 }
934
935 static void
936 lnet_msg_detach_md(struct lnet_msg *msg, int status)
937 {
938         struct lnet_libmd *md = msg->msg_md;
939         lnet_handler_t handler = NULL;
940         int cpt = lnet_cpt_of_cookie(md->md_lh.lh_cookie);
941         int unlink;
942
943         lnet_res_lock(cpt);
944         while (md->md_flags & LNET_MD_FLAG_HANDLING)
945                 /* An event handler is running - wait for it to
946                  * complete to avoid races.
947                  */
948                 lnet_md_wait_handling(md, cpt);
949
950         /* Now it's safe to drop my caller's ref */
951         md->md_refcount--;
952         LASSERT(md->md_refcount >= 0);
953
954         unlink = lnet_md_unlinkable(md);
955         if (md->md_handler) {
956                 if ((md->md_flags & LNET_MD_FLAG_ABORTED) && !status) {
957                         msg->msg_ev.status   = -ETIMEDOUT;
958                         CDEBUG(D_NET, "md 0x%p already unlinked\n", md);
959                 } else {
960                         msg->msg_ev.status   = status;
961                 }
962                 msg->msg_ev.unlinked = unlink;
963                 handler = md->md_handler;
964                 if (!unlink)
965                         md->md_flags |= LNET_MD_FLAG_HANDLING;
966         }
967
968         if (unlink || (md->md_refcount == 0 &&
969                        md->md_threshold == LNET_MD_THRESH_INF))
970                 lnet_detach_rsp_tracker(md, cpt);
971
972         msg->msg_md = NULL;
973         if (unlink)
974                 lnet_md_unlink(md);
975
976         lnet_res_unlock(cpt);
977
978         if (handler) {
979                 handler(&msg->msg_ev);
980                 if (!unlink) {
981                         lnet_res_lock(cpt);
982                         md->md_flags &= ~LNET_MD_FLAG_HANDLING;
983                         wake_up_var(md);
984                         lnet_res_unlock(cpt);
985                 }
986         }
987 }
988
989 static bool
990 lnet_is_health_check(struct lnet_msg *msg)
991 {
992         bool hc = true;
993         int status = msg->msg_ev.status;
994
995         if ((!msg->msg_tx_committed && !msg->msg_rx_committed) ||
996             !msg->msg_onactivelist) {
997                 CDEBUG(D_NET, "msg %p not committed for send or receive\n",
998                        msg);
999                 return false;
1000         }
1001
1002         if ((msg->msg_tx_committed && !msg->msg_txpeer) ||
1003             (msg->msg_rx_committed && !msg->msg_rxpeer)) {
1004                 /* The optimized GET case does not set msg_rxpeer, but status
1005                  * could be zero. Only print the error message if we have a
1006                  * non-zero status.
1007                  */
1008                 if (status)
1009                         CDEBUG(D_NET, "msg %p status %d cannot retry\n", msg,
1010                                status);
1011                 return false;
1012         }
1013
1014         /* Check for status inconsistencies */
1015         if ((!status && msg->msg_health_status != LNET_MSG_STATUS_OK) ||
1016              (status && msg->msg_health_status == LNET_MSG_STATUS_OK)) {
1017                 CDEBUG(D_NET, "Msg %p is in inconsistent state, don't perform health "
1018                        "checking (%d, %d)\n", msg, status,
1019                        msg->msg_health_status);
1020                 hc = false;
1021         }
1022
1023         CDEBUG(D_NET, "health check = %d, status = %d, hstatus = %d\n",
1024                hc, status, msg->msg_health_status);
1025
1026         return hc;
1027 }
1028
1029 char *
1030 lnet_health_error2str(enum lnet_msg_hstatus hstatus)
1031 {
1032         switch (hstatus) {
1033         case LNET_MSG_STATUS_LOCAL_INTERRUPT:
1034                 return "LOCAL_INTERRUPT";
1035         case LNET_MSG_STATUS_LOCAL_DROPPED:
1036                 return "LOCAL_DROPPED";
1037         case LNET_MSG_STATUS_LOCAL_ABORTED:
1038                 return "LOCAL_ABORTED";
1039         case LNET_MSG_STATUS_LOCAL_NO_ROUTE:
1040                 return "LOCAL_NO_ROUTE";
1041         case LNET_MSG_STATUS_LOCAL_TIMEOUT:
1042                 return "LOCAL_TIMEOUT";
1043         case LNET_MSG_STATUS_LOCAL_ERROR:
1044                 return "LOCAL_ERROR";
1045         case LNET_MSG_STATUS_REMOTE_DROPPED:
1046                 return "REMOTE_DROPPED";
1047         case LNET_MSG_STATUS_REMOTE_ERROR:
1048                 return "REMOTE_ERROR";
1049         case LNET_MSG_STATUS_REMOTE_TIMEOUT:
1050                 return "REMOTE_TIMEOUT";
1051         case LNET_MSG_STATUS_NETWORK_TIMEOUT:
1052                 return "NETWORK_TIMEOUT";
1053         case LNET_MSG_STATUS_OK:
1054                 return "OK";
1055         default:
1056                 return "<UNKNOWN>";
1057         }
1058 }
1059
1060 bool
1061 lnet_send_error_simulation(struct lnet_msg *msg,
1062                            enum lnet_msg_hstatus *hstatus)
1063 {
1064         if (!msg)
1065                 return false;
1066
1067         if (list_empty(&the_lnet.ln_drop_rules))
1068             return false;
1069
1070         /* match only health rules */
1071         if (!lnet_drop_rule_match(&msg->msg_hdr, NULL, hstatus))
1072                 return false;
1073
1074         CDEBUG(D_NET, "src %s(%s)->dst %s: %s simulate health error: %s\n",
1075                 libcfs_nidstr(&msg->msg_hdr.src_nid),
1076                 libcfs_nidstr(&msg->msg_txni->ni_nid),
1077                 libcfs_nidstr(&msg->msg_hdr.dest_nid),
1078                 lnet_msgtyp2str(msg->msg_type),
1079                 lnet_health_error2str(*hstatus));
1080
1081         return true;
1082 }
1083 EXPORT_SYMBOL(lnet_send_error_simulation);
1084
1085 void
1086 lnet_finalize(struct lnet_msg *msg, int status)
1087 {
1088         struct lnet_msg_container *container;
1089         int my_slot;
1090         int cpt;
1091         int rc;
1092
1093         LASSERT(!in_interrupt());
1094
1095         if (msg == NULL)
1096                 return;
1097
1098         msg->msg_ev.status = status;
1099
1100         if (lnet_is_health_check(msg)) {
1101                 /*
1102                  * Check the health status of the message. If it has one
1103                  * of the errors that we're supposed to handle, and it has
1104                  * not timed out, then
1105                  *      1. Decrement the appropriate health_value
1106                  *      2. queue the message on the resend queue
1107
1108                  * if the message send is success, timed out or failed in the
1109                  * health check for any reason then we'll just finalize the
1110                  * message. Otherwise just return since the message has been
1111                  * put on the resend queue.
1112                  */
1113                 if (!lnet_health_check(msg))
1114                         return;
1115         }
1116
1117         /*
1118          * We're not going to resend this message so detach its MD and invoke
1119          * the appropriate callbacks
1120          */
1121         if (msg->msg_md != NULL)
1122                 lnet_msg_detach_md(msg, status);
1123
1124 again:
1125         if (!msg->msg_tx_committed && !msg->msg_rx_committed) {
1126                 /* not committed to network yet */
1127                 LASSERT(!msg->msg_onactivelist);
1128                 lnet_msg_free(msg);
1129                 return;
1130         }
1131
1132         /*
1133          * NB: routed message can be committed for both receiving and sending,
1134          * we should finalize in LIFO order and keep counters correct.
1135          * (finalize sending first then finalize receiving)
1136          */
1137         cpt = msg->msg_tx_committed ? msg->msg_tx_cpt : msg->msg_rx_cpt;
1138         lnet_net_lock(cpt);
1139
1140         container = the_lnet.ln_msg_containers[cpt];
1141
1142         /* Recursion breaker.  Don't complete the message here if I am (or
1143          * enough other threads are) already completing messages */
1144         my_slot = lnet_check_finalize_recursion_locked(msg,
1145                                                 &container->msc_finalizing,
1146                                                 container->msc_nfinalizers,
1147                                                 container->msc_finalizers);
1148
1149         /* enough threads are resending */
1150         if (my_slot == -1) {
1151                 lnet_net_unlock(cpt);
1152                 return;
1153         }
1154
1155         rc = 0;
1156         while ((msg = list_first_entry_or_null(&container->msc_finalizing,
1157                                                struct lnet_msg,
1158                                                msg_list)) != NULL) {
1159                 list_del_init(&msg->msg_list);
1160
1161                 /* NB drops and regains the lnet lock if it actually does
1162                  * anything, so my finalizing friends can chomp along too */
1163                 rc = lnet_complete_msg_locked(msg, cpt);
1164                 if (rc != 0)
1165                         break;
1166         }
1167
1168         if (unlikely(!list_empty(&the_lnet.ln_delay_rules))) {
1169                 lnet_net_unlock(cpt);
1170                 lnet_delay_rule_check();
1171                 lnet_net_lock(cpt);
1172         }
1173
1174         container->msc_finalizers[my_slot] = NULL;
1175         lnet_net_unlock(cpt);
1176
1177         if (rc != 0)
1178                 goto again;
1179 }
1180 EXPORT_SYMBOL(lnet_finalize);
1181
1182 void
1183 lnet_msg_container_cleanup(struct lnet_msg_container *container)
1184 {
1185         struct lnet_msg *msg;
1186         int count = 0;
1187
1188         if (container->msc_init == 0)
1189                 return;
1190
1191         while ((msg = list_first_entry_or_null(&container->msc_active,
1192                                                struct lnet_msg,
1193                                                msg_activelist)) != NULL) {
1194                 LASSERT(msg->msg_onactivelist);
1195                 msg->msg_onactivelist = 0;
1196                 list_del_init(&msg->msg_activelist);
1197                 lnet_msg_free(msg);
1198                 count++;
1199         }
1200
1201         if (count > 0)
1202                 CERROR("%d active msg on exit\n", count);
1203
1204         if (container->msc_finalizers != NULL) {
1205                 CFS_FREE_PTR_ARRAY(container->msc_finalizers,
1206                                    container->msc_nfinalizers);
1207                 container->msc_finalizers = NULL;
1208         }
1209
1210         if (container->msc_resenders != NULL) {
1211                 CFS_FREE_PTR_ARRAY(container->msc_resenders,
1212                                    container->msc_nfinalizers);
1213                 container->msc_resenders = NULL;
1214         }
1215         container->msc_init = 0;
1216 }
1217
1218 int
1219 lnet_msg_container_setup(struct lnet_msg_container *container, int cpt)
1220 {
1221         int rc = 0;
1222
1223         container->msc_init = 1;
1224
1225         INIT_LIST_HEAD(&container->msc_active);
1226         INIT_LIST_HEAD(&container->msc_finalizing);
1227         INIT_LIST_HEAD(&container->msc_resending);
1228
1229         /* number of CPUs */
1230         container->msc_nfinalizers = cfs_cpt_weight(lnet_cpt_table(), cpt);
1231         if (container->msc_nfinalizers == 0)
1232                 container->msc_nfinalizers = 1;
1233
1234         LIBCFS_CPT_ALLOC(container->msc_finalizers, lnet_cpt_table(), cpt,
1235                          container->msc_nfinalizers *
1236                          sizeof(*container->msc_finalizers));
1237
1238         if (container->msc_finalizers == NULL) {
1239                 CERROR("Failed to allocate message finalizers\n");
1240                 lnet_msg_container_cleanup(container);
1241                 return -ENOMEM;
1242         }
1243
1244         LIBCFS_CPT_ALLOC(container->msc_resenders, lnet_cpt_table(), cpt,
1245                          container->msc_nfinalizers *
1246                          sizeof(*container->msc_resenders));
1247
1248         if (container->msc_resenders == NULL) {
1249                 CERROR("Failed to allocate message resenders\n");
1250                 lnet_msg_container_cleanup(container);
1251                 return -ENOMEM;
1252         }
1253
1254         return rc;
1255 }
1256
1257 void
1258 lnet_msg_containers_destroy(void)
1259 {
1260         struct lnet_msg_container *container;
1261         int     i;
1262
1263         if (the_lnet.ln_msg_containers == NULL)
1264                 return;
1265
1266         cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers)
1267                 lnet_msg_container_cleanup(container);
1268
1269         cfs_percpt_free(the_lnet.ln_msg_containers);
1270         the_lnet.ln_msg_containers = NULL;
1271 }
1272
1273 int
1274 lnet_msg_containers_create(void)
1275 {
1276         struct lnet_msg_container *container;
1277         int     rc;
1278         int     i;
1279
1280         the_lnet.ln_msg_containers = cfs_percpt_alloc(lnet_cpt_table(),
1281                                                       sizeof(*container));
1282
1283         if (the_lnet.ln_msg_containers == NULL) {
1284                 CERROR("Failed to allocate cpu-partition data for network\n");
1285                 return -ENOMEM;
1286         }
1287
1288         cfs_percpt_for_each(container, i, the_lnet.ln_msg_containers) {
1289                 rc = lnet_msg_container_setup(container, i);
1290                 if (rc != 0) {
1291                         lnet_msg_containers_destroy();
1292                         return rc;
1293                 }
1294         }
1295
1296         return 0;
1297 }