4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2014, 2017, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
29 * lnet/lnet/net_fault.c
31 * Lustre network fault simulation
33 * Author: liang.zhen@intel.com
36 #define DEBUG_SUBSYSTEM S_LNET
38 #include <linux/random.h>
39 #include <lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 time64_t dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 time64_t dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
68 lnet_fault_nid_match(lnet_nid_t nid, struct lnet_nid *msg_nid)
70 if (nid == LNET_NID_ANY)
74 if (lnet_nid_to_nid4(msg_nid) == nid)
77 if (LNET_NIDNET(nid) != LNET_NID_NET(msg_nid))
80 /* 255.255.255.255@net is wildcard for all addresses in a network */
81 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
85 lnet_fault_attr_match(struct lnet_fault_attr *attr,
87 struct lnet_nid *local_nid,
89 unsigned int type, unsigned int portal)
91 if (!lnet_fault_nid_match(attr->fa_src, src) ||
92 !lnet_fault_nid_match(attr->fa_dst, dst) ||
93 !lnet_fault_nid_match(attr->fa_local_nid, local_nid))
96 if (!(attr->fa_msg_mask & BIT(type)))
99 /* NB: ACK and REPLY have no portal, but they should have been
100 * rejected by message mask */
101 if (attr->fa_ptl_mask != 0 && /* has portal filter */
102 !(attr->fa_ptl_mask & (1ULL << portal)))
109 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
111 if (attr->fa_msg_mask == 0)
112 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
114 if (attr->fa_ptl_mask == 0) /* no portal filter */
117 /* NB: only PUT and GET can be filtered if portal filter has been set */
118 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
119 if (attr->fa_msg_mask == 0) {
120 CDEBUG(D_NET, "can't find valid message type bits %x\n",
128 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
130 /* NB: fs_counter is NOT updated by this function */
148 * LNet message drop simulation
152 * Add a new drop rule to LNet
153 * There is no check for duplicated drop rule, all rules will be checked for
157 lnet_drop_rule_add(struct lnet_fault_attr *attr)
159 struct lnet_drop_rule *rule;
162 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
164 "please provide either drop rate or drop interval, "
165 "but not both at the same time %d/%d\n",
166 attr->u.drop.da_rate, attr->u.drop.da_interval);
170 if (lnet_fault_attr_validate(attr) != 0)
177 spin_lock_init(&rule->dr_lock);
179 rule->dr_attr = *attr;
180 if (attr->u.drop.da_interval != 0) {
181 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
182 rule->dr_drop_time = ktime_get_seconds() +
183 prandom_u32_max(attr->u.drop.da_interval);
185 rule->dr_drop_at = prandom_u32_max(attr->u.drop.da_rate);
188 lnet_net_lock(LNET_LOCK_EX);
189 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
190 lnet_net_unlock(LNET_LOCK_EX);
192 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
193 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
194 attr->u.drop.da_rate, attr->u.drop.da_interval);
199 * Remove matched drop rules from lnet, all rules that can match \a src and
200 * \a dst will be removed.
201 * If \a src is zero, then all rules have \a dst as destination will be remove
202 * If \a dst is zero, then all rules have \a src as source will be removed
203 * If both of them are zero, all rules will be removed
206 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
208 struct lnet_drop_rule *rule;
209 struct lnet_drop_rule *tmp;
214 lnet_net_lock(LNET_LOCK_EX);
215 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
216 if (rule->dr_attr.fa_src != src && src != 0)
219 if (rule->dr_attr.fa_dst != dst && dst != 0)
222 list_move(&rule->dr_link, &zombies);
224 lnet_net_unlock(LNET_LOCK_EX);
226 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
227 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
228 libcfs_nid2str(rule->dr_attr.fa_src),
229 libcfs_nid2str(rule->dr_attr.fa_dst),
230 rule->dr_attr.u.drop.da_rate,
231 rule->dr_attr.u.drop.da_interval);
233 list_del(&rule->dr_link);
242 * List drop rule at position of \a pos
245 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
246 struct lnet_fault_stat *stat)
248 struct lnet_drop_rule *rule;
254 cpt = lnet_net_lock_current();
255 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
259 spin_lock(&rule->dr_lock);
260 *attr = rule->dr_attr;
261 *stat = rule->dr_stat;
262 spin_unlock(&rule->dr_lock);
267 lnet_net_unlock(cpt);
272 * reset counters for all drop rules
275 lnet_drop_rule_reset(void)
277 struct lnet_drop_rule *rule;
281 cpt = lnet_net_lock_current();
283 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
284 struct lnet_fault_attr *attr = &rule->dr_attr;
286 spin_lock(&rule->dr_lock);
288 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
289 if (attr->u.drop.da_rate != 0) {
290 rule->dr_drop_at = prandom_u32_max(attr->u.drop.da_rate);
292 rule->dr_drop_time = ktime_get_seconds() +
293 prandom_u32_max(attr->u.drop.da_interval);
294 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
296 spin_unlock(&rule->dr_lock);
299 lnet_net_unlock(cpt);
304 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
311 /* assign a random failure */
312 choice = prandom_u32_max(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
316 if (mask == HSTATUS_RANDOM) {
321 if (mask & BIT(choice)) {
326 /* round to the closest ON bit */
328 best_delta = HSTATUS_END;
334 if (delta < best_delta) {
346 * check source/destination NID, portal, message type and drop rate,
347 * decide whether should drop this message or not
350 drop_rule_match(struct lnet_drop_rule *rule,
351 struct lnet_nid *src,
352 struct lnet_nid *local_nid,
353 struct lnet_nid *dst,
354 unsigned int type, unsigned int portal,
355 enum lnet_msg_hstatus *hstatus)
357 struct lnet_fault_attr *attr = &rule->dr_attr;
360 if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
363 if (attr->u.drop.da_drop_all) {
364 CDEBUG(D_NET, "set to drop all messages\n");
370 * if we're trying to match a health status error but it hasn't
371 * been set in the rule, then don't match
373 if ((hstatus && !attr->u.drop.da_health_error_mask) ||
374 (!hstatus && attr->u.drop.da_health_error_mask))
377 /* match this rule, check drop rate now */
378 spin_lock(&rule->dr_lock);
379 if (attr->u.drop.da_random) {
380 int value = prandom_u32_max(attr->u.drop.da_interval);
381 if (value >= (attr->u.drop.da_interval / 2))
385 } else if (rule->dr_drop_time != 0) { /* time based drop */
386 time64_t now = ktime_get_seconds();
388 rule->dr_stat.fs_count++;
389 drop = now >= rule->dr_drop_time;
391 if (now > rule->dr_time_base)
392 rule->dr_time_base = now;
394 rule->dr_drop_time = rule->dr_time_base +
395 prandom_u32_max(attr->u.drop.da_interval);
396 rule->dr_time_base += attr->u.drop.da_interval;
398 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
399 libcfs_nid2str(attr->fa_src),
400 libcfs_nid2str(attr->fa_dst),
404 } else { /* rate based drop */
407 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
408 count = rule->dr_stat.fs_count;
409 if (do_div(count, attr->u.drop.da_rate) == 0) {
410 rule->dr_drop_at = rule->dr_stat.fs_count +
411 prandom_u32_max(attr->u.drop.da_rate);
412 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
413 libcfs_nid2str(attr->fa_src),
414 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
420 if (drop) { /* drop this message, update counters */
422 lnet_fault_match_health(hstatus,
423 attr->u.drop.da_health_error_mask);
424 lnet_fault_stat_inc(&rule->dr_stat, type);
425 rule->dr_stat.u.drop.ds_dropped++;
428 spin_unlock(&rule->dr_lock);
433 * Check if message from \a src to \a dst can match any existed drop rule
436 lnet_drop_rule_match(struct lnet_hdr *hdr,
437 struct lnet_nid *local_nid,
438 enum lnet_msg_hstatus *hstatus)
440 unsigned int typ = hdr->type;
441 struct lnet_drop_rule *rule;
442 unsigned int ptl = -1;
446 /* NB: if Portal is specified, then only PUT and GET will be
447 * filtered by drop rule */
448 if (typ == LNET_MSG_PUT)
449 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
450 else if (typ == LNET_MSG_GET)
451 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
453 cpt = lnet_net_lock_current();
454 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
455 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
456 &hdr->dest_nid, typ, ptl,
461 lnet_net_unlock(cpt);
467 * LNet Delay Simulation
469 /** timestamp (second) to send delayed message */
470 #define msg_delay_send msg_ev.hdr_data
472 struct lnet_delay_rule {
473 /** link chain on the_lnet.ln_delay_rules */
474 struct list_head dl_link;
475 /** link chain on delay_dd.dd_sched_rules */
476 struct list_head dl_sched_link;
477 /** attributes of this rule */
478 struct lnet_fault_attr dl_attr;
479 /** lock to protect \a below members */
481 /** refcount of delay rule */
482 atomic_t dl_refcount;
484 * the message sequence to delay, which means message is delayed when
485 * dl_stat.fs_count == dl_delay_at
487 unsigned long dl_delay_at;
489 * seconds to delay the next message, it's exclusive with dl_delay_at
491 time64_t dl_delay_time;
492 /** baseline to caculate dl_delay_time */
493 time64_t dl_time_base;
494 /** seconds until we send the next delayed message */
495 time64_t dl_msg_send;
496 /** delayed message list */
497 struct list_head dl_msg_list;
498 /** statistic of delayed messages */
499 struct lnet_fault_stat dl_stat;
500 /** timer to wakeup delay_daemon */
501 struct timer_list dl_timer;
504 struct delay_daemon_data {
505 /** serialise rule add/remove */
506 struct mutex dd_mutex;
507 /** protect rules on \a dd_sched_rules */
509 /** scheduled delay rules (by timer) */
510 struct list_head dd_sched_rules;
511 /** deamon thread sleeps at here */
512 wait_queue_head_t dd_waitq;
513 /** controler (lctl command) wait at here */
514 wait_queue_head_t dd_ctl_waitq;
515 /** deamon is running */
516 unsigned int dd_running;
517 /** deamon stopped */
518 unsigned int dd_stopped;
521 static struct delay_daemon_data delay_dd;
524 delay_rule_decref(struct lnet_delay_rule *rule)
526 if (atomic_dec_and_test(&rule->dl_refcount)) {
527 LASSERT(list_empty(&rule->dl_sched_link));
528 LASSERT(list_empty(&rule->dl_msg_list));
529 LASSERT(list_empty(&rule->dl_link));
536 * check source/destination NID, portal, message type and delay rate,
537 * decide whether should delay this message or not
540 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
541 struct lnet_nid *dst, unsigned int type, unsigned int portal,
542 struct lnet_msg *msg)
544 struct lnet_fault_attr *attr = &rule->dl_attr;
546 time64_t now = ktime_get_seconds();
548 if (!lnet_fault_attr_match(attr, src, NULL,
552 /* match this rule, check delay rate now */
553 spin_lock(&rule->dl_lock);
554 if (rule->dl_delay_time != 0) { /* time based delay */
555 rule->dl_stat.fs_count++;
556 delay = now >= rule->dl_delay_time;
558 if (now > rule->dl_time_base)
559 rule->dl_time_base = now;
561 rule->dl_delay_time = rule->dl_time_base +
562 prandom_u32_max(attr->u.delay.la_interval);
563 rule->dl_time_base += attr->u.delay.la_interval;
565 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
566 libcfs_nid2str(attr->fa_src),
567 libcfs_nid2str(attr->fa_dst),
568 rule->dl_delay_time);
571 } else { /* rate based delay */
574 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
575 /* generate the next random rate sequence */
576 count = rule->dl_stat.fs_count;
577 if (do_div(count, attr->u.delay.la_rate) == 0) {
578 rule->dl_delay_at = rule->dl_stat.fs_count +
579 prandom_u32_max(attr->u.delay.la_rate);
580 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
581 libcfs_nid2str(attr->fa_src),
582 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
587 spin_unlock(&rule->dl_lock);
591 /* delay this message, update counters */
592 lnet_fault_stat_inc(&rule->dl_stat, type);
593 rule->dl_stat.u.delay.ls_delayed++;
595 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
596 msg->msg_delay_send = now + attr->u.delay.la_latency;
597 if (rule->dl_msg_send == -1) {
598 rule->dl_msg_send = msg->msg_delay_send;
599 mod_timer(&rule->dl_timer,
600 jiffies + cfs_time_seconds(attr->u.delay.la_latency));
603 spin_unlock(&rule->dl_lock);
608 * check if \a msg can match any Delay Rule, receiving of this message
609 * will be delayed if there is a match.
612 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
614 struct lnet_delay_rule *rule;
615 unsigned int typ = hdr->type;
616 unsigned int ptl = -1;
618 /* NB: called with hold of lnet_net_lock */
620 /* NB: if Portal is specified, then only PUT and GET will be
621 * filtered by delay rule */
622 if (typ == LNET_MSG_PUT)
623 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
624 else if (typ == LNET_MSG_GET)
625 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
627 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
628 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
636 /** check out delayed messages for send */
638 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
639 struct list_head *msg_list)
641 struct lnet_msg *msg;
642 struct lnet_msg *tmp;
643 time64_t now = ktime_get_seconds();
645 if (!all && rule->dl_msg_send > now)
648 spin_lock(&rule->dl_lock);
649 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
650 if (!all && msg->msg_delay_send > now)
653 msg->msg_delay_send = 0;
654 list_move_tail(&msg->msg_list, msg_list);
657 if (list_empty(&rule->dl_msg_list)) {
658 del_timer(&rule->dl_timer);
659 rule->dl_msg_send = -1;
661 } else if (!list_empty(msg_list)) {
662 /* dequeued some timedout messages, update timer for the
663 * next delayed message on rule */
664 msg = list_first_entry(&rule->dl_msg_list,
665 struct lnet_msg, msg_list);
666 rule->dl_msg_send = msg->msg_delay_send;
667 mod_timer(&rule->dl_timer,
669 cfs_time_seconds(msg->msg_delay_send - now));
671 spin_unlock(&rule->dl_lock);
675 delayed_msg_process(struct list_head *msg_list, bool drop)
677 struct lnet_msg *msg;
679 while ((msg = list_first_entry_or_null(msg_list, struct lnet_msg,
680 msg_list)) != NULL) {
685 if (msg->msg_sending) {
687 list_del_init(&msg->msg_list);
689 CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
690 libcfs_nidstr(&ni->ni_nid),
691 libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
692 lnet_msgtyp2str(msg->msg_type));
693 lnet_ni_send(ni, msg);
697 /* Delayed receive */
698 LASSERT(msg->msg_rxpeer != NULL);
699 LASSERT(msg->msg_rxni != NULL);
702 cpt = msg->msg_rx_cpt;
704 list_del_init(&msg->msg_list);
708 } else if (!msg->msg_routing) {
709 rc = lnet_parse_local(ni, msg);
715 rc = lnet_parse_forward_locked(ni, msg);
716 lnet_net_unlock(cpt);
720 lnet_ni_recv(ni, msg->msg_private, msg, 0,
721 0, msg->msg_len, msg->msg_len);
723 case LNET_CREDIT_WAIT:
725 default: /* failures */
730 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
732 lnet_finalize(msg, rc);
737 * Process delayed messages for scheduled rules
738 * This function can either be called by delay_rule_daemon, or by lnet_finalise
741 lnet_delay_rule_check(void)
743 struct lnet_delay_rule *rule;
747 if (list_empty(&delay_dd.dd_sched_rules))
750 spin_lock_bh(&delay_dd.dd_lock);
751 if (list_empty(&delay_dd.dd_sched_rules)) {
752 spin_unlock_bh(&delay_dd.dd_lock);
756 rule = list_first_entry(&delay_dd.dd_sched_rules,
757 struct lnet_delay_rule, dl_sched_link);
758 list_del_init(&rule->dl_sched_link);
759 spin_unlock_bh(&delay_dd.dd_lock);
761 delayed_msg_check(rule, false, &msgs);
762 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
765 if (!list_empty(&msgs))
766 delayed_msg_process(&msgs, false);
769 /** deamon thread to handle delayed messages */
771 lnet_delay_rule_daemon(void *arg)
773 delay_dd.dd_running = 1;
774 wake_up(&delay_dd.dd_ctl_waitq);
776 while (delay_dd.dd_running) {
777 wait_event_interruptible(delay_dd.dd_waitq,
778 !delay_dd.dd_running ||
779 !list_empty(&delay_dd.dd_sched_rules));
780 lnet_delay_rule_check();
783 /* in case more rules have been enqueued after my last check */
784 lnet_delay_rule_check();
785 delay_dd.dd_stopped = 1;
786 wake_up(&delay_dd.dd_ctl_waitq);
792 delay_timer_cb(cfs_timer_cb_arg_t data)
794 struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
796 spin_lock_bh(&delay_dd.dd_lock);
797 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
798 atomic_inc(&rule->dl_refcount);
799 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
800 wake_up(&delay_dd.dd_waitq);
802 spin_unlock_bh(&delay_dd.dd_lock);
806 * Add a new delay rule to LNet
807 * There is no check for duplicated delay rule, all rules will be checked for
811 lnet_delay_rule_add(struct lnet_fault_attr *attr)
813 struct lnet_delay_rule *rule;
817 if (!((attr->u.delay.la_rate == 0) ^
818 (attr->u.delay.la_interval == 0))) {
820 "please provide either delay rate or delay interval, "
821 "but not both at the same time %d/%d\n",
822 attr->u.delay.la_rate, attr->u.delay.la_interval);
826 if (attr->u.delay.la_latency == 0) {
827 CDEBUG(D_NET, "delay latency cannot be zero\n");
831 if (lnet_fault_attr_validate(attr) != 0)
838 mutex_lock(&delay_dd.dd_mutex);
839 if (!delay_dd.dd_running) {
840 struct task_struct *task;
842 /* NB: although LND threads will process delayed message
843 * in lnet_finalize, but there is no guarantee that LND
844 * threads will be waken up if no other message needs to
846 * Only one daemon thread, performance is not the concern
847 * of this simualation module.
849 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
854 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
857 cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
858 (unsigned long)rule, 0);
860 spin_lock_init(&rule->dl_lock);
861 INIT_LIST_HEAD(&rule->dl_msg_list);
862 INIT_LIST_HEAD(&rule->dl_sched_link);
864 rule->dl_attr = *attr;
865 if (attr->u.delay.la_interval != 0) {
866 rule->dl_time_base = ktime_get_seconds() +
867 attr->u.delay.la_interval;
868 rule->dl_delay_time = ktime_get_seconds() +
869 prandom_u32_max(attr->u.delay.la_interval);
871 rule->dl_delay_at = prandom_u32_max(attr->u.delay.la_rate);
874 rule->dl_msg_send = -1;
876 lnet_net_lock(LNET_LOCK_EX);
877 atomic_set(&rule->dl_refcount, 1);
878 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
879 lnet_net_unlock(LNET_LOCK_EX);
881 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
882 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
883 attr->u.delay.la_rate);
885 mutex_unlock(&delay_dd.dd_mutex);
888 mutex_unlock(&delay_dd.dd_mutex);
894 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
895 * and \a dst are zero, all rules will be removed, otherwise only matched rules
897 * If \a src is zero, then all rules have \a dst as destination will be remove
898 * If \a dst is zero, then all rules have \a src as source will be removed
900 * When a delay rule is removed, all delayed messages of this rule will be
901 * processed immediately.
904 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
906 struct lnet_delay_rule *rule;
907 struct lnet_delay_rule *tmp;
908 LIST_HEAD(rule_list);
917 mutex_lock(&delay_dd.dd_mutex);
918 lnet_net_lock(LNET_LOCK_EX);
920 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
921 if (rule->dl_attr.fa_src != src && src != 0)
924 if (rule->dl_attr.fa_dst != dst && dst != 0)
927 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
928 libcfs_nid2str(rule->dl_attr.fa_src),
929 libcfs_nid2str(rule->dl_attr.fa_dst),
930 rule->dl_attr.u.delay.la_rate,
931 rule->dl_attr.u.delay.la_interval);
932 /* refcount is taken over by rule_list */
933 list_move(&rule->dl_link, &rule_list);
936 /* check if we need to shutdown delay_daemon */
937 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
938 !list_empty(&rule_list);
939 lnet_net_unlock(LNET_LOCK_EX);
941 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
942 list_del_init(&rule->dl_link);
944 del_timer_sync(&rule->dl_timer);
945 delayed_msg_check(rule, true, &msg_list);
946 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
950 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
951 LASSERT(delay_dd.dd_running);
952 delay_dd.dd_running = 0;
953 wake_up(&delay_dd.dd_waitq);
955 while (!delay_dd.dd_stopped)
956 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
958 mutex_unlock(&delay_dd.dd_mutex);
960 if (!list_empty(&msg_list))
961 delayed_msg_process(&msg_list, shutdown);
967 * List Delay Rule at position of \a pos
970 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
971 struct lnet_fault_stat *stat)
973 struct lnet_delay_rule *rule;
979 cpt = lnet_net_lock_current();
980 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
984 spin_lock(&rule->dl_lock);
985 *attr = rule->dl_attr;
986 *stat = rule->dl_stat;
987 spin_unlock(&rule->dl_lock);
992 lnet_net_unlock(cpt);
997 * reset counters for all Delay Rules
1000 lnet_delay_rule_reset(void)
1002 struct lnet_delay_rule *rule;
1006 cpt = lnet_net_lock_current();
1008 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1009 struct lnet_fault_attr *attr = &rule->dl_attr;
1011 spin_lock(&rule->dl_lock);
1013 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1014 if (attr->u.delay.la_rate != 0) {
1015 rule->dl_delay_at = prandom_u32_max(attr->u.delay.la_rate);
1017 rule->dl_delay_time = ktime_get_seconds() +
1018 prandom_u32_max(attr->u.delay.la_interval);
1019 rule->dl_time_base = ktime_get_seconds() +
1020 attr->u.delay.la_interval;
1022 spin_unlock(&rule->dl_lock);
1025 lnet_net_unlock(cpt);
1030 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1032 struct lnet_fault_attr *attr;
1033 struct lnet_fault_stat *stat;
1035 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1041 case LNET_CTL_DROP_ADD:
1045 return lnet_drop_rule_add(attr);
1047 case LNET_CTL_DROP_DEL:
1051 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
1055 case LNET_CTL_DROP_RESET:
1056 lnet_drop_rule_reset();
1059 case LNET_CTL_DROP_LIST:
1060 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1061 if (attr == NULL || stat == NULL)
1064 return lnet_drop_rule_list(data->ioc_count, attr, stat);
1066 case LNET_CTL_DELAY_ADD:
1070 return lnet_delay_rule_add(attr);
1072 case LNET_CTL_DELAY_DEL:
1076 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
1077 attr->fa_dst, false);
1080 case LNET_CTL_DELAY_RESET:
1081 lnet_delay_rule_reset();
1084 case LNET_CTL_DELAY_LIST:
1085 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1086 if (attr == NULL || stat == NULL)
1089 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1094 lnet_fault_init(void)
1096 BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1097 BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1098 BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1099 BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1101 mutex_init(&delay_dd.dd_mutex);
1102 spin_lock_init(&delay_dd.dd_lock);
1103 init_waitqueue_head(&delay_dd.dd_waitq);
1104 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1105 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1111 lnet_fault_fini(void)
1113 lnet_drop_rule_del(0, 0);
1114 lnet_delay_rule_del(0, 0, true);
1116 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1117 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1118 LASSERT(list_empty(&delay_dd.dd_sched_rules));