4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2014, 2016, Intel Corporation.
27 * This file is part of Lustre, http://www.lustre.org/
28 * Lustre is a trademark of Sun Microsystems, Inc.
30 * lnet/lnet/net_fault.c
32 * Lustre network fault simulation
34 * Author: liang.zhen@intel.com
37 #define DEBUG_SUBSYSTEM S_LNET
39 #include <lnet/lib-lnet.h>
40 #include <lnet/lnetctl.h>
42 #define LNET_MSG_MASK (LNET_PUT_BIT | LNET_ACK_BIT | \
43 LNET_GET_BIT | LNET_REPLY_BIT)
45 struct lnet_drop_rule {
46 /** link chain on the_lnet.ln_drop_rules */
47 struct list_head dr_link;
48 /** attributes of this rule */
49 struct lnet_fault_attr dr_attr;
50 /** lock to protect \a dr_drop_at and \a dr_stat */
53 * the message sequence to drop, which means message is dropped when
54 * dr_stat.drs_count == dr_drop_at
56 unsigned long dr_drop_at;
58 * seconds to drop the next message, it's exclusive with dr_drop_at
60 cfs_time_t dr_drop_time;
61 /** baseline to caculate dr_drop_time */
62 cfs_time_t dr_time_base;
63 /** statistic of dropped messages */
64 struct lnet_fault_stat dr_stat;
68 lnet_fault_nid_match(lnet_nid_t nid, lnet_nid_t msg_nid)
70 if (nid == msg_nid || nid == LNET_NID_ANY)
73 if (LNET_NIDNET(nid) != LNET_NIDNET(msg_nid))
76 /* 255.255.255.255@net is wildcard for all addresses in a network */
77 return LNET_NIDADDR(nid) == LNET_NIDADDR(LNET_NID_ANY);
81 lnet_fault_attr_match(struct lnet_fault_attr *attr, lnet_nid_t src,
82 lnet_nid_t dst, unsigned int type, unsigned int portal)
84 if (!lnet_fault_nid_match(attr->fa_src, src) ||
85 !lnet_fault_nid_match(attr->fa_dst, dst))
88 if (!(attr->fa_msg_mask & (1 << type)))
91 /* NB: ACK and REPLY have no portal, but they should have been
92 * rejected by message mask */
93 if (attr->fa_ptl_mask != 0 && /* has portal filter */
94 !(attr->fa_ptl_mask & (1ULL << portal)))
101 lnet_fault_attr_validate(struct lnet_fault_attr *attr)
103 if (attr->fa_msg_mask == 0)
104 attr->fa_msg_mask = LNET_MSG_MASK; /* all message types */
106 if (attr->fa_ptl_mask == 0) /* no portal filter */
109 /* NB: only PUT and GET can be filtered if portal filter has been set */
110 attr->fa_msg_mask &= LNET_GET_BIT | LNET_PUT_BIT;
111 if (attr->fa_msg_mask == 0) {
112 CDEBUG(D_NET, "can't find valid message type bits %x\n",
120 lnet_fault_stat_inc(struct lnet_fault_stat *stat, unsigned int type)
122 /* NB: fs_counter is NOT updated by this function */
140 * LNet message drop simulation
144 * Add a new drop rule to LNet
145 * There is no check for duplicated drop rule, all rules will be checked for
149 lnet_drop_rule_add(struct lnet_fault_attr *attr)
151 struct lnet_drop_rule *rule;
154 if (!((attr->u.drop.da_rate == 0) ^ (attr->u.drop.da_interval == 0))) {
156 "please provide either drop rate or drop interval, "
157 "but not both at the same time %d/%d\n",
158 attr->u.drop.da_rate, attr->u.drop.da_interval);
162 if (lnet_fault_attr_validate(attr) != 0)
169 spin_lock_init(&rule->dr_lock);
171 rule->dr_attr = *attr;
172 if (attr->u.drop.da_interval != 0) {
173 rule->dr_time_base = cfs_time_shift(attr->u.drop.da_interval);
174 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
175 attr->u.drop.da_interval);
177 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
180 lnet_net_lock(LNET_LOCK_EX);
181 list_add(&rule->dr_link, &the_lnet.ln_drop_rules);
182 lnet_net_unlock(LNET_LOCK_EX);
184 CDEBUG(D_NET, "Added drop rule: src %s, dst %s, rate %d, interval %d\n",
185 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
186 attr->u.drop.da_rate, attr->u.drop.da_interval);
191 * Remove matched drop rules from lnet, all rules that can match \a src and
192 * \a dst will be removed.
193 * If \a src is zero, then all rules have \a dst as destination will be remove
194 * If \a dst is zero, then all rules have \a src as source will be removed
195 * If both of them are zero, all rules will be removed
198 lnet_drop_rule_del(lnet_nid_t src, lnet_nid_t dst)
200 struct lnet_drop_rule *rule;
201 struct lnet_drop_rule *tmp;
202 struct list_head zombies;
206 INIT_LIST_HEAD(&zombies);
208 lnet_net_lock(LNET_LOCK_EX);
209 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_drop_rules, dr_link) {
210 if (rule->dr_attr.fa_src != src && src != 0)
213 if (rule->dr_attr.fa_dst != dst && dst != 0)
216 list_move(&rule->dr_link, &zombies);
218 lnet_net_unlock(LNET_LOCK_EX);
220 list_for_each_entry_safe(rule, tmp, &zombies, dr_link) {
221 CDEBUG(D_NET, "Remove drop rule: src %s->dst: %s (1/%d, %d)\n",
222 libcfs_nid2str(rule->dr_attr.fa_src),
223 libcfs_nid2str(rule->dr_attr.fa_dst),
224 rule->dr_attr.u.drop.da_rate,
225 rule->dr_attr.u.drop.da_interval);
227 list_del(&rule->dr_link);
236 * List drop rule at position of \a pos
239 lnet_drop_rule_list(int pos, struct lnet_fault_attr *attr,
240 struct lnet_fault_stat *stat)
242 struct lnet_drop_rule *rule;
248 cpt = lnet_net_lock_current();
249 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
253 spin_lock(&rule->dr_lock);
254 *attr = rule->dr_attr;
255 *stat = rule->dr_stat;
256 spin_unlock(&rule->dr_lock);
261 lnet_net_unlock(cpt);
266 * reset counters for all drop rules
269 lnet_drop_rule_reset(void)
271 struct lnet_drop_rule *rule;
275 cpt = lnet_net_lock_current();
277 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
278 struct lnet_fault_attr *attr = &rule->dr_attr;
280 spin_lock(&rule->dr_lock);
282 memset(&rule->dr_stat, 0, sizeof(rule->dr_stat));
283 if (attr->u.drop.da_rate != 0) {
284 rule->dr_drop_at = cfs_rand() % attr->u.drop.da_rate;
286 rule->dr_drop_time = cfs_time_shift(cfs_rand() %
287 attr->u.drop.da_interval);
288 rule->dr_time_base = cfs_time_shift(attr->u.drop.
291 spin_unlock(&rule->dr_lock);
294 lnet_net_unlock(cpt);
299 * check source/destination NID, portal, message type and drop rate,
300 * decide whether should drop this message or not
303 drop_rule_match(struct lnet_drop_rule *rule, lnet_nid_t src,
304 lnet_nid_t dst, unsigned int type, unsigned int portal)
306 struct lnet_fault_attr *attr = &rule->dr_attr;
309 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
312 /* match this rule, check drop rate now */
313 spin_lock(&rule->dr_lock);
314 if (rule->dr_drop_time != 0) { /* time based drop */
315 cfs_time_t now = cfs_time_current();
317 rule->dr_stat.fs_count++;
318 drop = cfs_time_aftereq(now, rule->dr_drop_time);
320 if (cfs_time_after(now, rule->dr_time_base))
321 rule->dr_time_base = now;
323 rule->dr_drop_time = rule->dr_time_base +
324 cfs_time_seconds(cfs_rand() %
325 attr->u.drop.da_interval);
326 rule->dr_time_base += cfs_time_seconds(attr->u.drop.
329 CDEBUG(D_NET, "Drop Rule %s->%s: next drop : "
331 libcfs_nid2str(attr->fa_src),
332 libcfs_nid2str(attr->fa_dst),
336 } else { /* rate based drop */
339 drop = rule->dr_stat.fs_count++ == rule->dr_drop_at;
340 count = rule->dr_stat.fs_count;
341 if (do_div(count, attr->u.drop.da_rate) == 0) {
342 rule->dr_drop_at = rule->dr_stat.fs_count +
343 cfs_rand() % attr->u.drop.da_rate;
344 CDEBUG(D_NET, "Drop Rule %s->%s: next drop: %lu\n",
345 libcfs_nid2str(attr->fa_src),
346 libcfs_nid2str(attr->fa_dst), rule->dr_drop_at);
350 if (drop) { /* drop this message, update counters */
351 lnet_fault_stat_inc(&rule->dr_stat, type);
352 rule->dr_stat.u.drop.ds_dropped++;
355 spin_unlock(&rule->dr_lock);
360 * Check if message from \a src to \a dst can match any existed drop rule
363 lnet_drop_rule_match(struct lnet_hdr *hdr)
365 struct lnet_drop_rule *rule;
366 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
367 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
368 unsigned int typ = le32_to_cpu(hdr->type);
369 unsigned int ptl = -1;
373 /* NB: if Portal is specified, then only PUT and GET will be
374 * filtered by drop rule */
375 if (typ == LNET_MSG_PUT)
376 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
377 else if (typ == LNET_MSG_GET)
378 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
380 cpt = lnet_net_lock_current();
381 list_for_each_entry(rule, &the_lnet.ln_drop_rules, dr_link) {
382 drop = drop_rule_match(rule, src, dst, typ, ptl);
387 lnet_net_unlock(cpt);
392 * LNet Delay Simulation
394 /** timestamp (second) to send delayed message */
395 #define msg_delay_send msg_ev.hdr_data
397 struct lnet_delay_rule {
398 /** link chain on the_lnet.ln_delay_rules */
399 struct list_head dl_link;
400 /** link chain on delay_dd.dd_sched_rules */
401 struct list_head dl_sched_link;
402 /** attributes of this rule */
403 struct lnet_fault_attr dl_attr;
404 /** lock to protect \a below members */
406 /** refcount of delay rule */
407 atomic_t dl_refcount;
409 * the message sequence to delay, which means message is delayed when
410 * dl_stat.fs_count == dl_delay_at
412 unsigned long dl_delay_at;
414 * seconds to delay the next message, it's exclusive with dl_delay_at
416 cfs_time_t dl_delay_time;
417 /** baseline to caculate dl_delay_time */
418 cfs_time_t dl_time_base;
419 /** jiffies to send the next delayed message */
420 unsigned long dl_msg_send;
421 /** delayed message list */
422 struct list_head dl_msg_list;
423 /** statistic of delayed messages */
424 struct lnet_fault_stat dl_stat;
425 /** timer to wakeup delay_daemon */
426 struct timer_list dl_timer;
429 struct delay_daemon_data {
430 /** serialise rule add/remove */
431 struct mutex dd_mutex;
432 /** protect rules on \a dd_sched_rules */
434 /** scheduled delay rules (by timer) */
435 struct list_head dd_sched_rules;
436 /** deamon thread sleeps at here */
437 wait_queue_head_t dd_waitq;
438 /** controler (lctl command) wait at here */
439 wait_queue_head_t dd_ctl_waitq;
440 /** deamon is running */
441 unsigned int dd_running;
442 /** deamon stopped */
443 unsigned int dd_stopped;
446 static struct delay_daemon_data delay_dd;
449 round_timeout(cfs_time_t timeout)
451 return cfs_time_seconds((unsigned int)
452 cfs_duration_sec(cfs_time_sub(timeout, 0)) + 1);
456 delay_rule_decref(struct lnet_delay_rule *rule)
458 if (atomic_dec_and_test(&rule->dl_refcount)) {
459 LASSERT(list_empty(&rule->dl_sched_link));
460 LASSERT(list_empty(&rule->dl_msg_list));
461 LASSERT(list_empty(&rule->dl_link));
468 * check source/destination NID, portal, message type and delay rate,
469 * decide whether should delay this message or not
472 delay_rule_match(struct lnet_delay_rule *rule, lnet_nid_t src,
473 lnet_nid_t dst, unsigned int type, unsigned int portal,
474 struct lnet_msg *msg)
476 struct lnet_fault_attr *attr = &rule->dl_attr;
479 if (!lnet_fault_attr_match(attr, src, dst, type, portal))
482 /* match this rule, check delay rate now */
483 spin_lock(&rule->dl_lock);
484 if (rule->dl_delay_time != 0) { /* time based delay */
485 cfs_time_t now = cfs_time_current();
487 rule->dl_stat.fs_count++;
488 delay = cfs_time_aftereq(now, rule->dl_delay_time);
490 if (cfs_time_after(now, rule->dl_time_base))
491 rule->dl_time_base = now;
493 rule->dl_delay_time = rule->dl_time_base +
494 cfs_time_seconds(cfs_rand() %
495 attr->u.delay.la_interval);
496 rule->dl_time_base += cfs_time_seconds(attr->u.delay.
499 CDEBUG(D_NET, "Delay Rule %s->%s: next delay : "
501 libcfs_nid2str(attr->fa_src),
502 libcfs_nid2str(attr->fa_dst),
503 rule->dl_delay_time);
506 } else { /* rate based delay */
509 delay = rule->dl_stat.fs_count++ == rule->dl_delay_at;
510 /* generate the next random rate sequence */
511 count = rule->dl_stat.fs_count;
512 if (do_div(count, attr->u.delay.la_rate) == 0) {
513 rule->dl_delay_at = rule->dl_stat.fs_count +
514 cfs_rand() % attr->u.delay.la_rate;
515 CDEBUG(D_NET, "Delay Rule %s->%s: next delay: %lu\n",
516 libcfs_nid2str(attr->fa_src),
517 libcfs_nid2str(attr->fa_dst), rule->dl_delay_at);
522 spin_unlock(&rule->dl_lock);
526 /* delay this message, update counters */
527 lnet_fault_stat_inc(&rule->dl_stat, type);
528 rule->dl_stat.u.delay.ls_delayed++;
530 list_add_tail(&msg->msg_list, &rule->dl_msg_list);
531 msg->msg_delay_send = round_timeout(
532 cfs_time_shift(attr->u.delay.la_latency));
533 if (rule->dl_msg_send == -1) {
534 rule->dl_msg_send = msg->msg_delay_send;
535 mod_timer(&rule->dl_timer, rule->dl_msg_send);
538 spin_unlock(&rule->dl_lock);
543 * check if \a msg can match any Delay Rule, receiving of this message
544 * will be delayed if there is a match.
547 lnet_delay_rule_match_locked(struct lnet_hdr *hdr, struct lnet_msg *msg)
549 struct lnet_delay_rule *rule;
550 lnet_nid_t src = le64_to_cpu(hdr->src_nid);
551 lnet_nid_t dst = le64_to_cpu(hdr->dest_nid);
552 unsigned int typ = le32_to_cpu(hdr->type);
553 unsigned int ptl = -1;
555 /* NB: called with hold of lnet_net_lock */
557 /* NB: if Portal is specified, then only PUT and GET will be
558 * filtered by delay rule */
559 if (typ == LNET_MSG_PUT)
560 ptl = le32_to_cpu(hdr->msg.put.ptl_index);
561 else if (typ == LNET_MSG_GET)
562 ptl = le32_to_cpu(hdr->msg.get.ptl_index);
564 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
565 if (delay_rule_match(rule, src, dst, typ, ptl, msg))
572 /** check out delayed messages for send */
574 delayed_msg_check(struct lnet_delay_rule *rule, bool all,
575 struct list_head *msg_list)
577 struct lnet_msg *msg;
578 struct lnet_msg *tmp;
579 unsigned long now = cfs_time_current();
581 if (!all && rule->dl_msg_send > now)
584 spin_lock(&rule->dl_lock);
585 list_for_each_entry_safe(msg, tmp, &rule->dl_msg_list, msg_list) {
586 if (!all && msg->msg_delay_send > now)
589 msg->msg_delay_send = 0;
590 list_move_tail(&msg->msg_list, msg_list);
593 if (list_empty(&rule->dl_msg_list)) {
594 del_timer(&rule->dl_timer);
595 rule->dl_msg_send = -1;
597 } else if (!list_empty(msg_list)) {
598 /* dequeued some timedout messages, update timer for the
599 * next delayed message on rule */
600 msg = list_entry(rule->dl_msg_list.next,
601 struct lnet_msg, msg_list);
602 rule->dl_msg_send = msg->msg_delay_send;
603 mod_timer(&rule->dl_timer, rule->dl_msg_send);
605 spin_unlock(&rule->dl_lock);
609 delayed_msg_process(struct list_head *msg_list, bool drop)
611 struct lnet_msg *msg;
613 while (!list_empty(msg_list)) {
618 msg = list_entry(msg_list->next, struct lnet_msg, msg_list);
619 LASSERT(msg->msg_rxpeer != NULL);
620 LASSERT(msg->msg_rxni != NULL);
623 cpt = msg->msg_rx_cpt;
625 list_del_init(&msg->msg_list);
629 } else if (!msg->msg_routing) {
630 rc = lnet_parse_local(ni, msg);
636 rc = lnet_parse_forward_locked(ni, msg);
637 lnet_net_unlock(cpt);
641 lnet_ni_recv(ni, msg->msg_private, msg, 0,
642 0, msg->msg_len, msg->msg_len);
643 case LNET_CREDIT_WAIT:
645 default: /* failures */
650 lnet_drop_message(ni, cpt, msg->msg_private, msg->msg_len);
651 lnet_finalize(msg, rc);
656 * Process delayed messages for scheduled rules
657 * This function can either be called by delay_rule_daemon, or by lnet_finalise
660 lnet_delay_rule_check(void)
662 struct lnet_delay_rule *rule;
663 struct list_head msgs;
665 INIT_LIST_HEAD(&msgs);
667 if (list_empty(&delay_dd.dd_sched_rules))
670 spin_lock_bh(&delay_dd.dd_lock);
671 if (list_empty(&delay_dd.dd_sched_rules)) {
672 spin_unlock_bh(&delay_dd.dd_lock);
676 rule = list_entry(delay_dd.dd_sched_rules.next,
677 struct lnet_delay_rule, dl_sched_link);
678 list_del_init(&rule->dl_sched_link);
679 spin_unlock_bh(&delay_dd.dd_lock);
681 delayed_msg_check(rule, false, &msgs);
682 delay_rule_decref(rule); /* -1 for delay_dd.dd_sched_rules */
685 if (!list_empty(&msgs))
686 delayed_msg_process(&msgs, false);
689 /** deamon thread to handle delayed messages */
691 lnet_delay_rule_daemon(void *arg)
693 delay_dd.dd_running = 1;
694 wake_up(&delay_dd.dd_ctl_waitq);
696 while (delay_dd.dd_running) {
697 wait_event_interruptible(delay_dd.dd_waitq,
698 !delay_dd.dd_running ||
699 !list_empty(&delay_dd.dd_sched_rules));
700 lnet_delay_rule_check();
703 /* in case more rules have been enqueued after my last check */
704 lnet_delay_rule_check();
705 delay_dd.dd_stopped = 1;
706 wake_up(&delay_dd.dd_ctl_waitq);
712 delay_timer_cb(unsigned long arg)
714 struct lnet_delay_rule *rule = (struct lnet_delay_rule *)arg;
716 spin_lock_bh(&delay_dd.dd_lock);
717 if (list_empty(&rule->dl_sched_link) && delay_dd.dd_running) {
718 atomic_inc(&rule->dl_refcount);
719 list_add_tail(&rule->dl_sched_link, &delay_dd.dd_sched_rules);
720 wake_up(&delay_dd.dd_waitq);
722 spin_unlock_bh(&delay_dd.dd_lock);
726 * Add a new delay rule to LNet
727 * There is no check for duplicated delay rule, all rules will be checked for
731 lnet_delay_rule_add(struct lnet_fault_attr *attr)
733 struct lnet_delay_rule *rule;
737 if (!((attr->u.delay.la_rate == 0) ^
738 (attr->u.delay.la_interval == 0))) {
740 "please provide either delay rate or delay interval, "
741 "but not both at the same time %d/%d\n",
742 attr->u.delay.la_rate, attr->u.delay.la_interval);
746 if (attr->u.delay.la_latency == 0) {
747 CDEBUG(D_NET, "delay latency cannot be zero\n");
751 if (lnet_fault_attr_validate(attr) != 0)
758 mutex_lock(&delay_dd.dd_mutex);
759 if (!delay_dd.dd_running) {
760 struct task_struct *task;
762 /* NB: although LND threads will process delayed message
763 * in lnet_finalize, but there is no guarantee that LND
764 * threads will be waken up if no other message needs to
766 * Only one daemon thread, performance is not the concern
767 * of this simualation module.
769 task = kthread_run(lnet_delay_rule_daemon, NULL, "lnet_dd");
774 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_running);
777 init_timer(&rule->dl_timer);
778 rule->dl_timer.function = delay_timer_cb;
779 rule->dl_timer.data = (unsigned long)rule;
781 spin_lock_init(&rule->dl_lock);
782 INIT_LIST_HEAD(&rule->dl_msg_list);
783 INIT_LIST_HEAD(&rule->dl_sched_link);
785 rule->dl_attr = *attr;
786 if (attr->u.delay.la_interval != 0) {
787 rule->dl_time_base = cfs_time_shift(attr->u.delay.la_interval);
788 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
789 attr->u.delay.la_interval);
791 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
794 rule->dl_msg_send = -1;
796 lnet_net_lock(LNET_LOCK_EX);
797 atomic_set(&rule->dl_refcount, 1);
798 list_add(&rule->dl_link, &the_lnet.ln_delay_rules);
799 lnet_net_unlock(LNET_LOCK_EX);
801 CDEBUG(D_NET, "Added delay rule: src %s, dst %s, rate %d\n",
802 libcfs_nid2str(attr->fa_src), libcfs_nid2str(attr->fa_src),
803 attr->u.delay.la_rate);
805 mutex_unlock(&delay_dd.dd_mutex);
808 mutex_unlock(&delay_dd.dd_mutex);
814 * Remove matched Delay Rules from lnet, if \a shutdown is true or both \a src
815 * and \a dst are zero, all rules will be removed, otherwise only matched rules
817 * If \a src is zero, then all rules have \a dst as destination will be remove
818 * If \a dst is zero, then all rules have \a src as source will be removed
820 * When a delay rule is removed, all delayed messages of this rule will be
821 * processed immediately.
824 lnet_delay_rule_del(lnet_nid_t src, lnet_nid_t dst, bool shutdown)
826 struct lnet_delay_rule *rule;
827 struct lnet_delay_rule *tmp;
828 struct list_head rule_list;
829 struct list_head msg_list;
834 INIT_LIST_HEAD(&rule_list);
835 INIT_LIST_HEAD(&msg_list);
840 mutex_lock(&delay_dd.dd_mutex);
841 lnet_net_lock(LNET_LOCK_EX);
843 list_for_each_entry_safe(rule, tmp, &the_lnet.ln_delay_rules, dl_link) {
844 if (rule->dl_attr.fa_src != src && src != 0)
847 if (rule->dl_attr.fa_dst != dst && dst != 0)
850 CDEBUG(D_NET, "Remove delay rule: src %s->dst: %s (1/%d, %d)\n",
851 libcfs_nid2str(rule->dl_attr.fa_src),
852 libcfs_nid2str(rule->dl_attr.fa_dst),
853 rule->dl_attr.u.delay.la_rate,
854 rule->dl_attr.u.delay.la_interval);
855 /* refcount is taken over by rule_list */
856 list_move(&rule->dl_link, &rule_list);
859 /* check if we need to shutdown delay_daemon */
860 cleanup = list_empty(&the_lnet.ln_delay_rules) &&
861 !list_empty(&rule_list);
862 lnet_net_unlock(LNET_LOCK_EX);
864 list_for_each_entry_safe(rule, tmp, &rule_list, dl_link) {
865 list_del_init(&rule->dl_link);
867 del_timer_sync(&rule->dl_timer);
868 delayed_msg_check(rule, true, &msg_list);
869 delay_rule_decref(rule); /* -1 for the_lnet.ln_delay_rules */
873 if (cleanup) { /* no more delay rule, shutdown delay_daemon */
874 LASSERT(delay_dd.dd_running);
875 delay_dd.dd_running = 0;
876 wake_up(&delay_dd.dd_waitq);
878 while (!delay_dd.dd_stopped)
879 wait_event(delay_dd.dd_ctl_waitq, delay_dd.dd_stopped);
881 mutex_unlock(&delay_dd.dd_mutex);
883 if (!list_empty(&msg_list))
884 delayed_msg_process(&msg_list, shutdown);
890 * List Delay Rule at position of \a pos
893 lnet_delay_rule_list(int pos, struct lnet_fault_attr *attr,
894 struct lnet_fault_stat *stat)
896 struct lnet_delay_rule *rule;
902 cpt = lnet_net_lock_current();
903 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
907 spin_lock(&rule->dl_lock);
908 *attr = rule->dl_attr;
909 *stat = rule->dl_stat;
910 spin_unlock(&rule->dl_lock);
915 lnet_net_unlock(cpt);
920 * reset counters for all Delay Rules
923 lnet_delay_rule_reset(void)
925 struct lnet_delay_rule *rule;
929 cpt = lnet_net_lock_current();
931 list_for_each_entry(rule, &the_lnet.ln_delay_rules, dl_link) {
932 struct lnet_fault_attr *attr = &rule->dl_attr;
934 spin_lock(&rule->dl_lock);
936 memset(&rule->dl_stat, 0, sizeof(rule->dl_stat));
937 if (attr->u.delay.la_rate != 0) {
938 rule->dl_delay_at = cfs_rand() % attr->u.delay.la_rate;
940 rule->dl_delay_time = cfs_time_shift(cfs_rand() %
941 attr->u.delay.la_interval);
942 rule->dl_time_base = cfs_time_shift(attr->u.delay.
945 spin_unlock(&rule->dl_lock);
948 lnet_net_unlock(cpt);
953 lnet_fault_ctl(int opc, struct libcfs_ioctl_data *data)
955 struct lnet_fault_attr *attr;
956 struct lnet_fault_stat *stat;
958 attr = (struct lnet_fault_attr *)data->ioc_inlbuf1;
964 case LNET_CTL_DROP_ADD:
968 return lnet_drop_rule_add(attr);
970 case LNET_CTL_DROP_DEL:
974 data->ioc_count = lnet_drop_rule_del(attr->fa_src,
978 case LNET_CTL_DROP_RESET:
979 lnet_drop_rule_reset();
982 case LNET_CTL_DROP_LIST:
983 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
984 if (attr == NULL || stat == NULL)
987 return lnet_drop_rule_list(data->ioc_count, attr, stat);
989 case LNET_CTL_DELAY_ADD:
993 return lnet_delay_rule_add(attr);
995 case LNET_CTL_DELAY_DEL:
999 data->ioc_count = lnet_delay_rule_del(attr->fa_src,
1000 attr->fa_dst, false);
1003 case LNET_CTL_DELAY_RESET:
1004 lnet_delay_rule_reset();
1007 case LNET_CTL_DELAY_LIST:
1008 stat = (struct lnet_fault_stat *)data->ioc_inlbuf2;
1009 if (attr == NULL || stat == NULL)
1012 return lnet_delay_rule_list(data->ioc_count, attr, stat);
1017 lnet_fault_init(void)
1019 CLASSERT(LNET_PUT_BIT == 1 << LNET_MSG_PUT);
1020 CLASSERT(LNET_ACK_BIT == 1 << LNET_MSG_ACK);
1021 CLASSERT(LNET_GET_BIT == 1 << LNET_MSG_GET);
1022 CLASSERT(LNET_REPLY_BIT == 1 << LNET_MSG_REPLY);
1024 mutex_init(&delay_dd.dd_mutex);
1025 spin_lock_init(&delay_dd.dd_lock);
1026 init_waitqueue_head(&delay_dd.dd_waitq);
1027 init_waitqueue_head(&delay_dd.dd_ctl_waitq);
1028 INIT_LIST_HEAD(&delay_dd.dd_sched_rules);
1034 lnet_fault_fini(void)
1036 lnet_drop_rule_del(0, 0);
1037 lnet_delay_rule_del(0, 0, true);
1039 LASSERT(list_empty(&the_lnet.ln_drop_rules));
1040 LASSERT(list_empty(&the_lnet.ln_delay_rules));
1041 LASSERT(list_empty(&delay_dd.dd_sched_rules));