4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2014, 2017, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Sun Microsystems, Inc.
30 * lnet/lnet/net_fault.c
32 * Lustre network fault simulation
34 * Author: liang.zhen@intel.com
37 #define DEBUG_SUBSYSTEM S_LNET
39 #include <lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 time64_t dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 time64_t dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
70 if (nid == msg_nid || nid == LNET_NID_ANY)
73 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
76 /* 255.255.255.255@net is wildcard for all addresses in a network */
77 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82 lnet_nid_t dst, unsigned int type, unsigned int portal)
84 if (!lnet_fault_nid_match(attr->fa_src, src) ||
85 !lnet_fault_nid_match(attr->fa_dst, dst))
88 if (!(attr->fa_msg_mask & (1 << type)))
91 /* NB: ACK and REPLY have no portal, but they should have been
92 * rejected by message mask */
93 if (attr->fa_ptl_mask != 0 && /* has portal filter */
94 !(attr->fa_ptl_mask & (1ULL << portal)))
101 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 if (attr->fa_msg_mask == 0)
104 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106 if (attr->fa_ptl_mask == 0) /* no portal filter */
109 /* NB: only PUT and GET can be filtered if portal filter has been set */
110 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
111 if (attr->fa_msg_mask == 0) {
112 CDEBUG(D_NET, "can't find valid message type bits %x\n",
120 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 /* NB: fs_counter is NOT updated by this function */
140 * LNet message drop simulation
144 * Add a new drop rule to LNet
145 * There is no check for duplicated drop rule, all rules will be checked for
149 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 struct lnet_drop_rule *rule;
154 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
156 "please provide either drop rate or drop interval, "
157 "but not both at the same time %d/%d\n",
158 attr->u.drop.da_rate, attr->u.drop.da_interval);
162 if (lnet_fault_attr_validate(attr) != 0)
169 spin_lock_init(&rule->dr_lock);
171 rule->dr_attr = *attr;
172 if (attr->u.drop.da_interval != 0) {
173 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
174 rule->dr_drop_time = ktime_get_seconds() +
175 cfs_rand() % attr->u.drop.da_interval;
177 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
180 lnet_net_lock(LNET_LOCK_EX);
181 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
182 lnet_net_unlock(LNET_LOCK_EX);
184 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
185 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
186 attr->u.drop.da_rate, attr->u.drop.da_interval);
191 * Remove matched drop rules from lnet, all rules that can match \a src and
192 * \a dst will be removed.
193 * If \a src is zero, then all rules have \a dst as destination will be remove
194 * If \a dst is zero, then all rules have \a src as source will be removed
195 * If both of them are zero, all rules will be removed
198 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
200 struct lnet_drop_rule *rule;
201 struct lnet_drop_rule *tmp;
202 struct list_head zombies;
206 INIT_LIST_HEAD(&zombies);
208 lnet_net_lock(LNET_LOCK_EX);
209 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
210 if (rule->dr_attr.fa_src != src && src != 0)
213 if (rule->dr_attr.fa_dst != dst && dst != 0)
216 list_move(&rule->dr_link, &zombies);
218 lnet_net_unlock(LNET_LOCK_EX);
220 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
221 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
222 libcfs_nid2str(rule->dr_attr.fa_src),
223 libcfs_nid2str(rule->dr_attr.fa_dst),
224 rule->dr_attr.u.drop.da_rate,
225 rule->dr_attr.u.drop.da_interval);
227 list_del(&rule->dr_link);
236 * List drop rule at position of \a pos
239 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
240 struct lnet_fault_stat *stat)
242 struct lnet_drop_rule *rule;
248 cpt = lnet_net_lock_current();
249 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
253 spin_lock(&rule->dr_lock);
254 *attr = rule->dr_attr;
255 *stat = rule->dr_stat;
256 spin_unlock(&rule->dr_lock);
261 lnet_net_unlock(cpt);
266 * reset counters for all drop rules
269 lnet_drop_rule_reset(void)
271 struct lnet_drop_rule *rule;
275 cpt = lnet_net_lock_current();
277 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
278 struct lnet_fault_attr *attr = &rule->dr_attr;
280 spin_lock(&rule->dr_lock);
282 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
283 if (attr->u.drop.da_rate != 0) {
284 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
286 rule->dr_drop_time = ktime_get_seconds() +
287 cfs_rand() % attr->u.drop.da_interval;
288 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
290 spin_unlock(&rule->dr_lock);
293 lnet_net_unlock(cpt);
298 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
306 /* assign a random failure */
308 choice = random % (LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
312 if (mask == HSTATUS_RANDOM) {
317 if (mask & (1 << choice)) {
322 /* round to the closest ON bit */
324 best_delta = HSTATUS_END;
326 if (mask & (1 << i)) {
330 if (delta < best_delta) {
342 * check source/destination NID, portal, message type and drop rate,
343 * decide whether should drop this message or not
346 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
347 lnet_nid_t dst, unsigned int type, unsigned int portal,
348 enum lnet_msg_hstatus *hstatus)
350 struct lnet_fault_attr *attr = &rule->dr_attr;
353 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
357 * if we're trying to match a health status error but it hasn't
358 * been set in the rule, then don't match
360 if ((hstatus && !attr->u.drop.da_health_error_mask) ||
361 (!hstatus && attr->u.drop.da_health_error_mask))
364 /* match this rule, check drop rate now */
365 spin_lock(&rule->dr_lock);
366 if (attr->u.drop.da_random) {
367 int value = cfs_rand() % attr->u.drop.da_interval;
368 if (value >= (attr->u.drop.da_interval / 2))
372 } else if (rule->dr_drop_time != 0) { /* time based drop */
373 time64_t now = ktime_get_seconds();
375 rule->dr_stat.fs_count++;
376 drop = now >= rule->dr_drop_time;
378 if (now > rule->dr_time_base)
379 rule->dr_time_base = now;
381 rule->dr_drop_time = rule->dr_time_base +
382 cfs_rand() % attr->u.drop.da_interval;
383 rule->dr_time_base += attr->u.drop.da_interval;
385 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
386 libcfs_nid2str(attr->fa_src),
387 libcfs_nid2str(attr->fa_dst),
391 } else { /* rate based drop */
394 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
395 count = rule->dr_stat.fs_count;
396 if (do_div(count, attr->u.drop.da_rate) == 0) {
397 rule->dr_drop_at = rule->dr_stat.fs_count +
398 cfs_rand() % attr->u.drop.da_rate;
399 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
400 libcfs_nid2str(attr->fa_src),
401 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
405 if (drop) { /* drop this message, update counters */
407 lnet_fault_match_health(hstatus,
408 attr->u.drop.da_health_error_mask);
409 lnet_fault_stat_inc(&rule->dr_stat, type);
410 rule->dr_stat.u.drop.ds_dropped++;
413 spin_unlock(&rule->dr_lock);
418 * Check if message from \a src to \a dst can match any existed drop rule
421 lnet_drop_rule_match(struct lnet_hdr *hdr, enum lnet_msg_hstatus *hstatus)
423 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
424 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
425 unsigned int typ = le32_to_cpu(hdr->type);
426 struct lnet_drop_rule *rule;
427 unsigned int ptl = -1;
431 /* NB: if Portal is specified, then only PUT and GET will be
432 * filtered by drop rule */
433 if (typ == LNET_MSG_PUT)
434 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
435 else if (typ == LNET_MSG_GET)
436 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
438 cpt = lnet_net_lock_current();
439 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
440 drop = drop_rule_match(rule, src, dst, typ, ptl,
445 lnet_net_unlock(cpt);
451 * LNet Delay Simulation
453 /** timestamp (second) to send delayed message */
454 #define msg_delay_send msg_ev.hdr_data
456 struct lnet_delay_rule {
457 /** link chain on the_lnet.ln_delay_rules */
458 struct list_head dl_link;
459 /** link chain on delay_dd.dd_sched_rules */
460 struct list_head dl_sched_link;
461 /** attributes of this rule */
462 struct lnet_fault_attr dl_attr;
463 /** lock to protect \a below members */
465 /** refcount of delay rule */
466 atomic_t dl_refcount;
468 * the message sequence to delay, which means message is delayed when
469 * dl_stat.fs_count == dl_delay_at
471 unsigned long dl_delay_at;
473 * seconds to delay the next message, it's exclusive with dl_delay_at
475 time64_t dl_delay_time;
476 /** baseline to caculate dl_delay_time */
477 time64_t dl_time_base;
478 /** jiffies to send the next delayed message */
479 unsigned long dl_msg_send;
480 /** delayed message list */
481 struct list_head dl_msg_list;
482 /** statistic of delayed messages */
483 struct lnet_fault_stat dl_stat;
484 /** timer to wakeup delay_daemon */
485 struct timer_list dl_timer;
488 struct delay_daemon_data {
489 /** serialise rule add/remove */
490 struct mutex dd_mutex;
491 /** protect rules on \a dd_sched_rules */
493 /** scheduled delay rules (by timer) */
494 struct list_head dd_sched_rules;
495 /** deamon thread sleeps at here */
496 wait_queue_head_t dd_waitq;
497 /** controler (lctl command) wait at here */
498 wait_queue_head_t dd_ctl_waitq;
499 /** deamon is running */
500 unsigned int dd_running;
501 /** deamon stopped */
502 unsigned int dd_stopped;
505 static struct delay_daemon_data delay_dd;
508 delay_rule_decref(struct lnet_delay_rule *rule)
510 if (atomic_dec_and_test(&rule->dl_refcount)) {
511 LASSERT(list_empty(&rule->dl_sched_link));
512 LASSERT(list_empty(&rule->dl_msg_list));
513 LASSERT(list_empty(&rule->dl_link));
520 * check source/destination NID, portal, message type and delay rate,
521 * decide whether should delay this message or not
524 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
525 lnet_nid_t dst, unsigned int type, unsigned int portal,
526 struct lnet_msg *msg)
528 struct lnet_fault_attr *attr = &rule->dl_attr;
531 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
534 /* match this rule, check delay rate now */
535 spin_lock(&rule->dl_lock);
536 if (rule->dl_delay_time != 0) { /* time based delay */
537 time64_t now = ktime_get_seconds();
539 rule->dl_stat.fs_count++;
540 delay = now >= rule->dl_delay_time;
542 if (now > rule->dl_time_base)
543 rule->dl_time_base = now;
545 rule->dl_delay_time = rule->dl_time_base +
546 cfs_rand() % attr->u.delay.la_interval;
547 rule->dl_time_base += attr->u.delay.la_interval;
549 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
550 libcfs_nid2str(attr->fa_src),
551 libcfs_nid2str(attr->fa_dst),
552 rule->dl_delay_time);
555 } else { /* rate based delay */
558 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
559 /* generate the next random rate sequence */
560 count = rule->dl_stat.fs_count;
561 if (do_div(count, attr->u.delay.la_rate) == 0) {
562 rule->dl_delay_at = rule->dl_stat.fs_count +
563 cfs_rand() % attr->u.delay.la_rate;
564 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
565 libcfs_nid2str(attr->fa_src),
566 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
571 spin_unlock(&rule->dl_lock);
575 /* delay this message, update counters */
576 lnet_fault_stat_inc(&rule->dl_stat, type);
577 rule->dl_stat.u.delay.ls_delayed++;
579 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
580 msg->msg_delay_send = ktime_get_seconds() + attr->u.delay.la_latency;
581 if (rule->dl_msg_send == -1) {
582 rule->dl_msg_send = msg->msg_delay_send;
583 mod_timer(&rule->dl_timer, rule->dl_msg_send);
586 spin_unlock(&rule->dl_lock);
591 * check if \a msg can match any Delay Rule, receiving of this message
592 * will be delayed if there is a match.
595 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
597 struct lnet_delay_rule *rule;
598 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
599 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
600 unsigned int typ = le32_to_cpu(hdr->type);
601 unsigned int ptl = -1;
603 /* NB: called with hold of lnet_net_lock */
605 /* NB: if Portal is specified, then only PUT and GET will be
606 * filtered by delay rule */
607 if (typ == LNET_MSG_PUT)
608 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
609 else if (typ == LNET_MSG_GET)
610 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
612 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
613 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
620 /** check out delayed messages for send */
622 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
623 struct list_head *msg_list)
625 struct lnet_msg *msg;
626 struct lnet_msg *tmp;
627 time64_t now = ktime_get_seconds();
629 if (!all && cfs_time_seconds(rule->dl_msg_send) > now)
632 spin_lock(&rule->dl_lock);
633 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
634 if (!all && msg->msg_delay_send > now)
637 msg->msg_delay_send = 0;
638 list_move_tail(&msg->msg_list, msg_list);
641 if (list_empty(&rule->dl_msg_list)) {
642 del_timer(&rule->dl_timer);
643 rule->dl_msg_send = -1;
645 } else if (!list_empty(msg_list)) {
646 /* dequeued some timedout messages, update timer for the
647 * next delayed message on rule */
648 msg = list_entry(rule->dl_msg_list.next,
649 struct lnet_msg, msg_list);
650 rule->dl_msg_send = msg->msg_delay_send;
651 mod_timer(&rule->dl_timer, rule->dl_msg_send);
653 spin_unlock(&rule->dl_lock);
657 delayed_msg_process(struct list_head *msg_list, bool drop)
659 struct lnet_msg *msg;
661 while (!list_empty(msg_list)) {
666 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
667 LASSERT(msg->msg_rxpeer != NULL);
668 LASSERT(msg->msg_rxni != NULL);
671 cpt = msg->msg_rx_cpt;
673 list_del_init(&msg->msg_list);
677 } else if (!msg->msg_routing) {
678 rc = lnet_parse_local(ni, msg);
684 rc = lnet_parse_forward_locked(ni, msg);
685 lnet_net_unlock(cpt);
689 lnet_ni_recv(ni, msg->msg_private, msg, 0,
690 0, msg->msg_len, msg->msg_len);
691 case LNET_CREDIT_WAIT:
693 default: /* failures */
698 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
700 lnet_finalize(msg, rc);
705 * Process delayed messages for scheduled rules
706 * This function can either be called by delay_rule_daemon, or by lnet_finalise
709 lnet_delay_rule_check(void)
711 struct lnet_delay_rule *rule;
712 struct list_head msgs;
714 INIT_LIST_HEAD(&msgs);
716 if (list_empty(&delay_dd.dd_sched_rules))
719 spin_lock_bh(&delay_dd.dd_lock);
720 if (list_empty(&delay_dd.dd_sched_rules)) {
721 spin_unlock_bh(&delay_dd.dd_lock);
725 rule = list_entry(delay_dd.dd_sched_rules.next,
726 struct lnet_delay_rule, dl_sched_link);
727 list_del_init(&rule->dl_sched_link);
728 spin_unlock_bh(&delay_dd.dd_lock);
730 delayed_msg_check(rule, false, &msgs);
731 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
734 if (!list_empty(&msgs))
735 delayed_msg_process(&msgs, false);
738 /** deamon thread to handle delayed messages */
740 lnet_delay_rule_daemon(void *arg)
742 delay_dd.dd_running = 1;
743 wake_up(&delay_dd.dd_ctl_waitq);
745 while (delay_dd.dd_running) {
746 wait_event_interruptible(delay_dd.dd_waitq,
747 !delay_dd.dd_running ||
748 !list_empty(&delay_dd.dd_sched_rules));
749 lnet_delay_rule_check();
752 /* in case more rules have been enqueued after my last check */
753 lnet_delay_rule_check();
754 delay_dd.dd_stopped = 1;
755 wake_up(&delay_dd.dd_ctl_waitq);
761 delay_timer_cb(cfs_timer_cb_arg_t data)
763 struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
765 spin_lock_bh(&delay_dd.dd_lock);
766 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
767 atomic_inc(&rule->dl_refcount);
768 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
769 wake_up(&delay_dd.dd_waitq);
771 spin_unlock_bh(&delay_dd.dd_lock);
775 * Add a new delay rule to LNet
776 * There is no check for duplicated delay rule, all rules will be checked for
780 lnet_delay_rule_add(struct lnet_fault_attr *attr)
782 struct lnet_delay_rule *rule;
786 if (!((attr->u.delay.la_rate == 0) ^
787 (attr->u.delay.la_interval == 0))) {
789 "please provide either delay rate or delay interval, "
790 "but not both at the same time %d/%d\n",
791 attr->u.delay.la_rate, attr->u.delay.la_interval);
795 if (attr->u.delay.la_latency == 0) {
796 CDEBUG(D_NET, "delay latency cannot be zero\n");
800 if (lnet_fault_attr_validate(attr) != 0)
807 mutex_lock(&delay_dd.dd_mutex);
808 if (!delay_dd.dd_running) {
809 struct task_struct *task;
811 /* NB: although LND threads will process delayed message
812 * in lnet_finalize, but there is no guarantee that LND
813 * threads will be waken up if no other message needs to
815 * Only one daemon thread, performance is not the concern
816 * of this simualation module.
818 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
823 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
826 cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
827 (unsigned long)rule, 0);
829 spin_lock_init(&rule->dl_lock);
830 INIT_LIST_HEAD(&rule->dl_msg_list);
831 INIT_LIST_HEAD(&rule->dl_sched_link);
833 rule->dl_attr = *attr;
834 if (attr->u.delay.la_interval != 0) {
835 rule->dl_time_base = ktime_get_seconds() +
836 attr->u.delay.la_interval;
837 rule->dl_delay_time = ktime_get_seconds() +
838 cfs_rand() % attr->u.delay.la_interval;
840 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
843 rule->dl_msg_send = -1;
845 lnet_net_lock(LNET_LOCK_EX);
846 atomic_set(&rule->dl_refcount, 1);
847 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
848 lnet_net_unlock(LNET_LOCK_EX);
850 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
851 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
852 attr->u.delay.la_rate);
854 mutex_unlock(&delay_dd.dd_mutex);
857 mutex_unlock(&delay_dd.dd_mutex);
863 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
864 * and \a dst are zero, all rules will be removed, otherwise only matched rules
866 * If \a src is zero, then all rules have \a dst as destination will be remove
867 * If \a dst is zero, then all rules have \a src as source will be removed
869 * When a delay rule is removed, all delayed messages of this rule will be
870 * processed immediately.
873 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
875 struct lnet_delay_rule *rule;
876 struct lnet_delay_rule *tmp;
877 struct list_head rule_list;
878 struct list_head msg_list;
883 INIT_LIST_HEAD(&rule_list);
884 INIT_LIST_HEAD(&msg_list);
889 mutex_lock(&delay_dd.dd_mutex);
890 lnet_net_lock(LNET_LOCK_EX);
892 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
893 if (rule->dl_attr.fa_src != src && src != 0)
896 if (rule->dl_attr.fa_dst != dst && dst != 0)
899 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
900 libcfs_nid2str(rule->dl_attr.fa_src),
901 libcfs_nid2str(rule->dl_attr.fa_dst),
902 rule->dl_attr.u.delay.la_rate,
903 rule->dl_attr.u.delay.la_interval);
904 /* refcount is taken over by rule_list */
905 list_move(&rule->dl_link, &rule_list);
908 /* check if we need to shutdown delay_daemon */
909 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
910 !list_empty(&rule_list);
911 lnet_net_unlock(LNET_LOCK_EX);
913 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
914 list_del_init(&rule->dl_link);
916 del_timer_sync(&rule->dl_timer);
917 delayed_msg_check(rule, true, &msg_list);
918 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
922 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
923 LASSERT(delay_dd.dd_running);
924 delay_dd.dd_running = 0;
925 wake_up(&delay_dd.dd_waitq);
927 while (!delay_dd.dd_stopped)
928 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
930 mutex_unlock(&delay_dd.dd_mutex);
932 if (!list_empty(&msg_list))
933 delayed_msg_process(&msg_list, shutdown);
939 * List Delay Rule at position of \a pos
942 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
943 struct lnet_fault_stat *stat)
945 struct lnet_delay_rule *rule;
951 cpt = lnet_net_lock_current();
952 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
956 spin_lock(&rule->dl_lock);
957 *attr = rule->dl_attr;
958 *stat = rule->dl_stat;
959 spin_unlock(&rule->dl_lock);
964 lnet_net_unlock(cpt);
969 * reset counters for all Delay Rules
972 lnet_delay_rule_reset(void)
974 struct lnet_delay_rule *rule;
978 cpt = lnet_net_lock_current();
980 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
981 struct lnet_fault_attr *attr = &rule->dl_attr;
983 spin_lock(&rule->dl_lock);
985 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
986 if (attr->u.delay.la_rate != 0) {
987 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
989 rule->dl_delay_time = ktime_get_seconds() +
990 cfs_rand() % attr->u.delay.la_interval;
991 rule->dl_time_base = ktime_get_seconds() +
992 attr->u.delay.la_interval;
994 spin_unlock(&rule->dl_lock);
997 lnet_net_unlock(cpt);
1002 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1004 struct lnet_fault_attr *attr;
1005 struct lnet_fault_stat *stat;
1007 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1013 case LNET_CTL_DROP_ADD:
1017 return lnet_drop_rule_add(attr);
1019 case LNET_CTL_DROP_DEL:
1023 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
1027 case LNET_CTL_DROP_RESET:
1028 lnet_drop_rule_reset();
1031 case LNET_CTL_DROP_LIST:
1032 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1033 if (attr == NULL || stat == NULL)
1036 return lnet_drop_rule_list(data->ioc_count, attr, stat);
1038 case LNET_CTL_DELAY_ADD:
1042 return lnet_delay_rule_add(attr);
1044 case LNET_CTL_DELAY_DEL:
1048 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
1049 attr->fa_dst, false);
1052 case LNET_CTL_DELAY_RESET:
1053 lnet_delay_rule_reset();
1056 case LNET_CTL_DELAY_LIST:
1057 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1058 if (attr == NULL || stat == NULL)
1061 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1066 lnet_fault_init(void)
1068 CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1069 CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1070 CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1071 CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1073 mutex_init(&delay_dd.dd_mutex);
1074 spin_lock_init(&delay_dd.dd_lock);
1075 init_waitqueue_head(&delay_dd.dd_waitq);
1076 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1077 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1083 lnet_fault_fini(void)
1085 lnet_drop_rule_del(0, 0);
1086 lnet_delay_rule_del(0, 0, true);
1088 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1089 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1090 LASSERT(list_empty(&delay_dd.dd_sched_rules));