1 // SPDX-License-Identifier: GPL-2.0
3 /* Copyright (c) 2014, 2017, Intel Corporation. */
5 /* This file is part of Lustre, http://www.lustre.org/
7 * Lustre network fault simulation
9 * Author: liang.zhen@intel.com
12 #define DEBUG_SUBSYSTEM S_LNET
14 #include <linux/random.h>
15 #include <lnet/lib-lnet.h>
16 #include <uapi/linux/lnet/lnetctl.h>
18 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
19 LNET_GET_BIT | LNET_REPLY_BIT)
21 struct lnet_drop_rule {
22 /** link chain on the_lnet.ln_drop_rules */
23 struct list_head dr_link;
24 /** attributes of this rule */
25 struct lnet_fault_large_attr dr_attr;
26 /** lock to protect \a dr_drop_at and \a dr_stat */
29 * the message sequence to drop, which means message is dropped when
30 * dr_stat.drs_count == dr_drop_at
32 unsigned long dr_drop_at;
34 * seconds to drop the next message, it's exclusive with dr_drop_at
36 time64_t dr_drop_time;
37 /** baseline to caculate dr_drop_time */
38 time64_t dr_time_base;
39 /** statistic of dropped messages */
40 struct lnet_fault_stat dr_stat;
44 lnet_fault_attr_to_attr4(struct lnet_fault_large_attr *attr,
45 struct lnet_fault_attr *attr4)
50 attr4->fa_src = lnet_nid_to_nid4(&attr->fa_src);
51 attr4->fa_dst = lnet_nid_to_nid4(&attr->fa_dst);
52 attr4->fa_local_nid = lnet_nid_to_nid4(&attr->fa_local_nid);
53 attr4->fa_ptl_mask = attr->fa_ptl_mask;
54 attr4->fa_msg_mask = attr->fa_msg_mask;
56 memcpy(&attr4->u, &attr->u, sizeof(attr4->u));
60 lnet_fault_attr4_to_attr(struct lnet_fault_attr *attr4,
61 struct lnet_fault_large_attr *attr)
67 lnet_nid4_to_nid(attr4->fa_src, &attr->fa_src);
69 attr->fa_src = LNET_ANY_NID;
72 lnet_nid4_to_nid(attr4->fa_dst, &attr->fa_dst);
74 attr->fa_dst = LNET_ANY_NID;
76 if (attr4->fa_local_nid)
77 lnet_nid4_to_nid(attr4->fa_local_nid, &attr->fa_local_nid);
79 attr->fa_local_nid = LNET_ANY_NID;
81 attr->fa_ptl_mask = attr4->fa_ptl_mask;
82 attr->fa_msg_mask = attr4->fa_msg_mask;
84 memcpy(&attr->u, &attr4->u, sizeof(attr->u));
88 lnet_fault_nid_match(struct lnet_nid *nid, struct lnet_nid *msg_nid)
90 if (LNET_NID_IS_ANY(nid))
94 if (nid_same(msg_nid, nid))
97 if (LNET_NID_NET(nid) != LNET_NID_NET(msg_nid))
100 /* 255.255.255.255@net is wildcard for all addresses in a network */
101 return __be32_to_cpu(nid->nid_addr[0]) == LNET_NIDADDR(LNET_NID_ANY);
105 lnet_fault_attr_match(struct lnet_fault_large_attr *attr,
106 struct lnet_nid *src,
107 struct lnet_nid *local_nid,
108 struct lnet_nid *dst,
109 unsigned int type, unsigned int portal)
111 if (!lnet_fault_nid_match(&attr->fa_src, src) ||
112 !lnet_fault_nid_match(&attr->fa_dst, dst) ||
113 !lnet_fault_nid_match(&attr->fa_local_nid, local_nid))
116 if (!(attr->fa_msg_mask & BIT(type)))
119 /* NB: ACK and REPLY have no portal, but they should have been
120 * rejected by message mask */
121 if (attr->fa_ptl_mask != 0 && /* has portal filter */
122 !(attr->fa_ptl_mask & (1ULL << portal)))
129 lnet_fault_attr_validate(struct lnet_fault_large_attr *attr)
131 if (attr->fa_msg_mask == 0)
132 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
134 if (attr->fa_ptl_mask == 0) /* no portal filter */
137 /* NB: only PUT and GET can be filtered if portal filter has been set */
138 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
139 if (attr->fa_msg_mask == 0) {
140 CDEBUG(D_NET, "can't find valid message type bits %x\n",
148 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
150 /* NB: fs_counter is NOT updated by this function */
168 * LNet message drop simulation
172 * Add a new drop rule to LNet
173 * There is no check for duplicated drop rule, all rules will be checked for
177 lnet_drop_rule_add(struct lnet_fault_large_attr *attr)
179 struct lnet_drop_rule *rule;
182 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
184 "please provide either drop rate or drop interval, "
185 "but not both at the same time %d/%d\n",
186 attr->u.drop.da_rate, attr->u.drop.da_interval);
190 if (lnet_fault_attr_validate(attr) != 0)
197 spin_lock_init(&rule->dr_lock);
199 rule->dr_attr = *attr;
200 if (attr->u.drop.da_interval != 0) {
201 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
202 rule->dr_drop_time = ktime_get_seconds() +
203 get_random_u32_below(attr->u.drop.da_interval);
205 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
208 lnet_net_lock(LNET_LOCK_EX);
209 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
210 lnet_net_unlock(LNET_LOCK_EX);
212 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
213 libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
214 attr->u.drop.da_rate, attr->u.drop.da_interval);
219 * Remove matched drop rules from lnet, all rules that can match \a src and
220 * \a dst will be removed.
221 * If \a src is zero, then all rules have \a dst as destination will be remove
222 * If \a dst is zero, then all rules have \a src as source will be removed
223 * If both of them are zero, all rules will be removed
226 lnet_drop_rule_del(struct lnet_nid *src, struct lnet_nid *dst)
228 struct lnet_drop_rule *rule;
229 struct lnet_drop_rule *tmp;
234 lnet_net_lock(LNET_LOCK_EX);
235 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
236 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dr_attr.fa_src, src)))
239 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dr_attr.fa_dst, dst)))
242 list_move(&rule->dr_link, &zombies);
244 lnet_net_unlock(LNET_LOCK_EX);
246 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
247 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
248 libcfs_nidstr(&rule->dr_attr.fa_src),
249 libcfs_nidstr(&rule->dr_attr.fa_dst),
250 rule->dr_attr.u.drop.da_rate,
251 rule->dr_attr.u.drop.da_interval);
253 list_del(&rule->dr_link);
262 * List drop rule at position of \a pos
265 lnet_drop_rule_list(int pos, struct lnet_fault_large_attr *attr,
266 struct lnet_fault_stat *stat)
268 struct lnet_drop_rule *rule;
274 cpt = lnet_net_lock_current();
275 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
279 spin_lock(&rule->dr_lock);
280 *attr = rule->dr_attr;
281 *stat = rule->dr_stat;
282 spin_unlock(&rule->dr_lock);
287 lnet_net_unlock(cpt);
292 * reset counters for all drop rules
295 lnet_drop_rule_reset(void)
297 struct lnet_drop_rule *rule;
301 cpt = lnet_net_lock_current();
303 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
304 struct lnet_fault_large_attr *attr = &rule->dr_attr;
306 spin_lock(&rule->dr_lock);
308 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
309 if (attr->u.drop.da_rate != 0) {
310 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
312 rule->dr_drop_time = ktime_get_seconds() +
313 get_random_u32_below(attr->u.drop.da_interval);
314 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
316 spin_unlock(&rule->dr_lock);
319 lnet_net_unlock(cpt);
324 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
331 /* assign a random failure */
332 choice = get_random_u32_below(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
336 if (mask == HSTATUS_RANDOM) {
341 if (mask & BIT(choice)) {
346 /* round to the closest ON bit */
348 best_delta = HSTATUS_END;
354 if (delta < best_delta) {
366 * check source/destination NID, portal, message type and drop rate,
367 * decide whether should drop this message or not
370 drop_rule_match(struct lnet_drop_rule *rule,
371 struct lnet_nid *src,
372 struct lnet_nid *local_nid,
373 struct lnet_nid *dst,
374 unsigned int type, unsigned int portal,
375 enum lnet_msg_hstatus *hstatus)
377 struct lnet_fault_large_attr *attr = &rule->dr_attr;
380 if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
383 if (attr->u.drop.da_drop_all) {
384 CDEBUG(D_NET, "set to drop all messages\n");
390 * if we're trying to match a health status error but it hasn't
391 * been set in the rule, then don't match
393 if ((hstatus && !attr->u.drop.da_health_error_mask) ||
394 (!hstatus && attr->u.drop.da_health_error_mask))
397 /* match this rule, check drop rate now */
398 spin_lock(&rule->dr_lock);
399 if (attr->u.drop.da_random) {
400 int value = get_random_u32_below(attr->u.drop.da_interval);
401 if (value >= (attr->u.drop.da_interval / 2))
405 } else if (rule->dr_drop_time != 0) { /* time based drop */
406 time64_t now = ktime_get_seconds();
408 rule->dr_stat.fs_count++;
409 drop = now >= rule->dr_drop_time;
411 if (now > rule->dr_time_base)
412 rule->dr_time_base = now;
414 rule->dr_drop_time = rule->dr_time_base +
415 get_random_u32_below(attr->u.drop.da_interval);
416 rule->dr_time_base += attr->u.drop.da_interval;
418 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
419 libcfs_nidstr(&attr->fa_src),
420 libcfs_nidstr(&attr->fa_dst),
424 } else { /* rate based drop */
427 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
428 count = rule->dr_stat.fs_count;
429 if (do_div(count, attr->u.drop.da_rate) == 0) {
430 rule->dr_drop_at = rule->dr_stat.fs_count +
431 get_random_u32_below(attr->u.drop.da_rate);
432 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
433 libcfs_nidstr(&attr->fa_src),
434 libcfs_nidstr(&attr->fa_dst), rule->dr_drop_at);
440 if (drop) { /* drop this message, update counters */
442 lnet_fault_match_health(hstatus,
443 attr->u.drop.da_health_error_mask);
444 lnet_fault_stat_inc(&rule->dr_stat, type);
445 rule->dr_stat.u.drop.ds_dropped++;
448 spin_unlock(&rule->dr_lock);
453 * Check if message from \a src to \a dst can match any existed drop rule
456 lnet_drop_rule_match(struct lnet_hdr *hdr,
457 struct lnet_nid *local_nid,
458 enum lnet_msg_hstatus *hstatus)
460 unsigned int typ = hdr->type;
461 struct lnet_drop_rule *rule;
462 unsigned int ptl = -1;
466 /* NB: if Portal is specified, then only PUT and GET will be
467 * filtered by drop rule */
468 if (typ == LNET_MSG_PUT)
469 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
470 else if (typ == LNET_MSG_GET)
471 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
473 cpt = lnet_net_lock_current();
474 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
475 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
476 &hdr->dest_nid, typ, ptl,
481 lnet_net_unlock(cpt);
487 * LNet Delay Simulation
489 /** timestamp (second) to send delayed message */
490 #define msg_delay_send msg_ev.hdr_data
492 struct lnet_delay_rule {
493 /** link chain on the_lnet.ln_delay_rules */
494 struct list_head dl_link;
495 /** link chain on delay_dd.dd_sched_rules */
496 struct list_head dl_sched_link;
497 /** attributes of this rule */
498 struct lnet_fault_large_attr dl_attr;
499 /** lock to protect \a below members */
501 /** refcount of delay rule */
502 atomic_t dl_refcount;
504 * the message sequence to delay, which means message is delayed when
505 * dl_stat.fs_count == dl_delay_at
507 unsigned long dl_delay_at;
509 * seconds to delay the next message, it's exclusive with dl_delay_at
511 time64_t dl_delay_time;
512 /** baseline to caculate dl_delay_time */
513 time64_t dl_time_base;
514 /** seconds until we send the next delayed message */
515 time64_t dl_msg_send;
516 /** delayed message list */
517 struct list_head dl_msg_list;
518 /** statistic of delayed messages */
519 struct lnet_fault_stat dl_stat;
520 /** timer to wakeup delay_daemon */
521 struct timer_list dl_timer;
524 struct delay_daemon_data {
525 /** serialise rule add/remove */
526 struct mutex dd_mutex;
527 /** protect rules on \a dd_sched_rules */
529 /** scheduled delay rules (by timer) */
530 struct list_head dd_sched_rules;
531 /** deamon thread sleeps at here */
532 wait_queue_head_t dd_waitq;
533 /** controler (lctl command) wait at here */
534 wait_queue_head_t dd_ctl_waitq;
535 /** deamon is running */
536 unsigned int dd_running;
537 /** deamon stopped */
538 unsigned int dd_stopped;
541 static struct delay_daemon_data delay_dd;
544 delay_rule_decref(struct lnet_delay_rule *rule)
546 if (atomic_dec_and_test(&rule->dl_refcount)) {
547 LASSERT(list_empty(&rule->dl_sched_link));
548 LASSERT(list_empty(&rule->dl_msg_list));
549 LASSERT(list_empty(&rule->dl_link));
556 * check source/destination NID, portal, message type and delay rate,
557 * decide whether should delay this message or not
560 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
561 struct lnet_nid *dst, unsigned int type, unsigned int portal,
562 struct lnet_msg *msg)
564 struct lnet_fault_large_attr *attr = &rule->dl_attr;
566 time64_t now = ktime_get_seconds();
568 if (!lnet_fault_attr_match(attr, src, NULL,
572 /* match this rule, check delay rate now */
573 spin_lock(&rule->dl_lock);
574 if (rule->dl_delay_time != 0) { /* time based delay */
575 rule->dl_stat.fs_count++;
576 delay = now >= rule->dl_delay_time;
578 if (now > rule->dl_time_base)
579 rule->dl_time_base = now;
581 rule->dl_delay_time = rule->dl_time_base +
582 get_random_u32_below(attr->u.delay.la_interval);
583 rule->dl_time_base += attr->u.delay.la_interval;
585 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
586 libcfs_nidstr(&attr->fa_src),
587 libcfs_nidstr(&attr->fa_dst),
588 rule->dl_delay_time);
591 } else { /* rate based delay */
594 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
595 /* generate the next random rate sequence */
596 count = rule->dl_stat.fs_count;
597 if (do_div(count, attr->u.delay.la_rate) == 0) {
598 rule->dl_delay_at = rule->dl_stat.fs_count +
599 get_random_u32_below(attr->u.delay.la_rate);
600 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
601 libcfs_nidstr(&attr->fa_src),
602 libcfs_nidstr(&attr->fa_dst), rule->dl_delay_at);
607 spin_unlock(&rule->dl_lock);
611 /* delay this message, update counters */
612 lnet_fault_stat_inc(&rule->dl_stat, type);
613 rule->dl_stat.u.delay.ls_delayed++;
615 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
616 msg->msg_delay_send = now + attr->u.delay.la_latency;
617 if (rule->dl_msg_send == -1) {
618 rule->dl_msg_send = msg->msg_delay_send;
619 mod_timer(&rule->dl_timer,
620 jiffies + cfs_time_seconds(attr->u.delay.la_latency));
623 spin_unlock(&rule->dl_lock);
628 * check if \a msg can match any Delay Rule, receiving of this message
629 * will be delayed if there is a match.
632 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
634 struct lnet_delay_rule *rule;
635 unsigned int typ = hdr->type;
636 unsigned int ptl = -1;
638 /* NB: called with hold of lnet_net_lock */
640 /* NB: if Portal is specified, then only PUT and GET will be
641 * filtered by delay rule */
642 if (typ == LNET_MSG_PUT)
643 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
644 else if (typ == LNET_MSG_GET)
645 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
647 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
648 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
656 /** check out delayed messages for send */
658 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
659 struct list_head *msg_list)
661 struct lnet_msg *msg;
662 struct lnet_msg *tmp;
663 time64_t now = ktime_get_seconds();
665 if (!all && rule->dl_msg_send > now)
668 spin_lock(&rule->dl_lock);
669 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
670 if (!all && msg->msg_delay_send > now)
673 msg->msg_delay_send = 0;
674 list_move_tail(&msg->msg_list, msg_list);
677 if (list_empty(&rule->dl_msg_list)) {
678 timer_delete(&rule->dl_timer);
679 rule->dl_msg_send = -1;
681 } else if (!list_empty(msg_list)) {
682 /* dequeued some timedout messages, update timer for the
683 * next delayed message on rule */
684 msg = list_first_entry(&rule->dl_msg_list,
685 struct lnet_msg, msg_list);
686 rule->dl_msg_send = msg->msg_delay_send;
687 mod_timer(&rule->dl_timer,
689 cfs_time_seconds(msg->msg_delay_send - now));
691 spin_unlock(&rule->dl_lock);
695 delayed_msg_process(struct list_head *msg_list, bool drop)
697 struct lnet_msg *msg;
699 while ((msg = list_first_entry_or_null(msg_list, struct lnet_msg,
700 msg_list)) != NULL) {
705 if (msg->msg_sending) {
707 list_del_init(&msg->msg_list);
709 CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
710 libcfs_nidstr(&ni->ni_nid),
711 libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
712 lnet_msgtyp2str(msg->msg_type));
713 lnet_ni_send(ni, msg);
717 /* Delayed receive */
718 LASSERT(msg->msg_rxpeer != NULL);
719 LASSERT(msg->msg_rxni != NULL);
722 cpt = msg->msg_rx_cpt;
724 list_del_init(&msg->msg_list);
728 } else if (!msg->msg_routing) {
729 rc = lnet_parse_local(ni, msg);
735 rc = lnet_parse_forward_locked(ni, msg);
736 lnet_net_unlock(cpt);
740 lnet_ni_recv(ni, msg->msg_private, msg, 0,
741 0, msg->msg_len, msg->msg_len);
743 case LNET_CREDIT_WAIT:
745 default: /* failures */
750 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
752 lnet_finalize(msg, rc);
757 * Process delayed messages for scheduled rules
758 * This function can either be called by delay_rule_daemon, or by lnet_finalise
761 lnet_delay_rule_check(void)
763 struct lnet_delay_rule *rule;
767 if (list_empty(&delay_dd.dd_sched_rules))
770 spin_lock_bh(&delay_dd.dd_lock);
771 if (list_empty(&delay_dd.dd_sched_rules)) {
772 spin_unlock_bh(&delay_dd.dd_lock);
776 rule = list_first_entry(&delay_dd.dd_sched_rules,
777 struct lnet_delay_rule, dl_sched_link);
778 list_del_init(&rule->dl_sched_link);
779 spin_unlock_bh(&delay_dd.dd_lock);
781 delayed_msg_check(rule, false, &msgs);
782 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
785 if (!list_empty(&msgs))
786 delayed_msg_process(&msgs, false);
789 /** deamon thread to handle delayed messages */
791 lnet_delay_rule_daemon(void *arg)
793 delay_dd.dd_running = 1;
794 wake_up(&delay_dd.dd_ctl_waitq);
796 while (delay_dd.dd_running) {
797 wait_event_interruptible(delay_dd.dd_waitq,
798 !delay_dd.dd_running ||
799 !list_empty(&delay_dd.dd_sched_rules));
800 lnet_delay_rule_check();
803 /* in case more rules have been enqueued after my last check */
804 lnet_delay_rule_check();
805 delay_dd.dd_stopped = 1;
806 wake_up(&delay_dd.dd_ctl_waitq);
812 delay_timer_cb(cfs_timer_cb_arg_t data)
814 struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
816 spin_lock_bh(&delay_dd.dd_lock);
817 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
818 atomic_inc(&rule->dl_refcount);
819 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
820 wake_up(&delay_dd.dd_waitq);
822 spin_unlock_bh(&delay_dd.dd_lock);
826 * Add a new delay rule to LNet
827 * There is no check for duplicated delay rule, all rules will be checked for
831 lnet_delay_rule_add(struct lnet_fault_large_attr *attr)
833 struct lnet_delay_rule *rule;
837 if (!((attr->u.delay.la_rate == 0) ^
838 (attr->u.delay.la_interval == 0))) {
840 "please provide either delay rate or delay interval, "
841 "but not both at the same time %d/%d\n",
842 attr->u.delay.la_rate, attr->u.delay.la_interval);
846 if (attr->u.delay.la_latency == 0) {
847 CDEBUG(D_NET, "delay latency cannot be zero\n");
851 if (lnet_fault_attr_validate(attr) != 0)
858 mutex_lock(&delay_dd.dd_mutex);
859 if (!delay_dd.dd_running) {
860 struct task_struct *task;
862 /* NB: although LND threads will process delayed message
863 * in lnet_finalize, but there is no guarantee that LND
864 * threads will be waken up if no other message needs to
866 * Only one daemon thread, performance is not the concern
867 * of this simualation module.
869 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
874 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
877 cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
878 (unsigned long)rule, 0);
880 spin_lock_init(&rule->dl_lock);
881 INIT_LIST_HEAD(&rule->dl_msg_list);
882 INIT_LIST_HEAD(&rule->dl_sched_link);
884 rule->dl_attr = *attr;
885 if (attr->u.delay.la_interval != 0) {
886 rule->dl_time_base = ktime_get_seconds() +
887 attr->u.delay.la_interval;
888 rule->dl_delay_time = ktime_get_seconds() +
889 get_random_u32_below(attr->u.delay.la_interval);
891 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
894 rule->dl_msg_send = -1;
896 lnet_net_lock(LNET_LOCK_EX);
897 atomic_set(&rule->dl_refcount, 1);
898 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
899 lnet_net_unlock(LNET_LOCK_EX);
901 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
902 libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
903 attr->u.delay.la_rate);
905 mutex_unlock(&delay_dd.dd_mutex);
908 mutex_unlock(&delay_dd.dd_mutex);
914 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
915 * and \a dst are zero, all rules will be removed, otherwise only matched rules
917 * If \a src is zero, then all rules have \a dst as destination will be remove
918 * If \a dst is zero, then all rules have \a src as source will be removed
920 * When a delay rule is removed, all delayed messages of this rule will be
921 * processed immediately.
924 lnet_delay_rule_del(struct lnet_nid *src, struct lnet_nid *dst, bool shutdown)
926 struct lnet_delay_rule *rule;
927 struct lnet_delay_rule *tmp;
928 LIST_HEAD(rule_list);
937 mutex_lock(&delay_dd.dd_mutex);
938 lnet_net_lock(LNET_LOCK_EX);
940 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
941 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dl_attr.fa_src, src)))
944 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dl_attr.fa_dst, dst)))
947 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
948 libcfs_nidstr(&rule->dl_attr.fa_src),
949 libcfs_nidstr(&rule->dl_attr.fa_dst),
950 rule->dl_attr.u.delay.la_rate,
951 rule->dl_attr.u.delay.la_interval);
952 /* refcount is taken over by rule_list */
953 list_move(&rule->dl_link, &rule_list);
956 /* check if we need to shutdown delay_daemon */
957 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
958 !list_empty(&rule_list);
959 lnet_net_unlock(LNET_LOCK_EX);
961 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
962 list_del_init(&rule->dl_link);
964 timer_delete_sync(&rule->dl_timer);
965 delayed_msg_check(rule, true, &msg_list);
966 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
970 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
971 LASSERT(delay_dd.dd_running);
972 delay_dd.dd_running = 0;
973 wake_up(&delay_dd.dd_waitq);
975 while (!delay_dd.dd_stopped)
976 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
978 mutex_unlock(&delay_dd.dd_mutex);
980 if (!list_empty(&msg_list))
981 delayed_msg_process(&msg_list, shutdown);
987 * List Delay Rule at position of \a pos
990 lnet_delay_rule_list(int pos, struct lnet_fault_large_attr *attr,
991 struct lnet_fault_stat *stat)
993 struct lnet_delay_rule *rule;
999 cpt = lnet_net_lock_current();
1000 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1004 spin_lock(&rule->dl_lock);
1005 *attr = rule->dl_attr;
1006 *stat = rule->dl_stat;
1007 spin_unlock(&rule->dl_lock);
1012 lnet_net_unlock(cpt);
1017 * reset counters for all Delay Rules
1020 lnet_delay_rule_reset(void)
1022 struct lnet_delay_rule *rule;
1026 cpt = lnet_net_lock_current();
1028 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1029 struct lnet_fault_large_attr *attr = &rule->dl_attr;
1031 spin_lock(&rule->dl_lock);
1033 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1034 if (attr->u.delay.la_rate != 0) {
1035 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
1037 rule->dl_delay_time = ktime_get_seconds() +
1038 get_random_u32_below(attr->u.delay.la_interval);
1039 rule->dl_time_base = ktime_get_seconds() +
1040 attr->u.delay.la_interval;
1042 spin_unlock(&rule->dl_lock);
1045 lnet_net_unlock(cpt);
1050 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1052 struct lnet_fault_attr *attr4;
1053 struct lnet_fault_stat *stat;
1054 struct lnet_fault_large_attr attr = { { 0 } };
1057 attr4 = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1059 lnet_fault_attr4_to_attr(attr4, &attr);
1065 case LNET_CTL_DROP_ADD:
1069 return lnet_drop_rule_add(&attr);
1071 case LNET_CTL_DROP_DEL:
1075 data->ioc_count = lnet_drop_rule_del(&attr.fa_src,
1079 case LNET_CTL_DROP_RESET:
1080 lnet_drop_rule_reset();
1083 case LNET_CTL_DROP_LIST:
1084 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1085 if (!attr4 || !stat)
1088 rc = lnet_drop_rule_list(data->ioc_count, &attr, stat);
1089 lnet_fault_attr_to_attr4(&attr, attr4);
1092 case LNET_CTL_DELAY_ADD:
1096 return lnet_delay_rule_add(&attr);
1098 case LNET_CTL_DELAY_DEL:
1102 data->ioc_count = lnet_delay_rule_del(&attr.fa_src,
1103 &attr.fa_dst, false);
1106 case LNET_CTL_DELAY_RESET:
1107 lnet_delay_rule_reset();
1110 case LNET_CTL_DELAY_LIST:
1111 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1112 if (!attr4 || !stat)
1115 rc = lnet_delay_rule_list(data->ioc_count, &attr, stat);
1116 lnet_fault_attr_to_attr4(&attr, attr4);
1122 lnet_fault_init(void)
1124 BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1125 BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1126 BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1127 BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1129 mutex_init(&delay_dd.dd_mutex);
1130 spin_lock_init(&delay_dd.dd_lock);
1131 init_waitqueue_head(&delay_dd.dd_waitq);
1132 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1133 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1139 lnet_fault_fini(void)
1141 lnet_drop_rule_del(NULL, NULL);
1142 lnet_delay_rule_del(NULL, NULL, true);
1144 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1145 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1146 LASSERT(list_empty(&delay_dd.dd_sched_rules));