Whamcloud - gitweb
f66a6dfb9e4740d8e49570590375e76268f81138
[fs/lustre-release.git] / lnet / lnet / net_fault.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2014, 2017, Intel Corporation.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  *
29  * lnet/lnet/net_fault.c
30  *
31  * Lustre network fault simulation
32  *
33  * Author: liang.zhen@intel.com
34  */
35
36 #define DEBUG_SUBSYSTEM S_LNET
37
38 #include <linux/random.h>
39 #include <lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
41
42 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
43                                  LNET_GET_BIT | LNET_REPLY_BIT)
44
45 struct lnet_drop_rule {
46         /** link chain on the_lnet.ln_drop_rules */
47         struct list_head                dr_link;
48         /** attributes of this rule */
49         struct lnet_fault_large_attr    dr_attr;
50         /** lock to protect \a dr_drop_at and \a dr_stat */
51         spinlock_t                      dr_lock;
52         /**
53          * the message sequence to drop, which means message is dropped when
54          * dr_stat.drs_count == dr_drop_at
55          */
56         unsigned long                   dr_drop_at;
57         /**
58          * seconds to drop the next message, it's exclusive with dr_drop_at
59          */
60         time64_t                        dr_drop_time;
61         /** baseline to caculate dr_drop_time */
62         time64_t                        dr_time_base;
63         /** statistic of dropped messages */
64         struct lnet_fault_stat          dr_stat;
65 };
66
67 static void
68 lnet_fault_attr_to_attr4(struct lnet_fault_large_attr *attr,
69                          struct lnet_fault_attr *attr4)
70 {
71         if (!attr)
72                 return;
73
74         attr4->fa_src = lnet_nid_to_nid4(&attr->fa_src);
75         attr4->fa_dst = lnet_nid_to_nid4(&attr->fa_dst);
76         attr4->fa_local_nid = lnet_nid_to_nid4(&attr->fa_local_nid);
77         attr4->fa_ptl_mask = attr->fa_ptl_mask;
78         attr4->fa_msg_mask = attr->fa_msg_mask;
79
80         memcpy(&attr4->u, &attr->u, sizeof(attr4->u));
81 }
82
83 static void
84 lnet_fault_attr4_to_attr(struct lnet_fault_attr *attr4,
85                          struct lnet_fault_large_attr *attr)
86 {
87         if (!attr4)
88                 return;
89
90         if (attr4->fa_src)
91                 lnet_nid4_to_nid(attr4->fa_src, &attr->fa_src);
92         else
93                 attr->fa_src = LNET_ANY_NID;
94
95         if (attr4->fa_dst)
96                 lnet_nid4_to_nid(attr4->fa_dst, &attr->fa_dst);
97         else
98                 attr->fa_dst = LNET_ANY_NID;
99
100         if (attr4->fa_local_nid)
101                 lnet_nid4_to_nid(attr4->fa_local_nid, &attr->fa_local_nid);
102         else
103                 attr->fa_local_nid = LNET_ANY_NID;
104
105         attr->fa_ptl_mask = attr4->fa_ptl_mask;
106         attr->fa_msg_mask = attr4->fa_msg_mask;
107
108         memcpy(&attr->u, &attr4->u, sizeof(attr->u));
109 }
110
111 static bool
112 lnet_fault_nid_match(struct lnet_nid *nid, struct lnet_nid *msg_nid)
113 {
114         if (LNET_NID_IS_ANY(nid))
115                 return true;
116         if (!msg_nid)
117                 return false;
118         if (nid_same(msg_nid, nid))
119                 return true;
120
121         if (LNET_NID_NET(nid) != LNET_NID_NET(msg_nid))
122                 return false;
123
124         /* 255.255.255.255@net is wildcard for all addresses in a network */
125         return __be32_to_cpu(nid->nid_addr[0]) == LNET_NIDADDR(LNET_NID_ANY);
126 }
127
128 static bool
129 lnet_fault_attr_match(struct lnet_fault_large_attr *attr,
130                       struct lnet_nid *src,
131                       struct lnet_nid *local_nid,
132                       struct lnet_nid *dst,
133                       unsigned int type, unsigned int portal)
134 {
135         if (!lnet_fault_nid_match(&attr->fa_src, src) ||
136             !lnet_fault_nid_match(&attr->fa_dst, dst) ||
137             !lnet_fault_nid_match(&attr->fa_local_nid, local_nid))
138                 return false;
139
140         if (!(attr->fa_msg_mask & BIT(type)))
141                 return false;
142
143         /* NB: ACK and REPLY have no portal, but they should have been
144          * rejected by message mask */
145         if (attr->fa_ptl_mask != 0 && /* has portal filter */
146             !(attr->fa_ptl_mask & (1ULL << portal)))
147                 return false;
148
149         return true;
150 }
151
152 static int
153 lnet_fault_attr_validate(struct lnet_fault_large_attr *attr)
154 {
155         if (attr->fa_msg_mask == 0)
156                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
157
158         if (attr->fa_ptl_mask == 0) /* no portal filter */
159                 return 0;
160
161         /* NB: only PUT and GET can be filtered if portal filter has been set */
162         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
163         if (attr->fa_msg_mask == 0) {
164                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
165                        attr->fa_msg_mask);
166                 return -EINVAL;
167         }
168         return 0;
169 }
170
171 static void
172 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
173 {
174         /* NB: fs_counter is NOT updated by this function */
175         switch (type) {
176         case LNET_MSG_PUT:
177                 stat->fs_put++;
178                 return;
179         case LNET_MSG_ACK:
180                 stat->fs_ack++;
181                 return;
182         case LNET_MSG_GET:
183                 stat->fs_get++;
184                 return;
185         case LNET_MSG_REPLY:
186                 stat->fs_reply++;
187                 return;
188         }
189 }
190
191 /**
192  * LNet message drop simulation
193  */
194
195 /**
196  * Add a new drop rule to LNet
197  * There is no check for duplicated drop rule, all rules will be checked for
198  * incoming message.
199  */
200 static int
201 lnet_drop_rule_add(struct lnet_fault_large_attr *attr)
202 {
203         struct lnet_drop_rule *rule;
204         ENTRY;
205
206         if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
207                 CDEBUG(D_NET,
208                        "please provide either drop rate or drop interval, "
209                        "but not both at the same time %d/%d\n",
210                        attr->u.drop.da_rate, attr->u.drop.da_interval);
211                 RETURN(-EINVAL);
212         }
213
214         if (lnet_fault_attr_validate(attr) != 0)
215                 RETURN(-EINVAL);
216
217         CFS_ALLOC_PTR(rule);
218         if (rule == NULL)
219                 RETURN(-ENOMEM);
220
221         spin_lock_init(&rule->dr_lock);
222
223         rule->dr_attr = *attr;
224         if (attr->u.drop.da_interval != 0) {
225                 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
226                 rule->dr_drop_time = ktime_get_seconds() +
227                                      get_random_u32_below(attr->u.drop.da_interval);
228         } else {
229                 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
230         }
231
232         lnet_net_lock(LNET_LOCK_EX);
233         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
234         lnet_net_unlock(LNET_LOCK_EX);
235
236         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
237                libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
238                attr->u.drop.da_rate, attr->u.drop.da_interval);
239         RETURN(0);
240 }
241
242 /**
243  * Remove matched drop rules from lnet, all rules that can match \a src and
244  * \a dst will be removed.
245  * If \a src is zero, then all rules have \a dst as destination will be remove
246  * If \a dst is zero, then all rules have \a src as source will be removed
247  * If both of them are zero, all rules will be removed
248  */
249 static int
250 lnet_drop_rule_del(struct lnet_nid *src, struct lnet_nid *dst)
251 {
252         struct lnet_drop_rule *rule;
253         struct lnet_drop_rule *tmp;
254         LIST_HEAD(zombies);
255         int n = 0;
256         ENTRY;
257
258         lnet_net_lock(LNET_LOCK_EX);
259         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
260                 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dr_attr.fa_src, src)))
261                         continue;
262
263                 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dr_attr.fa_dst, dst)))
264                         continue;
265
266                 list_move(&rule->dr_link, &zombies);
267         }
268         lnet_net_unlock(LNET_LOCK_EX);
269
270         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
271                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
272                        libcfs_nidstr(&rule->dr_attr.fa_src),
273                        libcfs_nidstr(&rule->dr_attr.fa_dst),
274                        rule->dr_attr.u.drop.da_rate,
275                        rule->dr_attr.u.drop.da_interval);
276
277                 list_del(&rule->dr_link);
278                 CFS_FREE_PTR(rule);
279                 n++;
280         }
281
282         RETURN(n);
283 }
284
285 /**
286  * List drop rule at position of \a pos
287  */
288 static int
289 lnet_drop_rule_list(int pos, struct lnet_fault_large_attr *attr,
290                     struct lnet_fault_stat *stat)
291 {
292         struct lnet_drop_rule *rule;
293         int                    cpt;
294         int                    i = 0;
295         int                    rc = -ENOENT;
296         ENTRY;
297
298         cpt = lnet_net_lock_current();
299         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
300                 if (i++ < pos)
301                         continue;
302
303                 spin_lock(&rule->dr_lock);
304                 *attr = rule->dr_attr;
305                 *stat = rule->dr_stat;
306                 spin_unlock(&rule->dr_lock);
307                 rc = 0;
308                 break;
309         }
310
311         lnet_net_unlock(cpt);
312         RETURN(rc);
313 }
314
315 /**
316  * reset counters for all drop rules
317  */
318 static void
319 lnet_drop_rule_reset(void)
320 {
321         struct lnet_drop_rule *rule;
322         int                    cpt;
323         ENTRY;
324
325         cpt = lnet_net_lock_current();
326
327         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
328                 struct lnet_fault_large_attr *attr = &rule->dr_attr;
329
330                 spin_lock(&rule->dr_lock);
331
332                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
333                 if (attr->u.drop.da_rate != 0) {
334                         rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
335                 } else {
336                         rule->dr_drop_time = ktime_get_seconds() +
337                                              get_random_u32_below(attr->u.drop.da_interval);
338                         rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
339                 }
340                 spin_unlock(&rule->dr_lock);
341         }
342
343         lnet_net_unlock(cpt);
344         EXIT;
345 }
346
347 static void
348 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
349 {
350         int choice;
351         int delta;
352         int best_delta;
353         int i;
354
355         /* assign a random failure */
356         choice = get_random_u32_below(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
357         if (choice == 0)
358                 choice++;
359
360         if (mask == HSTATUS_RANDOM) {
361                 *hstatus = choice;
362                 return;
363         }
364
365         if (mask & BIT(choice)) {
366                 *hstatus = choice;
367                 return;
368         }
369
370         /* round to the closest ON bit */
371         i = HSTATUS_END;
372         best_delta = HSTATUS_END;
373         while (i > 0) {
374                 if (mask & BIT(i)) {
375                         delta = choice - i;
376                         if (delta < 0)
377                                 delta *= -1;
378                         if (delta < best_delta) {
379                                 best_delta = delta;
380                                 choice = i;
381                         }
382                 }
383                 i--;
384         }
385
386         *hstatus = choice;
387 }
388
389 /**
390  * check source/destination NID, portal, message type and drop rate,
391  * decide whether should drop this message or not
392  */
393 static bool
394 drop_rule_match(struct lnet_drop_rule *rule,
395                 struct lnet_nid *src,
396                 struct lnet_nid *local_nid,
397                 struct lnet_nid *dst,
398                 unsigned int type, unsigned int portal,
399                 enum lnet_msg_hstatus *hstatus)
400 {
401         struct lnet_fault_large_attr *attr = &rule->dr_attr;
402         bool drop;
403
404         if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
405                 return false;
406
407         if (attr->u.drop.da_drop_all) {
408                 CDEBUG(D_NET, "set to drop all messages\n");
409                 drop = true;
410                 goto drop_matched;
411         }
412
413         /*
414          * if we're trying to match a health status error but it hasn't
415          * been set in the rule, then don't match
416          */
417         if ((hstatus && !attr->u.drop.da_health_error_mask) ||
418             (!hstatus && attr->u.drop.da_health_error_mask))
419                 return false;
420
421         /* match this rule, check drop rate now */
422         spin_lock(&rule->dr_lock);
423         if (attr->u.drop.da_random) {
424                 int value = get_random_u32_below(attr->u.drop.da_interval);
425                 if (value >= (attr->u.drop.da_interval / 2))
426                         drop = true;
427                 else
428                         drop = false;
429         } else if (rule->dr_drop_time != 0) { /* time based drop */
430                 time64_t now = ktime_get_seconds();
431
432                 rule->dr_stat.fs_count++;
433                 drop = now >= rule->dr_drop_time;
434                 if (drop) {
435                         if (now > rule->dr_time_base)
436                                 rule->dr_time_base = now;
437
438                         rule->dr_drop_time = rule->dr_time_base +
439                                              get_random_u32_below(attr->u.drop.da_interval);
440                         rule->dr_time_base += attr->u.drop.da_interval;
441
442                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
443                                libcfs_nidstr(&attr->fa_src),
444                                libcfs_nidstr(&attr->fa_dst),
445                                rule->dr_drop_time);
446                 }
447
448         } else { /* rate based drop */
449                 __u64 count;
450
451                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
452                 count = rule->dr_stat.fs_count;
453                 if (do_div(count, attr->u.drop.da_rate) == 0) {
454                         rule->dr_drop_at = rule->dr_stat.fs_count +
455                                            get_random_u32_below(attr->u.drop.da_rate);
456                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
457                                libcfs_nidstr(&attr->fa_src),
458                                libcfs_nidstr(&attr->fa_dst), rule->dr_drop_at);
459                 }
460         }
461
462 drop_matched:
463
464         if (drop) { /* drop this message, update counters */
465                 if (hstatus)
466                         lnet_fault_match_health(hstatus,
467                                 attr->u.drop.da_health_error_mask);
468                 lnet_fault_stat_inc(&rule->dr_stat, type);
469                 rule->dr_stat.u.drop.ds_dropped++;
470         }
471
472         spin_unlock(&rule->dr_lock);
473         return drop;
474 }
475
476 /**
477  * Check if message from \a src to \a dst can match any existed drop rule
478  */
479 bool
480 lnet_drop_rule_match(struct lnet_hdr *hdr,
481                      struct lnet_nid *local_nid,
482                      enum lnet_msg_hstatus *hstatus)
483 {
484         unsigned int typ = hdr->type;
485         struct lnet_drop_rule *rule;
486         unsigned int ptl = -1;
487         bool drop = false;
488         int cpt;
489
490         /* NB: if Portal is specified, then only PUT and GET will be
491          * filtered by drop rule */
492         if (typ == LNET_MSG_PUT)
493                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
494         else if (typ == LNET_MSG_GET)
495                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
496
497         cpt = lnet_net_lock_current();
498         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
499                 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
500                                        &hdr->dest_nid, typ, ptl,
501                                        hstatus);
502                 if (drop)
503                         break;
504         }
505         lnet_net_unlock(cpt);
506
507         return drop;
508 }
509
510 /**
511  * LNet Delay Simulation
512  */
513 /** timestamp (second) to send delayed message */
514 #define msg_delay_send           msg_ev.hdr_data
515
516 struct lnet_delay_rule {
517         /** link chain on the_lnet.ln_delay_rules */
518         struct list_head                dl_link;
519         /** link chain on delay_dd.dd_sched_rules */
520         struct list_head                dl_sched_link;
521         /** attributes of this rule */
522         struct lnet_fault_large_attr    dl_attr;
523         /** lock to protect \a below members */
524         spinlock_t                      dl_lock;
525         /** refcount of delay rule */
526         atomic_t                        dl_refcount;
527         /**
528          * the message sequence to delay, which means message is delayed when
529          * dl_stat.fs_count == dl_delay_at
530          */
531         unsigned long                   dl_delay_at;
532         /**
533          * seconds to delay the next message, it's exclusive with dl_delay_at
534          */
535         time64_t                        dl_delay_time;
536         /** baseline to caculate dl_delay_time */
537         time64_t                        dl_time_base;
538         /** seconds until we send the next delayed message */
539         time64_t                        dl_msg_send;
540         /** delayed message list */
541         struct list_head                dl_msg_list;
542         /** statistic of delayed messages */
543         struct lnet_fault_stat          dl_stat;
544         /** timer to wakeup delay_daemon */
545         struct timer_list               dl_timer;
546 };
547
548 struct delay_daemon_data {
549         /** serialise rule add/remove */
550         struct mutex            dd_mutex;
551         /** protect rules on \a dd_sched_rules */
552         spinlock_t              dd_lock;
553         /** scheduled delay rules (by timer) */
554         struct list_head        dd_sched_rules;
555         /** deamon thread sleeps at here */
556         wait_queue_head_t       dd_waitq;
557         /** controler (lctl command) wait at here */
558         wait_queue_head_t       dd_ctl_waitq;
559         /** deamon is running */
560         unsigned int            dd_running;
561         /** deamon stopped */
562         unsigned int            dd_stopped;
563 };
564
565 static struct delay_daemon_data delay_dd;
566
567 static void
568 delay_rule_decref(struct lnet_delay_rule *rule)
569 {
570         if (atomic_dec_and_test(&rule->dl_refcount)) {
571                 LASSERT(list_empty(&rule->dl_sched_link));
572                 LASSERT(list_empty(&rule->dl_msg_list));
573                 LASSERT(list_empty(&rule->dl_link));
574
575                 CFS_FREE_PTR(rule);
576         }
577 }
578
579 /**
580  * check source/destination NID, portal, message type and delay rate,
581  * decide whether should delay this message or not
582  */
583 static bool
584 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
585                  struct lnet_nid *dst, unsigned int type, unsigned int portal,
586                  struct lnet_msg *msg)
587 {
588         struct lnet_fault_large_attr *attr = &rule->dl_attr;
589         bool delay;
590         time64_t now = ktime_get_seconds();
591
592         if (!lnet_fault_attr_match(attr, src, NULL,
593                                    dst, type, portal))
594                 return false;
595
596         /* match this rule, check delay rate now */
597         spin_lock(&rule->dl_lock);
598         if (rule->dl_delay_time != 0) { /* time based delay */
599                 rule->dl_stat.fs_count++;
600                 delay = now >= rule->dl_delay_time;
601                 if (delay) {
602                         if (now > rule->dl_time_base)
603                                 rule->dl_time_base = now;
604
605                         rule->dl_delay_time = rule->dl_time_base +
606                                               get_random_u32_below(attr->u.delay.la_interval);
607                         rule->dl_time_base += attr->u.delay.la_interval;
608
609                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
610                                libcfs_nidstr(&attr->fa_src),
611                                libcfs_nidstr(&attr->fa_dst),
612                                rule->dl_delay_time);
613                 }
614
615         } else { /* rate based delay */
616                 __u64 count;
617
618                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
619                 /* generate the next random rate sequence */
620                 count = rule->dl_stat.fs_count;
621                 if (do_div(count, attr->u.delay.la_rate) == 0) {
622                         rule->dl_delay_at = rule->dl_stat.fs_count +
623                                             get_random_u32_below(attr->u.delay.la_rate);
624                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
625                                libcfs_nidstr(&attr->fa_src),
626                                libcfs_nidstr(&attr->fa_dst), rule->dl_delay_at);
627                 }
628         }
629
630         if (!delay) {
631                 spin_unlock(&rule->dl_lock);
632                 return false;
633         }
634
635         /* delay this message, update counters */
636         lnet_fault_stat_inc(&rule->dl_stat, type);
637         rule->dl_stat.u.delay.ls_delayed++;
638
639         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
640         msg->msg_delay_send = now + attr->u.delay.la_latency;
641         if (rule->dl_msg_send == -1) {
642                 rule->dl_msg_send = msg->msg_delay_send;
643                 mod_timer(&rule->dl_timer,
644                           jiffies + cfs_time_seconds(attr->u.delay.la_latency));
645         }
646
647         spin_unlock(&rule->dl_lock);
648         return true;
649 }
650
651 /**
652  * check if \a msg can match any Delay Rule, receiving of this message
653  * will be delayed if there is a match.
654  */
655 bool
656 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
657 {
658         struct lnet_delay_rule  *rule;
659         unsigned int             typ = hdr->type;
660         unsigned int             ptl = -1;
661
662         /* NB: called with hold of lnet_net_lock */
663
664         /* NB: if Portal is specified, then only PUT and GET will be
665          * filtered by delay rule */
666         if (typ == LNET_MSG_PUT)
667                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
668         else if (typ == LNET_MSG_GET)
669                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
670
671         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
672                 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
673                                      typ, ptl, msg))
674                         return true;
675         }
676
677         return false;
678 }
679
680 /** check out delayed messages for send */
681 static void
682 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
683                   struct list_head *msg_list)
684 {
685         struct lnet_msg *msg;
686         struct lnet_msg *tmp;
687         time64_t now = ktime_get_seconds();
688
689         if (!all && rule->dl_msg_send > now)
690                 return;
691
692         spin_lock(&rule->dl_lock);
693         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
694                 if (!all && msg->msg_delay_send > now)
695                         break;
696
697                 msg->msg_delay_send = 0;
698                 list_move_tail(&msg->msg_list, msg_list);
699         }
700
701         if (list_empty(&rule->dl_msg_list)) {
702                 timer_delete(&rule->dl_timer);
703                 rule->dl_msg_send = -1;
704
705         } else if (!list_empty(msg_list)) {
706                 /* dequeued some timedout messages, update timer for the
707                  * next delayed message on rule */
708                 msg = list_first_entry(&rule->dl_msg_list,
709                                        struct lnet_msg, msg_list);
710                 rule->dl_msg_send = msg->msg_delay_send;
711                 mod_timer(&rule->dl_timer,
712                           jiffies +
713                           cfs_time_seconds(msg->msg_delay_send - now));
714         }
715         spin_unlock(&rule->dl_lock);
716 }
717
718 static void
719 delayed_msg_process(struct list_head *msg_list, bool drop)
720 {
721         struct lnet_msg *msg;
722
723         while ((msg = list_first_entry_or_null(msg_list, struct lnet_msg,
724                                                msg_list)) != NULL) {
725                 struct lnet_ni *ni;
726                 int             cpt;
727                 int             rc;
728
729                 if (msg->msg_sending) {
730                         /* Delayed send */
731                         list_del_init(&msg->msg_list);
732                         ni = msg->msg_txni;
733                         CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
734                                libcfs_nidstr(&ni->ni_nid),
735                                libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
736                                lnet_msgtyp2str(msg->msg_type));
737                         lnet_ni_send(ni, msg);
738                         continue;
739                 }
740
741                 /* Delayed receive */
742                 LASSERT(msg->msg_rxpeer != NULL);
743                 LASSERT(msg->msg_rxni != NULL);
744
745                 ni = msg->msg_rxni;
746                 cpt = msg->msg_rx_cpt;
747
748                 list_del_init(&msg->msg_list);
749                 if (drop) {
750                         rc = -ECANCELED;
751
752                 } else if (!msg->msg_routing) {
753                         rc = lnet_parse_local(ni, msg);
754                         if (rc == 0)
755                                 continue;
756
757                 } else {
758                         lnet_net_lock(cpt);
759                         rc = lnet_parse_forward_locked(ni, msg);
760                         lnet_net_unlock(cpt);
761
762                         switch (rc) {
763                         case LNET_CREDIT_OK:
764                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
765                                              0, msg->msg_len, msg->msg_len);
766                                 fallthrough;
767                         case LNET_CREDIT_WAIT:
768                                 continue;
769                         default: /* failures */
770                                 break;
771                         }
772                 }
773
774                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
775                                   msg->msg_type);
776                 lnet_finalize(msg, rc);
777         }
778 }
779
780 /**
781  * Process delayed messages for scheduled rules
782  * This function can either be called by delay_rule_daemon, or by lnet_finalise
783  */
784 void
785 lnet_delay_rule_check(void)
786 {
787         struct lnet_delay_rule *rule;
788         LIST_HEAD(msgs);
789
790         while (1) {
791                 if (list_empty(&delay_dd.dd_sched_rules))
792                         break;
793
794                 spin_lock_bh(&delay_dd.dd_lock);
795                 if (list_empty(&delay_dd.dd_sched_rules)) {
796                         spin_unlock_bh(&delay_dd.dd_lock);
797                         break;
798                 }
799
800                 rule = list_first_entry(&delay_dd.dd_sched_rules,
801                                         struct lnet_delay_rule, dl_sched_link);
802                 list_del_init(&rule->dl_sched_link);
803                 spin_unlock_bh(&delay_dd.dd_lock);
804
805                 delayed_msg_check(rule, false, &msgs);
806                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
807         }
808
809         if (!list_empty(&msgs))
810                 delayed_msg_process(&msgs, false);
811 }
812
813 /** deamon thread to handle delayed messages */
814 static int
815 lnet_delay_rule_daemon(void *arg)
816 {
817         delay_dd.dd_running = 1;
818         wake_up(&delay_dd.dd_ctl_waitq);
819
820         while (delay_dd.dd_running) {
821                 wait_event_interruptible(delay_dd.dd_waitq,
822                                          !delay_dd.dd_running ||
823                                          !list_empty(&delay_dd.dd_sched_rules));
824                 lnet_delay_rule_check();
825         }
826
827         /* in case more rules have been enqueued after my last check */
828         lnet_delay_rule_check();
829         delay_dd.dd_stopped = 1;
830         wake_up(&delay_dd.dd_ctl_waitq);
831
832         return 0;
833 }
834
835 static void
836 delay_timer_cb(cfs_timer_cb_arg_t data)
837 {
838         struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
839
840         spin_lock_bh(&delay_dd.dd_lock);
841         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
842                 atomic_inc(&rule->dl_refcount);
843                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
844                 wake_up(&delay_dd.dd_waitq);
845         }
846         spin_unlock_bh(&delay_dd.dd_lock);
847 }
848
849 /**
850  * Add a new delay rule to LNet
851  * There is no check for duplicated delay rule, all rules will be checked for
852  * incoming message.
853  */
854 int
855 lnet_delay_rule_add(struct lnet_fault_large_attr *attr)
856 {
857         struct lnet_delay_rule *rule;
858         int rc = 0;
859         ENTRY;
860
861         if (!((attr->u.delay.la_rate == 0) ^
862               (attr->u.delay.la_interval == 0))) {
863                 CDEBUG(D_NET,
864                        "please provide either delay rate or delay interval, "
865                        "but not both at the same time %d/%d\n",
866                        attr->u.delay.la_rate, attr->u.delay.la_interval);
867                 RETURN(-EINVAL);
868         }
869
870         if (attr->u.delay.la_latency == 0) {
871                 CDEBUG(D_NET, "delay latency cannot be zero\n");
872                 RETURN(-EINVAL);
873         }
874
875         if (lnet_fault_attr_validate(attr) != 0)
876                 RETURN(-EINVAL);
877
878         CFS_ALLOC_PTR(rule);
879         if (rule == NULL)
880                 RETURN(-ENOMEM);
881
882         mutex_lock(&delay_dd.dd_mutex);
883         if (!delay_dd.dd_running) {
884                 struct task_struct *task;
885
886                 /* NB: although LND threads will process delayed message
887                  * in lnet_finalize, but there is no guarantee that LND
888                  * threads will be waken up if no other message needs to
889                  * be handled.
890                  * Only one daemon thread, performance is not the concern
891                  * of this simualation module.
892                  */
893                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
894                 if (IS_ERR(task)) {
895                         rc = PTR_ERR(task);
896                         GOTO(failed, rc);
897                 }
898                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
899         }
900
901         cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
902                         (unsigned long)rule, 0);
903
904         spin_lock_init(&rule->dl_lock);
905         INIT_LIST_HEAD(&rule->dl_msg_list);
906         INIT_LIST_HEAD(&rule->dl_sched_link);
907
908         rule->dl_attr = *attr;
909         if (attr->u.delay.la_interval != 0) {
910                 rule->dl_time_base = ktime_get_seconds() +
911                                      attr->u.delay.la_interval;
912                 rule->dl_delay_time = ktime_get_seconds() +
913                                       get_random_u32_below(attr->u.delay.la_interval);
914         } else {
915                 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
916         }
917
918         rule->dl_msg_send = -1;
919
920         lnet_net_lock(LNET_LOCK_EX);
921         atomic_set(&rule->dl_refcount, 1);
922         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
923         lnet_net_unlock(LNET_LOCK_EX);
924
925         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
926                libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
927                attr->u.delay.la_rate);
928
929         mutex_unlock(&delay_dd.dd_mutex);
930         RETURN(0);
931  failed:
932         mutex_unlock(&delay_dd.dd_mutex);
933         CFS_FREE_PTR(rule);
934         return rc;
935 }
936
937 /**
938  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
939  * and \a dst are zero, all rules will be removed, otherwise only matched rules
940  * will be removed.
941  * If \a src is zero, then all rules have \a dst as destination will be remove
942  * If \a dst is zero, then all rules have \a src as source will be removed
943  *
944  * When a delay rule is removed, all delayed messages of this rule will be
945  * processed immediately.
946  */
947 int
948 lnet_delay_rule_del(struct lnet_nid *src, struct lnet_nid *dst, bool shutdown)
949 {
950         struct lnet_delay_rule *rule;
951         struct lnet_delay_rule *tmp;
952         LIST_HEAD(rule_list);
953         LIST_HEAD(msg_list);
954         int n = 0;
955         bool cleanup;
956         ENTRY;
957
958         if (shutdown)
959                 src = dst = 0;
960
961         mutex_lock(&delay_dd.dd_mutex);
962         lnet_net_lock(LNET_LOCK_EX);
963
964         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
965                 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dl_attr.fa_src, src)))
966                         continue;
967
968                 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dl_attr.fa_dst, dst)))
969                         continue;
970
971                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
972                        libcfs_nidstr(&rule->dl_attr.fa_src),
973                        libcfs_nidstr(&rule->dl_attr.fa_dst),
974                        rule->dl_attr.u.delay.la_rate,
975                        rule->dl_attr.u.delay.la_interval);
976                 /* refcount is taken over by rule_list */
977                 list_move(&rule->dl_link, &rule_list);
978         }
979
980         /* check if we need to shutdown delay_daemon */
981         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
982                   !list_empty(&rule_list);
983         lnet_net_unlock(LNET_LOCK_EX);
984
985         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
986                 list_del_init(&rule->dl_link);
987
988                 timer_delete_sync(&rule->dl_timer);
989                 delayed_msg_check(rule, true, &msg_list);
990                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
991                 n++;
992         }
993
994         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
995                 LASSERT(delay_dd.dd_running);
996                 delay_dd.dd_running = 0;
997                 wake_up(&delay_dd.dd_waitq);
998
999                 while (!delay_dd.dd_stopped)
1000                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
1001         }
1002         mutex_unlock(&delay_dd.dd_mutex);
1003
1004         if (!list_empty(&msg_list))
1005                 delayed_msg_process(&msg_list, shutdown);
1006
1007         RETURN(n);
1008 }
1009
1010 /**
1011  * List Delay Rule at position of \a pos
1012  */
1013 int
1014 lnet_delay_rule_list(int pos, struct lnet_fault_large_attr *attr,
1015                     struct lnet_fault_stat *stat)
1016 {
1017         struct lnet_delay_rule *rule;
1018         int                     cpt;
1019         int                     i = 0;
1020         int                     rc = -ENOENT;
1021         ENTRY;
1022
1023         cpt = lnet_net_lock_current();
1024         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1025                 if (i++ < pos)
1026                         continue;
1027
1028                 spin_lock(&rule->dl_lock);
1029                 *attr = rule->dl_attr;
1030                 *stat = rule->dl_stat;
1031                 spin_unlock(&rule->dl_lock);
1032                 rc = 0;
1033                 break;
1034         }
1035
1036         lnet_net_unlock(cpt);
1037         RETURN(rc);
1038 }
1039
1040 /**
1041  * reset counters for all Delay Rules
1042  */
1043 void
1044 lnet_delay_rule_reset(void)
1045 {
1046         struct lnet_delay_rule *rule;
1047         int                     cpt;
1048         ENTRY;
1049
1050         cpt = lnet_net_lock_current();
1051
1052         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1053                 struct lnet_fault_large_attr *attr = &rule->dl_attr;
1054
1055                 spin_lock(&rule->dl_lock);
1056
1057                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1058                 if (attr->u.delay.la_rate != 0) {
1059                         rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
1060                 } else {
1061                         rule->dl_delay_time = ktime_get_seconds() +
1062                                               get_random_u32_below(attr->u.delay.la_interval);
1063                         rule->dl_time_base = ktime_get_seconds() +
1064                                              attr->u.delay.la_interval;
1065                 }
1066                 spin_unlock(&rule->dl_lock);
1067         }
1068
1069         lnet_net_unlock(cpt);
1070         EXIT;
1071 }
1072
1073 int
1074 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1075 {
1076         struct lnet_fault_attr *attr4;
1077         struct lnet_fault_stat *stat;
1078         struct lnet_fault_large_attr attr = { { 0 } };
1079         int rc;
1080
1081         attr4 = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1082
1083         lnet_fault_attr4_to_attr(attr4, &attr);
1084
1085         switch (opc) {
1086         default:
1087                 return -EINVAL;
1088
1089         case LNET_CTL_DROP_ADD:
1090                 if (!attr4)
1091                         return -EINVAL;
1092
1093                 return lnet_drop_rule_add(&attr);
1094
1095         case LNET_CTL_DROP_DEL:
1096                 if (!attr4)
1097                         return -EINVAL;
1098
1099                 data->ioc_count = lnet_drop_rule_del(&attr.fa_src,
1100                                                      &attr.fa_dst);
1101                 return 0;
1102
1103         case LNET_CTL_DROP_RESET:
1104                 lnet_drop_rule_reset();
1105                 return 0;
1106
1107         case LNET_CTL_DROP_LIST:
1108                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1109                 if (!attr4 || !stat)
1110                         return -EINVAL;
1111
1112                 rc = lnet_drop_rule_list(data->ioc_count, &attr, stat);
1113                 lnet_fault_attr_to_attr4(&attr, attr4);
1114                 return rc;
1115
1116         case LNET_CTL_DELAY_ADD:
1117                 if (!attr4)
1118                         return -EINVAL;
1119
1120                 return lnet_delay_rule_add(&attr);
1121
1122         case LNET_CTL_DELAY_DEL:
1123                 if (!attr4)
1124                         return -EINVAL;
1125
1126                 data->ioc_count = lnet_delay_rule_del(&attr.fa_src,
1127                                                       &attr.fa_dst, false);
1128                 return 0;
1129
1130         case LNET_CTL_DELAY_RESET:
1131                 lnet_delay_rule_reset();
1132                 return 0;
1133
1134         case LNET_CTL_DELAY_LIST:
1135                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1136                 if (!attr4 || !stat)
1137                         return -EINVAL;
1138
1139                 rc = lnet_delay_rule_list(data->ioc_count, &attr, stat);
1140                 lnet_fault_attr_to_attr4(&attr, attr4);
1141                 return rc;
1142         }
1143 }
1144
1145 int
1146 lnet_fault_init(void)
1147 {
1148         BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1149         BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1150         BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1151         BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1152
1153         mutex_init(&delay_dd.dd_mutex);
1154         spin_lock_init(&delay_dd.dd_lock);
1155         init_waitqueue_head(&delay_dd.dd_waitq);
1156         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1157         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1158
1159         return 0;
1160 }
1161
1162 void
1163 lnet_fault_fini(void)
1164 {
1165         lnet_drop_rule_del(NULL, NULL);
1166         lnet_delay_rule_del(NULL, NULL, true);
1167
1168         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1169         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1170         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1171 }