4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2014, 2017, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
29 * lnet/lnet/net_fault.c
31 * Lustre network fault simulation
33 * Author: liang.zhen@intel.com
36 #define DEBUG_SUBSYSTEM S_LNET
38 #include <linux/random.h>
39 #include <lnet/lib-lnet.h>
40 #include <uapi/linux/lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 time64_t dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 time64_t dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
68 lnet_fault_nid_match(lnet_nid_t nid, struct lnet_nid *msg_nid)
70 if (nid == LNET_NID_ANY)
74 if (lnet_nid_to_nid4(msg_nid) == nid)
77 if (LNET_NIDNET(nid) != LNET_NID_NET(msg_nid))
80 /* 255.255.255.255@net is wildcard for all addresses in a network */
81 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
85 lnet_fault_attr_match(struct lnet_fault_attr *attr,
87 struct lnet_nid *local_nid,
89 unsigned int type, unsigned int portal)
91 if (!lnet_fault_nid_match(attr->fa_src, src) ||
92 !lnet_fault_nid_match(attr->fa_dst, dst) ||
93 !lnet_fault_nid_match(attr->fa_local_nid, local_nid))
96 if (!(attr->fa_msg_mask & BIT(type)))
99 /* NB: ACK and REPLY have no portal, but they should have been
100 * rejected by message mask */
101 if (attr->fa_ptl_mask != 0 && /* has portal filter */
102 !(attr->fa_ptl_mask & (1ULL << portal)))
109 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
111 if (attr->fa_msg_mask == 0)
112 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
114 if (attr->fa_ptl_mask == 0) /* no portal filter */
117 /* NB: only PUT and GET can be filtered if portal filter has been set */
118 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
119 if (attr->fa_msg_mask == 0) {
120 CDEBUG(D_NET, "can't find valid message type bits %x\n",
128 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
130 /* NB: fs_counter is NOT updated by this function */
148 * LNet message drop simulation
152 * Add a new drop rule to LNet
153 * There is no check for duplicated drop rule, all rules will be checked for
157 lnet_drop_rule_add(struct lnet_fault_attr *attr)
159 struct lnet_drop_rule *rule;
162 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
164 "please provide either drop rate or drop interval, "
165 "but not both at the same time %d/%d\n",
166 attr->u.drop.da_rate, attr->u.drop.da_interval);
170 if (lnet_fault_attr_validate(attr) != 0)
177 spin_lock_init(&rule->dr_lock);
179 rule->dr_attr = *attr;
180 if (attr->u.drop.da_interval != 0) {
181 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
182 rule->dr_drop_time = ktime_get_seconds() +
183 prandom_u32_max(attr->u.drop.da_interval);
185 rule->dr_drop_at = prandom_u32_max(attr->u.drop.da_rate);
188 lnet_net_lock(LNET_LOCK_EX);
189 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
190 lnet_net_unlock(LNET_LOCK_EX);
192 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
193 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
194 attr->u.drop.da_rate, attr->u.drop.da_interval);
199 * Remove matched drop rules from lnet, all rules that can match \a src and
200 * \a dst will be removed.
201 * If \a src is zero, then all rules have \a dst as destination will be remove
202 * If \a dst is zero, then all rules have \a src as source will be removed
203 * If both of them are zero, all rules will be removed
206 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
208 struct lnet_drop_rule *rule;
209 struct lnet_drop_rule *tmp;
214 lnet_net_lock(LNET_LOCK_EX);
215 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
216 if (rule->dr_attr.fa_src != src && src != 0)
219 if (rule->dr_attr.fa_dst != dst && dst != 0)
222 list_move(&rule->dr_link, &zombies);
224 lnet_net_unlock(LNET_LOCK_EX);
226 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
227 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
228 libcfs_nid2str(rule->dr_attr.fa_src),
229 libcfs_nid2str(rule->dr_attr.fa_dst),
230 rule->dr_attr.u.drop.da_rate,
231 rule->dr_attr.u.drop.da_interval);
233 list_del(&rule->dr_link);
242 * List drop rule at position of \a pos
245 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
246 struct lnet_fault_stat *stat)
248 struct lnet_drop_rule *rule;
254 cpt = lnet_net_lock_current();
255 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
259 spin_lock(&rule->dr_lock);
260 *attr = rule->dr_attr;
261 *stat = rule->dr_stat;
262 spin_unlock(&rule->dr_lock);
267 lnet_net_unlock(cpt);
272 * reset counters for all drop rules
275 lnet_drop_rule_reset(void)
277 struct lnet_drop_rule *rule;
281 cpt = lnet_net_lock_current();
283 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
284 struct lnet_fault_attr *attr = &rule->dr_attr;
286 spin_lock(&rule->dr_lock);
288 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
289 if (attr->u.drop.da_rate != 0) {
290 rule->dr_drop_at = prandom_u32_max(attr->u.drop.da_rate);
292 rule->dr_drop_time = ktime_get_seconds() +
293 prandom_u32_max(attr->u.drop.da_interval);
294 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
296 spin_unlock(&rule->dr_lock);
299 lnet_net_unlock(cpt);
304 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
311 /* assign a random failure */
312 choice = prandom_u32_max(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
316 if (mask == HSTATUS_RANDOM) {
321 if (mask & BIT(choice)) {
326 /* round to the closest ON bit */
328 best_delta = HSTATUS_END;
334 if (delta < best_delta) {
346 * check source/destination NID, portal, message type and drop rate,
347 * decide whether should drop this message or not
350 drop_rule_match(struct lnet_drop_rule *rule,
351 struct lnet_nid *src,
352 struct lnet_nid *local_nid,
353 struct lnet_nid *dst,
354 unsigned int type, unsigned int portal,
355 enum lnet_msg_hstatus *hstatus)
357 struct lnet_fault_attr *attr = &rule->dr_attr;
360 if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
363 if (attr->u.drop.da_drop_all) {
364 CDEBUG(D_NET, "set to drop all messages\n");
370 * if we're trying to match a health status error but it hasn't
371 * been set in the rule, then don't match
373 if ((hstatus && !attr->u.drop.da_health_error_mask) ||
374 (!hstatus && attr->u.drop.da_health_error_mask))
377 /* match this rule, check drop rate now */
378 spin_lock(&rule->dr_lock);
379 if (attr->u.drop.da_random) {
380 int value = prandom_u32_max(attr->u.drop.da_interval);
381 if (value >= (attr->u.drop.da_interval / 2))
385 } else if (rule->dr_drop_time != 0) { /* time based drop */
386 time64_t now = ktime_get_seconds();
388 rule->dr_stat.fs_count++;
389 drop = now >= rule->dr_drop_time;
391 if (now > rule->dr_time_base)
392 rule->dr_time_base = now;
394 rule->dr_drop_time = rule->dr_time_base +
395 prandom_u32_max(attr->u.drop.da_interval);
396 rule->dr_time_base += attr->u.drop.da_interval;
398 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
399 libcfs_nid2str(attr->fa_src),
400 libcfs_nid2str(attr->fa_dst),
404 } else { /* rate based drop */
407 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
408 count = rule->dr_stat.fs_count;
409 if (do_div(count, attr->u.drop.da_rate) == 0) {
410 rule->dr_drop_at = rule->dr_stat.fs_count +
411 prandom_u32_max(attr->u.drop.da_rate);
412 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
413 libcfs_nid2str(attr->fa_src),
414 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
420 if (drop) { /* drop this message, update counters */
422 lnet_fault_match_health(hstatus,
423 attr->u.drop.da_health_error_mask);
424 lnet_fault_stat_inc(&rule->dr_stat, type);
425 rule->dr_stat.u.drop.ds_dropped++;
428 spin_unlock(&rule->dr_lock);
433 * Check if message from \a src to \a dst can match any existed drop rule
436 lnet_drop_rule_match(struct lnet_hdr *hdr,
437 struct lnet_nid *local_nid,
438 enum lnet_msg_hstatus *hstatus)
440 unsigned int typ = hdr->type;
441 struct lnet_drop_rule *rule;
442 unsigned int ptl = -1;
446 /* NB: if Portal is specified, then only PUT and GET will be
447 * filtered by drop rule */
448 if (typ == LNET_MSG_PUT)
449 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
450 else if (typ == LNET_MSG_GET)
451 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
453 cpt = lnet_net_lock_current();
454 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
455 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
456 &hdr->dest_nid, typ, ptl,
461 lnet_net_unlock(cpt);
467 * LNet Delay Simulation
469 /** timestamp (second) to send delayed message */
470 #define msg_delay_send msg_ev.hdr_data
472 struct lnet_delay_rule {
473 /** link chain on the_lnet.ln_delay_rules */
474 struct list_head dl_link;
475 /** link chain on delay_dd.dd_sched_rules */
476 struct list_head dl_sched_link;
477 /** attributes of this rule */
478 struct lnet_fault_attr dl_attr;
479 /** lock to protect \a below members */
481 /** refcount of delay rule */
482 atomic_t dl_refcount;
484 * the message sequence to delay, which means message is delayed when
485 * dl_stat.fs_count == dl_delay_at
487 unsigned long dl_delay_at;
489 * seconds to delay the next message, it's exclusive with dl_delay_at
491 time64_t dl_delay_time;
492 /** baseline to caculate dl_delay_time */
493 time64_t dl_time_base;
494 /** seconds until we send the next delayed message */
495 time64_t dl_msg_send;
496 /** delayed message list */
497 struct list_head dl_msg_list;
498 /** statistic of delayed messages */
499 struct lnet_fault_stat dl_stat;
500 /** timer to wakeup delay_daemon */
501 struct timer_list dl_timer;
504 struct delay_daemon_data {
505 /** serialise rule add/remove */
506 struct mutex dd_mutex;
507 /** protect rules on \a dd_sched_rules */
509 /** scheduled delay rules (by timer) */
510 struct list_head dd_sched_rules;
511 /** deamon thread sleeps at here */
512 wait_queue_head_t dd_waitq;
513 /** controler (lctl command) wait at here */
514 wait_queue_head_t dd_ctl_waitq;
515 /** deamon is running */
516 unsigned int dd_running;
517 /** deamon stopped */
518 unsigned int dd_stopped;
521 static struct delay_daemon_data delay_dd;
524 delay_rule_decref(struct lnet_delay_rule *rule)
526 if (atomic_dec_and_test(&rule->dl_refcount)) {
527 LASSERT(list_empty(&rule->dl_sched_link));
528 LASSERT(list_empty(&rule->dl_msg_list));
529 LASSERT(list_empty(&rule->dl_link));
536 * check source/destination NID, portal, message type and delay rate,
537 * decide whether should delay this message or not
540 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
541 struct lnet_nid *dst, unsigned int type, unsigned int portal,
542 struct lnet_msg *msg)
544 struct lnet_fault_attr *attr = &rule->dl_attr;
546 time64_t now = ktime_get_seconds();
548 if (!lnet_fault_attr_match(attr, src, NULL,
552 /* match this rule, check delay rate now */
553 spin_lock(&rule->dl_lock);
554 if (rule->dl_delay_time != 0) { /* time based delay */
555 rule->dl_stat.fs_count++;
556 delay = now >= rule->dl_delay_time;
558 if (now > rule->dl_time_base)
559 rule->dl_time_base = now;
561 rule->dl_delay_time = rule->dl_time_base +
562 prandom_u32_max(attr->u.delay.la_interval);
563 rule->dl_time_base += attr->u.delay.la_interval;
565 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
566 libcfs_nid2str(attr->fa_src),
567 libcfs_nid2str(attr->fa_dst),
568 rule->dl_delay_time);
571 } else { /* rate based delay */
574 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
575 /* generate the next random rate sequence */
576 count = rule->dl_stat.fs_count;
577 if (do_div(count, attr->u.delay.la_rate) == 0) {
578 rule->dl_delay_at = rule->dl_stat.fs_count +
579 prandom_u32_max(attr->u.delay.la_rate);
580 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
581 libcfs_nid2str(attr->fa_src),
582 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
587 spin_unlock(&rule->dl_lock);
591 /* delay this message, update counters */
592 lnet_fault_stat_inc(&rule->dl_stat, type);
593 rule->dl_stat.u.delay.ls_delayed++;
595 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
596 msg->msg_delay_send = now + attr->u.delay.la_latency;
597 if (rule->dl_msg_send == -1) {
598 rule->dl_msg_send = msg->msg_delay_send;
599 mod_timer(&rule->dl_timer,
600 jiffies + cfs_time_seconds(attr->u.delay.la_latency));
603 spin_unlock(&rule->dl_lock);
608 * check if \a msg can match any Delay Rule, receiving of this message
609 * will be delayed if there is a match.
612 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
614 struct lnet_delay_rule *rule;
615 unsigned int typ = hdr->type;
616 unsigned int ptl = -1;
618 /* NB: called with hold of lnet_net_lock */
620 /* NB: if Portal is specified, then only PUT and GET will be
621 * filtered by delay rule */
622 if (typ == LNET_MSG_PUT)
623 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
624 else if (typ == LNET_MSG_GET)
625 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
627 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
628 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
636 /** check out delayed messages for send */
638 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
639 struct list_head *msg_list)
641 struct lnet_msg *msg;
642 struct lnet_msg *tmp;
643 time64_t now = ktime_get_seconds();
645 if (!all && rule->dl_msg_send > now)
648 spin_lock(&rule->dl_lock);
649 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
650 if (!all && msg->msg_delay_send > now)
653 msg->msg_delay_send = 0;
654 list_move_tail(&msg->msg_list, msg_list);
657 if (list_empty(&rule->dl_msg_list)) {
658 del_timer(&rule->dl_timer);
659 rule->dl_msg_send = -1;
661 } else if (!list_empty(msg_list)) {
662 /* dequeued some timedout messages, update timer for the
663 * next delayed message on rule */
664 msg = list_entry(rule->dl_msg_list.next,
665 struct lnet_msg, msg_list);
666 rule->dl_msg_send = msg->msg_delay_send;
667 mod_timer(&rule->dl_timer,
669 cfs_time_seconds(msg->msg_delay_send - now));
671 spin_unlock(&rule->dl_lock);
675 delayed_msg_process(struct list_head *msg_list, bool drop)
677 struct lnet_msg *msg;
679 while (!list_empty(msg_list)) {
684 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
686 if (msg->msg_sending) {
688 list_del_init(&msg->msg_list);
690 CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
691 libcfs_nidstr(&ni->ni_nid),
692 libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
693 lnet_msgtyp2str(msg->msg_type));
694 lnet_ni_send(ni, msg);
698 /* Delayed receive */
699 LASSERT(msg->msg_rxpeer != NULL);
700 LASSERT(msg->msg_rxni != NULL);
703 cpt = msg->msg_rx_cpt;
705 list_del_init(&msg->msg_list);
709 } else if (!msg->msg_routing) {
710 rc = lnet_parse_local(ni, msg);
716 rc = lnet_parse_forward_locked(ni, msg);
717 lnet_net_unlock(cpt);
721 lnet_ni_recv(ni, msg->msg_private, msg, 0,
722 0, msg->msg_len, msg->msg_len);
724 case LNET_CREDIT_WAIT:
726 default: /* failures */
731 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
733 lnet_finalize(msg, rc);
738 * Process delayed messages for scheduled rules
739 * This function can either be called by delay_rule_daemon, or by lnet_finalise
742 lnet_delay_rule_check(void)
744 struct lnet_delay_rule *rule;
748 if (list_empty(&delay_dd.dd_sched_rules))
751 spin_lock_bh(&delay_dd.dd_lock);
752 if (list_empty(&delay_dd.dd_sched_rules)) {
753 spin_unlock_bh(&delay_dd.dd_lock);
757 rule = list_entry(delay_dd.dd_sched_rules.next,
758 struct lnet_delay_rule, dl_sched_link);
759 list_del_init(&rule->dl_sched_link);
760 spin_unlock_bh(&delay_dd.dd_lock);
762 delayed_msg_check(rule, false, &msgs);
763 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
766 if (!list_empty(&msgs))
767 delayed_msg_process(&msgs, false);
770 /** deamon thread to handle delayed messages */
772 lnet_delay_rule_daemon(void *arg)
774 delay_dd.dd_running = 1;
775 wake_up(&delay_dd.dd_ctl_waitq);
777 while (delay_dd.dd_running) {
778 wait_event_interruptible(delay_dd.dd_waitq,
779 !delay_dd.dd_running ||
780 !list_empty(&delay_dd.dd_sched_rules));
781 lnet_delay_rule_check();
784 /* in case more rules have been enqueued after my last check */
785 lnet_delay_rule_check();
786 delay_dd.dd_stopped = 1;
787 wake_up(&delay_dd.dd_ctl_waitq);
793 delay_timer_cb(cfs_timer_cb_arg_t data)
795 struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
797 spin_lock_bh(&delay_dd.dd_lock);
798 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
799 atomic_inc(&rule->dl_refcount);
800 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
801 wake_up(&delay_dd.dd_waitq);
803 spin_unlock_bh(&delay_dd.dd_lock);
807 * Add a new delay rule to LNet
808 * There is no check for duplicated delay rule, all rules will be checked for
812 lnet_delay_rule_add(struct lnet_fault_attr *attr)
814 struct lnet_delay_rule *rule;
818 if (!((attr->u.delay.la_rate == 0) ^
819 (attr->u.delay.la_interval == 0))) {
821 "please provide either delay rate or delay interval, "
822 "but not both at the same time %d/%d\n",
823 attr->u.delay.la_rate, attr->u.delay.la_interval);
827 if (attr->u.delay.la_latency == 0) {
828 CDEBUG(D_NET, "delay latency cannot be zero\n");
832 if (lnet_fault_attr_validate(attr) != 0)
839 mutex_lock(&delay_dd.dd_mutex);
840 if (!delay_dd.dd_running) {
841 struct task_struct *task;
843 /* NB: although LND threads will process delayed message
844 * in lnet_finalize, but there is no guarantee that LND
845 * threads will be waken up if no other message needs to
847 * Only one daemon thread, performance is not the concern
848 * of this simualation module.
850 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
855 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
858 cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
859 (unsigned long)rule, 0);
861 spin_lock_init(&rule->dl_lock);
862 INIT_LIST_HEAD(&rule->dl_msg_list);
863 INIT_LIST_HEAD(&rule->dl_sched_link);
865 rule->dl_attr = *attr;
866 if (attr->u.delay.la_interval != 0) {
867 rule->dl_time_base = ktime_get_seconds() +
868 attr->u.delay.la_interval;
869 rule->dl_delay_time = ktime_get_seconds() +
870 prandom_u32_max(attr->u.delay.la_interval);
872 rule->dl_delay_at = prandom_u32_max(attr->u.delay.la_rate);
875 rule->dl_msg_send = -1;
877 lnet_net_lock(LNET_LOCK_EX);
878 atomic_set(&rule->dl_refcount, 1);
879 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
880 lnet_net_unlock(LNET_LOCK_EX);
882 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
883 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
884 attr->u.delay.la_rate);
886 mutex_unlock(&delay_dd.dd_mutex);
889 mutex_unlock(&delay_dd.dd_mutex);
895 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
896 * and \a dst are zero, all rules will be removed, otherwise only matched rules
898 * If \a src is zero, then all rules have \a dst as destination will be remove
899 * If \a dst is zero, then all rules have \a src as source will be removed
901 * When a delay rule is removed, all delayed messages of this rule will be
902 * processed immediately.
905 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
907 struct lnet_delay_rule *rule;
908 struct lnet_delay_rule *tmp;
909 LIST_HEAD(rule_list);
918 mutex_lock(&delay_dd.dd_mutex);
919 lnet_net_lock(LNET_LOCK_EX);
921 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
922 if (rule->dl_attr.fa_src != src && src != 0)
925 if (rule->dl_attr.fa_dst != dst && dst != 0)
928 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
929 libcfs_nid2str(rule->dl_attr.fa_src),
930 libcfs_nid2str(rule->dl_attr.fa_dst),
931 rule->dl_attr.u.delay.la_rate,
932 rule->dl_attr.u.delay.la_interval);
933 /* refcount is taken over by rule_list */
934 list_move(&rule->dl_link, &rule_list);
937 /* check if we need to shutdown delay_daemon */
938 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
939 !list_empty(&rule_list);
940 lnet_net_unlock(LNET_LOCK_EX);
942 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
943 list_del_init(&rule->dl_link);
945 del_timer_sync(&rule->dl_timer);
946 delayed_msg_check(rule, true, &msg_list);
947 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
951 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
952 LASSERT(delay_dd.dd_running);
953 delay_dd.dd_running = 0;
954 wake_up(&delay_dd.dd_waitq);
956 while (!delay_dd.dd_stopped)
957 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
959 mutex_unlock(&delay_dd.dd_mutex);
961 if (!list_empty(&msg_list))
962 delayed_msg_process(&msg_list, shutdown);
968 * List Delay Rule at position of \a pos
971 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
972 struct lnet_fault_stat *stat)
974 struct lnet_delay_rule *rule;
980 cpt = lnet_net_lock_current();
981 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
985 spin_lock(&rule->dl_lock);
986 *attr = rule->dl_attr;
987 *stat = rule->dl_stat;
988 spin_unlock(&rule->dl_lock);
993 lnet_net_unlock(cpt);
998 * reset counters for all Delay Rules
1001 lnet_delay_rule_reset(void)
1003 struct lnet_delay_rule *rule;
1007 cpt = lnet_net_lock_current();
1009 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1010 struct lnet_fault_attr *attr = &rule->dl_attr;
1012 spin_lock(&rule->dl_lock);
1014 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1015 if (attr->u.delay.la_rate != 0) {
1016 rule->dl_delay_at = prandom_u32_max(attr->u.delay.la_rate);
1018 rule->dl_delay_time = ktime_get_seconds() +
1019 prandom_u32_max(attr->u.delay.la_interval);
1020 rule->dl_time_base = ktime_get_seconds() +
1021 attr->u.delay.la_interval;
1023 spin_unlock(&rule->dl_lock);
1026 lnet_net_unlock(cpt);
1031 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1033 struct lnet_fault_attr *attr;
1034 struct lnet_fault_stat *stat;
1036 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1042 case LNET_CTL_DROP_ADD:
1046 return lnet_drop_rule_add(attr);
1048 case LNET_CTL_DROP_DEL:
1052 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
1056 case LNET_CTL_DROP_RESET:
1057 lnet_drop_rule_reset();
1060 case LNET_CTL_DROP_LIST:
1061 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1062 if (attr == NULL || stat == NULL)
1065 return lnet_drop_rule_list(data->ioc_count, attr, stat);
1067 case LNET_CTL_DELAY_ADD:
1071 return lnet_delay_rule_add(attr);
1073 case LNET_CTL_DELAY_DEL:
1077 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
1078 attr->fa_dst, false);
1081 case LNET_CTL_DELAY_RESET:
1082 lnet_delay_rule_reset();
1085 case LNET_CTL_DELAY_LIST:
1086 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1087 if (attr == NULL || stat == NULL)
1090 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1095 lnet_fault_init(void)
1097 BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1098 BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1099 BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1100 BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1102 mutex_init(&delay_dd.dd_mutex);
1103 spin_lock_init(&delay_dd.dd_lock);
1104 init_waitqueue_head(&delay_dd.dd_waitq);
1105 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1106 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1112 lnet_fault_fini(void)
1114 lnet_drop_rule_del(0, 0);
1115 lnet_delay_rule_del(0, 0, true);
1117 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1118 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1119 LASSERT(list_empty(&delay_dd.dd_sched_rules));