Whamcloud - gitweb
04c98d5f40d11cd203737b9bec1ba82ad75fff8d
[fs/lustre-release.git] / lnet / lnet / net_fault.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2014, 2017, Intel Corporation.
25  */
26 /*
27  * This file is part of Lustre, http://www.lustre.org/
28  * Lustre is a trademark of Sun Microsystems, Inc.
29  *
30  * lnet/lnet/net_fault.c
31  *
32  * Lustre network fault simulation
33  *
34  * Author: liang.zhen@intel.com
35  */
36
37 #define DEBUG_SUBSYSTEM S_LNET
38
39 #include <lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
41
42 #define LNET_MSG_MASK           (LNET_PUT_BIT | LNET_ACK_BIT | \
43                                  LNET_GET_BIT | LNET_REPLY_BIT)
44
45 struct lnet_drop_rule {
46         /** link chain on the_lnet.ln_drop_rules */
47         struct list_head        dr_link;
48         /** attributes of this rule */
49         struct lnet_fault_attr  dr_attr;
50         /** lock to protect \a dr_drop_at and \a dr_stat */
51         spinlock_t              dr_lock;
52         /**
53          * the message sequence to drop, which means message is dropped when
54          * dr_stat.drs_count == dr_drop_at
55          */
56         unsigned long           dr_drop_at;
57         /**
58          * seconds to drop the next message, it's exclusive with dr_drop_at
59          */
60         time64_t                dr_drop_time;
61         /** baseline to caculate dr_drop_time */
62         time64_t                dr_time_base;
63         /** statistic of dropped messages */
64         struct lnet_fault_stat  dr_stat;
65 };
66
67 static bool
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
69 {
70         if (nid == msg_nid || nid == LNET_NID_ANY)
71                 return true;
72
73         if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
74                 return false;
75
76         /* 255.255.255.255@net is wildcard for all addresses in a network */
77         return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
78 }
79
80 static bool
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82                       lnet_nid_t dst, unsigned int type, unsigned int portal)
83 {
84         if (!lnet_fault_nid_match(attr->fa_src, src) ||
85             !lnet_fault_nid_match(attr->fa_dst, dst))
86                 return false;
87
88         if (!(attr->fa_msg_mask & (1 << type)))
89                 return false;
90
91         /* NB: ACK and REPLY have no portal, but they should have been
92          * rejected by message mask */
93         if (attr->fa_ptl_mask != 0 && /* has portal filter */
94             !(attr->fa_ptl_mask & (1ULL << portal)))
95                 return false;
96
97         return true;
98 }
99
100 static int
101 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
102 {
103         if (attr->fa_msg_mask == 0)
104                 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
105
106         if (attr->fa_ptl_mask == 0) /* no portal filter */
107                 return 0;
108
109         /* NB: only PUT and GET can be filtered if portal filter has been set */
110         attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
111         if (attr->fa_msg_mask == 0) {
112                 CDEBUG(D_NET, "can't find valid message type bits %x\n",
113                        attr->fa_msg_mask);
114                 return -EINVAL;
115         }
116         return 0;
117 }
118
119 static void
120 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
121 {
122         /* NB: fs_counter is NOT updated by this function */
123         switch (type) {
124         case LNET_MSG_PUT:
125                 stat->fs_put++;
126                 return;
127         case LNET_MSG_ACK:
128                 stat->fs_ack++;
129                 return;
130         case LNET_MSG_GET:
131                 stat->fs_get++;
132                 return;
133         case LNET_MSG_REPLY:
134                 stat->fs_reply++;
135                 return;
136         }
137 }
138
139 /**
140  * LNet message drop simulation
141  */
142
143 /**
144  * Add a new drop rule to LNet
145  * There is no check for duplicated drop rule, all rules will be checked for
146  * incoming message.
147  */
148 static int
149 lnet_drop_rule_add(struct lnet_fault_attr *attr)
150 {
151         struct lnet_drop_rule *rule;
152         ENTRY;
153
154         if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
155                 CDEBUG(D_NET,
156                        "please provide either drop rate or drop interval, "
157                        "but not both at the same time %d/%d\n",
158                        attr->u.drop.da_rate, attr->u.drop.da_interval);
159                 RETURN(-EINVAL);
160         }
161
162         if (lnet_fault_attr_validate(attr) != 0)
163                 RETURN(-EINVAL);
164
165         CFS_ALLOC_PTR(rule);
166         if (rule == NULL)
167                 RETURN(-ENOMEM);
168
169         spin_lock_init(&rule->dr_lock);
170
171         rule->dr_attr = *attr;
172         if (attr->u.drop.da_interval != 0) {
173                 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
174                 rule->dr_drop_time = ktime_get_seconds() +
175                                      cfs_rand() % attr->u.drop.da_interval;
176         } else {
177                 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
178         }
179
180         lnet_net_lock(LNET_LOCK_EX);
181         list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
182         lnet_net_unlock(LNET_LOCK_EX);
183
184         CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
185                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
186                attr->u.drop.da_rate, attr->u.drop.da_interval);
187         RETURN(0);
188 }
189
190 /**
191  * Remove matched drop rules from lnet, all rules that can match \a src and
192  * \a dst will be removed.
193  * If \a src is zero, then all rules have \a dst as destination will be remove
194  * If \a dst is zero, then all rules have \a src as source will be removed
195  * If both of them are zero, all rules will be removed
196  */
197 static int
198 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
199 {
200         struct lnet_drop_rule *rule;
201         struct lnet_drop_rule *tmp;
202         struct list_head       zombies;
203         int                    n = 0;
204         ENTRY;
205
206         INIT_LIST_HEAD(&zombies);
207
208         lnet_net_lock(LNET_LOCK_EX);
209         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
210                 if (rule->dr_attr.fa_src != src && src != 0)
211                         continue;
212
213                 if (rule->dr_attr.fa_dst != dst && dst != 0)
214                         continue;
215
216                 list_move(&rule->dr_link, &zombies);
217         }
218         lnet_net_unlock(LNET_LOCK_EX);
219
220         list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
221                 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
222                        libcfs_nid2str(rule->dr_attr.fa_src),
223                        libcfs_nid2str(rule->dr_attr.fa_dst),
224                        rule->dr_attr.u.drop.da_rate,
225                        rule->dr_attr.u.drop.da_interval);
226
227                 list_del(&rule->dr_link);
228                 CFS_FREE_PTR(rule);
229                 n++;
230         }
231
232         RETURN(n);
233 }
234
235 /**
236  * List drop rule at position of \a pos
237  */
238 static int
239 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
240                     struct lnet_fault_stat *stat)
241 {
242         struct lnet_drop_rule *rule;
243         int                    cpt;
244         int                    i = 0;
245         int                    rc = -ENOENT;
246         ENTRY;
247
248         cpt = lnet_net_lock_current();
249         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
250                 if (i++ < pos)
251                         continue;
252
253                 spin_lock(&rule->dr_lock);
254                 *attr = rule->dr_attr;
255                 *stat = rule->dr_stat;
256                 spin_unlock(&rule->dr_lock);
257                 rc = 0;
258                 break;
259         }
260
261         lnet_net_unlock(cpt);
262         RETURN(rc);
263 }
264
265 /**
266  * reset counters for all drop rules
267  */
268 static void
269 lnet_drop_rule_reset(void)
270 {
271         struct lnet_drop_rule *rule;
272         int                    cpt;
273         ENTRY;
274
275         cpt = lnet_net_lock_current();
276
277         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
278                 struct lnet_fault_attr *attr = &rule->dr_attr;
279
280                 spin_lock(&rule->dr_lock);
281
282                 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
283                 if (attr->u.drop.da_rate != 0) {
284                         rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
285                 } else {
286                         rule->dr_drop_time = ktime_get_seconds() +
287                                              cfs_rand() % attr->u.drop.da_interval;
288                         rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
289                 }
290                 spin_unlock(&rule->dr_lock);
291         }
292
293         lnet_net_unlock(cpt);
294         EXIT;
295 }
296
297 static void
298 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
299 {
300         unsigned int random;
301         int choice;
302         int delta;
303         int best_delta;
304         int i;
305
306         /* assign a random failure */
307         random = cfs_rand();
308         choice = random % (LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
309         if (choice == 0)
310                 choice++;
311
312         if (mask == HSTATUS_RANDOM) {
313                 *hstatus = choice;
314                 return;
315         }
316
317         if (mask & (1 << choice)) {
318                 *hstatus = choice;
319                 return;
320         }
321
322         /* round to the closest ON bit */
323         i = HSTATUS_END;
324         best_delta = HSTATUS_END;
325         while (i > 0) {
326                 if (mask & (1 << i)) {
327                         delta = choice - i;
328                         if (delta < 0)
329                                 delta *= -1;
330                         if (delta < best_delta) {
331                                 best_delta = delta;
332                                 choice = i;
333                         }
334                 }
335                 i--;
336         }
337
338         *hstatus = choice;
339 }
340
341 /**
342  * check source/destination NID, portal, message type and drop rate,
343  * decide whether should drop this message or not
344  */
345 static bool
346 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
347                 lnet_nid_t dst, unsigned int type, unsigned int portal,
348                 enum lnet_msg_hstatus *hstatus)
349 {
350         struct lnet_fault_attr  *attr = &rule->dr_attr;
351         bool                     drop;
352
353         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
354                 return false;
355
356         /*
357          * if we're trying to match a health status error but it hasn't
358          * been set in the rule, then don't match
359          */
360         if ((hstatus && !attr->u.drop.da_health_error_mask) ||
361             (!hstatus && attr->u.drop.da_health_error_mask))
362                 return false;
363
364         /* match this rule, check drop rate now */
365         spin_lock(&rule->dr_lock);
366         if (attr->u.drop.da_random) {
367                 int value = cfs_rand() % attr->u.drop.da_interval;
368                 if (value >= (attr->u.drop.da_interval / 2))
369                         drop = true;
370                 else
371                         drop = false;
372         } else if (rule->dr_drop_time != 0) { /* time based drop */
373                 time64_t now = ktime_get_seconds();
374
375                 rule->dr_stat.fs_count++;
376                 drop = now >= rule->dr_drop_time;
377                 if (drop) {
378                         if (now > rule->dr_time_base)
379                                 rule->dr_time_base = now;
380
381                         rule->dr_drop_time = rule->dr_time_base +
382                                              cfs_rand() % attr->u.drop.da_interval;
383                         rule->dr_time_base += attr->u.drop.da_interval;
384
385                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
386                                libcfs_nid2str(attr->fa_src),
387                                libcfs_nid2str(attr->fa_dst),
388                                rule->dr_drop_time);
389                 }
390
391         } else { /* rate based drop */
392                 __u64 count;
393
394                 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
395                 count = rule->dr_stat.fs_count;
396                 if (do_div(count, attr->u.drop.da_rate) == 0) {
397                         rule->dr_drop_at = rule->dr_stat.fs_count +
398                                            cfs_rand() % attr->u.drop.da_rate;
399                         CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
400                                libcfs_nid2str(attr->fa_src),
401                                libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
402                 }
403         }
404
405         if (drop) { /* drop this message, update counters */
406                 if (hstatus)
407                         lnet_fault_match_health(hstatus,
408                                 attr->u.drop.da_health_error_mask);
409                 lnet_fault_stat_inc(&rule->dr_stat, type);
410                 rule->dr_stat.u.drop.ds_dropped++;
411         }
412
413         spin_unlock(&rule->dr_lock);
414         return drop;
415 }
416
417 /**
418  * Check if message from \a src to \a dst can match any existed drop rule
419  */
420 bool
421 lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus)
422 {
423         lnet_nid_t src = le64_to_cpu(hdr->src_nid);
424         lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
425         unsigned int typ = le32_to_cpu(hdr->type);
426         struct lnet_drop_rule *rule;
427         unsigned int ptl = -1;
428         bool drop = false;
429         int cpt;
430
431         /* NB: if Portal is specified, then only PUT and GET will be
432          * filtered by drop rule */
433         if (typ == LNET_MSG_PUT)
434                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
435         else if (typ == LNET_MSG_GET)
436                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
437
438         cpt = lnet_net_lock_current();
439         list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
440                 drop = drop_rule_match(rule, src, dst, typ, ptl,
441                                        hstatus);
442                 if (drop)
443                         break;
444         }
445         lnet_net_unlock(cpt);
446
447         return drop;
448 }
449
450 /**
451  * LNet Delay Simulation
452  */
453 /** timestamp (second) to send delayed message */
454 #define msg_delay_send           msg_ev.hdr_data
455
456 struct lnet_delay_rule {
457         /** link chain on the_lnet.ln_delay_rules */
458         struct list_head        dl_link;
459         /** link chain on delay_dd.dd_sched_rules */
460         struct list_head        dl_sched_link;
461         /** attributes of this rule */
462         struct lnet_fault_attr  dl_attr;
463         /** lock to protect \a below members */
464         spinlock_t              dl_lock;
465         /** refcount of delay rule */
466         atomic_t                dl_refcount;
467         /**
468          * the message sequence to delay, which means message is delayed when
469          * dl_stat.fs_count == dl_delay_at
470          */
471         unsigned long           dl_delay_at;
472         /**
473          * seconds to delay the next message, it's exclusive with dl_delay_at
474          */
475         time64_t                dl_delay_time;
476         /** baseline to caculate dl_delay_time */
477         time64_t                dl_time_base;
478         /** jiffies to send the next delayed message */
479         unsigned long           dl_msg_send;
480         /** delayed message list */
481         struct list_head        dl_msg_list;
482         /** statistic of delayed messages */
483         struct lnet_fault_stat  dl_stat;
484         /** timer to wakeup delay_daemon */
485         struct timer_list       dl_timer;
486 };
487
488 struct delay_daemon_data {
489         /** serialise rule add/remove */
490         struct mutex            dd_mutex;
491         /** protect rules on \a dd_sched_rules */
492         spinlock_t              dd_lock;
493         /** scheduled delay rules (by timer) */
494         struct list_head        dd_sched_rules;
495         /** deamon thread sleeps at here */
496         wait_queue_head_t       dd_waitq;
497         /** controler (lctl command) wait at here */
498         wait_queue_head_t       dd_ctl_waitq;
499         /** deamon is running */
500         unsigned int            dd_running;
501         /** deamon stopped */
502         unsigned int            dd_stopped;
503 };
504
505 static struct delay_daemon_data delay_dd;
506
507 static void
508 delay_rule_decref(struct lnet_delay_rule *rule)
509 {
510         if (atomic_dec_and_test(&rule->dl_refcount)) {
511                 LASSERT(list_empty(&rule->dl_sched_link));
512                 LASSERT(list_empty(&rule->dl_msg_list));
513                 LASSERT(list_empty(&rule->dl_link));
514
515                 CFS_FREE_PTR(rule);
516         }
517 }
518
519 /**
520  * check source/destination NID, portal, message type and delay rate,
521  * decide whether should delay this message or not
522  */
523 static bool
524 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
525                 lnet_nid_t dst, unsigned int type, unsigned int portal,
526                 struct lnet_msg *msg)
527 {
528         struct lnet_fault_attr  *attr = &rule->dl_attr;
529         bool                     delay;
530
531         if (!lnet_fault_attr_match(attr, src, dst, type, portal))
532                 return false;
533
534         /* match this rule, check delay rate now */
535         spin_lock(&rule->dl_lock);
536         if (rule->dl_delay_time != 0) { /* time based delay */
537                 time64_t now = ktime_get_seconds();
538
539                 rule->dl_stat.fs_count++;
540                 delay = now >= rule->dl_delay_time;
541                 if (delay) {
542                         if (now > rule->dl_time_base)
543                                 rule->dl_time_base = now;
544
545                         rule->dl_delay_time = rule->dl_time_base +
546                                               cfs_rand() % attr->u.delay.la_interval;
547                         rule->dl_time_base += attr->u.delay.la_interval;
548
549                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
550                                libcfs_nid2str(attr->fa_src),
551                                libcfs_nid2str(attr->fa_dst),
552                                rule->dl_delay_time);
553                 }
554
555         } else { /* rate based delay */
556                 __u64 count;
557
558                 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
559                 /* generate the next random rate sequence */
560                 count = rule->dl_stat.fs_count;
561                 if (do_div(count, attr->u.delay.la_rate) == 0) {
562                         rule->dl_delay_at = rule->dl_stat.fs_count +
563                                             cfs_rand() % attr->u.delay.la_rate;
564                         CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
565                                libcfs_nid2str(attr->fa_src),
566                                libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
567                 }
568         }
569
570         if (!delay) {
571                 spin_unlock(&rule->dl_lock);
572                 return false;
573         }
574
575         /* delay this message, update counters */
576         lnet_fault_stat_inc(&rule->dl_stat, type);
577         rule->dl_stat.u.delay.ls_delayed++;
578
579         list_add_tail(&msg->msg_list, &rule->dl_msg_list);
580         msg->msg_delay_send = ktime_get_seconds() + attr->u.delay.la_latency;
581         if (rule->dl_msg_send == -1) {
582                 rule->dl_msg_send = msg->msg_delay_send;
583                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
584         }
585
586         spin_unlock(&rule->dl_lock);
587         return true;
588 }
589
590 /**
591  * check if \a msg can match any Delay Rule, receiving of this message
592  * will be delayed if there is a match.
593  */
594 bool
595 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
596 {
597         struct lnet_delay_rule  *rule;
598         lnet_nid_t               src = le64_to_cpu(hdr->src_nid);
599         lnet_nid_t               dst = le64_to_cpu(hdr->dest_nid);
600         unsigned int             typ = le32_to_cpu(hdr->type);
601         unsigned int             ptl = -1;
602
603         /* NB: called with hold of lnet_net_lock */
604
605         /* NB: if Portal is specified, then only PUT and GET will be
606          * filtered by delay rule */
607         if (typ == LNET_MSG_PUT)
608                 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
609         else if (typ == LNET_MSG_GET)
610                 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
611
612         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
613                 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
614                         return true;
615         }
616
617         return false;
618 }
619
620 /** check out delayed messages for send */
621 static void
622 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
623                   struct list_head *msg_list)
624 {
625         struct lnet_msg *msg;
626         struct lnet_msg *tmp;
627         time64_t now = ktime_get_seconds();
628
629         if (!all && cfs_time_seconds(rule->dl_msg_send) > now)
630                 return;
631
632         spin_lock(&rule->dl_lock);
633         list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
634                 if (!all && msg->msg_delay_send > now)
635                         break;
636
637                 msg->msg_delay_send = 0;
638                 list_move_tail(&msg->msg_list, msg_list);
639         }
640
641         if (list_empty(&rule->dl_msg_list)) {
642                 del_timer(&rule->dl_timer);
643                 rule->dl_msg_send = -1;
644
645         } else if (!list_empty(msg_list)) {
646                 /* dequeued some timedout messages, update timer for the
647                  * next delayed message on rule */
648                 msg = list_entry(rule->dl_msg_list.next,
649                                  struct lnet_msg, msg_list);
650                 rule->dl_msg_send = msg->msg_delay_send;
651                 mod_timer(&rule->dl_timer, rule->dl_msg_send);
652         }
653         spin_unlock(&rule->dl_lock);
654 }
655
656 static void
657 delayed_msg_process(struct list_head *msg_list, bool drop)
658 {
659         struct lnet_msg *msg;
660
661         while (!list_empty(msg_list)) {
662                 struct lnet_ni *ni;
663                 int             cpt;
664                 int             rc;
665
666                 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
667                 LASSERT(msg->msg_rxpeer != NULL);
668                 LASSERT(msg->msg_rxni != NULL);
669
670                 ni = msg->msg_rxni;
671                 cpt = msg->msg_rx_cpt;
672
673                 list_del_init(&msg->msg_list);
674                 if (drop) {
675                         rc = -ECANCELED;
676
677                 } else if (!msg->msg_routing) {
678                         rc = lnet_parse_local(ni, msg);
679                         if (rc == 0)
680                                 continue;
681
682                 } else {
683                         lnet_net_lock(cpt);
684                         rc = lnet_parse_forward_locked(ni, msg);
685                         lnet_net_unlock(cpt);
686
687                         switch (rc) {
688                         case LNET_CREDIT_OK:
689                                 lnet_ni_recv(ni, msg->msg_private, msg, 0,
690                                              0, msg->msg_len, msg->msg_len);
691                         case LNET_CREDIT_WAIT:
692                                 continue;
693                         default: /* failures */
694                                 break;
695                         }
696                 }
697
698                 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
699                                   msg->msg_type);
700                 lnet_finalize(msg, rc);
701         }
702 }
703
704 /**
705  * Process delayed messages for scheduled rules
706  * This function can either be called by delay_rule_daemon, or by lnet_finalise
707  */
708 void
709 lnet_delay_rule_check(void)
710 {
711         struct lnet_delay_rule  *rule;
712         struct list_head         msgs;
713
714         INIT_LIST_HEAD(&msgs);
715         while (1) {
716                 if (list_empty(&delay_dd.dd_sched_rules))
717                         break;
718
719                 spin_lock_bh(&delay_dd.dd_lock);
720                 if (list_empty(&delay_dd.dd_sched_rules)) {
721                         spin_unlock_bh(&delay_dd.dd_lock);
722                         break;
723                 }
724
725                 rule = list_entry(delay_dd.dd_sched_rules.next,
726                                   struct lnet_delay_rule, dl_sched_link);
727                 list_del_init(&rule->dl_sched_link);
728                 spin_unlock_bh(&delay_dd.dd_lock);
729
730                 delayed_msg_check(rule, false, &msgs);
731                 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
732         }
733
734         if (!list_empty(&msgs))
735                 delayed_msg_process(&msgs, false);
736 }
737
738 /** deamon thread to handle delayed messages */
739 static int
740 lnet_delay_rule_daemon(void *arg)
741 {
742         delay_dd.dd_running = 1;
743         wake_up(&delay_dd.dd_ctl_waitq);
744
745         while (delay_dd.dd_running) {
746                 wait_event_interruptible(delay_dd.dd_waitq,
747                                          !delay_dd.dd_running ||
748                                          !list_empty(&delay_dd.dd_sched_rules));
749                 lnet_delay_rule_check();
750         }
751
752         /* in case more rules have been enqueued after my last check */
753         lnet_delay_rule_check();
754         delay_dd.dd_stopped = 1;
755         wake_up(&delay_dd.dd_ctl_waitq);
756
757         return 0;
758 }
759
760 static void
761 delay_timer_cb(cfs_timer_cb_arg_t data)
762 {
763         struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
764
765         spin_lock_bh(&delay_dd.dd_lock);
766         if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
767                 atomic_inc(&rule->dl_refcount);
768                 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
769                 wake_up(&delay_dd.dd_waitq);
770         }
771         spin_unlock_bh(&delay_dd.dd_lock);
772 }
773
774 /**
775  * Add a new delay rule to LNet
776  * There is no check for duplicated delay rule, all rules will be checked for
777  * incoming message.
778  */
779 int
780 lnet_delay_rule_add(struct lnet_fault_attr *attr)
781 {
782         struct lnet_delay_rule *rule;
783         int                     rc = 0;
784         ENTRY;
785
786         if (!((attr->u.delay.la_rate == 0) ^
787               (attr->u.delay.la_interval == 0))) {
788                 CDEBUG(D_NET,
789                        "please provide either delay rate or delay interval, "
790                        "but not both at the same time %d/%d\n",
791                        attr->u.delay.la_rate, attr->u.delay.la_interval);
792                 RETURN(-EINVAL);
793         }
794
795         if (attr->u.delay.la_latency == 0) {
796                 CDEBUG(D_NET, "delay latency cannot be zero\n");
797                 RETURN(-EINVAL);
798         }
799
800         if (lnet_fault_attr_validate(attr) != 0)
801                 RETURN(-EINVAL);
802
803         CFS_ALLOC_PTR(rule);
804         if (rule == NULL)
805                 RETURN(-ENOMEM);
806
807         mutex_lock(&delay_dd.dd_mutex);
808         if (!delay_dd.dd_running) {
809                 struct task_struct *task;
810
811                 /* NB: although LND threads will process delayed message
812                  * in lnet_finalize, but there is no guarantee that LND
813                  * threads will be waken up if no other message needs to
814                  * be handled.
815                  * Only one daemon thread, performance is not the concern
816                  * of this simualation module.
817                  */
818                 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
819                 if (IS_ERR(task)) {
820                         rc = PTR_ERR(task);
821                         GOTO(failed, rc);
822                 }
823                 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
824         }
825
826         cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
827                         (unsigned long)rule, 0);
828
829         spin_lock_init(&rule->dl_lock);
830         INIT_LIST_HEAD(&rule->dl_msg_list);
831         INIT_LIST_HEAD(&rule->dl_sched_link);
832
833         rule->dl_attr = *attr;
834         if (attr->u.delay.la_interval != 0) {
835                 rule->dl_time_base = ktime_get_seconds() +
836                                      attr->u.delay.la_interval;
837                 rule->dl_delay_time = ktime_get_seconds() +
838                                       cfs_rand() % attr->u.delay.la_interval;
839         } else {
840                 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
841         }
842
843         rule->dl_msg_send = -1;
844
845         lnet_net_lock(LNET_LOCK_EX);
846         atomic_set(&rule->dl_refcount, 1);
847         list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
848         lnet_net_unlock(LNET_LOCK_EX);
849
850         CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
851                libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
852                attr->u.delay.la_rate);
853
854         mutex_unlock(&delay_dd.dd_mutex);
855         RETURN(0);
856  failed:
857         mutex_unlock(&delay_dd.dd_mutex);
858         CFS_FREE_PTR(rule);
859         return rc;
860 }
861
862 /**
863  * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
864  * and \a dst are zero, all rules will be removed, otherwise only matched rules
865  * will be removed.
866  * If \a src is zero, then all rules have \a dst as destination will be remove
867  * If \a dst is zero, then all rules have \a src as source will be removed
868  *
869  * When a delay rule is removed, all delayed messages of this rule will be
870  * processed immediately.
871  */
872 int
873 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
874 {
875         struct lnet_delay_rule *rule;
876         struct lnet_delay_rule  *tmp;
877         struct list_head        rule_list;
878         struct list_head        msg_list;
879         int                     n = 0;
880         bool                    cleanup;
881         ENTRY;
882
883         INIT_LIST_HEAD(&rule_list);
884         INIT_LIST_HEAD(&msg_list);
885
886         if (shutdown)
887                 src = dst = 0;
888
889         mutex_lock(&delay_dd.dd_mutex);
890         lnet_net_lock(LNET_LOCK_EX);
891
892         list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
893                 if (rule->dl_attr.fa_src != src && src != 0)
894                         continue;
895
896                 if (rule->dl_attr.fa_dst != dst && dst != 0)
897                         continue;
898
899                 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
900                        libcfs_nid2str(rule->dl_attr.fa_src),
901                        libcfs_nid2str(rule->dl_attr.fa_dst),
902                        rule->dl_attr.u.delay.la_rate,
903                        rule->dl_attr.u.delay.la_interval);
904                 /* refcount is taken over by rule_list */
905                 list_move(&rule->dl_link, &rule_list);
906         }
907
908         /* check if we need to shutdown delay_daemon */
909         cleanup = list_empty(&the_lnet.ln_delay_rules) &&
910                   !list_empty(&rule_list);
911         lnet_net_unlock(LNET_LOCK_EX);
912
913         list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
914                 list_del_init(&rule->dl_link);
915
916                 del_timer_sync(&rule->dl_timer);
917                 delayed_msg_check(rule, true, &msg_list);
918                 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
919                 n++;
920         }
921
922         if (cleanup) { /* no more delay rule, shutdown delay_daemon */
923                 LASSERT(delay_dd.dd_running);
924                 delay_dd.dd_running = 0;
925                 wake_up(&delay_dd.dd_waitq);
926
927                 while (!delay_dd.dd_stopped)
928                         wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
929         }
930         mutex_unlock(&delay_dd.dd_mutex);
931
932         if (!list_empty(&msg_list))
933                 delayed_msg_process(&msg_list, shutdown);
934
935         RETURN(n);
936 }
937
938 /**
939  * List Delay Rule at position of \a pos
940  */
941 int
942 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
943                     struct lnet_fault_stat *stat)
944 {
945         struct lnet_delay_rule *rule;
946         int                     cpt;
947         int                     i = 0;
948         int                     rc = -ENOENT;
949         ENTRY;
950
951         cpt = lnet_net_lock_current();
952         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
953                 if (i++ < pos)
954                         continue;
955
956                 spin_lock(&rule->dl_lock);
957                 *attr = rule->dl_attr;
958                 *stat = rule->dl_stat;
959                 spin_unlock(&rule->dl_lock);
960                 rc = 0;
961                 break;
962         }
963
964         lnet_net_unlock(cpt);
965         RETURN(rc);
966 }
967
968 /**
969  * reset counters for all Delay Rules
970  */
971 void
972 lnet_delay_rule_reset(void)
973 {
974         struct lnet_delay_rule *rule;
975         int                     cpt;
976         ENTRY;
977
978         cpt = lnet_net_lock_current();
979
980         list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
981                 struct lnet_fault_attr *attr = &rule->dl_attr;
982
983                 spin_lock(&rule->dl_lock);
984
985                 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
986                 if (attr->u.delay.la_rate != 0) {
987                         rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
988                 } else {
989                         rule->dl_delay_time = ktime_get_seconds() +
990                                               cfs_rand() % attr->u.delay.la_interval;
991                         rule->dl_time_base = ktime_get_seconds() +
992                                              attr->u.delay.la_interval;
993                 }
994                 spin_unlock(&rule->dl_lock);
995         }
996
997         lnet_net_unlock(cpt);
998         EXIT;
999 }
1000
1001 int
1002 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1003 {
1004         struct lnet_fault_attr *attr;
1005         struct lnet_fault_stat *stat;
1006
1007         attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1008
1009         switch (opc) {
1010         default:
1011                 return -EINVAL;
1012
1013         case LNET_CTL_DROP_ADD:
1014                 if (attr == NULL)
1015                         return -EINVAL;
1016
1017                 return lnet_drop_rule_add(attr);
1018
1019         case LNET_CTL_DROP_DEL:
1020                 if (attr == NULL)
1021                         return -EINVAL;
1022
1023                 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
1024                                                      attr->fa_dst);
1025                 return 0;
1026
1027         case LNET_CTL_DROP_RESET:
1028                 lnet_drop_rule_reset();
1029                 return 0;
1030
1031         case LNET_CTL_DROP_LIST:
1032                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1033                 if (attr == NULL || stat == NULL)
1034                         return -EINVAL;
1035
1036                 return lnet_drop_rule_list(data->ioc_count, attr, stat);
1037
1038         case LNET_CTL_DELAY_ADD:
1039                 if (attr == NULL)
1040                         return -EINVAL;
1041
1042                 return lnet_delay_rule_add(attr);
1043
1044         case LNET_CTL_DELAY_DEL:
1045                 if (attr == NULL)
1046                         return -EINVAL;
1047
1048                 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
1049                                                       attr->fa_dst, false);
1050                 return 0;
1051
1052         case LNET_CTL_DELAY_RESET:
1053                 lnet_delay_rule_reset();
1054                 return 0;
1055
1056         case LNET_CTL_DELAY_LIST:
1057                 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1058                 if (attr == NULL || stat == NULL)
1059                         return -EINVAL;
1060
1061                 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1062         }
1063 }
1064
1065 int
1066 lnet_fault_init(void)
1067 {
1068         CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1069         CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1070         CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1071         CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1072
1073         mutex_init(&delay_dd.dd_mutex);
1074         spin_lock_init(&delay_dd.dd_lock);
1075         init_waitqueue_head(&delay_dd.dd_waitq);
1076         init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1077         INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1078
1079         return 0;
1080 }
1081
1082 void
1083 lnet_fault_fini(void)
1084 {
1085         lnet_drop_rule_del(0, 0);
1086         lnet_delay_rule_del(0, 0, true);
1087
1088         LASSERT(list_empty(&the_lnet.ln_drop_rules));
1089         LASSERT(list_empty(&the_lnet.ln_delay_rules));
1090         LASSERT(list_empty(&delay_dd.dd_sched_rules));
1091 }