Whamcloud - gitweb
fea2f9292d0001b16ce3a70734d1ff973edcfb10
[fs/lustre-release.git] / lnet / lnet / net_fault.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2014, 2016, Intel Corporation.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  * Lustre is a trademark of Sun Microsystems, Inc.
29  *
30  * lnet/lnet/net_fault.c
31  *
32  * Lustre network fault simulation
33  *
34  * Author: liang.zhen@intel.com
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <lnet/lib-lnet.h>
40 #include <lnet/lnetctl.h>
41
42 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
43                                  LNET_GET_BIT | LNET_REPLY_BIT)
44
45 struct lnet_drop_rule {
46         /** link chain on the_lnet.ln_drop_rules */
47         struct list_head        dr_link;
48         /** attributes of this rule */
49         struct lnet_fault_attr  dr_attr;
50         /** lock to protect \a dr_drop_at and \a dr_stat */
51         spinlock_t              dr_lock;
52         /**
53          * the message sequence to drop, which means message is dropped when
54          * dr_stat.drs_count == dr_drop_at
55          */
56         unsigned long           dr_drop_at;
57         /**
58          * seconds to drop the next message, it's exclusive with dr_drop_at
59          */
60         cfs_time_t              dr_drop_time;
61         /** baseline to caculate dr_drop_time */
62         cfs_time_t              dr_time_base;
63         /** statistic of dropped messages */
64         struct lnet_fault_stat  dr_stat;
65 };
66
67 static bool
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
69 {
70         if (nid == msg_nid || nid == LNET_NID_ANY)
71                 return true;
72
73         if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
74                 return false;
75
76         /* 255.255.255.255@net is wildcard for all addresses in a network */
77         return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
78 }
79
80 static bool
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82                       lnet_nid_t dst, unsigned int type, unsigned int portal)
83 {
84         if (!lnet_fault_nid_match(attr->fa_src, src) ||
85             !lnet_fault_nid_match(attr->fa_dst, dst))
86                 return false;
87
88         if (!(attr->fa_msg_mask & (1 << type)))
89                 return false;
90
91         /* NB: ACK and REPLY have no portal, but they should have been
92          * rejected by message mask */
93         if (attr->fa_ptl_mask != 0 && /* has portal filter */
94             !(attr->fa_ptl_mask & (1ULL << portal)))
95                 return false;
96
97         return true;
98 }
99
100 static int
101 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
102 {
103         if (attr->fa_msg_mask == 0)
104                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
105
106         if (attr->fa_ptl_mask == 0) /* no portal filter */
107                 return 0;
108
109         /* NB: only PUT and GET can be filtered if portal filter has been set */
110         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
111         if (attr->fa_msg_mask == 0) {
112                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
113                        attr->fa_msg_mask);
114                 return -EINVAL;
115         }
116         return 0;
117 }
118
119 static void
120 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
121 {
122         /* NB: fs_counter is NOT updated by this function */
123         switch (type) {
124         case LNET_MSG_PUT:
125                 stat->fs_put++;
126                 return;
127         case LNET_MSG_ACK:
128                 stat->fs_ack++;
129                 return;
130         case LNET_MSG_GET:
131                 stat->fs_get++;
132                 return;
133         case LNET_MSG_REPLY:
134                 stat->fs_reply++;
135                 return;
136         }
137 }
138
139 /**
140  * LNet message drop simulation
141  */
142
143 /**
144  * Add a new drop rule to LNet
145  * There is no check for duplicated drop rule, all rules will be checked for
146  * incoming message.
147  */
148 static int
149 lnet_drop_rule_add(struct lnet_fault_attr *attr)
150 {
151         struct lnet_drop_rule *rule;
152         ENTRY;
153
154         if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
155                 CDEBUG(D_NET,
156                        "please provide either drop rate or drop interval, "
157                        "but not both at the same time %d/%d\n",
158                        attr->u.drop.da_rate, attr->u.drop.da_interval);
159                 RETURN(-EINVAL);
160         }
161
162         if (lnet_fault_attr_validate(attr) != 0)
163                 RETURN(-EINVAL);
164
165         CFS_ALLOC_PTR(rule);
166         if (rule == NULL)
167                 RETURN(-ENOMEM);
168
169         spin_lock_init(&rule->dr_lock);
170
171         rule->dr_attr = *attr;
172         if (attr->u.drop.da_interval != 0) {
173                 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
174                 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
175                                                     attr->u.drop.da_interval);
176         } else {
177                 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
178         }
179
180         lnet_net_lock(LNET_LOCK_EX);
181         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
182         lnet_net_unlock(LNET_LOCK_EX);
183
184         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
185                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
186                attr->u.drop.da_rate, attr->u.drop.da_interval);
187         RETURN(0);
188 }
189
190 /**
191  * Remove matched drop rules from lnet, all rules that can match \a src and
192  * \a dst will be removed.
193  * If \a src is zero, then all rules have \a dst as destination will be remove
194  * If \a dst is zero, then all rules have \a src as source will be removed
195  * If both of them are zero, all rules will be removed
196  */
197 static int
198 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
199 {
200         struct lnet_drop_rule *rule;
201         struct lnet_drop_rule *tmp;
202         struct list_head       zombies;
203         int                    n = 0;
204         ENTRY;
205
206         INIT_LIST_HEAD(&zombies);
207
208         lnet_net_lock(LNET_LOCK_EX);
209         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
210                 if (rule->dr_attr.fa_src != src && src != 0)
211                         continue;
212
213                 if (rule->dr_attr.fa_dst != dst && dst != 0)
214                         continue;
215
216                 list_move(&rule->dr_link, &zombies);
217         }
218         lnet_net_unlock(LNET_LOCK_EX);
219
220         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
221                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
222                        libcfs_nid2str(rule->dr_attr.fa_src),
223                        libcfs_nid2str(rule->dr_attr.fa_dst),
224                        rule->dr_attr.u.drop.da_rate,
225                        rule->dr_attr.u.drop.da_interval);
226
227                 list_del(&rule->dr_link);
228                 CFS_FREE_PTR(rule);
229                 n++;
230         }
231
232         RETURN(n);
233 }
234
235 /**
236  * List drop rule at position of \a pos
237  */
238 static int
239 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
240                     struct lnet_fault_stat *stat)
241 {
242         struct lnet_drop_rule *rule;
243         int                    cpt;
244         int                    i = 0;
245         int                    rc = -ENOENT;
246         ENTRY;
247
248         cpt = lnet_net_lock_current();
249         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
250                 if (i++ < pos)
251                         continue;
252
253                 spin_lock(&rule->dr_lock);
254                 *attr = rule->dr_attr;
255                 *stat = rule->dr_stat;
256                 spin_unlock(&rule->dr_lock);
257                 rc = 0;
258                 break;
259         }
260
261         lnet_net_unlock(cpt);
262         RETURN(rc);
263 }
264
265 /**
266  * reset counters for all drop rules
267  */
268 static void
269 lnet_drop_rule_reset(void)
270 {
271         struct lnet_drop_rule *rule;
272         int                    cpt;
273         ENTRY;
274
275         cpt = lnet_net_lock_current();
276
277         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
278                 struct lnet_fault_attr *attr = &rule->dr_attr;
279
280                 spin_lock(&rule->dr_lock);
281
282                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
283                 if (attr->u.drop.da_rate != 0) {
284                         rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
285                 } else {
286                         rule->dr_drop_time = cfs_time_shift(cfs_rand() %
287                                                 attr->u.drop.da_interval);
288                         rule->dr_time_base = cfs_time_shift(attr->u.drop.
289                                                                   da_interval);
290                 }
291                 spin_unlock(&rule->dr_lock);
292         }
293
294         lnet_net_unlock(cpt);
295         EXIT;
296 }
297
298 /**
299  * check source/destination NID, portal, message type and drop rate,
300  * decide whether should drop this message or not
301  */
302 static bool
303 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
304                 lnet_nid_t dst, unsigned int type, unsigned int portal)
305 {
306         struct lnet_fault_attr  *attr = &rule->dr_attr;
307         bool                     drop;
308
309         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
310                 return false;
311
312         /* match this rule, check drop rate now */
313         spin_lock(&rule->dr_lock);
314         if (rule->dr_drop_time != 0) { /* time based drop */
315                 cfs_time_t now = cfs_time_current();
316
317                 rule->dr_stat.fs_count++;
318                 drop = cfs_time_aftereq(now, rule->dr_drop_time);
319                 if (drop) {
320                         if (cfs_time_after(now, rule->dr_time_base))
321                                 rule->dr_time_base = now;
322
323                         rule->dr_drop_time = rule->dr_time_base +
324                                              cfs_time_seconds(cfs_rand() %
325                                                 attr->u.drop.da_interval);
326                         rule->dr_time_base += cfs_time_seconds(attr->u.drop.
327                                                                da_interval);
328
329                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : "
330                                       CFS_TIME_T"\n",
331                                       libcfs_nid2str(attr->fa_src),
332                                       libcfs_nid2str(attr->fa_dst),
333                                       rule->dr_drop_time);
334                 }
335
336         } else { /* rate based drop */
337                 __u64 count;
338
339                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
340                 count = rule->dr_stat.fs_count;
341                 if (do_div(count, attr->u.drop.da_rate) == 0) {
342                         rule->dr_drop_at = rule->dr_stat.fs_count +
343                                            cfs_rand() % attr->u.drop.da_rate;
344                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
345                                libcfs_nid2str(attr->fa_src),
346                                libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
347                 }
348         }
349
350         if (drop) { /* drop this message, update counters */
351                 lnet_fault_stat_inc(&rule->dr_stat, type);
352                 rule->dr_stat.u.drop.ds_dropped++;
353         }
354
355         spin_unlock(&rule->dr_lock);
356         return drop;
357 }
358
359 /**
360  * Check if message from \a src to \a dst can match any existed drop rule
361  */
362 bool
363 lnet_drop_rule_match(struct lnet_hdr *hdr)
364 {
365         struct lnet_drop_rule   *rule;
366         lnet_nid_t               src = le64_to_cpu(hdr->src_nid);
367         lnet_nid_t               dst = le64_to_cpu(hdr->dest_nid);
368         unsigned int             typ = le32_to_cpu(hdr->type);
369         unsigned int             ptl = -1;
370         bool                     drop = false;
371         int                      cpt;
372
373         /* NB: if Portal is specified, then only PUT and GET will be
374          * filtered by drop rule */
375         if (typ == LNET_MSG_PUT)
376                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
377         else if (typ == LNET_MSG_GET)
378                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
379
380         cpt = lnet_net_lock_current();
381         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
382                 drop = drop_rule_match(rule, src, dst, typ, ptl);
383                 if (drop)
384                         break;
385         }
386
387         lnet_net_unlock(cpt);
388         return drop;
389 }
390
391 /**
392  * LNet Delay Simulation
393  */
394 /** timestamp (second) to send delayed message */
395 #define msg_delay_send           msg_ev.hdr_data
396
397 struct lnet_delay_rule {
398         /** link chain on the_lnet.ln_delay_rules */
399         struct list_head        dl_link;
400         /** link chain on delay_dd.dd_sched_rules */
401         struct list_head        dl_sched_link;
402         /** attributes of this rule */
403         struct lnet_fault_attr  dl_attr;
404         /** lock to protect \a below members */
405         spinlock_t              dl_lock;
406         /** refcount of delay rule */
407         atomic_t                dl_refcount;
408         /**
409          * the message sequence to delay, which means message is delayed when
410          * dl_stat.fs_count == dl_delay_at
411          */
412         unsigned long           dl_delay_at;
413         /**
414          * seconds to delay the next message, it's exclusive with dl_delay_at
415          */
416         cfs_time_t              dl_delay_time;
417         /** baseline to caculate dl_delay_time */
418         cfs_time_t              dl_time_base;
419         /** jiffies to send the next delayed message */
420         unsigned long           dl_msg_send;
421         /** delayed message list */
422         struct list_head        dl_msg_list;
423         /** statistic of delayed messages */
424         struct lnet_fault_stat  dl_stat;
425         /** timer to wakeup delay_daemon */
426         struct timer_list       dl_timer;
427 };
428
429 struct delay_daemon_data {
430         /** serialise rule add/remove */
431         struct mutex            dd_mutex;
432         /** protect rules on \a dd_sched_rules */
433         spinlock_t              dd_lock;
434         /** scheduled delay rules (by timer) */
435         struct list_head        dd_sched_rules;
436         /** deamon thread sleeps at here */
437         wait_queue_head_t       dd_waitq;
438         /** controler (lctl command) wait at here */
439         wait_queue_head_t       dd_ctl_waitq;
440         /** deamon is running */
441         unsigned int            dd_running;
442         /** deamon stopped */
443         unsigned int            dd_stopped;
444 };
445
446 static struct delay_daemon_data delay_dd;
447
448 static cfs_time_t
449 round_timeout(cfs_time_t timeout)
450 {
451         return cfs_time_seconds((unsigned int)
452                         cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
453 }
454
455 static void
456 delay_rule_decref(struct lnet_delay_rule *rule)
457 {
458         if (atomic_dec_and_test(&rule->dl_refcount)) {
459                 LASSERT(list_empty(&rule->dl_sched_link));
460                 LASSERT(list_empty(&rule->dl_msg_list));
461                 LASSERT(list_empty(&rule->dl_link));
462
463                 CFS_FREE_PTR(rule);
464         }
465 }
466
467 /**
468  * check source/destination NID, portal, message type and delay rate,
469  * decide whether should delay this message or not
470  */
471 static bool
472 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
473                 lnet_nid_t dst, unsigned int type, unsigned int portal,
474                 struct lnet_msg *msg)
475 {
476         struct lnet_fault_attr  *attr = &rule->dl_attr;
477         bool                     delay;
478
479         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
480                 return false;
481
482         /* match this rule, check delay rate now */
483         spin_lock(&rule->dl_lock);
484         if (rule->dl_delay_time != 0) { /* time based delay */
485                 cfs_time_t now = cfs_time_current();
486
487                 rule->dl_stat.fs_count++;
488                 delay = cfs_time_aftereq(now, rule->dl_delay_time);
489                 if (delay) {
490                         if (cfs_time_after(now, rule->dl_time_base))
491                                 rule->dl_time_base = now;
492
493                         rule->dl_delay_time = rule->dl_time_base +
494                                              cfs_time_seconds(cfs_rand() %
495                                                 attr->u.delay.la_interval);
496                         rule->dl_time_base += cfs_time_seconds(attr->u.delay.
497                                                                la_interval);
498
499                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : "
500                                       CFS_TIME_T"\n",
501                                       libcfs_nid2str(attr->fa_src),
502                                       libcfs_nid2str(attr->fa_dst),
503                                       rule->dl_delay_time);
504                 }
505
506         } else { /* rate based delay */
507                 __u64 count;
508
509                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
510                 /* generate the next random rate sequence */
511                 count = rule->dl_stat.fs_count;
512                 if (do_div(count, attr->u.delay.la_rate) == 0) {
513                         rule->dl_delay_at = rule->dl_stat.fs_count +
514                                             cfs_rand() % attr->u.delay.la_rate;
515                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
516                                libcfs_nid2str(attr->fa_src),
517                                libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
518                 }
519         }
520
521         if (!delay) {
522                 spin_unlock(&rule->dl_lock);
523                 return false;
524         }
525
526         /* delay this message, update counters */
527         lnet_fault_stat_inc(&rule->dl_stat, type);
528         rule->dl_stat.u.delay.ls_delayed++;
529
530         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
531         msg->msg_delay_send = round_timeout(
532                         cfs_time_shift(attr->u.delay.la_latency));
533         if (rule->dl_msg_send == -1) {
534                 rule->dl_msg_send = msg->msg_delay_send;
535                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
536         }
537
538         spin_unlock(&rule->dl_lock);
539         return true;
540 }
541
542 /**
543  * check if \a msg can match any Delay Rule, receiving of this message
544  * will be delayed if there is a match.
545  */
546 bool
547 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
548 {
549         struct lnet_delay_rule  *rule;
550         lnet_nid_t               src = le64_to_cpu(hdr->src_nid);
551         lnet_nid_t               dst = le64_to_cpu(hdr->dest_nid);
552         unsigned int             typ = le32_to_cpu(hdr->type);
553         unsigned int             ptl = -1;
554
555         /* NB: called with hold of lnet_net_lock */
556
557         /* NB: if Portal is specified, then only PUT and GET will be
558          * filtered by delay rule */
559         if (typ == LNET_MSG_PUT)
560                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
561         else if (typ == LNET_MSG_GET)
562                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
563
564         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
565                 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
566                         return true;
567         }
568
569         return false;
570 }
571
572 /** check out delayed messages for send */
573 static void
574 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
575                   struct list_head *msg_list)
576 {
577         struct lnet_msg *msg;
578         struct lnet_msg *tmp;
579         unsigned long    now = cfs_time_current();
580
581         if (!all && rule->dl_msg_send > now)
582                 return;
583
584         spin_lock(&rule->dl_lock);
585         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
586                 if (!all && msg->msg_delay_send > now)
587                         break;
588
589                 msg->msg_delay_send = 0;
590                 list_move_tail(&msg->msg_list, msg_list);
591         }
592
593         if (list_empty(&rule->dl_msg_list)) {
594                 del_timer(&rule->dl_timer);
595                 rule->dl_msg_send = -1;
596
597         } else if (!list_empty(msg_list)) {
598                 /* dequeued some timedout messages, update timer for the
599                  * next delayed message on rule */
600                 msg = list_entry(rule->dl_msg_list.next,
601                                  struct lnet_msg, msg_list);
602                 rule->dl_msg_send = msg->msg_delay_send;
603                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
604         }
605         spin_unlock(&rule->dl_lock);
606 }
607
608 static void
609 delayed_msg_process(struct list_head *msg_list, bool drop)
610 {
611         struct lnet_msg *msg;
612
613         while (!list_empty(msg_list)) {
614                 struct lnet_ni *ni;
615                 int             cpt;
616                 int             rc;
617
618                 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
619                 LASSERT(msg->msg_rxpeer != NULL);
620                 LASSERT(msg->msg_rxni != NULL);
621
622                 ni = msg->msg_rxni;
623                 cpt = msg->msg_rx_cpt;
624
625                 list_del_init(&msg->msg_list);
626                 if (drop) {
627                         rc = -ECANCELED;
628
629                 } else if (!msg->msg_routing) {
630                         rc = lnet_parse_local(ni, msg);
631                         if (rc == 0)
632                                 continue;
633
634                 } else {
635                         lnet_net_lock(cpt);
636                         rc = lnet_parse_forward_locked(ni, msg);
637                         lnet_net_unlock(cpt);
638
639                         switch (rc) {
640                         case LNET_CREDIT_OK:
641                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
642                                              0, msg->msg_len, msg->msg_len);
643                         case LNET_CREDIT_WAIT:
644                                 continue;
645                         default: /* failures */
646                                 break;
647                         }
648                 }
649
650                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
651                 lnet_finalize(msg, rc);
652         }
653 }
654
655 /**
656  * Process delayed messages for scheduled rules
657  * This function can either be called by delay_rule_daemon, or by lnet_finalise
658  */
659 void
660 lnet_delay_rule_check(void)
661 {
662         struct lnet_delay_rule  *rule;
663         struct list_head         msgs;
664
665         INIT_LIST_HEAD(&msgs);
666         while (1) {
667                 if (list_empty(&delay_dd.dd_sched_rules))
668                         break;
669
670                 spin_lock_bh(&delay_dd.dd_lock);
671                 if (list_empty(&delay_dd.dd_sched_rules)) {
672                         spin_unlock_bh(&delay_dd.dd_lock);
673                         break;
674                 }
675
676                 rule = list_entry(delay_dd.dd_sched_rules.next,
677                                   struct lnet_delay_rule, dl_sched_link);
678                 list_del_init(&rule->dl_sched_link);
679                 spin_unlock_bh(&delay_dd.dd_lock);
680
681                 delayed_msg_check(rule, false, &msgs);
682                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
683         }
684
685         if (!list_empty(&msgs))
686                 delayed_msg_process(&msgs, false);
687 }
688
689 /** deamon thread to handle delayed messages */
690 static int
691 lnet_delay_rule_daemon(void *arg)
692 {
693         delay_dd.dd_running = 1;
694         wake_up(&delay_dd.dd_ctl_waitq);
695
696         while (delay_dd.dd_running) {
697                 wait_event_interruptible(delay_dd.dd_waitq,
698                                          !delay_dd.dd_running ||
699                                          !list_empty(&delay_dd.dd_sched_rules));
700                 lnet_delay_rule_check();
701         }
702
703         /* in case more rules have been enqueued after my last check */
704         lnet_delay_rule_check();
705         delay_dd.dd_stopped = 1;
706         wake_up(&delay_dd.dd_ctl_waitq);
707
708         return 0;
709 }
710
711 static void
712 delay_timer_cb(unsigned long arg)
713 {
714         struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
715
716         spin_lock_bh(&delay_dd.dd_lock);
717         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
718                 atomic_inc(&rule->dl_refcount);
719                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
720                 wake_up(&delay_dd.dd_waitq);
721         }
722         spin_unlock_bh(&delay_dd.dd_lock);
723 }
724
725 /**
726  * Add a new delay rule to LNet
727  * There is no check for duplicated delay rule, all rules will be checked for
728  * incoming message.
729  */
730 int
731 lnet_delay_rule_add(struct lnet_fault_attr *attr)
732 {
733         struct lnet_delay_rule *rule;
734         int                     rc = 0;
735         ENTRY;
736
737         if (!((attr->u.delay.la_rate == 0) ^
738               (attr->u.delay.la_interval == 0))) {
739                 CDEBUG(D_NET,
740                        "please provide either delay rate or delay interval, "
741                        "but not both at the same time %d/%d\n",
742                        attr->u.delay.la_rate, attr->u.delay.la_interval);
743                 RETURN(-EINVAL);
744         }
745
746         if (attr->u.delay.la_latency == 0) {
747                 CDEBUG(D_NET, "delay latency cannot be zero\n");
748                 RETURN(-EINVAL);
749         }
750
751         if (lnet_fault_attr_validate(attr) != 0)
752                 RETURN(-EINVAL);
753
754         CFS_ALLOC_PTR(rule);
755         if (rule == NULL)
756                 RETURN(-ENOMEM);
757
758         mutex_lock(&delay_dd.dd_mutex);
759         if (!delay_dd.dd_running) {
760                 struct task_struct *task;
761
762                 /* NB: although LND threads will process delayed message
763                  * in lnet_finalize, but there is no guarantee that LND
764                  * threads will be waken up if no other message needs to
765                  * be handled.
766                  * Only one daemon thread, performance is not the concern
767                  * of this simualation module.
768                  */
769                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
770                 if (IS_ERR(task)) {
771                         rc = PTR_ERR(task);
772                         GOTO(failed, rc);
773                 }
774                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
775         }
776
777         init_timer(&rule->dl_timer);
778         rule->dl_timer.function = delay_timer_cb;
779         rule->dl_timer.data = (unsigned long)rule;
780
781         spin_lock_init(&rule->dl_lock);
782         INIT_LIST_HEAD(&rule->dl_msg_list);
783         INIT_LIST_HEAD(&rule->dl_sched_link);
784
785         rule->dl_attr = *attr;
786         if (attr->u.delay.la_interval != 0) {
787                 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
788                 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
789                                                      attr->u.delay.la_interval);
790         } else {
791                 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
792         }
793
794         rule->dl_msg_send = -1;
795
796         lnet_net_lock(LNET_LOCK_EX);
797         atomic_set(&rule->dl_refcount, 1);
798         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
799         lnet_net_unlock(LNET_LOCK_EX);
800
801         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
802                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
803                attr->u.delay.la_rate);
804
805         mutex_unlock(&delay_dd.dd_mutex);
806         RETURN(0);
807  failed:
808         mutex_unlock(&delay_dd.dd_mutex);
809         CFS_FREE_PTR(rule);
810         return rc;
811 }
812
813 /**
814  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
815  * and \a dst are zero, all rules will be removed, otherwise only matched rules
816  * will be removed.
817  * If \a src is zero, then all rules have \a dst as destination will be remove
818  * If \a dst is zero, then all rules have \a src as source will be removed
819  *
820  * When a delay rule is removed, all delayed messages of this rule will be
821  * processed immediately.
822  */
823 int
824 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
825 {
826         struct lnet_delay_rule *rule;
827         struct lnet_delay_rule  *tmp;
828         struct list_head        rule_list;
829         struct list_head        msg_list;
830         int                     n = 0;
831         bool                    cleanup;
832         ENTRY;
833
834         INIT_LIST_HEAD(&rule_list);
835         INIT_LIST_HEAD(&msg_list);
836
837         if (shutdown)
838                 src = dst = 0;
839
840         mutex_lock(&delay_dd.dd_mutex);
841         lnet_net_lock(LNET_LOCK_EX);
842
843         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
844                 if (rule->dl_attr.fa_src != src && src != 0)
845                         continue;
846
847                 if (rule->dl_attr.fa_dst != dst && dst != 0)
848                         continue;
849
850                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
851                        libcfs_nid2str(rule->dl_attr.fa_src),
852                        libcfs_nid2str(rule->dl_attr.fa_dst),
853                        rule->dl_attr.u.delay.la_rate,
854                        rule->dl_attr.u.delay.la_interval);
855                 /* refcount is taken over by rule_list */
856                 list_move(&rule->dl_link, &rule_list);
857         }
858
859         /* check if we need to shutdown delay_daemon */
860         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
861                   !list_empty(&rule_list);
862         lnet_net_unlock(LNET_LOCK_EX);
863
864         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
865                 list_del_init(&rule->dl_link);
866
867                 del_timer_sync(&rule->dl_timer);
868                 delayed_msg_check(rule, true, &msg_list);
869                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
870                 n++;
871         }
872
873         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
874                 LASSERT(delay_dd.dd_running);
875                 delay_dd.dd_running = 0;
876                 wake_up(&delay_dd.dd_waitq);
877
878                 while (!delay_dd.dd_stopped)
879                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
880         }
881         mutex_unlock(&delay_dd.dd_mutex);
882
883         if (!list_empty(&msg_list))
884                 delayed_msg_process(&msg_list, shutdown);
885
886         RETURN(n);
887 }
888
889 /**
890  * List Delay Rule at position of \a pos
891  */
892 int
893 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
894                     struct lnet_fault_stat *stat)
895 {
896         struct lnet_delay_rule *rule;
897         int                     cpt;
898         int                     i = 0;
899         int                     rc = -ENOENT;
900         ENTRY;
901
902         cpt = lnet_net_lock_current();
903         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
904                 if (i++ < pos)
905                         continue;
906
907                 spin_lock(&rule->dl_lock);
908                 *attr = rule->dl_attr;
909                 *stat = rule->dl_stat;
910                 spin_unlock(&rule->dl_lock);
911                 rc = 0;
912                 break;
913         }
914
915         lnet_net_unlock(cpt);
916         RETURN(rc);
917 }
918
919 /**
920  * reset counters for all Delay Rules
921  */
922 void
923 lnet_delay_rule_reset(void)
924 {
925         struct lnet_delay_rule *rule;
926         int                     cpt;
927         ENTRY;
928
929         cpt = lnet_net_lock_current();
930
931         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
932                 struct lnet_fault_attr *attr = &rule->dl_attr;
933
934                 spin_lock(&rule->dl_lock);
935
936                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
937                 if (attr->u.delay.la_rate != 0) {
938                         rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
939                 } else {
940                         rule->dl_delay_time = cfs_time_shift(cfs_rand() %
941                                                 attr->u.delay.la_interval);
942                         rule->dl_time_base = cfs_time_shift(attr->u.delay.
943                                                                   la_interval);
944                 }
945                 spin_unlock(&rule->dl_lock);
946         }
947
948         lnet_net_unlock(cpt);
949         EXIT;
950 }
951
952 int
953 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
954 {
955         struct lnet_fault_attr *attr;
956         struct lnet_fault_stat *stat;
957
958         attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
959
960         switch (opc) {
961         default:
962                 return -EINVAL;
963
964         case LNET_CTL_DROP_ADD:
965                 if (attr == NULL)
966                         return -EINVAL;
967
968                 return lnet_drop_rule_add(attr);
969
970         case LNET_CTL_DROP_DEL:
971                 if (attr == NULL)
972                         return -EINVAL;
973
974                 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
975                                                      attr->fa_dst);
976                 return 0;
977
978         case LNET_CTL_DROP_RESET:
979                 lnet_drop_rule_reset();
980                 return 0;
981
982         case LNET_CTL_DROP_LIST:
983                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
984                 if (attr == NULL || stat == NULL)
985                         return -EINVAL;
986
987                 return lnet_drop_rule_list(data->ioc_count, attr, stat);
988
989         case LNET_CTL_DELAY_ADD:
990                 if (attr == NULL)
991                         return -EINVAL;
992
993                 return lnet_delay_rule_add(attr);
994
995         case LNET_CTL_DELAY_DEL:
996                 if (attr == NULL)
997                         return -EINVAL;
998
999                 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
1000                                                       attr->fa_dst, false);
1001                 return 0;
1002
1003         case LNET_CTL_DELAY_RESET:
1004                 lnet_delay_rule_reset();
1005                 return 0;
1006
1007         case LNET_CTL_DELAY_LIST:
1008                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1009                 if (attr == NULL || stat == NULL)
1010                         return -EINVAL;
1011
1012                 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1013         }
1014 }
1015
1016 int
1017 lnet_fault_init(void)
1018 {
1019         CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1020         CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1021         CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1022         CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1023
1024         mutex_init(&delay_dd.dd_mutex);
1025         spin_lock_init(&delay_dd.dd_lock);
1026         init_waitqueue_head(&delay_dd.dd_waitq);
1027         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1028         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1029
1030         return 0;
1031 }
1032
1033 void
1034 lnet_fault_fini(void)
1035 {
1036         lnet_drop_rule_del(0, 0);
1037         lnet_delay_rule_del(0, 0, true);
1038
1039         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1040         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1041         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1042 }