Whamcloud - gitweb
LU-6142 lnet: SPDX for lnet/lnet/
[fs/lustre-release.git] / lnet / lnet / net_fault.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /* Copyright (c) 2014, 2017, Intel Corporation. */
4
5 /* This file is part of Lustre, http://www.lustre.org/
6  *
7  * Lustre network fault simulation
8  *
9  * Author: liang.zhen@intel.com
10  */
11
12 #define DEBUG_SUBSYSTEM S_LNET
13
14 #include <linux/random.h>
15 #include <lnet/lib-lnet.h>
16 #include <uapi/linux/lnet/lnetctl.h>
17
18 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
19                                  LNET_GET_BIT | LNET_REPLY_BIT)
20
21 struct lnet_drop_rule {
22         /** link chain on the_lnet.ln_drop_rules */
23         struct list_head                dr_link;
24         /** attributes of this rule */
25         struct lnet_fault_large_attr    dr_attr;
26         /** lock to protect \a dr_drop_at and \a dr_stat */
27         spinlock_t                      dr_lock;
28         /**
29          * the message sequence to drop, which means message is dropped when
30          * dr_stat.drs_count == dr_drop_at
31          */
32         unsigned long                   dr_drop_at;
33         /**
34          * seconds to drop the next message, it's exclusive with dr_drop_at
35          */
36         time64_t                        dr_drop_time;
37         /** baseline to caculate dr_drop_time */
38         time64_t                        dr_time_base;
39         /** statistic of dropped messages */
40         struct lnet_fault_stat          dr_stat;
41 };
42
43 static void
44 lnet_fault_attr_to_attr4(struct lnet_fault_large_attr *attr,
45                          struct lnet_fault_attr *attr4)
46 {
47         if (!attr)
48                 return;
49
50         attr4->fa_src = lnet_nid_to_nid4(&attr->fa_src);
51         attr4->fa_dst = lnet_nid_to_nid4(&attr->fa_dst);
52         attr4->fa_local_nid = lnet_nid_to_nid4(&attr->fa_local_nid);
53         attr4->fa_ptl_mask = attr->fa_ptl_mask;
54         attr4->fa_msg_mask = attr->fa_msg_mask;
55
56         memcpy(&attr4->u, &attr->u, sizeof(attr4->u));
57 }
58
59 static void
60 lnet_fault_attr4_to_attr(struct lnet_fault_attr *attr4,
61                          struct lnet_fault_large_attr *attr)
62 {
63         if (!attr4)
64                 return;
65
66         if (attr4->fa_src)
67                 lnet_nid4_to_nid(attr4->fa_src, &attr->fa_src);
68         else
69                 attr->fa_src = LNET_ANY_NID;
70
71         if (attr4->fa_dst)
72                 lnet_nid4_to_nid(attr4->fa_dst, &attr->fa_dst);
73         else
74                 attr->fa_dst = LNET_ANY_NID;
75
76         if (attr4->fa_local_nid)
77                 lnet_nid4_to_nid(attr4->fa_local_nid, &attr->fa_local_nid);
78         else
79                 attr->fa_local_nid = LNET_ANY_NID;
80
81         attr->fa_ptl_mask = attr4->fa_ptl_mask;
82         attr->fa_msg_mask = attr4->fa_msg_mask;
83
84         memcpy(&attr->u, &attr4->u, sizeof(attr->u));
85 }
86
87 static bool
88 lnet_fault_nid_match(struct lnet_nid *nid, struct lnet_nid *msg_nid)
89 {
90         if (LNET_NID_IS_ANY(nid))
91                 return true;
92         if (!msg_nid)
93                 return false;
94         if (nid_same(msg_nid, nid))
95                 return true;
96
97         if (LNET_NID_NET(nid) != LNET_NID_NET(msg_nid))
98                 return false;
99
100         /* 255.255.255.255@net is wildcard for all addresses in a network */
101         return __be32_to_cpu(nid->nid_addr[0]) == LNET_NIDADDR(LNET_NID_ANY);
102 }
103
104 static bool
105 lnet_fault_attr_match(struct lnet_fault_large_attr *attr,
106                       struct lnet_nid *src,
107                       struct lnet_nid *local_nid,
108                       struct lnet_nid *dst,
109                       unsigned int type, unsigned int portal)
110 {
111         if (!lnet_fault_nid_match(&attr->fa_src, src) ||
112             !lnet_fault_nid_match(&attr->fa_dst, dst) ||
113             !lnet_fault_nid_match(&attr->fa_local_nid, local_nid))
114                 return false;
115
116         if (!(attr->fa_msg_mask & BIT(type)))
117                 return false;
118
119         /* NB: ACK and REPLY have no portal, but they should have been
120          * rejected by message mask */
121         if (attr->fa_ptl_mask != 0 && /* has portal filter */
122             !(attr->fa_ptl_mask & (1ULL << portal)))
123                 return false;
124
125         return true;
126 }
127
128 static int
129 lnet_fault_attr_validate(struct lnet_fault_large_attr *attr)
130 {
131         if (attr->fa_msg_mask == 0)
132                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
133
134         if (attr->fa_ptl_mask == 0) /* no portal filter */
135                 return 0;
136
137         /* NB: only PUT and GET can be filtered if portal filter has been set */
138         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
139         if (attr->fa_msg_mask == 0) {
140                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
141                        attr->fa_msg_mask);
142                 return -EINVAL;
143         }
144         return 0;
145 }
146
147 static void
148 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
149 {
150         /* NB: fs_counter is NOT updated by this function */
151         switch (type) {
152         case LNET_MSG_PUT:
153                 stat->fs_put++;
154                 return;
155         case LNET_MSG_ACK:
156                 stat->fs_ack++;
157                 return;
158         case LNET_MSG_GET:
159                 stat->fs_get++;
160                 return;
161         case LNET_MSG_REPLY:
162                 stat->fs_reply++;
163                 return;
164         }
165 }
166
167 /**
168  * LNet message drop simulation
169  */
170
171 /**
172  * Add a new drop rule to LNet
173  * There is no check for duplicated drop rule, all rules will be checked for
174  * incoming message.
175  */
176 static int
177 lnet_drop_rule_add(struct lnet_fault_large_attr *attr)
178 {
179         struct lnet_drop_rule *rule;
180         ENTRY;
181
182         if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
183                 CDEBUG(D_NET,
184                        "please provide either drop rate or drop interval, "
185                        "but not both at the same time %d/%d\n",
186                        attr->u.drop.da_rate, attr->u.drop.da_interval);
187                 RETURN(-EINVAL);
188         }
189
190         if (lnet_fault_attr_validate(attr) != 0)
191                 RETURN(-EINVAL);
192
193         CFS_ALLOC_PTR(rule);
194         if (rule == NULL)
195                 RETURN(-ENOMEM);
196
197         spin_lock_init(&rule->dr_lock);
198
199         rule->dr_attr = *attr;
200         if (attr->u.drop.da_interval != 0) {
201                 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
202                 rule->dr_drop_time = ktime_get_seconds() +
203                                      get_random_u32_below(attr->u.drop.da_interval);
204         } else {
205                 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
206         }
207
208         lnet_net_lock(LNET_LOCK_EX);
209         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
210         lnet_net_unlock(LNET_LOCK_EX);
211
212         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
213                libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
214                attr->u.drop.da_rate, attr->u.drop.da_interval);
215         RETURN(0);
216 }
217
218 /**
219  * Remove matched drop rules from lnet, all rules that can match \a src and
220  * \a dst will be removed.
221  * If \a src is zero, then all rules have \a dst as destination will be remove
222  * If \a dst is zero, then all rules have \a src as source will be removed
223  * If both of them are zero, all rules will be removed
224  */
225 static int
226 lnet_drop_rule_del(struct lnet_nid *src, struct lnet_nid *dst)
227 {
228         struct lnet_drop_rule *rule;
229         struct lnet_drop_rule *tmp;
230         LIST_HEAD(zombies);
231         int n = 0;
232         ENTRY;
233
234         lnet_net_lock(LNET_LOCK_EX);
235         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
236                 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dr_attr.fa_src, src)))
237                         continue;
238
239                 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dr_attr.fa_dst, dst)))
240                         continue;
241
242                 list_move(&rule->dr_link, &zombies);
243         }
244         lnet_net_unlock(LNET_LOCK_EX);
245
246         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
247                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
248                        libcfs_nidstr(&rule->dr_attr.fa_src),
249                        libcfs_nidstr(&rule->dr_attr.fa_dst),
250                        rule->dr_attr.u.drop.da_rate,
251                        rule->dr_attr.u.drop.da_interval);
252
253                 list_del(&rule->dr_link);
254                 CFS_FREE_PTR(rule);
255                 n++;
256         }
257
258         RETURN(n);
259 }
260
261 /**
262  * List drop rule at position of \a pos
263  */
264 static int
265 lnet_drop_rule_list(int pos, struct lnet_fault_large_attr *attr,
266                     struct lnet_fault_stat *stat)
267 {
268         struct lnet_drop_rule *rule;
269         int                    cpt;
270         int                    i = 0;
271         int                    rc = -ENOENT;
272         ENTRY;
273
274         cpt = lnet_net_lock_current();
275         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
276                 if (i++ < pos)
277                         continue;
278
279                 spin_lock(&rule->dr_lock);
280                 *attr = rule->dr_attr;
281                 *stat = rule->dr_stat;
282                 spin_unlock(&rule->dr_lock);
283                 rc = 0;
284                 break;
285         }
286
287         lnet_net_unlock(cpt);
288         RETURN(rc);
289 }
290
291 /**
292  * reset counters for all drop rules
293  */
294 static void
295 lnet_drop_rule_reset(void)
296 {
297         struct lnet_drop_rule *rule;
298         int                    cpt;
299         ENTRY;
300
301         cpt = lnet_net_lock_current();
302
303         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
304                 struct lnet_fault_large_attr *attr = &rule->dr_attr;
305
306                 spin_lock(&rule->dr_lock);
307
308                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
309                 if (attr->u.drop.da_rate != 0) {
310                         rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
311                 } else {
312                         rule->dr_drop_time = ktime_get_seconds() +
313                                              get_random_u32_below(attr->u.drop.da_interval);
314                         rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
315                 }
316                 spin_unlock(&rule->dr_lock);
317         }
318
319         lnet_net_unlock(cpt);
320         EXIT;
321 }
322
323 static void
324 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
325 {
326         int choice;
327         int delta;
328         int best_delta;
329         int i;
330
331         /* assign a random failure */
332         choice = get_random_u32_below(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
333         if (choice == 0)
334                 choice++;
335
336         if (mask == HSTATUS_RANDOM) {
337                 *hstatus = choice;
338                 return;
339         }
340
341         if (mask & BIT(choice)) {
342                 *hstatus = choice;
343                 return;
344         }
345
346         /* round to the closest ON bit */
347         i = HSTATUS_END;
348         best_delta = HSTATUS_END;
349         while (i > 0) {
350                 if (mask & BIT(i)) {
351                         delta = choice - i;
352                         if (delta < 0)
353                                 delta *= -1;
354                         if (delta < best_delta) {
355                                 best_delta = delta;
356                                 choice = i;
357                         }
358                 }
359                 i--;
360         }
361
362         *hstatus = choice;
363 }
364
365 /**
366  * check source/destination NID, portal, message type and drop rate,
367  * decide whether should drop this message or not
368  */
369 static bool
370 drop_rule_match(struct lnet_drop_rule *rule,
371                 struct lnet_nid *src,
372                 struct lnet_nid *local_nid,
373                 struct lnet_nid *dst,
374                 unsigned int type, unsigned int portal,
375                 enum lnet_msg_hstatus *hstatus)
376 {
377         struct lnet_fault_large_attr *attr = &rule->dr_attr;
378         bool drop;
379
380         if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
381                 return false;
382
383         if (attr->u.drop.da_drop_all) {
384                 CDEBUG(D_NET, "set to drop all messages\n");
385                 drop = true;
386                 goto drop_matched;
387         }
388
389         /*
390          * if we're trying to match a health status error but it hasn't
391          * been set in the rule, then don't match
392          */
393         if ((hstatus && !attr->u.drop.da_health_error_mask) ||
394             (!hstatus && attr->u.drop.da_health_error_mask))
395                 return false;
396
397         /* match this rule, check drop rate now */
398         spin_lock(&rule->dr_lock);
399         if (attr->u.drop.da_random) {
400                 int value = get_random_u32_below(attr->u.drop.da_interval);
401                 if (value >= (attr->u.drop.da_interval / 2))
402                         drop = true;
403                 else
404                         drop = false;
405         } else if (rule->dr_drop_time != 0) { /* time based drop */
406                 time64_t now = ktime_get_seconds();
407
408                 rule->dr_stat.fs_count++;
409                 drop = now >= rule->dr_drop_time;
410                 if (drop) {
411                         if (now > rule->dr_time_base)
412                                 rule->dr_time_base = now;
413
414                         rule->dr_drop_time = rule->dr_time_base +
415                                              get_random_u32_below(attr->u.drop.da_interval);
416                         rule->dr_time_base += attr->u.drop.da_interval;
417
418                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
419                                libcfs_nidstr(&attr->fa_src),
420                                libcfs_nidstr(&attr->fa_dst),
421                                rule->dr_drop_time);
422                 }
423
424         } else { /* rate based drop */
425                 __u64 count;
426
427                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
428                 count = rule->dr_stat.fs_count;
429                 if (do_div(count, attr->u.drop.da_rate) == 0) {
430                         rule->dr_drop_at = rule->dr_stat.fs_count +
431                                            get_random_u32_below(attr->u.drop.da_rate);
432                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
433                                libcfs_nidstr(&attr->fa_src),
434                                libcfs_nidstr(&attr->fa_dst), rule->dr_drop_at);
435                 }
436         }
437
438 drop_matched:
439
440         if (drop) { /* drop this message, update counters */
441                 if (hstatus)
442                         lnet_fault_match_health(hstatus,
443                                 attr->u.drop.da_health_error_mask);
444                 lnet_fault_stat_inc(&rule->dr_stat, type);
445                 rule->dr_stat.u.drop.ds_dropped++;
446         }
447
448         spin_unlock(&rule->dr_lock);
449         return drop;
450 }
451
452 /**
453  * Check if message from \a src to \a dst can match any existed drop rule
454  */
455 bool
456 lnet_drop_rule_match(struct lnet_hdr *hdr,
457                      struct lnet_nid *local_nid,
458                      enum lnet_msg_hstatus *hstatus)
459 {
460         unsigned int typ = hdr->type;
461         struct lnet_drop_rule *rule;
462         unsigned int ptl = -1;
463         bool drop = false;
464         int cpt;
465
466         /* NB: if Portal is specified, then only PUT and GET will be
467          * filtered by drop rule */
468         if (typ == LNET_MSG_PUT)
469                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
470         else if (typ == LNET_MSG_GET)
471                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
472
473         cpt = lnet_net_lock_current();
474         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
475                 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
476                                        &hdr->dest_nid, typ, ptl,
477                                        hstatus);
478                 if (drop)
479                         break;
480         }
481         lnet_net_unlock(cpt);
482
483         return drop;
484 }
485
486 /**
487  * LNet Delay Simulation
488  */
489 /** timestamp (second) to send delayed message */
490 #define msg_delay_send           msg_ev.hdr_data
491
492 struct lnet_delay_rule {
493         /** link chain on the_lnet.ln_delay_rules */
494         struct list_head                dl_link;
495         /** link chain on delay_dd.dd_sched_rules */
496         struct list_head                dl_sched_link;
497         /** attributes of this rule */
498         struct lnet_fault_large_attr    dl_attr;
499         /** lock to protect \a below members */
500         spinlock_t                      dl_lock;
501         /** refcount of delay rule */
502         atomic_t                        dl_refcount;
503         /**
504          * the message sequence to delay, which means message is delayed when
505          * dl_stat.fs_count == dl_delay_at
506          */
507         unsigned long                   dl_delay_at;
508         /**
509          * seconds to delay the next message, it's exclusive with dl_delay_at
510          */
511         time64_t                        dl_delay_time;
512         /** baseline to caculate dl_delay_time */
513         time64_t                        dl_time_base;
514         /** seconds until we send the next delayed message */
515         time64_t                        dl_msg_send;
516         /** delayed message list */
517         struct list_head                dl_msg_list;
518         /** statistic of delayed messages */
519         struct lnet_fault_stat          dl_stat;
520         /** timer to wakeup delay_daemon */
521         struct timer_list               dl_timer;
522 };
523
524 struct delay_daemon_data {
525         /** serialise rule add/remove */
526         struct mutex            dd_mutex;
527         /** protect rules on \a dd_sched_rules */
528         spinlock_t              dd_lock;
529         /** scheduled delay rules (by timer) */
530         struct list_head        dd_sched_rules;
531         /** deamon thread sleeps at here */
532         wait_queue_head_t       dd_waitq;
533         /** controler (lctl command) wait at here */
534         wait_queue_head_t       dd_ctl_waitq;
535         /** deamon is running */
536         unsigned int            dd_running;
537         /** deamon stopped */
538         unsigned int            dd_stopped;
539 };
540
541 static struct delay_daemon_data delay_dd;
542
543 static void
544 delay_rule_decref(struct lnet_delay_rule *rule)
545 {
546         if (atomic_dec_and_test(&rule->dl_refcount)) {
547                 LASSERT(list_empty(&rule->dl_sched_link));
548                 LASSERT(list_empty(&rule->dl_msg_list));
549                 LASSERT(list_empty(&rule->dl_link));
550
551                 CFS_FREE_PTR(rule);
552         }
553 }
554
555 /**
556  * check source/destination NID, portal, message type and delay rate,
557  * decide whether should delay this message or not
558  */
559 static bool
560 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
561                  struct lnet_nid *dst, unsigned int type, unsigned int portal,
562                  struct lnet_msg *msg)
563 {
564         struct lnet_fault_large_attr *attr = &rule->dl_attr;
565         bool delay;
566         time64_t now = ktime_get_seconds();
567
568         if (!lnet_fault_attr_match(attr, src, NULL,
569                                    dst, type, portal))
570                 return false;
571
572         /* match this rule, check delay rate now */
573         spin_lock(&rule->dl_lock);
574         if (rule->dl_delay_time != 0) { /* time based delay */
575                 rule->dl_stat.fs_count++;
576                 delay = now >= rule->dl_delay_time;
577                 if (delay) {
578                         if (now > rule->dl_time_base)
579                                 rule->dl_time_base = now;
580
581                         rule->dl_delay_time = rule->dl_time_base +
582                                               get_random_u32_below(attr->u.delay.la_interval);
583                         rule->dl_time_base += attr->u.delay.la_interval;
584
585                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
586                                libcfs_nidstr(&attr->fa_src),
587                                libcfs_nidstr(&attr->fa_dst),
588                                rule->dl_delay_time);
589                 }
590
591         } else { /* rate based delay */
592                 __u64 count;
593
594                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
595                 /* generate the next random rate sequence */
596                 count = rule->dl_stat.fs_count;
597                 if (do_div(count, attr->u.delay.la_rate) == 0) {
598                         rule->dl_delay_at = rule->dl_stat.fs_count +
599                                             get_random_u32_below(attr->u.delay.la_rate);
600                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
601                                libcfs_nidstr(&attr->fa_src),
602                                libcfs_nidstr(&attr->fa_dst), rule->dl_delay_at);
603                 }
604         }
605
606         if (!delay) {
607                 spin_unlock(&rule->dl_lock);
608                 return false;
609         }
610
611         /* delay this message, update counters */
612         lnet_fault_stat_inc(&rule->dl_stat, type);
613         rule->dl_stat.u.delay.ls_delayed++;
614
615         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
616         msg->msg_delay_send = now + attr->u.delay.la_latency;
617         if (rule->dl_msg_send == -1) {
618                 rule->dl_msg_send = msg->msg_delay_send;
619                 mod_timer(&rule->dl_timer,
620                           jiffies + cfs_time_seconds(attr->u.delay.la_latency));
621         }
622
623         spin_unlock(&rule->dl_lock);
624         return true;
625 }
626
627 /**
628  * check if \a msg can match any Delay Rule, receiving of this message
629  * will be delayed if there is a match.
630  */
631 bool
632 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
633 {
634         struct lnet_delay_rule  *rule;
635         unsigned int             typ = hdr->type;
636         unsigned int             ptl = -1;
637
638         /* NB: called with hold of lnet_net_lock */
639
640         /* NB: if Portal is specified, then only PUT and GET will be
641          * filtered by delay rule */
642         if (typ == LNET_MSG_PUT)
643                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
644         else if (typ == LNET_MSG_GET)
645                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
646
647         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
648                 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
649                                      typ, ptl, msg))
650                         return true;
651         }
652
653         return false;
654 }
655
656 /** check out delayed messages for send */
657 static void
658 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
659                   struct list_head *msg_list)
660 {
661         struct lnet_msg *msg;
662         struct lnet_msg *tmp;
663         time64_t now = ktime_get_seconds();
664
665         if (!all && rule->dl_msg_send > now)
666                 return;
667
668         spin_lock(&rule->dl_lock);
669         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
670                 if (!all && msg->msg_delay_send > now)
671                         break;
672
673                 msg->msg_delay_send = 0;
674                 list_move_tail(&msg->msg_list, msg_list);
675         }
676
677         if (list_empty(&rule->dl_msg_list)) {
678                 timer_delete(&rule->dl_timer);
679                 rule->dl_msg_send = -1;
680
681         } else if (!list_empty(msg_list)) {
682                 /* dequeued some timedout messages, update timer for the
683                  * next delayed message on rule */
684                 msg = list_first_entry(&rule->dl_msg_list,
685                                        struct lnet_msg, msg_list);
686                 rule->dl_msg_send = msg->msg_delay_send;
687                 mod_timer(&rule->dl_timer,
688                           jiffies +
689                           cfs_time_seconds(msg->msg_delay_send - now));
690         }
691         spin_unlock(&rule->dl_lock);
692 }
693
694 static void
695 delayed_msg_process(struct list_head *msg_list, bool drop)
696 {
697         struct lnet_msg *msg;
698
699         while ((msg = list_first_entry_or_null(msg_list, struct lnet_msg,
700                                                msg_list)) != NULL) {
701                 struct lnet_ni *ni;
702                 int             cpt;
703                 int             rc;
704
705                 if (msg->msg_sending) {
706                         /* Delayed send */
707                         list_del_init(&msg->msg_list);
708                         ni = msg->msg_txni;
709                         CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
710                                libcfs_nidstr(&ni->ni_nid),
711                                libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
712                                lnet_msgtyp2str(msg->msg_type));
713                         lnet_ni_send(ni, msg);
714                         continue;
715                 }
716
717                 /* Delayed receive */
718                 LASSERT(msg->msg_rxpeer != NULL);
719                 LASSERT(msg->msg_rxni != NULL);
720
721                 ni = msg->msg_rxni;
722                 cpt = msg->msg_rx_cpt;
723
724                 list_del_init(&msg->msg_list);
725                 if (drop) {
726                         rc = -ECANCELED;
727
728                 } else if (!msg->msg_routing) {
729                         rc = lnet_parse_local(ni, msg);
730                         if (rc == 0)
731                                 continue;
732
733                 } else {
734                         lnet_net_lock(cpt);
735                         rc = lnet_parse_forward_locked(ni, msg);
736                         lnet_net_unlock(cpt);
737
738                         switch (rc) {
739                         case LNET_CREDIT_OK:
740                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
741                                              0, msg->msg_len, msg->msg_len);
742                                 fallthrough;
743                         case LNET_CREDIT_WAIT:
744                                 continue;
745                         default: /* failures */
746                                 break;
747                         }
748                 }
749
750                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
751                                   msg->msg_type);
752                 lnet_finalize(msg, rc);
753         }
754 }
755
756 /**
757  * Process delayed messages for scheduled rules
758  * This function can either be called by delay_rule_daemon, or by lnet_finalise
759  */
760 void
761 lnet_delay_rule_check(void)
762 {
763         struct lnet_delay_rule *rule;
764         LIST_HEAD(msgs);
765
766         while (1) {
767                 if (list_empty(&delay_dd.dd_sched_rules))
768                         break;
769
770                 spin_lock_bh(&delay_dd.dd_lock);
771                 if (list_empty(&delay_dd.dd_sched_rules)) {
772                         spin_unlock_bh(&delay_dd.dd_lock);
773                         break;
774                 }
775
776                 rule = list_first_entry(&delay_dd.dd_sched_rules,
777                                         struct lnet_delay_rule, dl_sched_link);
778                 list_del_init(&rule->dl_sched_link);
779                 spin_unlock_bh(&delay_dd.dd_lock);
780
781                 delayed_msg_check(rule, false, &msgs);
782                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
783         }
784
785         if (!list_empty(&msgs))
786                 delayed_msg_process(&msgs, false);
787 }
788
789 /** deamon thread to handle delayed messages */
790 static int
791 lnet_delay_rule_daemon(void *arg)
792 {
793         delay_dd.dd_running = 1;
794         wake_up(&delay_dd.dd_ctl_waitq);
795
796         while (delay_dd.dd_running) {
797                 wait_event_interruptible(delay_dd.dd_waitq,
798                                          !delay_dd.dd_running ||
799                                          !list_empty(&delay_dd.dd_sched_rules));
800                 lnet_delay_rule_check();
801         }
802
803         /* in case more rules have been enqueued after my last check */
804         lnet_delay_rule_check();
805         delay_dd.dd_stopped = 1;
806         wake_up(&delay_dd.dd_ctl_waitq);
807
808         return 0;
809 }
810
811 static void
812 delay_timer_cb(cfs_timer_cb_arg_t data)
813 {
814         struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
815
816         spin_lock_bh(&delay_dd.dd_lock);
817         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
818                 atomic_inc(&rule->dl_refcount);
819                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
820                 wake_up(&delay_dd.dd_waitq);
821         }
822         spin_unlock_bh(&delay_dd.dd_lock);
823 }
824
825 /**
826  * Add a new delay rule to LNet
827  * There is no check for duplicated delay rule, all rules will be checked for
828  * incoming message.
829  */
830 int
831 lnet_delay_rule_add(struct lnet_fault_large_attr *attr)
832 {
833         struct lnet_delay_rule *rule;
834         int rc = 0;
835         ENTRY;
836
837         if (!((attr->u.delay.la_rate == 0) ^
838               (attr->u.delay.la_interval == 0))) {
839                 CDEBUG(D_NET,
840                        "please provide either delay rate or delay interval, "
841                        "but not both at the same time %d/%d\n",
842                        attr->u.delay.la_rate, attr->u.delay.la_interval);
843                 RETURN(-EINVAL);
844         }
845
846         if (attr->u.delay.la_latency == 0) {
847                 CDEBUG(D_NET, "delay latency cannot be zero\n");
848                 RETURN(-EINVAL);
849         }
850
851         if (lnet_fault_attr_validate(attr) != 0)
852                 RETURN(-EINVAL);
853
854         CFS_ALLOC_PTR(rule);
855         if (rule == NULL)
856                 RETURN(-ENOMEM);
857
858         mutex_lock(&delay_dd.dd_mutex);
859         if (!delay_dd.dd_running) {
860                 struct task_struct *task;
861
862                 /* NB: although LND threads will process delayed message
863                  * in lnet_finalize, but there is no guarantee that LND
864                  * threads will be waken up if no other message needs to
865                  * be handled.
866                  * Only one daemon thread, performance is not the concern
867                  * of this simualation module.
868                  */
869                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
870                 if (IS_ERR(task)) {
871                         rc = PTR_ERR(task);
872                         GOTO(failed, rc);
873                 }
874                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
875         }
876
877         cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
878                         (unsigned long)rule, 0);
879
880         spin_lock_init(&rule->dl_lock);
881         INIT_LIST_HEAD(&rule->dl_msg_list);
882         INIT_LIST_HEAD(&rule->dl_sched_link);
883
884         rule->dl_attr = *attr;
885         if (attr->u.delay.la_interval != 0) {
886                 rule->dl_time_base = ktime_get_seconds() +
887                                      attr->u.delay.la_interval;
888                 rule->dl_delay_time = ktime_get_seconds() +
889                                       get_random_u32_below(attr->u.delay.la_interval);
890         } else {
891                 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
892         }
893
894         rule->dl_msg_send = -1;
895
896         lnet_net_lock(LNET_LOCK_EX);
897         atomic_set(&rule->dl_refcount, 1);
898         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
899         lnet_net_unlock(LNET_LOCK_EX);
900
901         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
902                libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
903                attr->u.delay.la_rate);
904
905         mutex_unlock(&delay_dd.dd_mutex);
906         RETURN(0);
907  failed:
908         mutex_unlock(&delay_dd.dd_mutex);
909         CFS_FREE_PTR(rule);
910         return rc;
911 }
912
913 /**
914  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
915  * and \a dst are zero, all rules will be removed, otherwise only matched rules
916  * will be removed.
917  * If \a src is zero, then all rules have \a dst as destination will be remove
918  * If \a dst is zero, then all rules have \a src as source will be removed
919  *
920  * When a delay rule is removed, all delayed messages of this rule will be
921  * processed immediately.
922  */
923 int
924 lnet_delay_rule_del(struct lnet_nid *src, struct lnet_nid *dst, bool shutdown)
925 {
926         struct lnet_delay_rule *rule;
927         struct lnet_delay_rule *tmp;
928         LIST_HEAD(rule_list);
929         LIST_HEAD(msg_list);
930         int n = 0;
931         bool cleanup;
932         ENTRY;
933
934         if (shutdown)
935                 src = dst = 0;
936
937         mutex_lock(&delay_dd.dd_mutex);
938         lnet_net_lock(LNET_LOCK_EX);
939
940         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
941                 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dl_attr.fa_src, src)))
942                         continue;
943
944                 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dl_attr.fa_dst, dst)))
945                         continue;
946
947                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
948                        libcfs_nidstr(&rule->dl_attr.fa_src),
949                        libcfs_nidstr(&rule->dl_attr.fa_dst),
950                        rule->dl_attr.u.delay.la_rate,
951                        rule->dl_attr.u.delay.la_interval);
952                 /* refcount is taken over by rule_list */
953                 list_move(&rule->dl_link, &rule_list);
954         }
955
956         /* check if we need to shutdown delay_daemon */
957         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
958                   !list_empty(&rule_list);
959         lnet_net_unlock(LNET_LOCK_EX);
960
961         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
962                 list_del_init(&rule->dl_link);
963
964                 timer_delete_sync(&rule->dl_timer);
965                 delayed_msg_check(rule, true, &msg_list);
966                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
967                 n++;
968         }
969
970         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
971                 LASSERT(delay_dd.dd_running);
972                 delay_dd.dd_running = 0;
973                 wake_up(&delay_dd.dd_waitq);
974
975                 while (!delay_dd.dd_stopped)
976                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
977         }
978         mutex_unlock(&delay_dd.dd_mutex);
979
980         if (!list_empty(&msg_list))
981                 delayed_msg_process(&msg_list, shutdown);
982
983         RETURN(n);
984 }
985
986 /**
987  * List Delay Rule at position of \a pos
988  */
989 int
990 lnet_delay_rule_list(int pos, struct lnet_fault_large_attr *attr,
991                     struct lnet_fault_stat *stat)
992 {
993         struct lnet_delay_rule *rule;
994         int                     cpt;
995         int                     i = 0;
996         int                     rc = -ENOENT;
997         ENTRY;
998
999         cpt = lnet_net_lock_current();
1000         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1001                 if (i++ < pos)
1002                         continue;
1003
1004                 spin_lock(&rule->dl_lock);
1005                 *attr = rule->dl_attr;
1006                 *stat = rule->dl_stat;
1007                 spin_unlock(&rule->dl_lock);
1008                 rc = 0;
1009                 break;
1010         }
1011
1012         lnet_net_unlock(cpt);
1013         RETURN(rc);
1014 }
1015
1016 /**
1017  * reset counters for all Delay Rules
1018  */
1019 void
1020 lnet_delay_rule_reset(void)
1021 {
1022         struct lnet_delay_rule *rule;
1023         int                     cpt;
1024         ENTRY;
1025
1026         cpt = lnet_net_lock_current();
1027
1028         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1029                 struct lnet_fault_large_attr *attr = &rule->dl_attr;
1030
1031                 spin_lock(&rule->dl_lock);
1032
1033                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1034                 if (attr->u.delay.la_rate != 0) {
1035                         rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
1036                 } else {
1037                         rule->dl_delay_time = ktime_get_seconds() +
1038                                               get_random_u32_below(attr->u.delay.la_interval);
1039                         rule->dl_time_base = ktime_get_seconds() +
1040                                              attr->u.delay.la_interval;
1041                 }
1042                 spin_unlock(&rule->dl_lock);
1043         }
1044
1045         lnet_net_unlock(cpt);
1046         EXIT;
1047 }
1048
1049 int
1050 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1051 {
1052         struct lnet_fault_attr *attr4;
1053         struct lnet_fault_stat *stat;
1054         struct lnet_fault_large_attr attr = { { 0 } };
1055         int rc;
1056
1057         attr4 = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1058
1059         lnet_fault_attr4_to_attr(attr4, &attr);
1060
1061         switch (opc) {
1062         default:
1063                 return -EINVAL;
1064
1065         case LNET_CTL_DROP_ADD:
1066                 if (!attr4)
1067                         return -EINVAL;
1068
1069                 return lnet_drop_rule_add(&attr);
1070
1071         case LNET_CTL_DROP_DEL:
1072                 if (!attr4)
1073                         return -EINVAL;
1074
1075                 data->ioc_count = lnet_drop_rule_del(&attr.fa_src,
1076                                                      &attr.fa_dst);
1077                 return 0;
1078
1079         case LNET_CTL_DROP_RESET:
1080                 lnet_drop_rule_reset();
1081                 return 0;
1082
1083         case LNET_CTL_DROP_LIST:
1084                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1085                 if (!attr4 || !stat)
1086                         return -EINVAL;
1087
1088                 rc = lnet_drop_rule_list(data->ioc_count, &attr, stat);
1089                 lnet_fault_attr_to_attr4(&attr, attr4);
1090                 return rc;
1091
1092         case LNET_CTL_DELAY_ADD:
1093                 if (!attr4)
1094                         return -EINVAL;
1095
1096                 return lnet_delay_rule_add(&attr);
1097
1098         case LNET_CTL_DELAY_DEL:
1099                 if (!attr4)
1100                         return -EINVAL;
1101
1102                 data->ioc_count = lnet_delay_rule_del(&attr.fa_src,
1103                                                       &attr.fa_dst, false);
1104                 return 0;
1105
1106         case LNET_CTL_DELAY_RESET:
1107                 lnet_delay_rule_reset();
1108                 return 0;
1109
1110         case LNET_CTL_DELAY_LIST:
1111                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1112                 if (!attr4 || !stat)
1113                         return -EINVAL;
1114
1115                 rc = lnet_delay_rule_list(data->ioc_count, &attr, stat);
1116                 lnet_fault_attr_to_attr4(&attr, attr4);
1117                 return rc;
1118         }
1119 }
1120
1121 int
1122 lnet_fault_init(void)
1123 {
1124         BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1125         BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1126         BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1127         BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1128
1129         mutex_init(&delay_dd.dd_mutex);
1130         spin_lock_init(&delay_dd.dd_lock);
1131         init_waitqueue_head(&delay_dd.dd_waitq);
1132         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1133         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1134
1135         return 0;
1136 }
1137
1138 void
1139 lnet_fault_fini(void)
1140 {
1141         lnet_drop_rule_del(NULL, NULL);
1142         lnet_delay_rule_del(NULL, NULL, true);
1143
1144         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1145         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1146         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1147 }