Whamcloud - gitweb
LU-9019 lnet: move ping and delay injection to time64_t
[fs/lustre-release.git] / lnet / lnet / net_fault.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2014, 2017, Intel Corporation.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  * Lustre is a trademark of Sun Microsystems, Inc.
29  *
30  * lnet/lnet/net_fault.c
31  *
32  * Lustre network fault simulation
33  *
34  * Author: liang.zhen@intel.com
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
41
42 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
43                                  LNET_GET_BIT | LNET_REPLY_BIT)
44
45 struct lnet_drop_rule {
46         /** link chain on the_lnet.ln_drop_rules */
47         struct list_head        dr_link;
48         /** attributes of this rule */
49         struct lnet_fault_attr  dr_attr;
50         /** lock to protect \a dr_drop_at and \a dr_stat */
51         spinlock_t              dr_lock;
52         /**
53          * the message sequence to drop, which means message is dropped when
54          * dr_stat.drs_count == dr_drop_at
55          */
56         unsigned long           dr_drop_at;
57         /**
58          * seconds to drop the next message, it's exclusive with dr_drop_at
59          */
60         time64_t                dr_drop_time;
61         /** baseline to caculate dr_drop_time */
62         time64_t                dr_time_base;
63         /** statistic of dropped messages */
64         struct lnet_fault_stat  dr_stat;
65 };
66
67 static bool
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
69 {
70         if (nid == msg_nid || nid == LNET_NID_ANY)
71                 return true;
72
73         if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
74                 return false;
75
76         /* 255.255.255.255@net is wildcard for all addresses in a network */
77         return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
78 }
79
80 static bool
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82                       lnet_nid_t dst, unsigned int type, unsigned int portal)
83 {
84         if (!lnet_fault_nid_match(attr->fa_src, src) ||
85             !lnet_fault_nid_match(attr->fa_dst, dst))
86                 return false;
87
88         if (!(attr->fa_msg_mask & (1 << type)))
89                 return false;
90
91         /* NB: ACK and REPLY have no portal, but they should have been
92          * rejected by message mask */
93         if (attr->fa_ptl_mask != 0 && /* has portal filter */
94             !(attr->fa_ptl_mask & (1ULL << portal)))
95                 return false;
96
97         return true;
98 }
99
100 static int
101 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
102 {
103         if (attr->fa_msg_mask == 0)
104                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
105
106         if (attr->fa_ptl_mask == 0) /* no portal filter */
107                 return 0;
108
109         /* NB: only PUT and GET can be filtered if portal filter has been set */
110         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
111         if (attr->fa_msg_mask == 0) {
112                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
113                        attr->fa_msg_mask);
114                 return -EINVAL;
115         }
116         return 0;
117 }
118
119 static void
120 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
121 {
122         /* NB: fs_counter is NOT updated by this function */
123         switch (type) {
124         case LNET_MSG_PUT:
125                 stat->fs_put++;
126                 return;
127         case LNET_MSG_ACK:
128                 stat->fs_ack++;
129                 return;
130         case LNET_MSG_GET:
131                 stat->fs_get++;
132                 return;
133         case LNET_MSG_REPLY:
134                 stat->fs_reply++;
135                 return;
136         }
137 }
138
139 /**
140  * LNet message drop simulation
141  */
142
143 /**
144  * Add a new drop rule to LNet
145  * There is no check for duplicated drop rule, all rules will be checked for
146  * incoming message.
147  */
148 static int
149 lnet_drop_rule_add(struct lnet_fault_attr *attr)
150 {
151         struct lnet_drop_rule *rule;
152         ENTRY;
153
154         if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
155                 CDEBUG(D_NET,
156                        "please provide either drop rate or drop interval, "
157                        "but not both at the same time %d/%d\n",
158                        attr->u.drop.da_rate, attr->u.drop.da_interval);
159                 RETURN(-EINVAL);
160         }
161
162         if (lnet_fault_attr_validate(attr) != 0)
163                 RETURN(-EINVAL);
164
165         CFS_ALLOC_PTR(rule);
166         if (rule == NULL)
167                 RETURN(-ENOMEM);
168
169         spin_lock_init(&rule->dr_lock);
170
171         rule->dr_attr = *attr;
172         if (attr->u.drop.da_interval != 0) {
173                 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
174                 rule->dr_drop_time = ktime_get_seconds() +
175                                      cfs_rand() % attr->u.drop.da_interval;
176         } else {
177                 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
178         }
179
180         lnet_net_lock(LNET_LOCK_EX);
181         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
182         lnet_net_unlock(LNET_LOCK_EX);
183
184         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
185                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
186                attr->u.drop.da_rate, attr->u.drop.da_interval);
187         RETURN(0);
188 }
189
190 /**
191  * Remove matched drop rules from lnet, all rules that can match \a src and
192  * \a dst will be removed.
193  * If \a src is zero, then all rules have \a dst as destination will be remove
194  * If \a dst is zero, then all rules have \a src as source will be removed
195  * If both of them are zero, all rules will be removed
196  */
197 static int
198 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
199 {
200         struct lnet_drop_rule *rule;
201         struct lnet_drop_rule *tmp;
202         struct list_head       zombies;
203         int                    n = 0;
204         ENTRY;
205
206         INIT_LIST_HEAD(&zombies);
207
208         lnet_net_lock(LNET_LOCK_EX);
209         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
210                 if (rule->dr_attr.fa_src != src && src != 0)
211                         continue;
212
213                 if (rule->dr_attr.fa_dst != dst && dst != 0)
214                         continue;
215
216                 list_move(&rule->dr_link, &zombies);
217         }
218         lnet_net_unlock(LNET_LOCK_EX);
219
220         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
221                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
222                        libcfs_nid2str(rule->dr_attr.fa_src),
223                        libcfs_nid2str(rule->dr_attr.fa_dst),
224                        rule->dr_attr.u.drop.da_rate,
225                        rule->dr_attr.u.drop.da_interval);
226
227                 list_del(&rule->dr_link);
228                 CFS_FREE_PTR(rule);
229                 n++;
230         }
231
232         RETURN(n);
233 }
234
235 /**
236  * List drop rule at position of \a pos
237  */
238 static int
239 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
240                     struct lnet_fault_stat *stat)
241 {
242         struct lnet_drop_rule *rule;
243         int                    cpt;
244         int                    i = 0;
245         int                    rc = -ENOENT;
246         ENTRY;
247
248         cpt = lnet_net_lock_current();
249         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
250                 if (i++ < pos)
251                         continue;
252
253                 spin_lock(&rule->dr_lock);
254                 *attr = rule->dr_attr;
255                 *stat = rule->dr_stat;
256                 spin_unlock(&rule->dr_lock);
257                 rc = 0;
258                 break;
259         }
260
261         lnet_net_unlock(cpt);
262         RETURN(rc);
263 }
264
265 /**
266  * reset counters for all drop rules
267  */
268 static void
269 lnet_drop_rule_reset(void)
270 {
271         struct lnet_drop_rule *rule;
272         int                    cpt;
273         ENTRY;
274
275         cpt = lnet_net_lock_current();
276
277         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
278                 struct lnet_fault_attr *attr = &rule->dr_attr;
279
280                 spin_lock(&rule->dr_lock);
281
282                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
283                 if (attr->u.drop.da_rate != 0) {
284                         rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
285                 } else {
286                         rule->dr_drop_time = ktime_get_seconds() +
287                                              cfs_rand() % attr->u.drop.da_interval;
288                         rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
289                 }
290                 spin_unlock(&rule->dr_lock);
291         }
292
293         lnet_net_unlock(cpt);
294         EXIT;
295 }
296
297 /**
298  * check source/destination NID, portal, message type and drop rate,
299  * decide whether should drop this message or not
300  */
301 static bool
302 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
303                 lnet_nid_t dst, unsigned int type, unsigned int portal)
304 {
305         struct lnet_fault_attr  *attr = &rule->dr_attr;
306         bool                     drop;
307
308         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
309                 return false;
310
311         /* match this rule, check drop rate now */
312         spin_lock(&rule->dr_lock);
313         if (rule->dr_drop_time != 0) { /* time based drop */
314                 time64_t now = ktime_get_seconds();
315
316                 rule->dr_stat.fs_count++;
317                 drop = now >= rule->dr_drop_time;
318                 if (drop) {
319                         if (now > rule->dr_time_base)
320                                 rule->dr_time_base = now;
321
322                         rule->dr_drop_time = rule->dr_time_base +
323                                              cfs_rand() % attr->u.drop.da_interval;
324                         rule->dr_time_base += attr->u.drop.da_interval;
325
326                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
327                                libcfs_nid2str(attr->fa_src),
328                                libcfs_nid2str(attr->fa_dst),
329                                rule->dr_drop_time);
330                 }
331
332         } else { /* rate based drop */
333                 __u64 count;
334
335                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
336                 count = rule->dr_stat.fs_count;
337                 if (do_div(count, attr->u.drop.da_rate) == 0) {
338                         rule->dr_drop_at = rule->dr_stat.fs_count +
339                                            cfs_rand() % attr->u.drop.da_rate;
340                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
341                                libcfs_nid2str(attr->fa_src),
342                                libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
343                 }
344         }
345
346         if (drop) { /* drop this message, update counters */
347                 lnet_fault_stat_inc(&rule->dr_stat, type);
348                 rule->dr_stat.u.drop.ds_dropped++;
349         }
350
351         spin_unlock(&rule->dr_lock);
352         return drop;
353 }
354
355 /**
356  * Check if message from \a src to \a dst can match any existed drop rule
357  */
358 bool
359 lnet_drop_rule_match(struct lnet_hdr *hdr)
360 {
361         struct lnet_drop_rule   *rule;
362         lnet_nid_t               src = le64_to_cpu(hdr->src_nid);
363         lnet_nid_t               dst = le64_to_cpu(hdr->dest_nid);
364         unsigned int             typ = le32_to_cpu(hdr->type);
365         unsigned int             ptl = -1;
366         bool                     drop = false;
367         int                      cpt;
368
369         /* NB: if Portal is specified, then only PUT and GET will be
370          * filtered by drop rule */
371         if (typ == LNET_MSG_PUT)
372                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
373         else if (typ == LNET_MSG_GET)
374                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
375
376         cpt = lnet_net_lock_current();
377         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
378                 drop = drop_rule_match(rule, src, dst, typ, ptl);
379                 if (drop)
380                         break;
381         }
382
383         lnet_net_unlock(cpt);
384         return drop;
385 }
386
387 /**
388  * LNet Delay Simulation
389  */
390 /** timestamp (second) to send delayed message */
391 #define msg_delay_send           msg_ev.hdr_data
392
393 struct lnet_delay_rule {
394         /** link chain on the_lnet.ln_delay_rules */
395         struct list_head        dl_link;
396         /** link chain on delay_dd.dd_sched_rules */
397         struct list_head        dl_sched_link;
398         /** attributes of this rule */
399         struct lnet_fault_attr  dl_attr;
400         /** lock to protect \a below members */
401         spinlock_t              dl_lock;
402         /** refcount of delay rule */
403         atomic_t                dl_refcount;
404         /**
405          * the message sequence to delay, which means message is delayed when
406          * dl_stat.fs_count == dl_delay_at
407          */
408         unsigned long           dl_delay_at;
409         /**
410          * seconds to delay the next message, it's exclusive with dl_delay_at
411          */
412         time64_t                dl_delay_time;
413         /** baseline to caculate dl_delay_time */
414         time64_t                dl_time_base;
415         /** jiffies to send the next delayed message */
416         unsigned long           dl_msg_send;
417         /** delayed message list */
418         struct list_head        dl_msg_list;
419         /** statistic of delayed messages */
420         struct lnet_fault_stat  dl_stat;
421         /** timer to wakeup delay_daemon */
422         struct timer_list       dl_timer;
423 };
424
425 struct delay_daemon_data {
426         /** serialise rule add/remove */
427         struct mutex            dd_mutex;
428         /** protect rules on \a dd_sched_rules */
429         spinlock_t              dd_lock;
430         /** scheduled delay rules (by timer) */
431         struct list_head        dd_sched_rules;
432         /** deamon thread sleeps at here */
433         wait_queue_head_t       dd_waitq;
434         /** controler (lctl command) wait at here */
435         wait_queue_head_t       dd_ctl_waitq;
436         /** deamon is running */
437         unsigned int            dd_running;
438         /** deamon stopped */
439         unsigned int            dd_stopped;
440 };
441
442 static struct delay_daemon_data delay_dd;
443
444 static void
445 delay_rule_decref(struct lnet_delay_rule *rule)
446 {
447         if (atomic_dec_and_test(&rule->dl_refcount)) {
448                 LASSERT(list_empty(&rule->dl_sched_link));
449                 LASSERT(list_empty(&rule->dl_msg_list));
450                 LASSERT(list_empty(&rule->dl_link));
451
452                 CFS_FREE_PTR(rule);
453         }
454 }
455
456 /**
457  * check source/destination NID, portal, message type and delay rate,
458  * decide whether should delay this message or not
459  */
460 static bool
461 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
462                 lnet_nid_t dst, unsigned int type, unsigned int portal,
463                 struct lnet_msg *msg)
464 {
465         struct lnet_fault_attr  *attr = &rule->dl_attr;
466         bool                     delay;
467
468         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
469                 return false;
470
471         /* match this rule, check delay rate now */
472         spin_lock(&rule->dl_lock);
473         if (rule->dl_delay_time != 0) { /* time based delay */
474                 time64_t now = ktime_get_seconds();
475
476                 rule->dl_stat.fs_count++;
477                 delay = now >= rule->dl_delay_time;
478                 if (delay) {
479                         if (now > rule->dl_time_base)
480                                 rule->dl_time_base = now;
481
482                         rule->dl_delay_time = rule->dl_time_base +
483                                               cfs_rand() % attr->u.delay.la_interval;
484                         rule->dl_time_base += attr->u.delay.la_interval;
485
486                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
487                                libcfs_nid2str(attr->fa_src),
488                                libcfs_nid2str(attr->fa_dst),
489                                rule->dl_delay_time);
490                 }
491
492         } else { /* rate based delay */
493                 __u64 count;
494
495                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
496                 /* generate the next random rate sequence */
497                 count = rule->dl_stat.fs_count;
498                 if (do_div(count, attr->u.delay.la_rate) == 0) {
499                         rule->dl_delay_at = rule->dl_stat.fs_count +
500                                             cfs_rand() % attr->u.delay.la_rate;
501                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
502                                libcfs_nid2str(attr->fa_src),
503                                libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
504                 }
505         }
506
507         if (!delay) {
508                 spin_unlock(&rule->dl_lock);
509                 return false;
510         }
511
512         /* delay this message, update counters */
513         lnet_fault_stat_inc(&rule->dl_stat, type);
514         rule->dl_stat.u.delay.ls_delayed++;
515
516         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
517         msg->msg_delay_send = ktime_get_seconds() + attr->u.delay.la_latency;
518         if (rule->dl_msg_send == -1) {
519                 rule->dl_msg_send = msg->msg_delay_send;
520                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
521         }
522
523         spin_unlock(&rule->dl_lock);
524         return true;
525 }
526
527 /**
528  * check if \a msg can match any Delay Rule, receiving of this message
529  * will be delayed if there is a match.
530  */
531 bool
532 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
533 {
534         struct lnet_delay_rule  *rule;
535         lnet_nid_t               src = le64_to_cpu(hdr->src_nid);
536         lnet_nid_t               dst = le64_to_cpu(hdr->dest_nid);
537         unsigned int             typ = le32_to_cpu(hdr->type);
538         unsigned int             ptl = -1;
539
540         /* NB: called with hold of lnet_net_lock */
541
542         /* NB: if Portal is specified, then only PUT and GET will be
543          * filtered by delay rule */
544         if (typ == LNET_MSG_PUT)
545                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
546         else if (typ == LNET_MSG_GET)
547                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
548
549         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
550                 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
551                         return true;
552         }
553
554         return false;
555 }
556
557 /** check out delayed messages for send */
558 static void
559 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
560                   struct list_head *msg_list)
561 {
562         struct lnet_msg *msg;
563         struct lnet_msg *tmp;
564         time64_t now = ktime_get_seconds();
565
566         if (!all && cfs_time_seconds(rule->dl_msg_send) > now)
567                 return;
568
569         spin_lock(&rule->dl_lock);
570         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
571                 if (!all && msg->msg_delay_send > now)
572                         break;
573
574                 msg->msg_delay_send = 0;
575                 list_move_tail(&msg->msg_list, msg_list);
576         }
577
578         if (list_empty(&rule->dl_msg_list)) {
579                 del_timer(&rule->dl_timer);
580                 rule->dl_msg_send = -1;
581
582         } else if (!list_empty(msg_list)) {
583                 /* dequeued some timedout messages, update timer for the
584                  * next delayed message on rule */
585                 msg = list_entry(rule->dl_msg_list.next,
586                                  struct lnet_msg, msg_list);
587                 rule->dl_msg_send = msg->msg_delay_send;
588                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
589         }
590         spin_unlock(&rule->dl_lock);
591 }
592
593 static void
594 delayed_msg_process(struct list_head *msg_list, bool drop)
595 {
596         struct lnet_msg *msg;
597
598         while (!list_empty(msg_list)) {
599                 struct lnet_ni *ni;
600                 int             cpt;
601                 int             rc;
602
603                 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
604                 LASSERT(msg->msg_rxpeer != NULL);
605                 LASSERT(msg->msg_rxni != NULL);
606
607                 ni = msg->msg_rxni;
608                 cpt = msg->msg_rx_cpt;
609
610                 list_del_init(&msg->msg_list);
611                 if (drop) {
612                         rc = -ECANCELED;
613
614                 } else if (!msg->msg_routing) {
615                         rc = lnet_parse_local(ni, msg);
616                         if (rc == 0)
617                                 continue;
618
619                 } else {
620                         lnet_net_lock(cpt);
621                         rc = lnet_parse_forward_locked(ni, msg);
622                         lnet_net_unlock(cpt);
623
624                         switch (rc) {
625                         case LNET_CREDIT_OK:
626                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
627                                              0, msg->msg_len, msg->msg_len);
628                         case LNET_CREDIT_WAIT:
629                                 continue;
630                         default: /* failures */
631                                 break;
632                         }
633                 }
634
635                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
636                                   msg->msg_type);
637                 lnet_finalize(msg, rc);
638         }
639 }
640
641 /**
642  * Process delayed messages for scheduled rules
643  * This function can either be called by delay_rule_daemon, or by lnet_finalise
644  */
645 void
646 lnet_delay_rule_check(void)
647 {
648         struct lnet_delay_rule  *rule;
649         struct list_head         msgs;
650
651         INIT_LIST_HEAD(&msgs);
652         while (1) {
653                 if (list_empty(&delay_dd.dd_sched_rules))
654                         break;
655
656                 spin_lock_bh(&delay_dd.dd_lock);
657                 if (list_empty(&delay_dd.dd_sched_rules)) {
658                         spin_unlock_bh(&delay_dd.dd_lock);
659                         break;
660                 }
661
662                 rule = list_entry(delay_dd.dd_sched_rules.next,
663                                   struct lnet_delay_rule, dl_sched_link);
664                 list_del_init(&rule->dl_sched_link);
665                 spin_unlock_bh(&delay_dd.dd_lock);
666
667                 delayed_msg_check(rule, false, &msgs);
668                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
669         }
670
671         if (!list_empty(&msgs))
672                 delayed_msg_process(&msgs, false);
673 }
674
675 /** deamon thread to handle delayed messages */
676 static int
677 lnet_delay_rule_daemon(void *arg)
678 {
679         delay_dd.dd_running = 1;
680         wake_up(&delay_dd.dd_ctl_waitq);
681
682         while (delay_dd.dd_running) {
683                 wait_event_interruptible(delay_dd.dd_waitq,
684                                          !delay_dd.dd_running ||
685                                          !list_empty(&delay_dd.dd_sched_rules));
686                 lnet_delay_rule_check();
687         }
688
689         /* in case more rules have been enqueued after my last check */
690         lnet_delay_rule_check();
691         delay_dd.dd_stopped = 1;
692         wake_up(&delay_dd.dd_ctl_waitq);
693
694         return 0;
695 }
696
697 static void
698 delay_timer_cb(unsigned long arg)
699 {
700         struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
701
702         spin_lock_bh(&delay_dd.dd_lock);
703         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
704                 atomic_inc(&rule->dl_refcount);
705                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
706                 wake_up(&delay_dd.dd_waitq);
707         }
708         spin_unlock_bh(&delay_dd.dd_lock);
709 }
710
711 /**
712  * Add a new delay rule to LNet
713  * There is no check for duplicated delay rule, all rules will be checked for
714  * incoming message.
715  */
716 int
717 lnet_delay_rule_add(struct lnet_fault_attr *attr)
718 {
719         struct lnet_delay_rule *rule;
720         int                     rc = 0;
721         ENTRY;
722
723         if (!((attr->u.delay.la_rate == 0) ^
724               (attr->u.delay.la_interval == 0))) {
725                 CDEBUG(D_NET,
726                        "please provide either delay rate or delay interval, "
727                        "but not both at the same time %d/%d\n",
728                        attr->u.delay.la_rate, attr->u.delay.la_interval);
729                 RETURN(-EINVAL);
730         }
731
732         if (attr->u.delay.la_latency == 0) {
733                 CDEBUG(D_NET, "delay latency cannot be zero\n");
734                 RETURN(-EINVAL);
735         }
736
737         if (lnet_fault_attr_validate(attr) != 0)
738                 RETURN(-EINVAL);
739
740         CFS_ALLOC_PTR(rule);
741         if (rule == NULL)
742                 RETURN(-ENOMEM);
743
744         mutex_lock(&delay_dd.dd_mutex);
745         if (!delay_dd.dd_running) {
746                 struct task_struct *task;
747
748                 /* NB: although LND threads will process delayed message
749                  * in lnet_finalize, but there is no guarantee that LND
750                  * threads will be waken up if no other message needs to
751                  * be handled.
752                  * Only one daemon thread, performance is not the concern
753                  * of this simualation module.
754                  */
755                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
756                 if (IS_ERR(task)) {
757                         rc = PTR_ERR(task);
758                         GOTO(failed, rc);
759                 }
760                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
761         }
762
763         init_timer(&rule->dl_timer);
764         rule->dl_timer.function = delay_timer_cb;
765         rule->dl_timer.data = (unsigned long)rule;
766
767         spin_lock_init(&rule->dl_lock);
768         INIT_LIST_HEAD(&rule->dl_msg_list);
769         INIT_LIST_HEAD(&rule->dl_sched_link);
770
771         rule->dl_attr = *attr;
772         if (attr->u.delay.la_interval != 0) {
773                 rule->dl_time_base = ktime_get_seconds() +
774                                      attr->u.delay.la_interval;
775                 rule->dl_delay_time = ktime_get_seconds() +
776                                       cfs_rand() % attr->u.delay.la_interval;
777         } else {
778                 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
779         }
780
781         rule->dl_msg_send = -1;
782
783         lnet_net_lock(LNET_LOCK_EX);
784         atomic_set(&rule->dl_refcount, 1);
785         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
786         lnet_net_unlock(LNET_LOCK_EX);
787
788         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
789                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
790                attr->u.delay.la_rate);
791
792         mutex_unlock(&delay_dd.dd_mutex);
793         RETURN(0);
794  failed:
795         mutex_unlock(&delay_dd.dd_mutex);
796         CFS_FREE_PTR(rule);
797         return rc;
798 }
799
800 /**
801  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
802  * and \a dst are zero, all rules will be removed, otherwise only matched rules
803  * will be removed.
804  * If \a src is zero, then all rules have \a dst as destination will be remove
805  * If \a dst is zero, then all rules have \a src as source will be removed
806  *
807  * When a delay rule is removed, all delayed messages of this rule will be
808  * processed immediately.
809  */
810 int
811 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
812 {
813         struct lnet_delay_rule *rule;
814         struct lnet_delay_rule  *tmp;
815         struct list_head        rule_list;
816         struct list_head        msg_list;
817         int                     n = 0;
818         bool                    cleanup;
819         ENTRY;
820
821         INIT_LIST_HEAD(&rule_list);
822         INIT_LIST_HEAD(&msg_list);
823
824         if (shutdown)
825                 src = dst = 0;
826
827         mutex_lock(&delay_dd.dd_mutex);
828         lnet_net_lock(LNET_LOCK_EX);
829
830         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
831                 if (rule->dl_attr.fa_src != src && src != 0)
832                         continue;
833
834                 if (rule->dl_attr.fa_dst != dst && dst != 0)
835                         continue;
836
837                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
838                        libcfs_nid2str(rule->dl_attr.fa_src),
839                        libcfs_nid2str(rule->dl_attr.fa_dst),
840                        rule->dl_attr.u.delay.la_rate,
841                        rule->dl_attr.u.delay.la_interval);
842                 /* refcount is taken over by rule_list */
843                 list_move(&rule->dl_link, &rule_list);
844         }
845
846         /* check if we need to shutdown delay_daemon */
847         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
848                   !list_empty(&rule_list);
849         lnet_net_unlock(LNET_LOCK_EX);
850
851         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
852                 list_del_init(&rule->dl_link);
853
854                 del_timer_sync(&rule->dl_timer);
855                 delayed_msg_check(rule, true, &msg_list);
856                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
857                 n++;
858         }
859
860         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
861                 LASSERT(delay_dd.dd_running);
862                 delay_dd.dd_running = 0;
863                 wake_up(&delay_dd.dd_waitq);
864
865                 while (!delay_dd.dd_stopped)
866                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
867         }
868         mutex_unlock(&delay_dd.dd_mutex);
869
870         if (!list_empty(&msg_list))
871                 delayed_msg_process(&msg_list, shutdown);
872
873         RETURN(n);
874 }
875
876 /**
877  * List Delay Rule at position of \a pos
878  */
879 int
880 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
881                     struct lnet_fault_stat *stat)
882 {
883         struct lnet_delay_rule *rule;
884         int                     cpt;
885         int                     i = 0;
886         int                     rc = -ENOENT;
887         ENTRY;
888
889         cpt = lnet_net_lock_current();
890         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
891                 if (i++ < pos)
892                         continue;
893
894                 spin_lock(&rule->dl_lock);
895                 *attr = rule->dl_attr;
896                 *stat = rule->dl_stat;
897                 spin_unlock(&rule->dl_lock);
898                 rc = 0;
899                 break;
900         }
901
902         lnet_net_unlock(cpt);
903         RETURN(rc);
904 }
905
906 /**
907  * reset counters for all Delay Rules
908  */
909 void
910 lnet_delay_rule_reset(void)
911 {
912         struct lnet_delay_rule *rule;
913         int                     cpt;
914         ENTRY;
915
916         cpt = lnet_net_lock_current();
917
918         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
919                 struct lnet_fault_attr *attr = &rule->dl_attr;
920
921                 spin_lock(&rule->dl_lock);
922
923                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
924                 if (attr->u.delay.la_rate != 0) {
925                         rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
926                 } else {
927                         rule->dl_delay_time = ktime_get_seconds() +
928                                               cfs_rand() % attr->u.delay.la_interval;
929                         rule->dl_time_base = ktime_get_seconds() +
930                                              attr->u.delay.la_interval;
931                 }
932                 spin_unlock(&rule->dl_lock);
933         }
934
935         lnet_net_unlock(cpt);
936         EXIT;
937 }
938
939 int
940 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
941 {
942         struct lnet_fault_attr *attr;
943         struct lnet_fault_stat *stat;
944
945         attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
946
947         switch (opc) {
948         default:
949                 return -EINVAL;
950
951         case LNET_CTL_DROP_ADD:
952                 if (attr == NULL)
953                         return -EINVAL;
954
955                 return lnet_drop_rule_add(attr);
956
957         case LNET_CTL_DROP_DEL:
958                 if (attr == NULL)
959                         return -EINVAL;
960
961                 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
962                                                      attr->fa_dst);
963                 return 0;
964
965         case LNET_CTL_DROP_RESET:
966                 lnet_drop_rule_reset();
967                 return 0;
968
969         case LNET_CTL_DROP_LIST:
970                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
971                 if (attr == NULL || stat == NULL)
972                         return -EINVAL;
973
974                 return lnet_drop_rule_list(data->ioc_count, attr, stat);
975
976         case LNET_CTL_DELAY_ADD:
977                 if (attr == NULL)
978                         return -EINVAL;
979
980                 return lnet_delay_rule_add(attr);
981
982         case LNET_CTL_DELAY_DEL:
983                 if (attr == NULL)
984                         return -EINVAL;
985
986                 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
987                                                       attr->fa_dst, false);
988                 return 0;
989
990         case LNET_CTL_DELAY_RESET:
991                 lnet_delay_rule_reset();
992                 return 0;
993
994         case LNET_CTL_DELAY_LIST:
995                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
996                 if (attr == NULL || stat == NULL)
997                         return -EINVAL;
998
999                 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1000         }
1001 }
1002
1003 int
1004 lnet_fault_init(void)
1005 {
1006         CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1007         CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1008         CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1009         CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1010
1011         mutex_init(&delay_dd.dd_mutex);
1012         spin_lock_init(&delay_dd.dd_lock);
1013         init_waitqueue_head(&delay_dd.dd_waitq);
1014         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1015         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1016
1017         return 0;
1018 }
1019
1020 void
1021 lnet_fault_fini(void)
1022 {
1023         lnet_drop_rule_del(0, 0);
1024         lnet_delay_rule_del(0, 0, true);
1025
1026         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1027         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1028         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1029 }