Whamcloud - gitweb
LU-6939 nrs: add lock to protect TBF rule linkage 96/17596/3
authorLi Xi <lixi@ddn.com>
Thu, 3 Dec 2015 02:49:03 +0000 (10:49 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Mon, 25 Jan 2016 01:58:13 +0000 (01:58 +0000)
Due to lack of lock protection for operations on rule's list of
client (nrs_tbf_rule->tr_cli_list), it leads to assertion panic
"ASSERTION( cli->tc_rule == ((void *)0) ) failed". Add spinlock
to control concurrent access to the list of client for a rule to
avoid this assertion failure.

Also, some of the fields of rule is inited earlier in
nrs_tbf_rule_start() so as to prevent crash in nrs_tbf_rule_put().

Signed-off-by: Li Xi <lixi@ddn.com>
Change-Id: Ic9ecb4318329400ac0f5900d185c60d4b9a98c48
Reviewed-on: http://review.whamcloud.com/17596
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Henri Doreau <henri.doreau@cea.fr>
Reviewed-by: Nikitas Angelinas <nikitasangelinas@gmail.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/include/lustre_nrs_tbf.h
lustre/ptlrpc/nrs_tbf.c

index b36b549..04998c3 100644 (file)
@@ -58,7 +58,9 @@ struct nrs_tbf_client {
        char                             tc_jobid[LUSTRE_JOBID_SIZE];
        /** Reference number of the client. */
        atomic_t                         tc_ref;
        char                             tc_jobid[LUSTRE_JOBID_SIZE];
        /** Reference number of the client. */
        atomic_t                         tc_ref;
-       /** Likage to rule. */
+       /** Lock to protect rule and linkage. */
+       spinlock_t                       tc_rule_lock;
+       /** Linkage to rule. */
        struct list_head                 tc_linkage;
        /** Pointer to rule. */
        struct nrs_tbf_rule             *tc_rule;
        struct list_head                 tc_linkage;
        /** Pointer to rule. */
        struct nrs_tbf_rule             *tc_rule;
@@ -115,6 +117,8 @@ struct nrs_tbf_rule {
        __u64                            tr_nsecs;
        /** Token bucket depth. */
        __u64                            tr_depth;
        __u64                            tr_nsecs;
        /** Token bucket depth. */
        __u64                            tr_depth;
+       /** Lock to protect the list of clients. */
+       spinlock_t                       tr_rule_lock;
        /** List of client. */
        struct list_head                 tr_cli_list;
        /** Flags of the rule. */
        /** List of client. */
        struct list_head                 tr_cli_list;
        /** Flags of the rule. */
index 1162c6f..7f92e10 100644 (file)
@@ -116,7 +116,9 @@ nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
 {
        LASSERT(!list_empty(&cli->tc_linkage));
        LASSERT(cli->tc_rule);
 {
        LASSERT(!list_empty(&cli->tc_linkage));
        LASSERT(cli->tc_rule);
+       spin_lock(&cli->tc_rule->tr_rule_lock);
        list_del_init(&cli->tc_linkage);
        list_del_init(&cli->tc_linkage);
+       spin_unlock(&cli->tc_rule->tr_rule_lock);
        nrs_tbf_rule_put(cli->tc_rule);
        cli->tc_rule = NULL;
 }
        nrs_tbf_rule_put(cli->tc_rule);
        cli->tc_rule = NULL;
 }
@@ -146,7 +148,8 @@ nrs_tbf_cli_reset(struct nrs_tbf_head *head,
                  struct nrs_tbf_rule *rule,
                  struct nrs_tbf_client *cli)
 {
                  struct nrs_tbf_rule *rule,
                  struct nrs_tbf_client *cli)
 {
-       if (!list_empty(&cli->tc_linkage)) {
+       spin_lock(&cli->tc_rule_lock);
+       if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
                LASSERT(rule != cli->tc_rule);
                nrs_tbf_cli_rule_put(cli);
        }
                LASSERT(rule != cli->tc_rule);
                nrs_tbf_cli_rule_put(cli);
        }
@@ -154,7 +157,10 @@ nrs_tbf_cli_reset(struct nrs_tbf_head *head,
        LASSERT(list_empty(&cli->tc_linkage));
        /* Rule's ref is added before called */
        cli->tc_rule = rule;
        LASSERT(list_empty(&cli->tc_linkage));
        /* Rule's ref is added before called */
        cli->tc_rule = rule;
+       spin_lock(&rule->tr_rule_lock);
        list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
        list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
+       spin_unlock(&rule->tr_rule_lock);
+       spin_unlock(&cli->tc_rule_lock);
        nrs_tbf_cli_reset_value(head, cli);
 }
 
        nrs_tbf_cli_reset_value(head, cli);
 }
 
@@ -252,6 +258,7 @@ nrs_tbf_cli_init(struct nrs_tbf_head *head,
        head->th_ops->o_cli_init(cli, req);
        INIT_LIST_HEAD(&cli->tc_list);
        INIT_LIST_HEAD(&cli->tc_linkage);
        head->th_ops->o_cli_init(cli, req);
        INIT_LIST_HEAD(&cli->tc_list);
        INIT_LIST_HEAD(&cli->tc_linkage);
+       spin_lock_init(&cli->tc_rule_lock);
        atomic_set(&cli->tc_ref, 1);
        rule = nrs_tbf_rule_match(head, cli);
        nrs_tbf_cli_reset(head, rule, cli);
        atomic_set(&cli->tc_ref, 1);
        rule = nrs_tbf_rule_match(head, cli);
        nrs_tbf_cli_reset(head, rule, cli);
@@ -263,7 +270,9 @@ nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
        LASSERT(list_empty(&cli->tc_list));
        LASSERT(!cli->tc_in_heap);
        LASSERT(atomic_read(&cli->tc_ref) == 0);
        LASSERT(list_empty(&cli->tc_list));
        LASSERT(!cli->tc_in_heap);
        LASSERT(atomic_read(&cli->tc_ref) == 0);
+       spin_lock(&cli->tc_rule_lock);
        nrs_tbf_cli_rule_put(cli);
        nrs_tbf_cli_rule_put(cli);
+       spin_unlock(&cli->tc_rule_lock);
        OBD_FREE_PTR(cli);
 }
 
        OBD_FREE_PTR(cli);
 }
 
@@ -292,6 +301,9 @@ nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
        atomic_set(&rule->tr_ref, 1);
        INIT_LIST_HEAD(&rule->tr_cli_list);
        INIT_LIST_HEAD(&rule->tr_nids);
        atomic_set(&rule->tr_ref, 1);
        INIT_LIST_HEAD(&rule->tr_cli_list);
        INIT_LIST_HEAD(&rule->tr_nids);
+       INIT_LIST_HEAD(&rule->tr_linkage);
+       spin_lock_init(&rule->tr_rule_lock);
+       rule->tr_head = head;
 
        rc = head->th_ops->o_rule_init(policy, rule, start);
        if (rc) {
 
        rc = head->th_ops->o_rule_init(policy, rule, start);
        if (rc) {
@@ -309,7 +321,6 @@ nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
                return -EEXIST;
        }
        list_add(&rule->tr_linkage, &head->th_list);
                return -EEXIST;
        }
        list_add(&rule->tr_linkage, &head->th_list);
-       rule->tr_head = head;
        spin_unlock(&head->th_rule_lock);
        atomic_inc(&head->th_rule_sequence);
        if (start->tc_rule_flags & NTRS_DEFAULT) {
        spin_unlock(&head->th_rule_lock);
        atomic_inc(&head->th_rule_sequence);
        if (start->tc_rule_flags & NTRS_DEFAULT) {