1 // SPDX-License-Identifier: GPL-2.0
3 /* Copyright (c) 2014, 2017, Intel Corporation. */
5 /* This file is part of Lustre, http://www.lustre.org/
7 * Lustre network fault simulation
9 * Author: liang.zhen@intel.com
12 #define DEBUG_SUBSYSTEM S_LNET
14 #include <linux/random.h>
15 #include <lnet/lib-lnet.h>
16 #include <uapi/linux/lnet/lnetctl.h>
18 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
19 LNET_GET_BIT | LNET_REPLY_BIT)
21 struct lnet_drop_rule {
22 /** link chain on the_lnet.ln_drop_rules */
23 struct list_head dr_link;
24 /** attributes of this rule */
25 struct lnet_fault_large_attr dr_attr;
26 /** lock to protect \a dr_drop_at and \a dr_stat */
29 * the message sequence to drop, which means message is dropped when
30 * dr_stat.drs_count == dr_drop_at
32 unsigned long dr_drop_at;
34 * seconds to drop the next message, it's exclusive with dr_drop_at
36 time64_t dr_drop_time;
37 /** baseline to caculate dr_drop_time */
38 time64_t dr_time_base;
39 /** statistic of dropped messages */
40 struct lnet_fault_stat dr_stat;
44 lnet_fault_attr_to_attr4(struct lnet_fault_large_attr *attr,
45 struct lnet_fault_attr *attr4)
50 attr4->fa_src = lnet_nid_to_nid4(&attr->fa_src);
51 attr4->fa_dst = lnet_nid_to_nid4(&attr->fa_dst);
52 attr4->fa_local_nid = lnet_nid_to_nid4(&attr->fa_local_nid);
53 attr4->fa_ptl_mask = attr->fa_ptl_mask;
54 attr4->fa_msg_mask = attr->fa_msg_mask;
56 memcpy(&attr4->u, &attr->u, sizeof(attr4->u));
60 lnet_fault_attr4_to_attr(struct lnet_fault_attr *attr4,
61 struct lnet_fault_large_attr *attr)
67 lnet_nid4_to_nid(attr4->fa_src, &attr->fa_src);
69 attr->fa_src = LNET_ANY_NID;
72 lnet_nid4_to_nid(attr4->fa_dst, &attr->fa_dst);
74 attr->fa_dst = LNET_ANY_NID;
76 if (attr4->fa_local_nid)
77 lnet_nid4_to_nid(attr4->fa_local_nid, &attr->fa_local_nid);
79 attr->fa_local_nid = LNET_ANY_NID;
81 attr->fa_ptl_mask = attr4->fa_ptl_mask;
82 attr->fa_msg_mask = attr4->fa_msg_mask;
84 memcpy(&attr->u, &attr4->u, sizeof(attr->u));
88 lnet_fault_nid_match(struct lnet_nid *nid, struct lnet_nid *msg_nid)
90 if (LNET_NID_IS_ANY(nid))
94 if (nid_same(msg_nid, nid))
97 if (LNET_NID_NET(nid) != LNET_NID_NET(msg_nid))
100 /* 255.255.255.255@net is wildcard for all addresses in a network */
101 return __be32_to_cpu(nid->nid_addr[0]) == LNET_NIDADDR(LNET_NID_ANY);
105 lnet_fault_attr_match(struct lnet_fault_large_attr *attr,
106 struct lnet_nid *src,
107 struct lnet_nid *local_nid,
108 struct lnet_nid *dst,
109 unsigned int type, unsigned int portal)
111 if (!lnet_fault_nid_match(&attr->fa_src, src) ||
112 !lnet_fault_nid_match(&attr->fa_dst, dst) ||
113 !lnet_fault_nid_match(&attr->fa_local_nid, local_nid))
116 if (!(attr->fa_msg_mask & BIT(type)))
119 /* NB: ACK and REPLY have no portal, but they should have been
120 * rejected by message mask */
121 if (attr->fa_ptl_mask != 0 && /* has portal filter */
122 !(attr->fa_ptl_mask & (1ULL << portal)))
129 lnet_fault_attr_validate(struct lnet_fault_large_attr *attr)
131 if (attr->fa_msg_mask == 0)
132 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
134 if (attr->fa_ptl_mask == 0) /* no portal filter */
137 /* NB: only PUT and GET can be filtered if portal filter has been set */
138 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
139 if (attr->fa_msg_mask == 0) {
140 CDEBUG(D_NET, "can't find valid message type bits %x\n",
148 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
150 /* NB: fs_counter is NOT updated by this function */
168 * LNet message drop simulation
172 * Add a new drop rule to LNet
173 * There is no check for duplicated drop rule, all rules will be checked for
176 int lnet_drop_rule_add(struct lnet_fault_large_attr *attr)
178 struct lnet_drop_rule *rule;
181 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
183 "please provide either drop rate or drop interval, "
184 "but not both at the same time %d/%d\n",
185 attr->u.drop.da_rate, attr->u.drop.da_interval);
189 if (lnet_fault_attr_validate(attr) != 0)
196 spin_lock_init(&rule->dr_lock);
198 rule->dr_attr = *attr;
199 if (attr->u.drop.da_interval != 0) {
200 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
201 rule->dr_drop_time = ktime_get_seconds() +
202 get_random_u32_below(attr->u.drop.da_interval);
204 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
207 lnet_net_lock(LNET_LOCK_EX);
208 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
209 lnet_net_unlock(LNET_LOCK_EX);
211 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
212 libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
213 attr->u.drop.da_rate, attr->u.drop.da_interval);
218 * Remove matched drop rules from lnet, all rules that can match \a src and
219 * \a dst will be removed.
220 * If \a src is zero, then all rules have \a dst as destination will be remove
221 * If \a dst is zero, then all rules have \a src as source will be removed
222 * If both of them are zero, all rules will be removed
224 int lnet_drop_rule_del(struct lnet_nid *src, struct lnet_nid *dst)
226 struct lnet_drop_rule *rule;
227 struct lnet_drop_rule *tmp;
232 CDEBUG(D_NET, "src %s dst %s\n", libcfs_nidstr(src),
234 lnet_net_lock(LNET_LOCK_EX);
235 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
236 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dr_attr.fa_src, src)))
239 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dr_attr.fa_dst, dst)))
242 list_move(&rule->dr_link, &zombies);
244 lnet_net_unlock(LNET_LOCK_EX);
246 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
247 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
248 libcfs_nidstr(&rule->dr_attr.fa_src),
249 libcfs_nidstr(&rule->dr_attr.fa_dst),
250 rule->dr_attr.u.drop.da_rate,
251 rule->dr_attr.u.drop.da_interval);
253 list_del(&rule->dr_link);
262 * List drop rule at position of \a pos
265 lnet_drop_rule_list(int pos, struct lnet_fault_large_attr *attr,
266 struct lnet_fault_stat *stat)
268 struct lnet_drop_rule *rule;
274 cpt = lnet_net_lock_current();
275 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
279 spin_lock(&rule->dr_lock);
280 *attr = rule->dr_attr;
281 *stat = rule->dr_stat;
282 spin_unlock(&rule->dr_lock);
287 lnet_net_unlock(cpt);
291 int lnet_drop_rule_collect(struct lnet_genl_fault_rule_list *rlist)
293 struct lnet_drop_rule *rule;
297 cpt = lnet_net_lock_current();
298 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
299 struct lnet_rule_properties *prop;
301 prop = genradix_ptr_alloc(&rlist->lgfrl_list,
302 rlist->lgfrl_count++,
308 spin_lock(&rule->dr_lock);
309 prop->attr = rule->dr_attr;
310 prop->stat = rule->dr_stat;
311 spin_unlock(&rule->dr_lock);
314 lnet_net_unlock(cpt);
319 * reset counters for all drop rules
321 void lnet_drop_rule_reset(void)
323 struct lnet_drop_rule *rule;
327 cpt = lnet_net_lock_current();
329 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
330 struct lnet_fault_large_attr *attr = &rule->dr_attr;
332 spin_lock(&rule->dr_lock);
334 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
335 if (attr->u.drop.da_rate != 0) {
336 rule->dr_drop_at = get_random_u32_below(attr->u.drop.da_rate);
338 rule->dr_drop_time = ktime_get_seconds() +
339 get_random_u32_below(attr->u.drop.da_interval);
340 rule->dr_time_base = ktime_get_seconds() + attr->u.drop.da_interval;
342 spin_unlock(&rule->dr_lock);
345 lnet_net_unlock(cpt);
350 lnet_fault_match_health(enum lnet_msg_hstatus *hstatus, __u32 mask)
357 /* assign a random failure */
358 choice = get_random_u32_below(LNET_MSG_STATUS_END - LNET_MSG_STATUS_OK);
362 if (mask == HSTATUS_RANDOM) {
367 if (mask & BIT(choice)) {
372 /* round to the closest ON bit */
374 best_delta = HSTATUS_END;
380 if (delta < best_delta) {
392 * check source/destination NID, portal, message type and drop rate,
393 * decide whether should drop this message or not
396 drop_rule_match(struct lnet_drop_rule *rule,
397 struct lnet_nid *src,
398 struct lnet_nid *local_nid,
399 struct lnet_nid *dst,
400 unsigned int type, unsigned int portal,
401 enum lnet_msg_hstatus *hstatus)
403 struct lnet_fault_large_attr *attr = &rule->dr_attr;
406 if (!lnet_fault_attr_match(attr, src, local_nid, dst, type, portal))
409 if (attr->u.drop.da_drop_all) {
410 CDEBUG(D_NET, "set to drop all messages\n");
416 * if we're trying to match a health status error but it hasn't
417 * been set in the rule, then don't match
419 if ((hstatus && !attr->u.drop.da_health_error_mask) ||
420 (!hstatus && attr->u.drop.da_health_error_mask))
423 /* match this rule, check drop rate now */
424 spin_lock(&rule->dr_lock);
425 if (attr->u.drop.da_random) {
426 int value = get_random_u32_below(attr->u.drop.da_interval);
427 if (value >= (attr->u.drop.da_interval / 2))
431 } else if (rule->dr_drop_time != 0) { /* time based drop */
432 time64_t now = ktime_get_seconds();
434 rule->dr_stat.fs_count++;
435 drop = now >= rule->dr_drop_time;
437 if (now > rule->dr_time_base)
438 rule->dr_time_base = now;
440 rule->dr_drop_time = rule->dr_time_base +
441 get_random_u32_below(attr->u.drop.da_interval);
442 rule->dr_time_base += attr->u.drop.da_interval;
444 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %lld\n",
445 libcfs_nidstr(&attr->fa_src),
446 libcfs_nidstr(&attr->fa_dst),
450 } else { /* rate based drop */
453 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
454 count = rule->dr_stat.fs_count;
455 if (do_div(count, attr->u.drop.da_rate) == 0) {
456 rule->dr_drop_at = rule->dr_stat.fs_count +
457 get_random_u32_below(attr->u.drop.da_rate);
458 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
459 libcfs_nidstr(&attr->fa_src),
460 libcfs_nidstr(&attr->fa_dst), rule->dr_drop_at);
466 if (drop) { /* drop this message, update counters */
468 lnet_fault_match_health(hstatus,
469 attr->u.drop.da_health_error_mask);
470 lnet_fault_stat_inc(&rule->dr_stat, type);
471 rule->dr_stat.u.drop.ds_dropped++;
474 spin_unlock(&rule->dr_lock);
479 * Check if message from \a src to \a dst can match any existed drop rule
482 lnet_drop_rule_match(struct lnet_hdr *hdr,
483 struct lnet_nid *local_nid,
484 enum lnet_msg_hstatus *hstatus)
486 unsigned int typ = hdr->type;
487 struct lnet_drop_rule *rule;
488 unsigned int ptl = -1;
492 /* NB: if Portal is specified, then only PUT and GET will be
493 * filtered by drop rule */
494 if (typ == LNET_MSG_PUT)
495 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
496 else if (typ == LNET_MSG_GET)
497 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
499 cpt = lnet_net_lock_current();
500 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
501 drop = drop_rule_match(rule, &hdr->src_nid, local_nid,
502 &hdr->dest_nid, typ, ptl,
507 lnet_net_unlock(cpt);
513 * LNet Delay Simulation
515 /** timestamp (second) to send delayed message */
516 #define msg_delay_send msg_ev.hdr_data
518 struct lnet_delay_rule {
519 /** link chain on the_lnet.ln_delay_rules */
520 struct list_head dl_link;
521 /** link chain on delay_dd.dd_sched_rules */
522 struct list_head dl_sched_link;
523 /** attributes of this rule */
524 struct lnet_fault_large_attr dl_attr;
525 /** lock to protect \a below members */
527 /** refcount of delay rule */
528 atomic_t dl_refcount;
530 * the message sequence to delay, which means message is delayed when
531 * dl_stat.fs_count == dl_delay_at
533 unsigned long dl_delay_at;
535 * seconds to delay the next message, it's exclusive with dl_delay_at
537 time64_t dl_delay_time;
538 /** baseline to caculate dl_delay_time */
539 time64_t dl_time_base;
540 /** seconds until we send the next delayed message */
541 time64_t dl_msg_send;
542 /** delayed message list */
543 struct list_head dl_msg_list;
544 /** statistic of delayed messages */
545 struct lnet_fault_stat dl_stat;
546 /** timer to wakeup delay_daemon */
547 struct timer_list dl_timer;
550 struct delay_daemon_data {
551 /** serialise rule add/remove */
552 struct mutex dd_mutex;
553 /** protect rules on \a dd_sched_rules */
555 /** scheduled delay rules (by timer) */
556 struct list_head dd_sched_rules;
557 /** deamon thread sleeps at here */
558 wait_queue_head_t dd_waitq;
559 /** controler (lctl command) wait at here */
560 wait_queue_head_t dd_ctl_waitq;
561 /** deamon is running */
562 unsigned int dd_running;
563 /** deamon stopped */
564 unsigned int dd_stopped;
567 static struct delay_daemon_data delay_dd;
570 delay_rule_decref(struct lnet_delay_rule *rule)
572 if (atomic_dec_and_test(&rule->dl_refcount)) {
573 LASSERT(list_empty(&rule->dl_sched_link));
574 LASSERT(list_empty(&rule->dl_msg_list));
575 LASSERT(list_empty(&rule->dl_link));
582 * check source/destination NID, portal, message type and delay rate,
583 * decide whether should delay this message or not
586 delay_rule_match(struct lnet_delay_rule *rule, struct lnet_nid *src,
587 struct lnet_nid *dst, unsigned int type, unsigned int portal,
588 struct lnet_msg *msg)
590 struct lnet_fault_large_attr *attr = &rule->dl_attr;
592 time64_t now = ktime_get_seconds();
594 if (!lnet_fault_attr_match(attr, src, NULL,
598 /* match this rule, check delay rate now */
599 spin_lock(&rule->dl_lock);
600 if (rule->dl_delay_time != 0) { /* time based delay */
601 rule->dl_stat.fs_count++;
602 delay = now >= rule->dl_delay_time;
604 if (now > rule->dl_time_base)
605 rule->dl_time_base = now;
607 rule->dl_delay_time = rule->dl_time_base +
608 get_random_u32_below(attr->u.delay.la_interval);
609 rule->dl_time_base += attr->u.delay.la_interval;
611 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %lld\n",
612 libcfs_nidstr(&attr->fa_src),
613 libcfs_nidstr(&attr->fa_dst),
614 rule->dl_delay_time);
617 } else { /* rate based delay */
620 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
621 /* generate the next random rate sequence */
622 count = rule->dl_stat.fs_count;
623 if (do_div(count, attr->u.delay.la_rate) == 0) {
624 rule->dl_delay_at = rule->dl_stat.fs_count +
625 get_random_u32_below(attr->u.delay.la_rate);
626 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
627 libcfs_nidstr(&attr->fa_src),
628 libcfs_nidstr(&attr->fa_dst), rule->dl_delay_at);
633 spin_unlock(&rule->dl_lock);
637 /* delay this message, update counters */
638 lnet_fault_stat_inc(&rule->dl_stat, type);
639 rule->dl_stat.u.delay.ls_delayed++;
641 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
642 msg->msg_delay_send = now + attr->u.delay.la_latency;
643 if (rule->dl_msg_send == -1) {
644 rule->dl_msg_send = msg->msg_delay_send;
645 mod_timer(&rule->dl_timer,
646 jiffies + cfs_time_seconds(attr->u.delay.la_latency));
649 spin_unlock(&rule->dl_lock);
654 * check if \a msg can match any Delay Rule, receiving of this message
655 * will be delayed if there is a match.
658 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
660 struct lnet_delay_rule *rule;
661 unsigned int typ = hdr->type;
662 unsigned int ptl = -1;
664 /* NB: called with hold of lnet_net_lock */
666 /* NB: if Portal is specified, then only PUT and GET will be
667 * filtered by delay rule */
668 if (typ == LNET_MSG_PUT)
669 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
670 else if (typ == LNET_MSG_GET)
671 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
673 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
674 if (delay_rule_match(rule, &hdr->src_nid, &hdr->dest_nid,
682 /** check out delayed messages for send */
684 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
685 struct list_head *msg_list)
687 struct lnet_msg *msg;
688 struct lnet_msg *tmp;
689 time64_t now = ktime_get_seconds();
691 if (!all && rule->dl_msg_send > now)
694 spin_lock(&rule->dl_lock);
695 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
696 if (!all && msg->msg_delay_send > now)
699 msg->msg_delay_send = 0;
700 list_move_tail(&msg->msg_list, msg_list);
703 if (list_empty(&rule->dl_msg_list)) {
704 timer_delete(&rule->dl_timer);
705 rule->dl_msg_send = -1;
707 } else if (!list_empty(msg_list)) {
708 /* dequeued some timedout messages, update timer for the
709 * next delayed message on rule */
710 msg = list_first_entry(&rule->dl_msg_list,
711 struct lnet_msg, msg_list);
712 rule->dl_msg_send = msg->msg_delay_send;
713 mod_timer(&rule->dl_timer,
715 cfs_time_seconds(msg->msg_delay_send - now));
717 spin_unlock(&rule->dl_lock);
721 delayed_msg_process(struct list_head *msg_list, bool drop)
723 struct lnet_msg *msg;
725 while ((msg = list_first_entry_or_null(msg_list, struct lnet_msg,
726 msg_list)) != NULL) {
731 if (msg->msg_sending) {
733 list_del_init(&msg->msg_list);
735 CDEBUG(D_NET, "TRACE: msg %p %s -> %s : %s\n", msg,
736 libcfs_nidstr(&ni->ni_nid),
737 libcfs_nidstr(&msg->msg_txpeer->lpni_nid),
738 lnet_msgtyp2str(msg->msg_type));
739 lnet_ni_send(ni, msg);
743 /* Delayed receive */
744 LASSERT(msg->msg_rxpeer != NULL);
745 LASSERT(msg->msg_rxni != NULL);
748 cpt = msg->msg_rx_cpt;
750 list_del_init(&msg->msg_list);
754 } else if (!msg->msg_routing) {
755 rc = lnet_parse_local(ni, msg);
761 rc = lnet_parse_forward_locked(ni, msg);
762 lnet_net_unlock(cpt);
766 lnet_ni_recv(ni, msg->msg_private, msg, 0,
767 0, msg->msg_len, msg->msg_len);
769 case LNET_CREDIT_WAIT:
771 default: /* failures */
776 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len,
778 lnet_finalize(msg, rc);
783 * Process delayed messages for scheduled rules
784 * This function can either be called by delay_rule_daemon, or by lnet_finalise
787 lnet_delay_rule_check(void)
789 struct lnet_delay_rule *rule;
793 if (list_empty(&delay_dd.dd_sched_rules))
796 spin_lock_bh(&delay_dd.dd_lock);
797 if (list_empty(&delay_dd.dd_sched_rules)) {
798 spin_unlock_bh(&delay_dd.dd_lock);
802 rule = list_first_entry(&delay_dd.dd_sched_rules,
803 struct lnet_delay_rule, dl_sched_link);
804 list_del_init(&rule->dl_sched_link);
805 spin_unlock_bh(&delay_dd.dd_lock);
807 delayed_msg_check(rule, false, &msgs);
808 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
811 if (!list_empty(&msgs))
812 delayed_msg_process(&msgs, false);
815 /** deamon thread to handle delayed messages */
817 lnet_delay_rule_daemon(void *arg)
819 delay_dd.dd_running = 1;
820 wake_up(&delay_dd.dd_ctl_waitq);
822 while (delay_dd.dd_running) {
823 wait_event_interruptible(delay_dd.dd_waitq,
824 !delay_dd.dd_running ||
825 !list_empty(&delay_dd.dd_sched_rules));
826 lnet_delay_rule_check();
829 /* in case more rules have been enqueued after my last check */
830 lnet_delay_rule_check();
831 delay_dd.dd_stopped = 1;
832 wake_up(&delay_dd.dd_ctl_waitq);
838 delay_timer_cb(cfs_timer_cb_arg_t data)
840 struct lnet_delay_rule *rule = cfs_from_timer(rule, data, dl_timer);
842 spin_lock_bh(&delay_dd.dd_lock);
843 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
844 atomic_inc(&rule->dl_refcount);
845 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
846 wake_up(&delay_dd.dd_waitq);
848 spin_unlock_bh(&delay_dd.dd_lock);
852 * Add a new delay rule to LNet
853 * There is no check for duplicated delay rule, all rules will be checked for
857 lnet_delay_rule_add(struct lnet_fault_large_attr *attr)
859 struct lnet_delay_rule *rule;
863 if (!((attr->u.delay.la_rate == 0) ^
864 (attr->u.delay.la_interval == 0))) {
866 "please provide either delay rate or delay interval, "
867 "but not both at the same time %d/%d\n",
868 attr->u.delay.la_rate, attr->u.delay.la_interval);
872 if (attr->u.delay.la_latency == 0) {
873 CDEBUG(D_NET, "delay latency cannot be zero\n");
877 if (lnet_fault_attr_validate(attr) != 0)
884 mutex_lock(&delay_dd.dd_mutex);
885 if (!delay_dd.dd_running) {
886 struct task_struct *task;
888 /* NB: although LND threads will process delayed message
889 * in lnet_finalize, but there is no guarantee that LND
890 * threads will be waken up if no other message needs to
892 * Only one daemon thread, performance is not the concern
893 * of this simualation module.
895 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
900 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
903 cfs_timer_setup(&rule->dl_timer, delay_timer_cb,
904 (unsigned long)rule, 0);
906 spin_lock_init(&rule->dl_lock);
907 INIT_LIST_HEAD(&rule->dl_msg_list);
908 INIT_LIST_HEAD(&rule->dl_sched_link);
910 rule->dl_attr = *attr;
911 if (attr->u.delay.la_interval != 0) {
912 rule->dl_time_base = ktime_get_seconds() +
913 attr->u.delay.la_interval;
914 rule->dl_delay_time = ktime_get_seconds() +
915 get_random_u32_below(attr->u.delay.la_interval);
917 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
920 rule->dl_msg_send = -1;
922 lnet_net_lock(LNET_LOCK_EX);
923 atomic_set(&rule->dl_refcount, 1);
924 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
925 lnet_net_unlock(LNET_LOCK_EX);
927 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
928 libcfs_nidstr(&attr->fa_src), libcfs_nidstr(&attr->fa_dst),
929 attr->u.delay.la_rate);
931 mutex_unlock(&delay_dd.dd_mutex);
934 mutex_unlock(&delay_dd.dd_mutex);
940 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
941 * and \a dst are zero, all rules will be removed, otherwise only matched rules
943 * If \a src is zero, then all rules have \a dst as destination will be remove
944 * If \a dst is zero, then all rules have \a src as source will be removed
946 * When a delay rule is removed, all delayed messages of this rule will be
947 * processed immediately.
950 lnet_delay_rule_del(struct lnet_nid *src, struct lnet_nid *dst, bool shutdown)
952 struct lnet_delay_rule *rule;
953 struct lnet_delay_rule *tmp;
954 LIST_HEAD(rule_list);
960 mutex_lock(&delay_dd.dd_mutex);
961 lnet_net_lock(LNET_LOCK_EX);
963 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
964 CDEBUG(D_NET, "src %s dst %s fa_src %s fa_dst %s\n",
965 libcfs_nidstr(src), libcfs_nidstr(dst),
966 libcfs_nidstr(&rule->dl_attr.fa_src),
967 libcfs_nidstr(&rule->dl_attr.fa_dst));
968 if (!(LNET_NID_IS_ANY(src) || nid_same(&rule->dl_attr.fa_src, src)))
971 if (!(LNET_NID_IS_ANY(dst) || nid_same(&rule->dl_attr.fa_dst, dst)))
974 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
975 libcfs_nidstr(&rule->dl_attr.fa_src),
976 libcfs_nidstr(&rule->dl_attr.fa_dst),
977 rule->dl_attr.u.delay.la_rate,
978 rule->dl_attr.u.delay.la_interval);
979 /* refcount is taken over by rule_list */
980 list_move(&rule->dl_link, &rule_list);
983 /* check if we need to shutdown delay_daemon */
984 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
985 !list_empty(&rule_list);
986 lnet_net_unlock(LNET_LOCK_EX);
988 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
989 list_del_init(&rule->dl_link);
991 timer_delete_sync(&rule->dl_timer);
992 delayed_msg_check(rule, true, &msg_list);
993 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
997 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
998 LASSERT(delay_dd.dd_running);
999 delay_dd.dd_running = 0;
1000 wake_up(&delay_dd.dd_waitq);
1002 while (!delay_dd.dd_stopped)
1003 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
1005 mutex_unlock(&delay_dd.dd_mutex);
1007 if (!list_empty(&msg_list))
1008 delayed_msg_process(&msg_list, shutdown);
1014 * List Delay Rule at position of \a pos
1017 lnet_delay_rule_list(int pos, struct lnet_fault_large_attr *attr,
1018 struct lnet_fault_stat *stat)
1020 struct lnet_delay_rule *rule;
1026 cpt = lnet_net_lock_current();
1027 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1031 spin_lock(&rule->dl_lock);
1032 *attr = rule->dl_attr;
1033 *stat = rule->dl_stat;
1034 spin_unlock(&rule->dl_lock);
1039 lnet_net_unlock(cpt);
1043 int lnet_delay_rule_collect(struct lnet_genl_fault_rule_list *rlist)
1045 struct lnet_delay_rule *rule;
1049 cpt = lnet_net_lock_current();
1050 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1051 struct lnet_rule_properties *prop;
1053 prop = genradix_ptr_alloc(&rlist->lgfrl_list,
1054 rlist->lgfrl_count++,
1060 spin_lock(&rule->dl_lock);
1061 prop->attr = rule->dl_attr;
1062 prop->stat = rule->dl_stat;
1063 spin_unlock(&rule->dl_lock);
1066 lnet_net_unlock(cpt);
1071 * reset counters for all Delay Rules
1074 lnet_delay_rule_reset(void)
1076 struct lnet_delay_rule *rule;
1080 cpt = lnet_net_lock_current();
1082 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
1083 struct lnet_fault_large_attr *attr = &rule->dl_attr;
1085 spin_lock(&rule->dl_lock);
1087 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
1088 if (attr->u.delay.la_rate != 0) {
1089 rule->dl_delay_at = get_random_u32_below(attr->u.delay.la_rate);
1091 rule->dl_delay_time = ktime_get_seconds() +
1092 get_random_u32_below(attr->u.delay.la_interval);
1093 rule->dl_time_base = ktime_get_seconds() +
1094 attr->u.delay.la_interval;
1096 spin_unlock(&rule->dl_lock);
1099 lnet_net_unlock(cpt);
1104 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
1106 struct lnet_fault_attr *attr4;
1107 struct lnet_fault_stat *stat;
1108 struct lnet_fault_large_attr attr = { { 0 } };
1111 attr4 = (struct lnet_fault_attr *)data->ioc_inlbuf1;
1113 lnet_fault_attr4_to_attr(attr4, &attr);
1119 case LNET_CTL_DROP_ADD:
1123 return lnet_drop_rule_add(&attr);
1125 case LNET_CTL_DROP_DEL:
1129 data->ioc_count = lnet_drop_rule_del(&attr.fa_src,
1133 case LNET_CTL_DROP_RESET:
1134 lnet_drop_rule_reset();
1137 case LNET_CTL_DROP_LIST:
1138 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1139 if (!attr4 || !stat)
1142 rc = lnet_drop_rule_list(data->ioc_count, &attr, stat);
1143 lnet_fault_attr_to_attr4(&attr, attr4);
1146 case LNET_CTL_DELAY_ADD:
1150 return lnet_delay_rule_add(&attr);
1152 case LNET_CTL_DELAY_DEL:
1156 data->ioc_count = lnet_delay_rule_del(&attr.fa_src,
1157 &attr.fa_dst, false);
1160 case LNET_CTL_DELAY_RESET:
1161 lnet_delay_rule_reset();
1164 case LNET_CTL_DELAY_LIST:
1165 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1166 if (!attr4 || !stat)
1169 rc = lnet_delay_rule_list(data->ioc_count, &attr, stat);
1170 lnet_fault_attr_to_attr4(&attr, attr4);
1176 lnet_fault_init(void)
1178 BUILD_BUG_ON(LNET_PUT_BIT != BIT(LNET_MSG_PUT));
1179 BUILD_BUG_ON(LNET_ACK_BIT != BIT(LNET_MSG_ACK));
1180 BUILD_BUG_ON(LNET_GET_BIT != BIT(LNET_MSG_GET));
1181 BUILD_BUG_ON(LNET_REPLY_BIT != BIT(LNET_MSG_REPLY));
1183 mutex_init(&delay_dd.dd_mutex);
1184 spin_lock_init(&delay_dd.dd_lock);
1185 init_waitqueue_head(&delay_dd.dd_waitq);
1186 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1187 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1193 lnet_fault_fini(void)
1195 lnet_drop_rule_del(NULL, NULL);
1196 lnet_delay_rule_del(NULL, NULL, true);
1198 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1199 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1200 LASSERT(list_empty(&delay_dd.dd_sched_rules));