4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2014, 2016, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Sun Microsystems, Inc.
30 * lnet/lnet/net_fault.c
32 * Lustre network fault simulation
34 * Author: liang.zhen@intel.com
37 #define DEBUG_SUBSYSTEM S_LNET
39 #include <lnet/lib-lnet.h>
40 #include <lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 cfs_time_t dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 cfs_time_t dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
70 if (nid == msg_nid || nid == LNET_NID_ANY)
73 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
76 /* 255.255.255.255@net is wildcard for all addresses in a network */
77 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82 lnet_nid_t dst, unsigned int type, unsigned int portal)
84 if (!lnet_fault_nid_match(attr->fa_src, src) ||
85 !lnet_fault_nid_match(attr->fa_dst, dst))
88 if (!(attr->fa_msg_mask & (1 << type)))
91 /* NB: ACK and REPLY have no portal, but they should have been
92 * rejected by message mask */
93 if (attr->fa_ptl_mask != 0 && /* has portal filter */
94 !(attr->fa_ptl_mask & (1ULL << portal)))
101 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 if (attr->fa_msg_mask == 0)
104 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106 if (attr->fa_ptl_mask == 0) /* no portal filter */
109 /* NB: only PUT and GET can be filtered if portal filter has been set */
110 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
111 if (attr->fa_msg_mask == 0) {
112 CDEBUG(D_NET, "can't find valid message type bits %x\n",
120 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 /* NB: fs_counter is NOT updated by this function */
140 * LNet message drop simulation
144 * Add a new drop rule to LNet
145 * There is no check for duplicated drop rule, all rules will be checked for
149 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 struct lnet_drop_rule *rule;
154 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
156 "please provide either drop rate or drop interval, "
157 "but not both at the same time %d/%d\n",
158 attr->u.drop.da_rate, attr->u.drop.da_interval);
162 if (lnet_fault_attr_validate(attr) != 0)
169 spin_lock_init(&rule->dr_lock);
171 rule->dr_attr = *attr;
172 if (attr->u.drop.da_interval != 0) {
173 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
174 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
175 attr->u.drop.da_interval);
177 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
180 lnet_net_lock(LNET_LOCK_EX);
181 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
182 lnet_net_unlock(LNET_LOCK_EX);
184 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
185 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
186 attr->u.drop.da_rate, attr->u.drop.da_interval);
191 * Remove matched drop rules from lnet, all rules that can match \a src and
192 * \a dst will be removed.
193 * If \a src is zero, then all rules have \a dst as destination will be remove
194 * If \a dst is zero, then all rules have \a src as source will be removed
195 * If both of them are zero, all rules will be removed
198 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
200 struct lnet_drop_rule *rule;
201 struct lnet_drop_rule *tmp;
202 struct list_head zombies;
206 INIT_LIST_HEAD(&zombies);
208 lnet_net_lock(LNET_LOCK_EX);
209 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
210 if (rule->dr_attr.fa_src != src && src != 0)
213 if (rule->dr_attr.fa_dst != dst && dst != 0)
216 list_move(&rule->dr_link, &zombies);
218 lnet_net_unlock(LNET_LOCK_EX);
220 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
221 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
222 libcfs_nid2str(rule->dr_attr.fa_src),
223 libcfs_nid2str(rule->dr_attr.fa_dst),
224 rule->dr_attr.u.drop.da_rate,
225 rule->dr_attr.u.drop.da_interval);
227 list_del(&rule->dr_link);
236 * List drop rule at position of \a pos
239 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
240 struct lnet_fault_stat *stat)
242 struct lnet_drop_rule *rule;
248 cpt = lnet_net_lock_current();
249 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
253 spin_lock(&rule->dr_lock);
254 *attr = rule->dr_attr;
255 *stat = rule->dr_stat;
256 spin_unlock(&rule->dr_lock);
261 lnet_net_unlock(cpt);
266 * reset counters for all drop rules
269 lnet_drop_rule_reset(void)
271 struct lnet_drop_rule *rule;
275 cpt = lnet_net_lock_current();
277 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
278 struct lnet_fault_attr *attr = &rule->dr_attr;
280 spin_lock(&rule->dr_lock);
282 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
283 if (attr->u.drop.da_rate != 0) {
284 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
286 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
287 attr->u.drop.da_interval);
288 rule->dr_time_base = cfs_time_shift(attr->u.drop.
291 spin_unlock(&rule->dr_lock);
294 lnet_net_unlock(cpt);
299 * check source/destination NID, portal, message type and drop rate,
300 * decide whether should drop this message or not
303 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
304 lnet_nid_t dst, unsigned int type, unsigned int portal)
306 struct lnet_fault_attr *attr = &rule->dr_attr;
309 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
312 /* match this rule, check drop rate now */
313 spin_lock(&rule->dr_lock);
314 if (rule->dr_drop_time != 0) { /* time based drop */
315 cfs_time_t now = cfs_time_current();
317 rule->dr_stat.fs_count++;
318 drop = cfs_time_aftereq(now, rule->dr_drop_time);
320 if (cfs_time_after(now, rule->dr_time_base))
321 rule->dr_time_base = now;
323 rule->dr_drop_time = rule->dr_time_base +
324 cfs_time_seconds(cfs_rand() %
325 attr->u.drop.da_interval);
326 rule->dr_time_base += cfs_time_seconds(attr->u.drop.
329 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : %ld\n",
330 libcfs_nid2str(attr->fa_src),
331 libcfs_nid2str(attr->fa_dst),
335 } else { /* rate based drop */
338 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
339 count = rule->dr_stat.fs_count;
340 if (do_div(count, attr->u.drop.da_rate) == 0) {
341 rule->dr_drop_at = rule->dr_stat.fs_count +
342 cfs_rand() % attr->u.drop.da_rate;
343 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
344 libcfs_nid2str(attr->fa_src),
345 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
349 if (drop) { /* drop this message, update counters */
350 lnet_fault_stat_inc(&rule->dr_stat, type);
351 rule->dr_stat.u.drop.ds_dropped++;
354 spin_unlock(&rule->dr_lock);
359 * Check if message from \a src to \a dst can match any existed drop rule
362 lnet_drop_rule_match(struct lnet_hdr *hdr)
364 struct lnet_drop_rule *rule;
365 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
366 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
367 unsigned int typ = le32_to_cpu(hdr->type);
368 unsigned int ptl = -1;
372 /* NB: if Portal is specified, then only PUT and GET will be
373 * filtered by drop rule */
374 if (typ == LNET_MSG_PUT)
375 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
376 else if (typ == LNET_MSG_GET)
377 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
379 cpt = lnet_net_lock_current();
380 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
381 drop = drop_rule_match(rule, src, dst, typ, ptl);
386 lnet_net_unlock(cpt);
391 * LNet Delay Simulation
393 /** timestamp (second) to send delayed message */
394 #define msg_delay_send msg_ev.hdr_data
396 struct lnet_delay_rule {
397 /** link chain on the_lnet.ln_delay_rules */
398 struct list_head dl_link;
399 /** link chain on delay_dd.dd_sched_rules */
400 struct list_head dl_sched_link;
401 /** attributes of this rule */
402 struct lnet_fault_attr dl_attr;
403 /** lock to protect \a below members */
405 /** refcount of delay rule */
406 atomic_t dl_refcount;
408 * the message sequence to delay, which means message is delayed when
409 * dl_stat.fs_count == dl_delay_at
411 unsigned long dl_delay_at;
413 * seconds to delay the next message, it's exclusive with dl_delay_at
415 cfs_time_t dl_delay_time;
416 /** baseline to caculate dl_delay_time */
417 cfs_time_t dl_time_base;
418 /** jiffies to send the next delayed message */
419 unsigned long dl_msg_send;
420 /** delayed message list */
421 struct list_head dl_msg_list;
422 /** statistic of delayed messages */
423 struct lnet_fault_stat dl_stat;
424 /** timer to wakeup delay_daemon */
425 struct timer_list dl_timer;
428 struct delay_daemon_data {
429 /** serialise rule add/remove */
430 struct mutex dd_mutex;
431 /** protect rules on \a dd_sched_rules */
433 /** scheduled delay rules (by timer) */
434 struct list_head dd_sched_rules;
435 /** deamon thread sleeps at here */
436 wait_queue_head_t dd_waitq;
437 /** controler (lctl command) wait at here */
438 wait_queue_head_t dd_ctl_waitq;
439 /** deamon is running */
440 unsigned int dd_running;
441 /** deamon stopped */
442 unsigned int dd_stopped;
445 static struct delay_daemon_data delay_dd;
448 round_timeout(cfs_time_t timeout)
450 return cfs_time_seconds((unsigned int)
451 cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
455 delay_rule_decref(struct lnet_delay_rule *rule)
457 if (atomic_dec_and_test(&rule->dl_refcount)) {
458 LASSERT(list_empty(&rule->dl_sched_link));
459 LASSERT(list_empty(&rule->dl_msg_list));
460 LASSERT(list_empty(&rule->dl_link));
467 * check source/destination NID, portal, message type and delay rate,
468 * decide whether should delay this message or not
471 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
472 lnet_nid_t dst, unsigned int type, unsigned int portal,
473 struct lnet_msg *msg)
475 struct lnet_fault_attr *attr = &rule->dl_attr;
478 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
481 /* match this rule, check delay rate now */
482 spin_lock(&rule->dl_lock);
483 if (rule->dl_delay_time != 0) { /* time based delay */
484 cfs_time_t now = cfs_time_current();
486 rule->dl_stat.fs_count++;
487 delay = cfs_time_aftereq(now, rule->dl_delay_time);
489 if (cfs_time_after(now, rule->dl_time_base))
490 rule->dl_time_base = now;
492 rule->dl_delay_time = rule->dl_time_base +
493 cfs_time_seconds(cfs_rand() %
494 attr->u.delay.la_interval);
495 rule->dl_time_base += cfs_time_seconds(attr->u.delay.
498 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : %ld\n",
499 libcfs_nid2str(attr->fa_src),
500 libcfs_nid2str(attr->fa_dst),
501 rule->dl_delay_time);
504 } else { /* rate based delay */
507 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
508 /* generate the next random rate sequence */
509 count = rule->dl_stat.fs_count;
510 if (do_div(count, attr->u.delay.la_rate) == 0) {
511 rule->dl_delay_at = rule->dl_stat.fs_count +
512 cfs_rand() % attr->u.delay.la_rate;
513 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
514 libcfs_nid2str(attr->fa_src),
515 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
520 spin_unlock(&rule->dl_lock);
524 /* delay this message, update counters */
525 lnet_fault_stat_inc(&rule->dl_stat, type);
526 rule->dl_stat.u.delay.ls_delayed++;
528 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
529 msg->msg_delay_send = round_timeout(
530 cfs_time_shift(attr->u.delay.la_latency));
531 if (rule->dl_msg_send == -1) {
532 rule->dl_msg_send = msg->msg_delay_send;
533 mod_timer(&rule->dl_timer, rule->dl_msg_send);
536 spin_unlock(&rule->dl_lock);
541 * check if \a msg can match any Delay Rule, receiving of this message
542 * will be delayed if there is a match.
545 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
547 struct lnet_delay_rule *rule;
548 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
549 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
550 unsigned int typ = le32_to_cpu(hdr->type);
551 unsigned int ptl = -1;
553 /* NB: called with hold of lnet_net_lock */
555 /* NB: if Portal is specified, then only PUT and GET will be
556 * filtered by delay rule */
557 if (typ == LNET_MSG_PUT)
558 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
559 else if (typ == LNET_MSG_GET)
560 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
562 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
563 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
570 /** check out delayed messages for send */
572 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
573 struct list_head *msg_list)
575 struct lnet_msg *msg;
576 struct lnet_msg *tmp;
577 unsigned long now = cfs_time_current();
579 if (!all && rule->dl_msg_send > now)
582 spin_lock(&rule->dl_lock);
583 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
584 if (!all && msg->msg_delay_send > now)
587 msg->msg_delay_send = 0;
588 list_move_tail(&msg->msg_list, msg_list);
591 if (list_empty(&rule->dl_msg_list)) {
592 del_timer(&rule->dl_timer);
593 rule->dl_msg_send = -1;
595 } else if (!list_empty(msg_list)) {
596 /* dequeued some timedout messages, update timer for the
597 * next delayed message on rule */
598 msg = list_entry(rule->dl_msg_list.next,
599 struct lnet_msg, msg_list);
600 rule->dl_msg_send = msg->msg_delay_send;
601 mod_timer(&rule->dl_timer, rule->dl_msg_send);
603 spin_unlock(&rule->dl_lock);
607 delayed_msg_process(struct list_head *msg_list, bool drop)
609 struct lnet_msg *msg;
611 while (!list_empty(msg_list)) {
616 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
617 LASSERT(msg->msg_rxpeer != NULL);
618 LASSERT(msg->msg_rxni != NULL);
621 cpt = msg->msg_rx_cpt;
623 list_del_init(&msg->msg_list);
627 } else if (!msg->msg_routing) {
628 rc = lnet_parse_local(ni, msg);
634 rc = lnet_parse_forward_locked(ni, msg);
635 lnet_net_unlock(cpt);
639 lnet_ni_recv(ni, msg->msg_private, msg, 0,
640 0, msg->msg_len, msg->msg_len);
641 case LNET_CREDIT_WAIT:
643 default: /* failures */
648 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
649 lnet_finalize(msg, rc);
654 * Process delayed messages for scheduled rules
655 * This function can either be called by delay_rule_daemon, or by lnet_finalise
658 lnet_delay_rule_check(void)
660 struct lnet_delay_rule *rule;
661 struct list_head msgs;
663 INIT_LIST_HEAD(&msgs);
665 if (list_empty(&delay_dd.dd_sched_rules))
668 spin_lock_bh(&delay_dd.dd_lock);
669 if (list_empty(&delay_dd.dd_sched_rules)) {
670 spin_unlock_bh(&delay_dd.dd_lock);
674 rule = list_entry(delay_dd.dd_sched_rules.next,
675 struct lnet_delay_rule, dl_sched_link);
676 list_del_init(&rule->dl_sched_link);
677 spin_unlock_bh(&delay_dd.dd_lock);
679 delayed_msg_check(rule, false, &msgs);
680 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
683 if (!list_empty(&msgs))
684 delayed_msg_process(&msgs, false);
687 /** deamon thread to handle delayed messages */
689 lnet_delay_rule_daemon(void *arg)
691 delay_dd.dd_running = 1;
692 wake_up(&delay_dd.dd_ctl_waitq);
694 while (delay_dd.dd_running) {
695 wait_event_interruptible(delay_dd.dd_waitq,
696 !delay_dd.dd_running ||
697 !list_empty(&delay_dd.dd_sched_rules));
698 lnet_delay_rule_check();
701 /* in case more rules have been enqueued after my last check */
702 lnet_delay_rule_check();
703 delay_dd.dd_stopped = 1;
704 wake_up(&delay_dd.dd_ctl_waitq);
710 delay_timer_cb(unsigned long arg)
712 struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
714 spin_lock_bh(&delay_dd.dd_lock);
715 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
716 atomic_inc(&rule->dl_refcount);
717 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
718 wake_up(&delay_dd.dd_waitq);
720 spin_unlock_bh(&delay_dd.dd_lock);
724 * Add a new delay rule to LNet
725 * There is no check for duplicated delay rule, all rules will be checked for
729 lnet_delay_rule_add(struct lnet_fault_attr *attr)
731 struct lnet_delay_rule *rule;
735 if (!((attr->u.delay.la_rate == 0) ^
736 (attr->u.delay.la_interval == 0))) {
738 "please provide either delay rate or delay interval, "
739 "but not both at the same time %d/%d\n",
740 attr->u.delay.la_rate, attr->u.delay.la_interval);
744 if (attr->u.delay.la_latency == 0) {
745 CDEBUG(D_NET, "delay latency cannot be zero\n");
749 if (lnet_fault_attr_validate(attr) != 0)
756 mutex_lock(&delay_dd.dd_mutex);
757 if (!delay_dd.dd_running) {
758 struct task_struct *task;
760 /* NB: although LND threads will process delayed message
761 * in lnet_finalize, but there is no guarantee that LND
762 * threads will be waken up if no other message needs to
764 * Only one daemon thread, performance is not the concern
765 * of this simualation module.
767 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
772 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
775 init_timer(&rule->dl_timer);
776 rule->dl_timer.function = delay_timer_cb;
777 rule->dl_timer.data = (unsigned long)rule;
779 spin_lock_init(&rule->dl_lock);
780 INIT_LIST_HEAD(&rule->dl_msg_list);
781 INIT_LIST_HEAD(&rule->dl_sched_link);
783 rule->dl_attr = *attr;
784 if (attr->u.delay.la_interval != 0) {
785 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
786 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
787 attr->u.delay.la_interval);
789 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
792 rule->dl_msg_send = -1;
794 lnet_net_lock(LNET_LOCK_EX);
795 atomic_set(&rule->dl_refcount, 1);
796 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
797 lnet_net_unlock(LNET_LOCK_EX);
799 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
800 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
801 attr->u.delay.la_rate);
803 mutex_unlock(&delay_dd.dd_mutex);
806 mutex_unlock(&delay_dd.dd_mutex);
812 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
813 * and \a dst are zero, all rules will be removed, otherwise only matched rules
815 * If \a src is zero, then all rules have \a dst as destination will be remove
816 * If \a dst is zero, then all rules have \a src as source will be removed
818 * When a delay rule is removed, all delayed messages of this rule will be
819 * processed immediately.
822 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
824 struct lnet_delay_rule *rule;
825 struct lnet_delay_rule *tmp;
826 struct list_head rule_list;
827 struct list_head msg_list;
832 INIT_LIST_HEAD(&rule_list);
833 INIT_LIST_HEAD(&msg_list);
838 mutex_lock(&delay_dd.dd_mutex);
839 lnet_net_lock(LNET_LOCK_EX);
841 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
842 if (rule->dl_attr.fa_src != src && src != 0)
845 if (rule->dl_attr.fa_dst != dst && dst != 0)
848 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
849 libcfs_nid2str(rule->dl_attr.fa_src),
850 libcfs_nid2str(rule->dl_attr.fa_dst),
851 rule->dl_attr.u.delay.la_rate,
852 rule->dl_attr.u.delay.la_interval);
853 /* refcount is taken over by rule_list */
854 list_move(&rule->dl_link, &rule_list);
857 /* check if we need to shutdown delay_daemon */
858 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
859 !list_empty(&rule_list);
860 lnet_net_unlock(LNET_LOCK_EX);
862 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
863 list_del_init(&rule->dl_link);
865 del_timer_sync(&rule->dl_timer);
866 delayed_msg_check(rule, true, &msg_list);
867 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
871 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
872 LASSERT(delay_dd.dd_running);
873 delay_dd.dd_running = 0;
874 wake_up(&delay_dd.dd_waitq);
876 while (!delay_dd.dd_stopped)
877 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
879 mutex_unlock(&delay_dd.dd_mutex);
881 if (!list_empty(&msg_list))
882 delayed_msg_process(&msg_list, shutdown);
888 * List Delay Rule at position of \a pos
891 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
892 struct lnet_fault_stat *stat)
894 struct lnet_delay_rule *rule;
900 cpt = lnet_net_lock_current();
901 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
905 spin_lock(&rule->dl_lock);
906 *attr = rule->dl_attr;
907 *stat = rule->dl_stat;
908 spin_unlock(&rule->dl_lock);
913 lnet_net_unlock(cpt);
918 * reset counters for all Delay Rules
921 lnet_delay_rule_reset(void)
923 struct lnet_delay_rule *rule;
927 cpt = lnet_net_lock_current();
929 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
930 struct lnet_fault_attr *attr = &rule->dl_attr;
932 spin_lock(&rule->dl_lock);
934 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
935 if (attr->u.delay.la_rate != 0) {
936 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
938 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
939 attr->u.delay.la_interval);
940 rule->dl_time_base = cfs_time_shift(attr->u.delay.
943 spin_unlock(&rule->dl_lock);
946 lnet_net_unlock(cpt);
951 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
953 struct lnet_fault_attr *attr;
954 struct lnet_fault_stat *stat;
956 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
962 case LNET_CTL_DROP_ADD:
966 return lnet_drop_rule_add(attr);
968 case LNET_CTL_DROP_DEL:
972 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
976 case LNET_CTL_DROP_RESET:
977 lnet_drop_rule_reset();
980 case LNET_CTL_DROP_LIST:
981 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
982 if (attr == NULL || stat == NULL)
985 return lnet_drop_rule_list(data->ioc_count, attr, stat);
987 case LNET_CTL_DELAY_ADD:
991 return lnet_delay_rule_add(attr);
993 case LNET_CTL_DELAY_DEL:
997 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
998 attr->fa_dst, false);
1001 case LNET_CTL_DELAY_RESET:
1002 lnet_delay_rule_reset();
1005 case LNET_CTL_DELAY_LIST:
1006 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1007 if (attr == NULL || stat == NULL)
1010 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1015 lnet_fault_init(void)
1017 CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1018 CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1019 CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1020 CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1022 mutex_init(&delay_dd.dd_mutex);
1023 spin_lock_init(&delay_dd.dd_lock);
1024 init_waitqueue_head(&delay_dd.dd_waitq);
1025 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1026 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1032 lnet_fault_fini(void)
1034 lnet_drop_rule_del(0, 0);
1035 lnet_delay_rule_del(0, 0, true);
1037 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1038 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1039 LASSERT(list_empty(&delay_dd.dd_sched_rules));