Whamcloud - gitweb
New release 2.15.64
[fs/lustre-release.git] / lnet / lnet / net_fault.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /* Copyright (c) 2014, 2017, Intel Corporation. */
4
5 /* This file is part of Lustre, http://www.lustre.org/
6  *
7  * Lustre network fault simulation
8  *
9  * Author: liang.zhen@intel.com
10  */
11
12 #define DEBUG_SUBSYSTEM S_LNET
13
14 #include <linux/random.h>
15 #include <lnet/lib-lnet.h>
16 #include <uapi/linux/lnet/lnetctl.h>
17
18 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
19                                  LNET_GET_BIT | LNET_REPLY_BIT)
20
21 struct lnet_drop_rule {
22         /** link chain on the_lnet.ln_drop_rules */
23         struct list_head                dr_link;
24         /** attributes of this rule */
25         struct lnet_fault_large_attr    dr_attr;
26         /** lock to protect \a dr_drop_at and \a dr_stat */
27         spinlock_t                      dr_lock;
28         /**
29          * the message sequence to drop, which means message is dropped when
30          * dr_stat.drs_count == dr_drop_at
31          */
32         unsigned long                   dr_drop_at;
33         /**
34          * seconds to drop the next message, it's exclusive with dr_drop_at
35          */
36         time64_t                        dr_drop_time;
37         /** baseline to caculate dr_drop_time */
38         time64_t                        dr_time_base;
39         /** statistic of dropped messages */
40         struct lnet_fault_stat          dr_stat;
41 };
42
43 static void
44 lnet_fault_attr_to_attr4(struct lnet_fault_large_attr *attr,
45                          struct lnet_fault_attr *attr4)
46 {
47         if (!attr)
48                 return;
49
50         attr4->fa_src = lnet_nid_to_nid4(&attr->fa_src);
51         attr4->fa_dst = lnet_nid_to_nid4(&attr->fa_dst);
52         attr4->fa_local_nid = lnet_nid_to_nid4(&attr->fa_local_nid);
53         attr4->fa_ptl_mask = attr->fa_ptl_mask;
54         attr4->fa_msg_mask = attr->fa_msg_mask;
55
56         memcpy(&attr4->u, &attr->u, sizeof(attr4->u));
57 }
58
59 static void
60 lnet_fault_attr4_to_attr(struct lnet_fault_attr *attr4,
61                          struct lnet_fault_large_attr *attr)
62 {
63         if (!attr4)
64                 return;
65
66         if (attr4->fa_src)
67                 lnet_nid4_to_nid(attr4->fa_src, &attr->fa_src);
68         else
69                 attr->fa_src = LNET_ANY_NID;
70
71         if (attr4->fa_dst)
72                 lnet_nid4_to_nid(attr4->fa_dst, &attr->fa_dst);
73         else
74                 attr->fa_dst = LNET_ANY_NID;
75
76         if (attr4->fa_local_nid)
77                 lnet_nid4_to_nid(attr4->fa_local_nid, &attr->fa_local_nid);
78         else
79                 attr->fa_local_nid = LNET_ANY_NID;
80
81         attr->fa_ptl_mask = attr4->fa_ptl_mask;
82         attr->fa_msg_mask = attr4->fa_msg_mask;
83
84         memcpy(&attr->u, &attr4->u, sizeof(attr->u));
85 }
86
87 static bool
88 lnet_fault_nid_match(struct lnet_nid *nid, struct lnet_nid *msg_nid)
89 {
90         if (LNET_NID_IS_ANY(nid))
91                 return true;
92         if (!msg_nid)
93                 return false;
94         if (nid_same(msg_nid, nid))
95                 return true;
96
97         if (LNET_NID_NET(nid) != LNET_NID_NET(msg_nid))
98                 return false;
99
100         /* 255.255.255.255@net is wildcard for all addresses in a network */
101         return __be32_to_cpu(nid->nid_addr[0]) == LNET_NIDADDR(LNET_NID_ANY);
102 }
103
104 static bool
105 lnet_fault_attr_match(struct lnet_fault_large_attr *attr,
106                       struct lnet_nid *src,
107                       struct lnet_nid *local_nid,
108                       struct lnet_nid *dst,
109                       unsigned int type, unsigned int portal)
110 {
111         if (!lnet_fault_nid_match(&attr->fa_src, src) ||
112             !lnet_fault_nid_match(&attr->fa_dst, dst) ||
113             !lnet_fault_nid_match(&attr->fa_local_nid, local_nid))
114                 return false;
115
116         if (!(attr->fa_msg_mask & BIT(type)))
117                 return false;
118
119         /* NB: ACK and REPLY have no portal, but they should have been
120          * rejected by message mask */
121         if (attr->fa_ptl_mask != 0 && /* has portal filter */
122             !(attr->fa_ptl_mask & (1ULL << portal)))
123                 return false;
124
125         return true;
126 }
127
128 static int
129 lnet_fault_attr_validate(struct lnet_fault_large_attr *attr)
130 {
131         if (attr->fa_msg_mask == 0)
132                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
133
134         if (attr->fa_ptl_mask == 0) /* no portal filter */
135                 return 0;
136
137         /* NB: only PUT and GET can be filtered if portal filter has been set */
138         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
139         if (attr->fa_msg_mask == 0) {
140                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
141                        attr->fa_msg_mask);
142                 return -EINVAL;
143         }
144         return 0;
145 }
146
147 static void
148 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
149 {
150         /* NB: fs_counter is NOT updated by this function */
151         switch (type) {
152         case LNET_MSG_PUT:
153                 stat->fs_put++;
154                 return;
155         case LNET_MSG_ACK:
156                 stat->fs_ack++;
157                 return;
158         case LNET_MSG_GET:
159                 stat->fs_get++;
160                 return;
161         case LNET_MSG_REPLY:
162                 stat->fs_reply++;
163                 return;
164         }
165 }
166
167 /**
168  * LNet message drop simulation
169  */
170
171 /**
172  * Add a new drop rule to LNet
173  * There is no check for duplicated drop rule, all rules will be checked for
174  * incoming message.
175  */
176 int lnet_drop_rule_add(struct lnet_fault_large_attr *attr)
177 {
178         struct lnet_drop_rule *rule;
179         ENTRY;
180
181         if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
182                 CDEBUG(D_NET,
183                        "please provide either drop rate or drop interval, "
184                        "but not both at the same time %d/%d\n",
185                        attr->u.drop.da_rate, attr->u.drop.da_interval);
186                 RETURN(-EINVAL);
187         }
188
189         if (lnet_fault_attr_validate(attr) != 0)
190                 RETURN(-EINVAL);
191
192         CFS_ALLOC_PTR(rule);
193         if (rule == NULL)
194                 RETURN(-ENOMEM);
195
196         spin_lock_init(&rule->dr_lock);
197
198         rule->dr_attr = *attr;
199         if (attr->u.drop.da_interval != 0) {
200                 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
201                 rule->dr_drop_time = ktime_get_seconds() +
202                                      get_random_u32_below(attr->u.drop.da_interval);
203         } else {
204                 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
205         }
206
207         lnet_net_lock(LNET_LOCK_EX);
208         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
209         lnet_net_unlock(LNET_LOCK_EX);
210
211         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
212                libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
213                attr->u.drop.da_rate, attr->u.drop.da_interval);
214         RETURN(0);
215 }
216
217 /**
218  * Remove matched drop rules from lnet, all rules that can match \a src and
219  * \a dst will be removed.
220  * If \a src is zero, then all rules have \a dst as destination will be remove
221  * If \a dst is zero, then all rules have \a src as source will be removed
222  * If both of them are zero, all rules will be removed
223  */
224 int lnet_drop_rule_del(struct lnet_nid *src, struct lnet_nid *dst)
225 {
226         struct lnet_drop_rule *rule;
227         struct lnet_drop_rule *tmp;
228         LIST_HEAD(zombies);
229         int n = 0;
230         ENTRY;
231
232         CDEBUG(D_NET, "src %s dst %s\n", libcfs_nidstr(src),
233                libcfs_nidstr(dst));
234         lnet_net_lock(LNET_LOCK_EX);
235         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
236                 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dr_attr.fa_src, src)))
237                         continue;
238
239                 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dr_attr.fa_dst, dst)))
240                         continue;
241
242                 list_move(&rule->dr_link, &zombies);
243         }
244         lnet_net_unlock(LNET_LOCK_EX);
245
246         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
247                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
248                        libcfs_nidstr(&rule->dr_attr.fa_src),
249                        libcfs_nidstr(&rule->dr_attr.fa_dst),
250                        rule->dr_attr.u.drop.da_rate,
251                        rule->dr_attr.u.drop.da_interval);
252
253                 list_del(&rule->dr_link);
254                 CFS_FREE_PTR(rule);
255                 n++;
256         }
257
258         RETURN(n);
259 }
260
261 /**
262  * List drop rule at position of \a pos
263  */
264 static int
265 lnet_drop_rule_list(int pos, struct lnet_fault_large_attr *attr,
266                     struct lnet_fault_stat *stat)
267 {
268         struct lnet_drop_rule *rule;
269         int                    cpt;
270         int                    i = 0;
271         int                    rc = -ENOENT;
272         ENTRY;
273
274         cpt = lnet_net_lock_current();
275         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
276                 if (i++ < pos)
277                         continue;
278
279                 spin_lock(&rule->dr_lock);
280                 *attr = rule->dr_attr;
281                 *stat = rule->dr_stat;
282                 spin_unlock(&rule->dr_lock);
283                 rc = 0;
284                 break;
285         }
286
287         lnet_net_unlock(cpt);
288         RETURN(rc);
289 }
290
291 int lnet_drop_rule_collect(struct lnet_genl_fault_rule_list *rlist)
292 {
293         struct lnet_drop_rule *rule;
294         int cpt, rc = 0;
295
296         ENTRY;
297         cpt = lnet_net_lock_current();
298         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
299                 struct lnet_rule_properties *prop;
300
301                 prop = genradix_ptr_alloc(&rlist->lgfrl_list,
302                                           rlist->lgfrl_count++,
303                                           GFP_KERNEL);
304                 if (!prop) {
305                         rc = -ENOMEM;
306                         break;
307                 }
308                 spin_lock(&rule->dr_lock);
309                 prop->attr = rule->dr_attr;
310                 prop->stat = rule->dr_stat;
311                 spin_unlock(&rule->dr_lock);
312         }
313
314         lnet_net_unlock(cpt);
315         RETURN(rc);
316 }
317
318 /**
319  * reset counters for all drop rules
320  */
321 void lnet_drop_rule_reset(void)
322 {
323         struct lnet_drop_rule *rule;
324         int                    cpt;
325         ENTRY;
326
327         cpt = lnet_net_lock_current();
328
329         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
330                 struct lnet_fault_large_attr *attr = &rule->dr_attr;
331
332                 spin_lock(&rule->dr_lock);
333
334                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
335                 if (attr->u.drop.da_rate != 0) {
336                         rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
337                 } else {
338                         rule->dr_drop_time = ktime_get_seconds() +
339                                              get_random_u32_below(attr->u.drop.da_interval);
340                         rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
341                 }
342                 spin_unlock(&rule->dr_lock);
343         }
344
345         lnet_net_unlock(cpt);
346         EXIT;
347 }
348
349 static void
350 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
351 {
352         int choice;
353         int delta;
354         int best_delta;
355         int i;
356
357         /* assign a random failure */
358         choice = get_random_u32_below(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
359         if (choice == 0)
360                 choice++;
361
362         if (mask == HSTATUS_RANDOM) {
363                 *hstatus = choice;
364                 return;
365         }
366
367         if (mask & BIT(choice)) {
368                 *hstatus = choice;
369                 return;
370         }
371
372         /* round to the closest ON bit */
373         i = HSTATUS_END;
374         best_delta = HSTATUS_END;
375         while (i > 0) {
376                 if (mask & BIT(i)) {
377                         delta = choice - i;
378                         if (delta < 0)
379                                 delta *= -1;
380                         if (delta < best_delta) {
381                                 best_delta = delta;
382                                 choice = i;
383                         }
384                 }
385                 i--;
386         }
387
388         *hstatus = choice;
389 }
390
391 /**
392  * check source/destination NID, portal, message type and drop rate,
393  * decide whether should drop this message or not
394  */
395 static bool
396 drop_rule_match(struct lnet_drop_rule *rule,
397                 struct lnet_nid *src,
398                 struct lnet_nid *local_nid,
399                 struct lnet_nid *dst,
400                 unsigned int type, unsigned int portal,
401                 enum lnet_msg_hstatus *hstatus)
402 {
403         struct lnet_fault_large_attr *attr = &rule->dr_attr;
404         bool drop;
405
406         if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
407                 return false;
408
409         if (attr->u.drop.da_drop_all) {
410                 CDEBUG(D_NET, "set to drop all messages\n");
411                 drop = true;
412                 goto drop_matched;
413         }
414
415         /*
416          * if we're trying to match a health status error but it hasn't
417          * been set in the rule, then don't match
418          */
419         if ((hstatus && !attr->u.drop.da_health_error_mask) ||
420             (!hstatus && attr->u.drop.da_health_error_mask))
421                 return false;
422
423         /* match this rule, check drop rate now */
424         spin_lock(&rule->dr_lock);
425         if (attr->u.drop.da_random) {
426                 int value = get_random_u32_below(attr->u.drop.da_interval);
427                 if (value >= (attr->u.drop.da_interval / 2))
428                         drop = true;
429                 else
430                         drop = false;
431         } else if (rule->dr_drop_time != 0) { /* time based drop */
432                 time64_t now = ktime_get_seconds();
433
434                 rule->dr_stat.fs_count++;
435                 drop = now >= rule->dr_drop_time;
436                 if (drop) {
437                         if (now > rule->dr_time_base)
438                                 rule->dr_time_base = now;
439
440                         rule->dr_drop_time = rule->dr_time_base +
441                                              get_random_u32_below(attr->u.drop.da_interval);
442                         rule->dr_time_base += attr->u.drop.da_interval;
443
444                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
445                                libcfs_nidstr(&attr->fa_src),
446                                libcfs_nidstr(&attr->fa_dst),
447                                rule->dr_drop_time);
448                 }
449
450         } else { /* rate based drop */
451                 __u64 count;
452
453                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
454                 count = rule->dr_stat.fs_count;
455                 if (do_div(count, attr->u.drop.da_rate) == 0) {
456                         rule->dr_drop_at = rule->dr_stat.fs_count +
457                                            get_random_u32_below(attr->u.drop.da_rate);
458                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
459                                libcfs_nidstr(&attr->fa_src),
460                                libcfs_nidstr(&attr->fa_dst), rule->dr_drop_at);
461                 }
462         }
463
464 drop_matched:
465
466         if (drop) { /* drop this message, update counters */
467                 if (hstatus)
468                         lnet_fault_match_health(hstatus,
469                                 attr->u.drop.da_health_error_mask);
470                 lnet_fault_stat_inc(&rule->dr_stat, type);
471                 rule->dr_stat.u.drop.ds_dropped++;
472         }
473
474         spin_unlock(&rule->dr_lock);
475         return drop;
476 }
477
478 /**
479  * Check if message from \a src to \a dst can match any existed drop rule
480  */
481 bool
482 lnet_drop_rule_match(struct lnet_hdr *hdr,
483                      struct lnet_nid *local_nid,
484                      enum lnet_msg_hstatus *hstatus)
485 {
486         unsigned int typ = hdr->type;
487         struct lnet_drop_rule *rule;
488         unsigned int ptl = -1;
489         bool drop = false;
490         int cpt;
491
492         /* NB: if Portal is specified, then only PUT and GET will be
493          * filtered by drop rule */
494         if (typ == LNET_MSG_PUT)
495                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
496         else if (typ == LNET_MSG_GET)
497                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
498
499         cpt = lnet_net_lock_current();
500         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
501                 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
502                                        &hdr->dest_nid, typ, ptl,
503                                        hstatus);
504                 if (drop)
505                         break;
506         }
507         lnet_net_unlock(cpt);
508
509         return drop;
510 }
511
512 /**
513  * LNet Delay Simulation
514  */
515 /** timestamp (second) to send delayed message */
516 #define msg_delay_send           msg_ev.hdr_data
517
518 struct lnet_delay_rule {
519         /** link chain on the_lnet.ln_delay_rules */
520         struct list_head                dl_link;
521         /** link chain on delay_dd.dd_sched_rules */
522         struct list_head                dl_sched_link;
523         /** attributes of this rule */
524         struct lnet_fault_large_attr    dl_attr;
525         /** lock to protect \a below members */
526         spinlock_t                      dl_lock;
527         /** refcount of delay rule */
528         atomic_t                        dl_refcount;
529         /**
530          * the message sequence to delay, which means message is delayed when
531          * dl_stat.fs_count == dl_delay_at
532          */
533         unsigned long                   dl_delay_at;
534         /**
535          * seconds to delay the next message, it's exclusive with dl_delay_at
536          */
537         time64_t                        dl_delay_time;
538         /** baseline to caculate dl_delay_time */
539         time64_t                        dl_time_base;
540         /** seconds until we send the next delayed message */
541         time64_t                        dl_msg_send;
542         /** delayed message list */
543         struct list_head                dl_msg_list;
544         /** statistic of delayed messages */
545         struct lnet_fault_stat          dl_stat;
546         /** timer to wakeup delay_daemon */
547         struct timer_list               dl_timer;
548 };
549
550 struct delay_daemon_data {
551         /** serialise rule add/remove */
552         struct mutex            dd_mutex;
553         /** protect rules on \a dd_sched_rules */
554         spinlock_t              dd_lock;
555         /** scheduled delay rules (by timer) */
556         struct list_head        dd_sched_rules;
557         /** deamon thread sleeps at here */
558         wait_queue_head_t       dd_waitq;
559         /** controler (lctl command) wait at here */
560         wait_queue_head_t       dd_ctl_waitq;
561         /** deamon is running */
562         unsigned int            dd_running;
563         /** deamon stopped */
564         unsigned int            dd_stopped;
565 };
566
567 static struct delay_daemon_data delay_dd;
568
569 static void
570 delay_rule_decref(struct lnet_delay_rule *rule)
571 {
572         if (atomic_dec_and_test(&rule->dl_refcount)) {
573                 LASSERT(list_empty(&rule->dl_sched_link));
574                 LASSERT(list_empty(&rule->dl_msg_list));
575                 LASSERT(list_empty(&rule->dl_link));
576
577                 CFS_FREE_PTR(rule);
578         }
579 }
580
581 /**
582  * check source/destination NID, portal, message type and delay rate,
583  * decide whether should delay this message or not
584  */
585 static bool
586 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
587                  struct lnet_nid *dst, unsigned int type, unsigned int portal,
588                  struct lnet_msg *msg)
589 {
590         struct lnet_fault_large_attr *attr = &rule->dl_attr;
591         bool delay;
592         time64_t now = ktime_get_seconds();
593
594         if (!lnet_fault_attr_match(attr, src, NULL,
595                                    dst, type, portal))
596                 return false;
597
598         /* match this rule, check delay rate now */
599         spin_lock(&rule->dl_lock);
600         if (rule->dl_delay_time != 0) { /* time based delay */
601                 rule->dl_stat.fs_count++;
602                 delay = now >= rule->dl_delay_time;
603                 if (delay) {
604                         if (now > rule->dl_time_base)
605                                 rule->dl_time_base = now;
606
607                         rule->dl_delay_time = rule->dl_time_base +
608                                               get_random_u32_below(attr->u.delay.la_interval);
609                         rule->dl_time_base += attr->u.delay.la_interval;
610
611                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
612                                libcfs_nidstr(&attr->fa_src),
613                                libcfs_nidstr(&attr->fa_dst),
614                                rule->dl_delay_time);
615                 }
616
617         } else { /* rate based delay */
618                 __u64 count;
619
620                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
621                 /* generate the next random rate sequence */
622                 count = rule->dl_stat.fs_count;
623                 if (do_div(count, attr->u.delay.la_rate) == 0) {
624                         rule->dl_delay_at = rule->dl_stat.fs_count +
625                                             get_random_u32_below(attr->u.delay.la_rate);
626                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
627                                libcfs_nidstr(&attr->fa_src),
628                                libcfs_nidstr(&attr->fa_dst), rule->dl_delay_at);
629                 }
630         }
631
632         if (!delay) {
633                 spin_unlock(&rule->dl_lock);
634                 return false;
635         }
636
637         /* delay this message, update counters */
638         lnet_fault_stat_inc(&rule->dl_stat, type);
639         rule->dl_stat.u.delay.ls_delayed++;
640
641         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
642         msg->msg_delay_send = now + attr->u.delay.la_latency;
643         if (rule->dl_msg_send == -1) {
644                 rule->dl_msg_send = msg->msg_delay_send;
645                 mod_timer(&rule->dl_timer,
646                           jiffies + cfs_time_seconds(attr->u.delay.la_latency));
647         }
648
649         spin_unlock(&rule->dl_lock);
650         return true;
651 }
652
653 /**
654  * check if \a msg can match any Delay Rule, receiving of this message
655  * will be delayed if there is a match.
656  */
657 bool
658 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
659 {
660         struct lnet_delay_rule  *rule;
661         unsigned int             typ = hdr->type;
662         unsigned int             ptl = -1;
663
664         /* NB: called with hold of lnet_net_lock */
665
666         /* NB: if Portal is specified, then only PUT and GET will be
667          * filtered by delay rule */
668         if (typ == LNET_MSG_PUT)
669                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
670         else if (typ == LNET_MSG_GET)
671                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
672
673         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
674                 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
675                                      typ, ptl, msg))
676                         return true;
677         }
678
679         return false;
680 }
681
682 /** check out delayed messages for send */
683 static void
684 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
685                   struct list_head *msg_list)
686 {
687         struct lnet_msg *msg;
688         struct lnet_msg *tmp;
689         time64_t now = ktime_get_seconds();
690
691         if (!all && rule->dl_msg_send > now)
692                 return;
693
694         spin_lock(&rule->dl_lock);
695         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
696                 if (!all && msg->msg_delay_send > now)
697                         break;
698
699                 msg->msg_delay_send = 0;
700                 list_move_tail(&msg->msg_list, msg_list);
701         }
702
703         if (list_empty(&rule->dl_msg_list)) {
704                 timer_delete(&rule->dl_timer);
705                 rule->dl_msg_send = -1;
706
707         } else if (!list_empty(msg_list)) {
708                 /* dequeued some timedout messages, update timer for the
709                  * next delayed message on rule */
710                 msg = list_first_entry(&rule->dl_msg_list,
711                                        struct lnet_msg, msg_list);
712                 rule->dl_msg_send = msg->msg_delay_send;
713                 mod_timer(&rule->dl_timer,
714                           jiffies +
715                           cfs_time_seconds(msg->msg_delay_send - now));
716         }
717         spin_unlock(&rule->dl_lock);
718 }
719
720 static void
721 delayed_msg_process(struct list_head *msg_list, bool drop)
722 {
723         struct lnet_msg *msg;
724
725         while ((msg = list_first_entry_or_null(msg_list, struct lnet_msg,
726                                                msg_list)) != NULL) {
727                 struct lnet_ni *ni;
728                 int             cpt;
729                 int             rc;
730
731                 if (msg->msg_sending) {
732                         /* Delayed send */
733                         list_del_init(&msg->msg_list);
734                         ni = msg->msg_txni;
735                         CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
736                                libcfs_nidstr(&ni->ni_nid),
737                                libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
738                                lnet_msgtyp2str(msg->msg_type));
739                         lnet_ni_send(ni, msg);
740                         continue;
741                 }
742
743                 /* Delayed receive */
744                 LASSERT(msg->msg_rxpeer != NULL);
745                 LASSERT(msg->msg_rxni != NULL);
746
747                 ni = msg->msg_rxni;
748                 cpt = msg->msg_rx_cpt;
749
750                 list_del_init(&msg->msg_list);
751                 if (drop) {
752                         rc = -ECANCELED;
753
754                 } else if (!msg->msg_routing) {
755                         rc = lnet_parse_local(ni, msg);
756                         if (rc == 0)
757                                 continue;
758
759                 } else {
760                         lnet_net_lock(cpt);
761                         rc = lnet_parse_forward_locked(ni, msg);
762                         lnet_net_unlock(cpt);
763
764                         switch (rc) {
765                         case LNET_CREDIT_OK:
766                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
767                                              0, msg->msg_len, msg->msg_len);
768                                 fallthrough;
769                         case LNET_CREDIT_WAIT:
770                                 continue;
771                         default: /* failures */
772                                 break;
773                         }
774                 }
775
776                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
777                                   msg->msg_type);
778                 lnet_finalize(msg, rc);
779         }
780 }
781
782 /**
783  * Process delayed messages for scheduled rules
784  * This function can either be called by delay_rule_daemon, or by lnet_finalise
785  */
786 void
787 lnet_delay_rule_check(void)
788 {
789         struct lnet_delay_rule *rule;
790         LIST_HEAD(msgs);
791
792         while (1) {
793                 if (list_empty(&delay_dd.dd_sched_rules))
794                         break;
795
796                 spin_lock_bh(&delay_dd.dd_lock);
797                 if (list_empty(&delay_dd.dd_sched_rules)) {
798                         spin_unlock_bh(&delay_dd.dd_lock);
799                         break;
800                 }
801
802                 rule = list_first_entry(&delay_dd.dd_sched_rules,
803                                         struct lnet_delay_rule, dl_sched_link);
804                 list_del_init(&rule->dl_sched_link);
805                 spin_unlock_bh(&delay_dd.dd_lock);
806
807                 delayed_msg_check(rule, false, &msgs);
808                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
809         }
810
811         if (!list_empty(&msgs))
812                 delayed_msg_process(&msgs, false);
813 }
814
815 /** deamon thread to handle delayed messages */
816 static int
817 lnet_delay_rule_daemon(void *arg)
818 {
819         delay_dd.dd_running = 1;
820         wake_up(&delay_dd.dd_ctl_waitq);
821
822         while (delay_dd.dd_running) {
823                 wait_event_interruptible(delay_dd.dd_waitq,
824                                          !delay_dd.dd_running ||
825                                          !list_empty(&delay_dd.dd_sched_rules));
826                 lnet_delay_rule_check();
827         }
828
829         /* in case more rules have been enqueued after my last check */
830         lnet_delay_rule_check();
831         delay_dd.dd_stopped = 1;
832         wake_up(&delay_dd.dd_ctl_waitq);
833
834         return 0;
835 }
836
837 static void
838 delay_timer_cb(cfs_timer_cb_arg_t data)
839 {
840         struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
841
842         spin_lock_bh(&delay_dd.dd_lock);
843         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
844                 atomic_inc(&rule->dl_refcount);
845                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
846                 wake_up(&delay_dd.dd_waitq);
847         }
848         spin_unlock_bh(&delay_dd.dd_lock);
849 }
850
851 /**
852  * Add a new delay rule to LNet
853  * There is no check for duplicated delay rule, all rules will be checked for
854  * incoming message.
855  */
856 int
857 lnet_delay_rule_add(struct lnet_fault_large_attr *attr)
858 {
859         struct lnet_delay_rule *rule;
860         int rc = 0;
861         ENTRY;
862
863         if (!((attr->u.delay.la_rate == 0) ^
864               (attr->u.delay.la_interval == 0))) {
865                 CDEBUG(D_NET,
866                        "please provide either delay rate or delay interval, "
867                        "but not both at the same time %d/%d\n",
868                        attr->u.delay.la_rate, attr->u.delay.la_interval);
869                 RETURN(-EINVAL);
870         }
871
872         if (attr->u.delay.la_latency == 0) {
873                 CDEBUG(D_NET, "delay latency cannot be zero\n");
874                 RETURN(-EINVAL);
875         }
876
877         if (lnet_fault_attr_validate(attr) != 0)
878                 RETURN(-EINVAL);
879
880         CFS_ALLOC_PTR(rule);
881         if (rule == NULL)
882                 RETURN(-ENOMEM);
883
884         mutex_lock(&delay_dd.dd_mutex);
885         if (!delay_dd.dd_running) {
886                 struct task_struct *task;
887
888                 /* NB: although LND threads will process delayed message
889                  * in lnet_finalize, but there is no guarantee that LND
890                  * threads will be waken up if no other message needs to
891                  * be handled.
892                  * Only one daemon thread, performance is not the concern
893                  * of this simualation module.
894                  */
895                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
896                 if (IS_ERR(task)) {
897                         rc = PTR_ERR(task);
898                         GOTO(failed, rc);
899                 }
900                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
901         }
902
903         cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
904                         (unsigned long)rule, 0);
905
906         spin_lock_init(&rule->dl_lock);
907         INIT_LIST_HEAD(&rule->dl_msg_list);
908         INIT_LIST_HEAD(&rule->dl_sched_link);
909
910         rule->dl_attr = *attr;
911         if (attr->u.delay.la_interval != 0) {
912                 rule->dl_time_base = ktime_get_seconds() +
913                                      attr->u.delay.la_interval;
914                 rule->dl_delay_time = ktime_get_seconds() +
915                                       get_random_u32_below(attr->u.delay.la_interval);
916         } else {
917                 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
918         }
919
920         rule->dl_msg_send = -1;
921
922         lnet_net_lock(LNET_LOCK_EX);
923         atomic_set(&rule->dl_refcount, 1);
924         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
925         lnet_net_unlock(LNET_LOCK_EX);
926
927         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
928                libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
929                attr->u.delay.la_rate);
930
931         mutex_unlock(&delay_dd.dd_mutex);
932         RETURN(0);
933  failed:
934         mutex_unlock(&delay_dd.dd_mutex);
935         CFS_FREE_PTR(rule);
936         return rc;
937 }
938
939 /**
940  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
941  * and \a dst are zero, all rules will be removed, otherwise only matched rules
942  * will be removed.
943  * If \a src is zero, then all rules have \a dst as destination will be remove
944  * If \a dst is zero, then all rules have \a src as source will be removed
945  *
946  * When a delay rule is removed, all delayed messages of this rule will be
947  * processed immediately.
948  */
949 int
950 lnet_delay_rule_del(struct lnet_nid *src, struct lnet_nid *dst, bool shutdown)
951 {
952         struct lnet_delay_rule *rule;
953         struct lnet_delay_rule *tmp;
954         LIST_HEAD(rule_list);
955         LIST_HEAD(msg_list);
956         int n = 0;
957         bool cleanup;
958         ENTRY;
959
960         mutex_lock(&delay_dd.dd_mutex);
961         lnet_net_lock(LNET_LOCK_EX);
962
963         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
964                 CDEBUG(D_NET, "src %s dst %s fa_src %s fa_dst %s\n",
965                        libcfs_nidstr(src), libcfs_nidstr(dst),
966                        libcfs_nidstr(&rule->dl_attr.fa_src),
967                        libcfs_nidstr(&rule->dl_attr.fa_dst));
968                 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dl_attr.fa_src, src)))
969                         continue;
970
971                 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dl_attr.fa_dst, dst)))
972                         continue;
973
974                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
975                        libcfs_nidstr(&rule->dl_attr.fa_src),
976                        libcfs_nidstr(&rule->dl_attr.fa_dst),
977                        rule->dl_attr.u.delay.la_rate,
978                        rule->dl_attr.u.delay.la_interval);
979                 /* refcount is taken over by rule_list */
980                 list_move(&rule->dl_link, &rule_list);
981         }
982
983         /* check if we need to shutdown delay_daemon */
984         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
985                   !list_empty(&rule_list);
986         lnet_net_unlock(LNET_LOCK_EX);
987
988         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
989                 list_del_init(&rule->dl_link);
990
991                 timer_delete_sync(&rule->dl_timer);
992                 delayed_msg_check(rule, true, &msg_list);
993                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
994                 n++;
995         }
996
997         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
998                 LASSERT(delay_dd.dd_running);
999                 delay_dd.dd_running = 0;
1000                 wake_up(&delay_dd.dd_waitq);
1001
1002                 while (!delay_dd.dd_stopped)
1003                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
1004         }
1005         mutex_unlock(&delay_dd.dd_mutex);
1006
1007         if (!list_empty(&msg_list))
1008                 delayed_msg_process(&msg_list, shutdown);
1009
1010         RETURN(n);
1011 }
1012
1013 /**
1014  * List Delay Rule at position of \a pos
1015  */
1016 int
1017 lnet_delay_rule_list(int pos, struct lnet_fault_large_attr *attr,
1018                     struct lnet_fault_stat *stat)
1019 {
1020         struct lnet_delay_rule *rule;
1021         int                     cpt;
1022         int                     i = 0;
1023         int                     rc = -ENOENT;
1024         ENTRY;
1025
1026         cpt = lnet_net_lock_current();
1027         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1028                 if (i++ < pos)
1029                         continue;
1030
1031                 spin_lock(&rule->dl_lock);
1032                 *attr = rule->dl_attr;
1033                 *stat = rule->dl_stat;
1034                 spin_unlock(&rule->dl_lock);
1035                 rc = 0;
1036                 break;
1037         }
1038
1039         lnet_net_unlock(cpt);
1040         RETURN(rc);
1041 }
1042
1043 int lnet_delay_rule_collect(struct lnet_genl_fault_rule_list *rlist)
1044 {
1045         struct lnet_delay_rule *rule;
1046         int cpt, rc = 0;
1047
1048         ENTRY;
1049         cpt = lnet_net_lock_current();
1050         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1051                 struct lnet_rule_properties *prop;
1052
1053                 prop = genradix_ptr_alloc(&rlist->lgfrl_list,
1054                                           rlist->lgfrl_count++,
1055                                           GFP_KERNEL);
1056                 if (!prop) {
1057                         rc = -ENOMEM;
1058                         break;
1059                 }
1060                 spin_lock(&rule->dl_lock);
1061                 prop->attr = rule->dl_attr;
1062                 prop->stat = rule->dl_stat;
1063                 spin_unlock(&rule->dl_lock);
1064         }
1065
1066         lnet_net_unlock(cpt);
1067         RETURN(rc);
1068 }
1069
1070 /**
1071  * reset counters for all Delay Rules
1072  */
1073 void
1074 lnet_delay_rule_reset(void)
1075 {
1076         struct lnet_delay_rule *rule;
1077         int                     cpt;
1078         ENTRY;
1079
1080         cpt = lnet_net_lock_current();
1081
1082         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1083                 struct lnet_fault_large_attr *attr = &rule->dl_attr;
1084
1085                 spin_lock(&rule->dl_lock);
1086
1087                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1088                 if (attr->u.delay.la_rate != 0) {
1089                         rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
1090                 } else {
1091                         rule->dl_delay_time = ktime_get_seconds() +
1092                                               get_random_u32_below(attr->u.delay.la_interval);
1093                         rule->dl_time_base = ktime_get_seconds() +
1094                                              attr->u.delay.la_interval;
1095                 }
1096                 spin_unlock(&rule->dl_lock);
1097         }
1098
1099         lnet_net_unlock(cpt);
1100         EXIT;
1101 }
1102
1103 int
1104 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1105 {
1106         struct lnet_fault_attr *attr4;
1107         struct lnet_fault_stat *stat;
1108         struct lnet_fault_large_attr attr = { { 0 } };
1109         int rc;
1110
1111         attr4 = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1112
1113         lnet_fault_attr4_to_attr(attr4, &attr);
1114
1115         switch (opc) {
1116         default:
1117                 return -EINVAL;
1118
1119         case LNET_CTL_DROP_ADD:
1120                 if (!attr4)
1121                         return -EINVAL;
1122
1123                 return lnet_drop_rule_add(&attr);
1124
1125         case LNET_CTL_DROP_DEL:
1126                 if (!attr4)
1127                         return -EINVAL;
1128
1129                 data->ioc_count = lnet_drop_rule_del(&attr.fa_src,
1130                                                      &attr.fa_dst);
1131                 return 0;
1132
1133         case LNET_CTL_DROP_RESET:
1134                 lnet_drop_rule_reset();
1135                 return 0;
1136
1137         case LNET_CTL_DROP_LIST:
1138                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1139                 if (!attr4 || !stat)
1140                         return -EINVAL;
1141
1142                 rc = lnet_drop_rule_list(data->ioc_count, &attr, stat);
1143                 lnet_fault_attr_to_attr4(&attr, attr4);
1144                 return rc;
1145
1146         case LNET_CTL_DELAY_ADD:
1147                 if (!attr4)
1148                         return -EINVAL;
1149
1150                 return lnet_delay_rule_add(&attr);
1151
1152         case LNET_CTL_DELAY_DEL:
1153                 if (!attr4)
1154                         return -EINVAL;
1155
1156                 data->ioc_count = lnet_delay_rule_del(&attr.fa_src,
1157                                                       &attr.fa_dst, false);
1158                 return 0;
1159
1160         case LNET_CTL_DELAY_RESET:
1161                 lnet_delay_rule_reset();
1162                 return 0;
1163
1164         case LNET_CTL_DELAY_LIST:
1165                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1166                 if (!attr4 || !stat)
1167                         return -EINVAL;
1168
1169                 rc = lnet_delay_rule_list(data->ioc_count, &attr, stat);
1170                 lnet_fault_attr_to_attr4(&attr, attr4);
1171                 return rc;
1172         }
1173 }
1174
1175 int
1176 lnet_fault_init(void)
1177 {
1178         BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1179         BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1180         BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1181         BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1182
1183         mutex_init(&delay_dd.dd_mutex);
1184         spin_lock_init(&delay_dd.dd_lock);
1185         init_waitqueue_head(&delay_dd.dd_waitq);
1186         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1187         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1188
1189         return 0;
1190 }
1191
1192 void
1193 lnet_fault_fini(void)
1194 {
1195         lnet_drop_rule_del(NULL, NULL);
1196         lnet_delay_rule_del(NULL, NULL, true);
1197
1198         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1199         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1200         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1201 }