4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (C) 2013 DataDirect Networks, Inc.
25 * Copyright (c) 2014, 2016, Intel Corporation.
28 * lustre/ptlrpc/nrs_tbf.c
30 * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
49 * Token Bucket Filter over client NIDs
54 #define NRS_POL_NAME_TBF "tbf"
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
70 struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
72 struct ptlrpc_nrs *nrs = head->th_res.res_policy->pol_nrs;
73 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
75 nrs->nrs_throttling = 0;
76 wake_up(&svcpt->scp_waitq);
78 return HRTIMER_NORESTART;
81 #define NRS_TBF_DEFAULT_RULE "default"
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
85 LASSERT(atomic_read(&rule->tr_ref) == 0);
86 LASSERT(list_empty(&rule->tr_cli_list));
87 LASSERT(list_empty(&rule->tr_linkage));
89 rule->tr_head->th_ops->o_rule_fini(rule);
94 * Decreases the rule's usage reference count, and stops the rule in case it
95 * was already stopping and have no more outstanding usage references (which
96 * indicates it has no more queued or started requests, and can be safely
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
101 if (atomic_dec_and_test(&rule->tr_ref))
102 nrs_tbf_rule_fini(rule);
106 * Increases the rule's usage reference count.
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
110 atomic_inc(&rule->tr_ref);
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
116 LASSERT(!list_empty(&cli->tc_linkage));
117 LASSERT(cli->tc_rule);
118 spin_lock(&cli->tc_rule->tr_rule_lock);
119 list_del_init(&cli->tc_linkage);
120 spin_unlock(&cli->tc_rule->tr_rule_lock);
121 nrs_tbf_rule_put(cli->tc_rule);
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127 struct nrs_tbf_client *cli)
130 struct nrs_tbf_rule *rule = cli->tc_rule;
132 cli->tc_rpc_rate = rule->tr_rpc_rate;
133 cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134 cli->tc_depth = rule->tr_depth;
135 cli->tc_ntoken = rule->tr_depth;
136 cli->tc_check_time = ktime_to_ns(ktime_get());
137 cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
138 cli->tc_rule_generation = rule->tr_generation;
141 binheap_relocate(head->th_binheap,
146 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
147 struct nrs_tbf_rule *rule,
148 struct nrs_tbf_client *cli)
150 spin_lock(&cli->tc_rule_lock);
151 if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
152 LASSERT(rule != cli->tc_rule);
153 nrs_tbf_cli_rule_put(cli);
155 LASSERT(cli->tc_rule == NULL);
156 LASSERT(list_empty(&cli->tc_linkage));
157 /* Rule's ref is added before called */
159 spin_lock(&rule->tr_rule_lock);
160 list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
161 spin_unlock(&rule->tr_rule_lock);
162 spin_unlock(&cli->tc_rule_lock);
163 nrs_tbf_cli_reset_value(head, cli);
167 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
169 return rule->tr_head->th_ops->o_rule_dump(rule, m);
173 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
175 struct nrs_tbf_rule *rule;
178 LASSERT(head != NULL);
179 spin_lock(&head->th_rule_lock);
180 /* List the rules from newest to oldest */
181 list_for_each_entry(rule, &head->th_list, tr_linkage) {
182 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
183 rc = nrs_tbf_rule_dump(rule, m);
189 spin_unlock(&head->th_rule_lock);
194 static struct nrs_tbf_rule *
195 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
198 struct nrs_tbf_rule *rule;
200 LASSERT(head != NULL);
201 list_for_each_entry(rule, &head->th_list, tr_linkage) {
202 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
203 if (strcmp(rule->tr_name, name) == 0) {
204 nrs_tbf_rule_get(rule);
211 static struct nrs_tbf_rule *
212 nrs_tbf_rule_find(struct nrs_tbf_head *head,
215 struct nrs_tbf_rule *rule;
217 LASSERT(head != NULL);
218 spin_lock(&head->th_rule_lock);
219 rule = nrs_tbf_rule_find_nolock(head, name);
220 spin_unlock(&head->th_rule_lock);
224 static struct nrs_tbf_rule *
225 nrs_tbf_rule_match(struct nrs_tbf_head *head,
226 struct nrs_tbf_client *cli)
228 struct nrs_tbf_rule *rule = NULL;
229 struct nrs_tbf_rule *tmp_rule;
231 spin_lock(&head->th_rule_lock);
232 /* Match the newest rule in the list */
233 list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
234 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
235 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
242 rule = head->th_rule;
244 nrs_tbf_rule_get(rule);
245 spin_unlock(&head->th_rule_lock);
250 nrs_tbf_cli_init(struct nrs_tbf_head *head,
251 struct nrs_tbf_client *cli,
252 struct ptlrpc_request *req)
254 struct nrs_tbf_rule *rule;
256 memset(cli, 0, sizeof(*cli));
257 cli->tc_in_heap = false;
258 head->th_ops->o_cli_init(cli, req);
259 INIT_LIST_HEAD(&cli->tc_list);
260 INIT_LIST_HEAD(&cli->tc_linkage);
261 spin_lock_init(&cli->tc_rule_lock);
262 atomic_set(&cli->tc_ref, 1);
263 rule = nrs_tbf_rule_match(head, cli);
264 nrs_tbf_cli_reset(head, rule, cli);
268 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
270 LASSERT(list_empty(&cli->tc_list));
271 LASSERT(!cli->tc_in_heap);
272 LASSERT(atomic_read(&cli->tc_ref) == 0);
273 spin_lock(&cli->tc_rule_lock);
274 nrs_tbf_cli_rule_put(cli);
275 spin_unlock(&cli->tc_rule_lock);
280 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
281 struct nrs_tbf_head *head,
282 struct nrs_tbf_cmd *start)
284 struct nrs_tbf_rule *rule;
285 struct nrs_tbf_rule *tmp_rule;
286 struct nrs_tbf_rule *next_rule;
287 char *next_name = start->u.tc_start.ts_next_name;
290 rule = nrs_tbf_rule_find(head, start->tc_name);
292 nrs_tbf_rule_put(rule);
296 OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
300 memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
301 rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
302 rule->tr_flags = start->u.tc_start.ts_rule_flags;
303 rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
304 rule->tr_depth = tbf_depth;
305 atomic_set(&rule->tr_ref, 1);
306 INIT_LIST_HEAD(&rule->tr_cli_list);
307 INIT_LIST_HEAD(&rule->tr_nids);
308 INIT_LIST_HEAD(&rule->tr_linkage);
309 spin_lock_init(&rule->tr_rule_lock);
310 rule->tr_head = head;
312 rc = head->th_ops->o_rule_init(policy, rule, start);
318 /* Add as the newest rule */
319 spin_lock(&head->th_rule_lock);
320 tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
322 spin_unlock(&head->th_rule_lock);
323 nrs_tbf_rule_put(tmp_rule);
324 nrs_tbf_rule_put(rule);
329 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
331 spin_unlock(&head->th_rule_lock);
332 nrs_tbf_rule_put(rule);
336 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
337 nrs_tbf_rule_put(next_rule);
339 /* Add on the top of the rule list */
340 list_add(&rule->tr_linkage, &head->th_list);
342 spin_unlock(&head->th_rule_lock);
343 atomic_inc(&head->th_rule_sequence);
344 if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
345 rule->tr_flags |= NTRS_DEFAULT;
346 LASSERT(head->th_rule == NULL);
347 head->th_rule = rule;
350 CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %u gen %llu\n",
351 rule, rule->tr_rpc_rate, rule->tr_generation);
357 * Change the rank of a rule in the rule list
359 * The matched rule will be moved to the position right before another
362 * \param[in] policy the policy instance
363 * \param[in] head the TBF policy instance
364 * \param[in] name the rule name to be moved
365 * \param[in] next_name the rule name before which the matched rule will be
370 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
371 struct nrs_tbf_head *head,
375 struct nrs_tbf_rule *rule = NULL;
376 struct nrs_tbf_rule *next_rule = NULL;
379 LASSERT(head != NULL);
381 spin_lock(&head->th_rule_lock);
382 rule = nrs_tbf_rule_find_nolock(head, name);
384 GOTO(out, rc = -ENOENT);
386 if (strcmp(name, next_name) == 0)
389 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
391 GOTO(out_put, rc = -ENOENT);
393 list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
394 nrs_tbf_rule_put(next_rule);
396 nrs_tbf_rule_put(rule);
398 spin_unlock(&head->th_rule_lock);
403 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
404 struct nrs_tbf_head *head,
408 struct nrs_tbf_rule *rule;
410 assert_spin_locked(&policy->pol_nrs->nrs_lock);
412 rule = nrs_tbf_rule_find(head, name);
416 rule->tr_rpc_rate = rate;
417 rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
418 rule->tr_generation++;
419 nrs_tbf_rule_put(rule);
425 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
426 struct nrs_tbf_head *head,
427 struct nrs_tbf_cmd *change)
429 __u64 rate = change->u.tc_change.tc_rpc_rate;
430 char *next_name = change->u.tc_change.tc_next_name;
434 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
441 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
451 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
452 struct nrs_tbf_head *head,
453 struct nrs_tbf_cmd *stop)
455 struct nrs_tbf_rule *rule;
457 assert_spin_locked(&policy->pol_nrs->nrs_lock);
459 if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
462 rule = nrs_tbf_rule_find(head, stop->tc_name);
466 list_del_init(&rule->tr_linkage);
467 rule->tr_flags |= NTRS_STOPPING;
468 nrs_tbf_rule_put(rule);
469 nrs_tbf_rule_put(rule);
475 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
476 struct nrs_tbf_head *head,
477 struct nrs_tbf_cmd *cmd)
481 assert_spin_locked(&policy->pol_nrs->nrs_lock);
483 switch (cmd->tc_cmd) {
484 case NRS_CTL_TBF_START_RULE:
485 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
488 spin_unlock(&policy->pol_nrs->nrs_lock);
489 rc = nrs_tbf_rule_start(policy, head, cmd);
490 spin_lock(&policy->pol_nrs->nrs_lock);
492 case NRS_CTL_TBF_CHANGE_RULE:
493 rc = nrs_tbf_rule_change(policy, head, cmd);
495 case NRS_CTL_TBF_STOP_RULE:
496 rc = nrs_tbf_rule_stop(policy, head, cmd);
497 /* Take it as a success, if not exists at all */
498 return rc == -ENOENT ? 0 : rc;
505 * Binary heap predicate.
507 * \param[in] e1 the first binheap node to compare
508 * \param[in] e2 the second binheap node to compare
514 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
516 struct nrs_tbf_client *cli1;
517 struct nrs_tbf_client *cli2;
519 cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
520 cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
522 if (cli1->tc_deadline < cli2->tc_deadline)
524 else if (cli1->tc_deadline > cli2->tc_deadline)
527 if (cli1->tc_check_time < cli2->tc_check_time)
529 else if (cli1->tc_check_time > cli2->tc_check_time)
532 /* Maybe need more comparasion, e.g. request number in the rules */
537 * TBF binary heap operations
539 static struct binheap_ops nrs_tbf_heap_ops = {
542 .hop_compare = tbf_cli_compare,
545 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
548 return cfs_hash_djb2_hash(key, strlen(key), mask);
551 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
553 struct nrs_tbf_client *cli = hlist_entry(hnode,
554 struct nrs_tbf_client,
557 return (strcmp(cli->tc_jobid, key) == 0);
560 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
562 struct nrs_tbf_client *cli = hlist_entry(hnode,
563 struct nrs_tbf_client,
566 return cli->tc_jobid;
569 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
571 return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
574 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
576 struct nrs_tbf_client *cli = hlist_entry(hnode,
577 struct nrs_tbf_client,
580 atomic_inc(&cli->tc_ref);
583 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
585 struct nrs_tbf_client *cli = hlist_entry(hnode,
586 struct nrs_tbf_client,
589 atomic_dec(&cli->tc_ref);
593 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
596 struct nrs_tbf_client *cli = hlist_entry(hnode,
597 struct nrs_tbf_client,
600 LASSERT(atomic_read(&cli->tc_ref) == 0);
601 nrs_tbf_cli_fini(cli);
604 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
605 .hs_hash = nrs_tbf_jobid_hop_hash,
606 .hs_keycmp = nrs_tbf_jobid_hop_keycmp,
607 .hs_key = nrs_tbf_jobid_hop_key,
608 .hs_object = nrs_tbf_hop_object,
609 .hs_get = nrs_tbf_jobid_hop_get,
610 .hs_put = nrs_tbf_jobid_hop_put,
611 .hs_put_locked = nrs_tbf_jobid_hop_put,
612 .hs_exit = nrs_tbf_jobid_hop_exit,
615 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
616 CFS_HASH_NO_ITEMREF | \
619 static struct nrs_tbf_client *
620 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
621 struct cfs_hash_bd *bd,
624 struct hlist_node *hnode;
625 struct nrs_tbf_client *cli;
627 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
631 cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
632 if (!list_empty(&cli->tc_lru))
633 list_del_init(&cli->tc_lru);
637 #define NRS_TBF_JOBID_NULL ""
639 static struct nrs_tbf_client *
640 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
641 struct ptlrpc_request *req)
644 struct nrs_tbf_client *cli;
645 struct cfs_hash *hs = head->th_cli_hash;
646 struct cfs_hash_bd bd;
648 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
650 jobid = NRS_TBF_JOBID_NULL;
651 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
652 cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
653 cfs_hash_bd_unlock(hs, &bd, 1);
658 static struct nrs_tbf_client *
659 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
660 struct nrs_tbf_client *cli)
663 struct nrs_tbf_client *ret;
664 struct cfs_hash *hs = head->th_cli_hash;
665 struct cfs_hash_bd bd;
667 jobid = cli->tc_jobid;
668 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
669 ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
671 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
674 cfs_hash_bd_unlock(hs, &bd, 1);
680 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
681 struct nrs_tbf_client *cli)
683 struct cfs_hash_bd bd;
684 struct cfs_hash *hs = head->th_cli_hash;
685 struct nrs_tbf_bucket *bkt;
689 cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
690 bkt = cfs_hash_bd_extra_get(hs, &bd);
691 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
693 LASSERT(list_empty(&cli->tc_lru));
694 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
697 * Check and purge the LRU, there is at least one client in the LRU.
699 hw = tbf_jobid_cache_size >>
700 (hs->hs_cur_bits - hs->hs_bkt_bits);
701 while (cfs_hash_bd_count_get(&bd) > hw) {
702 if (unlikely(list_empty(&bkt->ntb_lru)))
704 cli = list_entry(bkt->ntb_lru.next,
705 struct nrs_tbf_client,
707 LASSERT(atomic_read(&cli->tc_ref) == 0);
708 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
709 list_move(&cli->tc_lru, &zombies);
711 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
713 while (!list_empty(&zombies)) {
714 cli = container_of(zombies.next,
715 struct nrs_tbf_client, tc_lru);
716 list_del_init(&cli->tc_lru);
717 nrs_tbf_cli_fini(cli);
722 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
723 struct ptlrpc_request *req)
725 char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
728 jobid = NRS_TBF_JOBID_NULL;
729 LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
730 INIT_LIST_HEAD(&cli->tc_lru);
731 memcpy(cli->tc_jobid, jobid, strlen(jobid));
734 static int nrs_tbf_jobid_hash_order(void)
738 for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
744 #define NRS_TBF_JOBID_BKT_BITS 10
747 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
748 struct nrs_tbf_head *head)
750 struct nrs_tbf_cmd start;
751 struct nrs_tbf_bucket *bkt;
755 struct cfs_hash_bd bd;
757 bits = nrs_tbf_jobid_hash_order();
758 if (bits < NRS_TBF_JOBID_BKT_BITS)
759 bits = NRS_TBF_JOBID_BKT_BITS;
760 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
763 NRS_TBF_JOBID_BKT_BITS,
767 &nrs_tbf_jobid_hash_ops,
768 NRS_TBF_JOBID_HASH_FLAGS);
769 if (head->th_cli_hash == NULL)
772 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
773 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
774 INIT_LIST_HEAD(&bkt->ntb_lru);
777 memset(&start, 0, sizeof(start));
778 start.u.tc_start.ts_jobids_str = "*";
780 start.u.tc_start.ts_rpc_rate = tbf_rate;
781 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
782 start.tc_name = NRS_TBF_DEFAULT_RULE;
783 INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
784 rc = nrs_tbf_rule_start(policy, head, &start);
786 cfs_hash_putref(head->th_cli_hash);
787 head->th_cli_hash = NULL;
794 * Frees jobid of \a list.
798 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
800 struct nrs_tbf_jobid *jobid, *n;
802 list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
803 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
804 list_del(&jobid->tj_linkage);
810 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
812 struct nrs_tbf_jobid *jobid;
815 OBD_ALLOC_PTR(jobid);
819 OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
820 if (jobid->tj_id == NULL) {
825 memcpy(jobid->tj_id, id->ls_str, id->ls_len);
826 ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
828 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
830 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
832 list_add_tail(&jobid->tj_linkage, jobid_list);
837 cfs_match_wildcard(const char *pattern, const char *content)
839 if (*pattern == '\0' && *content == '\0')
842 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
845 while (*pattern == *content) {
848 if (*pattern == '\0' && *content == '\0')
851 if (*pattern == '*' && *(pattern + 1) != '\0' &&
857 return (cfs_match_wildcard(pattern + 1, content) ||
858 cfs_match_wildcard(pattern, content + 1));
864 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
866 if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
867 return strcmp(jobid->tj_id, id) == 0;
869 if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
870 return cfs_match_wildcard(jobid->tj_id, id);
876 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
878 struct nrs_tbf_jobid *jobid;
880 list_for_each_entry(jobid, jobid_list, tj_linkage) {
881 if (nrs_tbf_jobid_match(jobid, id))
888 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
897 INIT_LIST_HEAD(jobid_list);
899 rc = cfs_gettok(&src, ' ', &res);
904 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
909 nrs_tbf_jobid_list_free(jobid_list);
913 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
915 if (!list_empty(&cmd->u.tc_start.ts_jobids))
916 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
917 if (cmd->u.tc_start.ts_jobids_str)
918 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
919 strlen(cmd->u.tc_start.ts_jobids_str) + 1);
922 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
925 int keylen = strlen(key);
928 rc = cfs_gettok(src, '=', &res);
929 if (rc == 0 || res.ls_len != keylen ||
930 strncmp(res.ls_str, key, keylen) != 0 ||
931 src->ls_len <= 2 || src->ls_str[0] != '{' ||
932 src->ls_str[src->ls_len - 1] != '}')
935 /* Skip '{' and '}' */
941 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
947 src.ls_len = strlen(id);
948 rc = nrs_tbf_check_id_value(&src, "jobid");
952 OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
953 if (cmd->u.tc_start.ts_jobids_str == NULL)
956 memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
958 /* parse jobid list */
959 rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
960 strlen(cmd->u.tc_start.ts_jobids_str),
961 &cmd->u.tc_start.ts_jobids);
963 nrs_tbf_jobid_cmd_fini(cmd);
968 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
969 struct nrs_tbf_rule *rule,
970 struct nrs_tbf_cmd *start)
974 LASSERT(start->u.tc_start.ts_jobids_str);
975 OBD_ALLOC(rule->tr_jobids_str,
976 strlen(start->u.tc_start.ts_jobids_str) + 1);
977 if (rule->tr_jobids_str == NULL)
980 memcpy(rule->tr_jobids_str,
981 start->u.tc_start.ts_jobids_str,
982 strlen(start->u.tc_start.ts_jobids_str));
984 INIT_LIST_HEAD(&rule->tr_jobids);
985 if (!list_empty(&start->u.tc_start.ts_jobids)) {
986 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
987 strlen(rule->tr_jobids_str),
990 CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
993 OBD_FREE(rule->tr_jobids_str,
994 strlen(start->u.tc_start.ts_jobids_str) + 1);
999 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1001 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1002 rule->tr_jobids_str, rule->tr_rpc_rate,
1003 atomic_read(&rule->tr_ref) - 1);
1008 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1009 struct nrs_tbf_client *cli)
1011 return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1014 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1016 if (!list_empty(&rule->tr_jobids))
1017 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1018 LASSERT(rule->tr_jobids_str != NULL);
1019 OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1022 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1023 .o_name = NRS_TBF_TYPE_JOBID,
1024 .o_startup = nrs_tbf_jobid_startup,
1025 .o_cli_find = nrs_tbf_jobid_cli_find,
1026 .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1027 .o_cli_put = nrs_tbf_jobid_cli_put,
1028 .o_cli_init = nrs_tbf_jobid_cli_init,
1029 .o_rule_init = nrs_tbf_jobid_rule_init,
1030 .o_rule_dump = nrs_tbf_jobid_rule_dump,
1031 .o_rule_match = nrs_tbf_jobid_rule_match,
1032 .o_rule_fini = nrs_tbf_jobid_rule_fini,
1036 * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1038 * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1039 * nrs_tbf_client objects.
1041 #define NRS_TBF_NID_BKT_BITS 8
1042 #define NRS_TBF_NID_BITS 16
1044 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1047 return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1050 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1052 lnet_nid_t *nid = (lnet_nid_t *)key;
1053 struct nrs_tbf_client *cli = hlist_entry(hnode,
1054 struct nrs_tbf_client,
1057 return *nid == cli->tc_nid;
1060 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1062 struct nrs_tbf_client *cli = hlist_entry(hnode,
1063 struct nrs_tbf_client,
1066 return &cli->tc_nid;
1069 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1071 struct nrs_tbf_client *cli = hlist_entry(hnode,
1072 struct nrs_tbf_client,
1075 atomic_inc(&cli->tc_ref);
1078 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1080 struct nrs_tbf_client *cli = hlist_entry(hnode,
1081 struct nrs_tbf_client,
1084 atomic_dec(&cli->tc_ref);
1087 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1089 struct nrs_tbf_client *cli = hlist_entry(hnode,
1090 struct nrs_tbf_client,
1093 LASSERTF(atomic_read(&cli->tc_ref) == 0,
1094 "Busy TBF object from client with NID %s, with %d refs\n",
1095 libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1097 nrs_tbf_cli_fini(cli);
1100 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1101 .hs_hash = nrs_tbf_nid_hop_hash,
1102 .hs_keycmp = nrs_tbf_nid_hop_keycmp,
1103 .hs_key = nrs_tbf_nid_hop_key,
1104 .hs_object = nrs_tbf_hop_object,
1105 .hs_get = nrs_tbf_nid_hop_get,
1106 .hs_put = nrs_tbf_nid_hop_put,
1107 .hs_put_locked = nrs_tbf_nid_hop_put,
1108 .hs_exit = nrs_tbf_nid_hop_exit,
1111 static struct nrs_tbf_client *
1112 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1113 struct ptlrpc_request *req)
1115 return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1118 static struct nrs_tbf_client *
1119 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1120 struct nrs_tbf_client *cli)
1122 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1127 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1128 struct nrs_tbf_client *cli)
1130 cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1134 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1135 struct nrs_tbf_head *head)
1137 struct nrs_tbf_cmd start;
1140 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1143 NRS_TBF_NID_BKT_BITS, 0,
1146 &nrs_tbf_nid_hash_ops,
1147 CFS_HASH_RW_BKTLOCK);
1148 if (head->th_cli_hash == NULL)
1151 memset(&start, 0, sizeof(start));
1152 start.u.tc_start.ts_nids_str = "*";
1154 start.u.tc_start.ts_rpc_rate = tbf_rate;
1155 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1156 start.tc_name = NRS_TBF_DEFAULT_RULE;
1157 INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1158 rc = nrs_tbf_rule_start(policy, head, &start);
1160 cfs_hash_putref(head->th_cli_hash);
1161 head->th_cli_hash = NULL;
1168 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1169 struct ptlrpc_request *req)
1171 cli->tc_nid = req->rq_peer.nid;
1174 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1175 struct nrs_tbf_rule *rule,
1176 struct nrs_tbf_cmd *start)
1178 LASSERT(start->u.tc_start.ts_nids_str);
1179 OBD_ALLOC(rule->tr_nids_str,
1180 strlen(start->u.tc_start.ts_nids_str) + 1);
1181 if (rule->tr_nids_str == NULL)
1184 memcpy(rule->tr_nids_str,
1185 start->u.tc_start.ts_nids_str,
1186 strlen(start->u.tc_start.ts_nids_str));
1188 INIT_LIST_HEAD(&rule->tr_nids);
1189 if (!list_empty(&start->u.tc_start.ts_nids)) {
1190 if (cfs_parse_nidlist(rule->tr_nids_str,
1191 strlen(rule->tr_nids_str),
1192 &rule->tr_nids) <= 0) {
1193 CERROR("nids {%s} illegal\n",
1195 OBD_FREE(rule->tr_nids_str,
1196 strlen(start->u.tc_start.ts_nids_str) + 1);
1204 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1206 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1207 rule->tr_nids_str, rule->tr_rpc_rate,
1208 atomic_read(&rule->tr_ref) - 1);
1213 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1214 struct nrs_tbf_client *cli)
1216 return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1219 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1221 if (!list_empty(&rule->tr_nids))
1222 cfs_free_nidlist(&rule->tr_nids);
1223 LASSERT(rule->tr_nids_str != NULL);
1224 OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1227 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1229 if (!list_empty(&cmd->u.tc_start.ts_nids))
1230 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1231 if (cmd->u.tc_start.ts_nids_str)
1232 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1233 strlen(cmd->u.tc_start.ts_nids_str) + 1);
1236 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1238 struct cfs_lstr src;
1242 src.ls_len = strlen(id);
1243 rc = nrs_tbf_check_id_value(&src, "nid");
1247 OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1248 if (cmd->u.tc_start.ts_nids_str == NULL)
1251 memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1253 /* parse NID list */
1254 if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1255 strlen(cmd->u.tc_start.ts_nids_str),
1256 &cmd->u.tc_start.ts_nids) <= 0) {
1257 nrs_tbf_nid_cmd_fini(cmd);
1264 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1265 .o_name = NRS_TBF_TYPE_NID,
1266 .o_startup = nrs_tbf_nid_startup,
1267 .o_cli_find = nrs_tbf_nid_cli_find,
1268 .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1269 .o_cli_put = nrs_tbf_nid_cli_put,
1270 .o_cli_init = nrs_tbf_nid_cli_init,
1271 .o_rule_init = nrs_tbf_nid_rule_init,
1272 .o_rule_dump = nrs_tbf_nid_rule_dump,
1273 .o_rule_match = nrs_tbf_nid_rule_match,
1274 .o_rule_fini = nrs_tbf_nid_rule_fini,
1277 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1280 return cfs_hash_djb2_hash(key, strlen(key), mask);
1283 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1285 struct nrs_tbf_client *cli = hlist_entry(hnode,
1286 struct nrs_tbf_client,
1289 return (strcmp(cli->tc_key, key) == 0);
1292 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1294 struct nrs_tbf_client *cli = hlist_entry(hnode,
1295 struct nrs_tbf_client,
1300 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1302 struct nrs_tbf_client *cli = hlist_entry(hnode,
1303 struct nrs_tbf_client,
1306 atomic_inc(&cli->tc_ref);
1309 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1311 struct nrs_tbf_client *cli = hlist_entry(hnode,
1312 struct nrs_tbf_client,
1315 atomic_dec(&cli->tc_ref);
1318 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1321 struct nrs_tbf_client *cli = hlist_entry(hnode,
1322 struct nrs_tbf_client,
1325 LASSERT(atomic_read(&cli->tc_ref) == 0);
1326 nrs_tbf_cli_fini(cli);
1329 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1330 .hs_hash = nrs_tbf_hop_hash,
1331 .hs_keycmp = nrs_tbf_hop_keycmp,
1332 .hs_key = nrs_tbf_hop_key,
1333 .hs_object = nrs_tbf_hop_object,
1334 .hs_get = nrs_tbf_hop_get,
1335 .hs_put = nrs_tbf_hop_put,
1336 .hs_put_locked = nrs_tbf_hop_put,
1337 .hs_exit = nrs_tbf_hop_exit,
1340 #define NRS_TBF_GENERIC_BKT_BITS 10
1341 #define NRS_TBF_GENERIC_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
1342 CFS_HASH_NO_ITEMREF | \
1346 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1348 struct nrs_tbf_cmd start;
1349 struct nrs_tbf_bucket *bkt;
1353 struct cfs_hash_bd bd;
1355 bits = nrs_tbf_jobid_hash_order();
1356 if (bits < NRS_TBF_GENERIC_BKT_BITS)
1357 bits = NRS_TBF_GENERIC_BKT_BITS;
1358 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1360 NRS_TBF_GENERIC_BKT_BITS,
1363 NRS_TBF_GENERIC_HASH_FLAGS);
1364 if (head->th_cli_hash == NULL)
1367 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1368 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1369 INIT_LIST_HEAD(&bkt->ntb_lru);
1372 memset(&start, 0, sizeof(start));
1373 start.u.tc_start.ts_conds_str = "*";
1375 start.u.tc_start.ts_rpc_rate = tbf_rate;
1376 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1377 start.tc_name = NRS_TBF_DEFAULT_RULE;
1378 INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1379 rc = nrs_tbf_rule_start(policy, head, &start);
1381 cfs_hash_putref(head->th_cli_hash);
1386 static struct nrs_tbf_client *
1387 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1390 struct hlist_node *hnode;
1391 struct nrs_tbf_client *cli;
1393 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1397 cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1398 if (!list_empty(&cli->tc_lru))
1399 list_del_init(&cli->tc_lru);
1404 * ONLY opcode presented in this function will be checked in
1405 * nrs_tbf_id_cli_set(). That means, we can add or remove an
1406 * opcode to enable or disable requests handled in nrs_tbf
1408 static struct req_format *req_fmt(__u32 opcode)
1412 return &RQF_OST_GETATTR;
1414 return &RQF_OST_SETATTR;
1416 return &RQF_OST_BRW_READ;
1418 return &RQF_OST_BRW_WRITE;
1419 /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1420 * in most case. Should they be removed? */
1422 return &RQF_OST_CREATE;
1424 return &RQF_OST_DESTROY;
1426 return &RQF_OST_PUNCH;
1428 return &RQF_OST_SYNC;
1430 return &RQF_OST_LADVISE;
1432 return &RQF_MDS_GETATTR;
1433 case MDS_GETATTR_NAME:
1434 return &RQF_MDS_GETATTR_NAME;
1435 /* close is skipped to avoid LDLM cancel slowness */
1438 return &RQF_MDS_CLOSE;
1441 return &RQF_MDS_REINT;
1443 return &RQF_MDS_READPAGE;
1445 return &RQF_MDS_GET_ROOT;
1447 return &RQF_MDS_STATFS;
1449 return &RQF_MDS_SYNC;
1451 return &RQF_MDS_QUOTACTL;
1453 return &RQF_MDS_GETXATTR;
1455 return &RQF_MDS_GET_INFO;
1456 /* HSM op is skipped */
1458 case MDS_HSM_STATE_GET:
1459 return &RQF_MDS_HSM_STATE_GET;
1460 case MDS_HSM_STATE_SET:
1461 return &RQF_MDS_HSM_STATE_SET;
1462 case MDS_HSM_ACTION:
1463 return &RQF_MDS_HSM_ACTION;
1464 case MDS_HSM_CT_REGISTER:
1465 return &RQF_MDS_HSM_CT_REGISTER;
1466 case MDS_HSM_CT_UNREGISTER:
1467 return &RQF_MDS_HSM_CT_UNREGISTER;
1469 case MDS_SWAP_LAYOUTS:
1470 return &RQF_MDS_SWAP_LAYOUTS;
1472 return &RQF_LDLM_ENQUEUE;
1478 static struct req_format *intent_req_fmt(__u32 it_opc)
1480 if (it_opc & (IT_OPEN | IT_CREAT))
1481 return &RQF_LDLM_INTENT_OPEN;
1482 else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1483 return &RQF_LDLM_INTENT_GETATTR;
1484 else if (it_opc & IT_GETXATTR)
1485 return &RQF_LDLM_INTENT_GETXATTR;
1486 else if (it_opc & (IT_GLIMPSE | IT_BRW))
1487 return &RQF_LDLM_INTENT;
1492 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1495 struct ost_body *body;
1497 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1499 id->ti_uid = body->oa.o_uid;
1500 id->ti_gid = body->oa.o_gid;
1507 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1510 struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1514 /* TODO: nodemaping feature converts {ug}id from individual
1515 * clients to the actual ones of the file system. Some work
1516 * may be needed to fix this. */
1517 id->ti_uid = b->mbo_uid;
1518 id->ti_gid = b->mbo_gid;
1521 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1524 struct mdt_rec_reint *rec;
1526 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1527 LASSERT(rec != NULL);
1529 /* use the fs{ug}id as {ug}id of the process */
1530 id->ti_uid = rec->rr_fsuid;
1531 id->ti_gid = rec->rr_fsgid;
1534 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1537 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1542 case MDS_GETATTR_NAME:
1547 case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1548 unpack_ugid_from_mdt_body(req, id);
1552 unpack_ugid_from_mdt_rec_reint(req, id);
1561 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1564 struct ldlm_intent *lit;
1565 struct req_format *fmt;
1567 if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1570 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1571 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1575 fmt = intent_req_fmt(lit->opc);
1579 req_capsule_extend(&req->rq_pill, fmt);
1581 if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1582 unpack_ugid_from_mdt_body(req, id);
1583 else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1584 unpack_ugid_from_mdt_rec_reint(req, id);
1590 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1591 enum nrs_tbf_flag ti_type)
1593 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1594 struct req_format *fmt = req_fmt(opc);
1595 bool fmt_unset = false;
1598 memset(id, 0, sizeof(struct tbf_id));
1599 id->ti_type = ti_type;
1603 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1604 if (req->rq_pill.rc_fmt == NULL) {
1605 req_capsule_set(&req->rq_pill, fmt);
1609 if (opc < OST_LAST_OPC)
1610 rc = ost_tbf_id_cli_set(req, id);
1611 else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1612 rc = mdt_tbf_id_cli_set(req, id);
1613 else if (opc == LDLM_ENQUEUE)
1614 rc = ldlm_tbf_id_cli_set(req, id);
1618 /* restore it to the initialized state */
1620 req->rq_pill.rc_fmt = NULL;
1624 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1625 struct ptlrpc_request *req,
1626 char *keystr, size_t keystr_sz)
1629 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1632 nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1633 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1635 jobid = NRS_TBF_JOBID_NULL;
1637 snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1638 libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1642 INIT_LIST_HEAD(&cli->tc_lru);
1643 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1644 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1645 cli->tc_nid = req->rq_peer.nid;
1646 cli->tc_opcode = opc;
1651 static struct nrs_tbf_client *
1652 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1654 struct nrs_tbf_client *cli;
1655 struct cfs_hash *hs = head->th_cli_hash;
1656 struct cfs_hash_bd bd;
1657 char keystr[NRS_TBF_KEY_LEN];
1659 nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1660 cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1661 cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1662 cfs_hash_bd_unlock(hs, &bd, 1);
1667 static struct nrs_tbf_client *
1668 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1669 struct nrs_tbf_client *cli)
1672 struct nrs_tbf_client *ret;
1673 struct cfs_hash *hs = head->th_cli_hash;
1674 struct cfs_hash_bd bd;
1677 cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1678 ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1680 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1683 cfs_hash_bd_unlock(hs, &bd, 1);
1689 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1691 struct cfs_hash_bd bd;
1692 struct cfs_hash *hs = head->th_cli_hash;
1693 struct nrs_tbf_bucket *bkt;
1697 cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1698 bkt = cfs_hash_bd_extra_get(hs, &bd);
1699 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1701 LASSERT(list_empty(&cli->tc_lru));
1702 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1705 * Check and purge the LRU, there is at least one client in the LRU.
1707 hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1708 while (cfs_hash_bd_count_get(&bd) > hw) {
1709 if (unlikely(list_empty(&bkt->ntb_lru)))
1711 cli = list_entry(bkt->ntb_lru.next,
1712 struct nrs_tbf_client,
1714 LASSERT(atomic_read(&cli->tc_ref) == 0);
1715 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1716 list_move(&cli->tc_lru, &zombies);
1718 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1720 while (!list_empty(&zombies)) {
1721 cli = container_of(zombies.next,
1722 struct nrs_tbf_client, tc_lru);
1723 list_del_init(&cli->tc_lru);
1724 nrs_tbf_cli_fini(cli);
1729 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1730 struct ptlrpc_request *req)
1732 char keystr[NRS_TBF_KEY_LEN];
1734 nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1738 nrs_tbf_id_list_free(struct list_head *uid_list)
1740 struct nrs_tbf_id *nti_id, *n;
1742 list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1743 list_del_init(&nti_id->nti_linkage);
1744 OBD_FREE_PTR(nti_id);
1749 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1751 LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1752 expr->te_field < NRS_TBF_FIELD_MAX);
1753 switch (expr->te_field) {
1754 case NRS_TBF_FIELD_NID:
1755 cfs_free_nidlist(&expr->te_cond);
1757 case NRS_TBF_FIELD_JOBID:
1758 nrs_tbf_jobid_list_free(&expr->te_cond);
1760 case NRS_TBF_FIELD_OPCODE:
1761 CFS_FREE_BITMAP(expr->te_opcodes);
1763 case NRS_TBF_FIELD_UID:
1764 case NRS_TBF_FIELD_GID:
1765 nrs_tbf_id_list_free(&expr->te_cond);
1774 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1776 struct nrs_tbf_expression *expression;
1777 struct nrs_tbf_expression *n;
1779 LASSERT(list_empty(&conjunction->tc_linkage));
1780 list_for_each_entry_safe(expression, n,
1781 &conjunction->tc_expressions,
1783 list_del_init(&expression->te_linkage);
1784 nrs_tbf_expression_free(expression);
1786 OBD_FREE_PTR(conjunction);
1790 nrs_tbf_conds_free(struct list_head *cond_list)
1792 struct nrs_tbf_conjunction *conjunction;
1793 struct nrs_tbf_conjunction *n;
1795 list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1796 list_del_init(&conjunction->tc_linkage);
1797 nrs_tbf_conjunction_free(conjunction);
1802 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1804 if (!list_empty(&cmd->u.tc_start.ts_conds))
1805 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1806 if (cmd->u.tc_start.ts_conds_str)
1807 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1808 strlen(cmd->u.tc_start.ts_conds_str) + 1);
1811 #define NRS_TBF_DISJUNCTION_DELIM (',')
1812 #define NRS_TBF_CONJUNCTION_DELIM ('&')
1813 #define NRS_TBF_EXPRESSION_DELIM ('=')
1816 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1818 int len = strlen(str);
1820 return (field->ls_len == len &&
1821 strncmp(field->ls_str, str, len) == 0);
1825 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1827 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1828 enum nrs_tbf_flag tif);
1831 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1833 struct nrs_tbf_expression *expr;
1834 struct cfs_lstr field;
1837 OBD_ALLOC_PTR(expr);
1841 rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1842 if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1843 src->ls_str[src->ls_len - 1] != '}')
1844 GOTO(out, rc = -EINVAL);
1846 /* Skip '{' and '}' */
1850 if (nrs_tbf_check_field(&field, "nid")) {
1851 if (cfs_parse_nidlist(src->ls_str,
1853 &expr->te_cond) <= 0)
1854 GOTO(out, rc = -EINVAL);
1855 expr->te_field = NRS_TBF_FIELD_NID;
1856 } else if (nrs_tbf_check_field(&field, "jobid")) {
1857 if (nrs_tbf_jobid_list_parse(src->ls_str,
1859 &expr->te_cond) < 0)
1860 GOTO(out, rc = -EINVAL);
1861 expr->te_field = NRS_TBF_FIELD_JOBID;
1862 } else if (nrs_tbf_check_field(&field, "opcode")) {
1863 if (nrs_tbf_opcode_list_parse(src->ls_str,
1865 &expr->te_opcodes) < 0)
1866 GOTO(out, rc = -EINVAL);
1867 expr->te_field = NRS_TBF_FIELD_OPCODE;
1868 } else if (nrs_tbf_check_field(&field, "uid")) {
1869 if (nrs_tbf_id_list_parse(src->ls_str,
1872 NRS_TBF_FLAG_UID) < 0)
1873 GOTO(out, rc = -EINVAL);
1874 expr->te_field = NRS_TBF_FIELD_UID;
1875 } else if (nrs_tbf_check_field(&field, "gid")) {
1876 if (nrs_tbf_id_list_parse(src->ls_str,
1879 NRS_TBF_FLAG_GID) < 0)
1880 GOTO(out, rc = -EINVAL);
1881 expr->te_field = NRS_TBF_FIELD_GID;
1883 GOTO(out, rc = -EINVAL);
1886 list_add_tail(&expr->te_linkage, cond_list);
1894 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1896 struct nrs_tbf_conjunction *conjunction;
1897 struct cfs_lstr expr;
1900 OBD_ALLOC_PTR(conjunction);
1901 if (conjunction == NULL)
1904 INIT_LIST_HEAD(&conjunction->tc_expressions);
1905 list_add_tail(&conjunction->tc_linkage, cond_list);
1907 while (src->ls_str) {
1908 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1913 rc = nrs_tbf_expression_parse(&expr,
1914 &conjunction->tc_expressions);
1922 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1924 struct cfs_lstr src;
1925 struct cfs_lstr res;
1930 INIT_LIST_HEAD(cond_list);
1931 while (src.ls_str) {
1932 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1937 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1945 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1949 OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1950 if (cmd->u.tc_start.ts_conds_str == NULL)
1953 memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1955 /* Parse hybird NID and JOBID conditions */
1956 rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1957 strlen(cmd->u.tc_start.ts_conds_str),
1958 &cmd->u.tc_start.ts_conds);
1960 nrs_tbf_generic_cmd_fini(cmd);
1966 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1969 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1970 struct nrs_tbf_rule *rule,
1971 struct nrs_tbf_client *cli)
1973 switch (expr->te_field) {
1974 case NRS_TBF_FIELD_NID:
1975 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1976 case NRS_TBF_FIELD_JOBID:
1977 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1978 case NRS_TBF_FIELD_OPCODE:
1979 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1980 case NRS_TBF_FIELD_UID:
1981 case NRS_TBF_FIELD_GID:
1982 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1989 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1990 struct nrs_tbf_rule *rule,
1991 struct nrs_tbf_client *cli)
1993 struct nrs_tbf_expression *expr;
1996 list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1997 matched = nrs_tbf_expression_match(expr, rule, cli);
2006 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2008 struct nrs_tbf_conjunction *conjunction;
2011 list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2012 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2021 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2023 if (!list_empty(&rule->tr_conds))
2024 nrs_tbf_conds_free(&rule->tr_conds);
2025 LASSERT(rule->tr_conds_str != NULL);
2026 OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2030 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2031 struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2035 LASSERT(start->u.tc_start.ts_conds_str);
2036 OBD_ALLOC(rule->tr_conds_str,
2037 strlen(start->u.tc_start.ts_conds_str) + 1);
2038 if (rule->tr_conds_str == NULL)
2041 memcpy(rule->tr_conds_str,
2042 start->u.tc_start.ts_conds_str,
2043 strlen(start->u.tc_start.ts_conds_str));
2045 INIT_LIST_HEAD(&rule->tr_conds);
2046 if (!list_empty(&start->u.tc_start.ts_conds)) {
2047 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2048 strlen(rule->tr_conds_str),
2052 nrs_tbf_generic_rule_fini(rule);
2058 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2060 seq_printf(m, "%s %s %u, ref %d\n", rule->tr_name,
2061 rule->tr_conds_str, rule->tr_rpc_rate,
2062 atomic_read(&rule->tr_ref) - 1);
2067 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2068 struct nrs_tbf_client *cli)
2070 return nrs_tbf_cond_match(rule, cli);
2073 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2074 .o_name = NRS_TBF_TYPE_GENERIC,
2075 .o_startup = nrs_tbf_startup,
2076 .o_cli_find = nrs_tbf_cli_find,
2077 .o_cli_findadd = nrs_tbf_cli_findadd,
2078 .o_cli_put = nrs_tbf_cli_put,
2079 .o_cli_init = nrs_tbf_generic_cli_init,
2080 .o_rule_init = nrs_tbf_rule_init,
2081 .o_rule_dump = nrs_tbf_generic_rule_dump,
2082 .o_rule_match = nrs_tbf_generic_rule_match,
2083 .o_rule_fini = nrs_tbf_generic_rule_fini,
2086 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2088 if (rule->tr_opcodes != NULL)
2089 CFS_FREE_BITMAP(rule->tr_opcodes);
2091 LASSERT(rule->tr_opcodes_str != NULL);
2092 OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2095 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2098 return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2101 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2103 const __u32 *opc = key;
2104 struct nrs_tbf_client *cli = hlist_entry(hnode,
2105 struct nrs_tbf_client,
2108 return *opc == cli->tc_opcode;
2111 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2113 struct nrs_tbf_client *cli = hlist_entry(hnode,
2114 struct nrs_tbf_client,
2117 return &cli->tc_opcode;
2120 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2121 struct hlist_node *hnode)
2123 struct nrs_tbf_client *cli = hlist_entry(hnode,
2124 struct nrs_tbf_client,
2127 atomic_inc(&cli->tc_ref);
2130 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2131 struct hlist_node *hnode)
2133 struct nrs_tbf_client *cli = hlist_entry(hnode,
2134 struct nrs_tbf_client,
2137 atomic_dec(&cli->tc_ref);
2140 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2141 struct hlist_node *hnode)
2143 struct nrs_tbf_client *cli = hlist_entry(hnode,
2144 struct nrs_tbf_client,
2147 LASSERTF(atomic_read(&cli->tc_ref) == 0,
2148 "Busy TBF object from client with opcode %s, with %d refs\n",
2149 ll_opcode2str(cli->tc_opcode),
2150 atomic_read(&cli->tc_ref));
2152 nrs_tbf_cli_fini(cli);
2154 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2155 .hs_hash = nrs_tbf_opcode_hop_hash,
2156 .hs_keycmp = nrs_tbf_opcode_hop_keycmp,
2157 .hs_key = nrs_tbf_opcode_hop_key,
2158 .hs_object = nrs_tbf_hop_object,
2159 .hs_get = nrs_tbf_opcode_hop_get,
2160 .hs_put = nrs_tbf_opcode_hop_put,
2161 .hs_put_locked = nrs_tbf_opcode_hop_put,
2162 .hs_exit = nrs_tbf_opcode_hop_exit,
2166 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2167 struct nrs_tbf_head *head)
2169 struct nrs_tbf_cmd start = { 0 };
2172 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2175 NRS_TBF_NID_BKT_BITS, 0,
2178 &nrs_tbf_opcode_hash_ops,
2179 CFS_HASH_RW_BKTLOCK);
2180 if (head->th_cli_hash == NULL)
2183 start.u.tc_start.ts_opcodes_str = "*";
2185 start.u.tc_start.ts_rpc_rate = tbf_rate;
2186 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2187 start.tc_name = NRS_TBF_DEFAULT_RULE;
2188 rc = nrs_tbf_rule_start(policy, head, &start);
2193 static struct nrs_tbf_client *
2194 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2195 struct ptlrpc_request *req)
2199 opc = lustre_msg_get_opc(req->rq_reqmsg);
2200 return cfs_hash_lookup(head->th_cli_hash, &opc);
2203 static struct nrs_tbf_client *
2204 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2205 struct nrs_tbf_client *cli)
2207 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2212 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2213 struct ptlrpc_request *req)
2215 cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2218 #define MAX_OPCODE_LEN 32
2220 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2223 char opcode_str[MAX_OPCODE_LEN];
2225 if (id->ls_len + 1 > MAX_OPCODE_LEN)
2228 memcpy(opcode_str, id->ls_str, id->ls_len);
2229 opcode_str[id->ls_len] = '\0';
2231 op = ll_str2opcode(opcode_str);
2235 cfs_bitmap_set(opcodes, op);
2240 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2242 struct cfs_bitmap *opcodes;
2243 struct cfs_lstr src;
2244 struct cfs_lstr res;
2248 opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2249 if (opcodes == NULL)
2254 while (src.ls_str) {
2255 rc = cfs_gettok(&src, ' ', &res);
2260 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2265 if (rc == 0 && bitmaptr)
2266 *bitmaptr = opcodes;
2268 CFS_FREE_BITMAP(opcodes);
2273 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2275 if (cmd->u.tc_start.ts_opcodes_str)
2276 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2277 strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2281 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2283 struct cfs_lstr src;
2287 src.ls_len = strlen(id);
2288 rc = nrs_tbf_check_id_value(&src, "opcode");
2292 OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2293 if (cmd->u.tc_start.ts_opcodes_str == NULL)
2296 memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2298 /* parse opcode list */
2299 rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2300 strlen(cmd->u.tc_start.ts_opcodes_str),
2303 nrs_tbf_opcode_cmd_fini(cmd);
2309 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2310 struct nrs_tbf_client *cli)
2312 if (rule->tr_opcodes == NULL)
2315 return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2318 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2319 struct nrs_tbf_rule *rule,
2320 struct nrs_tbf_cmd *start)
2324 LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2325 OBD_ALLOC(rule->tr_opcodes_str,
2326 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2327 if (rule->tr_opcodes_str == NULL)
2330 strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2331 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2333 /* Default rule '*' */
2334 if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2337 rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2338 strlen(rule->tr_opcodes_str),
2341 OBD_FREE(rule->tr_opcodes_str,
2342 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2348 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2350 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2351 rule->tr_opcodes_str, rule->tr_rpc_rate,
2352 atomic_read(&rule->tr_ref) - 1);
2357 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2358 .o_name = NRS_TBF_TYPE_OPCODE,
2359 .o_startup = nrs_tbf_opcode_startup,
2360 .o_cli_find = nrs_tbf_opcode_cli_find,
2361 .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2362 .o_cli_put = nrs_tbf_nid_cli_put,
2363 .o_cli_init = nrs_tbf_opcode_cli_init,
2364 .o_rule_init = nrs_tbf_opcode_rule_init,
2365 .o_rule_dump = nrs_tbf_opcode_rule_dump,
2366 .o_rule_match = nrs_tbf_opcode_rule_match,
2367 .o_rule_fini = nrs_tbf_opcode_rule_fini,
2370 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2373 return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2376 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2378 const struct tbf_id *opc = key;
2379 enum nrs_tbf_flag ntf;
2380 struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2382 ntf = opc->ti_type & cli->tc_id.ti_type;
2383 if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2386 if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2392 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2394 struct nrs_tbf_client *cli = hlist_entry(hnode,
2395 struct nrs_tbf_client,
2400 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2402 struct nrs_tbf_client *cli = hlist_entry(hnode,
2403 struct nrs_tbf_client,
2406 atomic_inc(&cli->tc_ref);
2409 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2411 struct nrs_tbf_client *cli = hlist_entry(hnode,
2412 struct nrs_tbf_client,
2415 atomic_dec(&cli->tc_ref);
2419 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2422 struct nrs_tbf_client *cli = hlist_entry(hnode,
2423 struct nrs_tbf_client,
2426 LASSERT(atomic_read(&cli->tc_ref) == 0);
2427 nrs_tbf_cli_fini(cli);
2430 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2431 .hs_hash = nrs_tbf_id_hop_hash,
2432 .hs_keycmp = nrs_tbf_id_hop_keycmp,
2433 .hs_key = nrs_tbf_id_hop_key,
2434 .hs_object = nrs_tbf_hop_object,
2435 .hs_get = nrs_tbf_id_hop_get,
2436 .hs_put = nrs_tbf_id_hop_put,
2437 .hs_put_locked = nrs_tbf_id_hop_put,
2438 .hs_exit = nrs_tbf_id_hop_exit,
2442 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2443 struct nrs_tbf_head *head)
2445 struct nrs_tbf_cmd start;
2448 head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2451 NRS_TBF_NID_BKT_BITS, 0,
2454 &nrs_tbf_id_hash_ops,
2455 CFS_HASH_RW_BKTLOCK);
2456 if (head->th_cli_hash == NULL)
2459 memset(&start, 0, sizeof(start));
2460 start.u.tc_start.ts_ids_str = "*";
2461 start.u.tc_start.ts_rpc_rate = tbf_rate;
2462 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2463 start.tc_name = NRS_TBF_DEFAULT_RULE;
2464 INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2465 rc = nrs_tbf_rule_start(policy, head, &start);
2467 cfs_hash_putref(head->th_cli_hash);
2468 head->th_cli_hash = NULL;
2474 static struct nrs_tbf_client *
2475 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2476 struct ptlrpc_request *req)
2480 LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2481 head->th_type_flag == NRS_TBF_FLAG_GID);
2483 nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2484 return cfs_hash_lookup(head->th_cli_hash, &id);
2487 static struct nrs_tbf_client *
2488 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2489 struct nrs_tbf_client *cli)
2491 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2496 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2497 struct ptlrpc_request *req)
2499 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2503 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2504 struct ptlrpc_request *req)
2506 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2510 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2512 struct nrs_tbf_id *nti_id;
2513 enum nrs_tbf_flag flag;
2515 list_for_each_entry(nti_id, id_list, nti_linkage) {
2516 flag = id.ti_type & nti_id->nti_id.ti_type;
2520 if ((flag & NRS_TBF_FLAG_UID) &&
2521 (id.ti_uid != nti_id->nti_id.ti_uid))
2524 if ((flag & NRS_TBF_FLAG_GID) &&
2525 (id.ti_gid != nti_id->nti_id.ti_gid))
2534 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2535 struct nrs_tbf_client *cli)
2537 return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2540 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2542 nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2544 if (cmd->u.tc_start.ts_ids_str)
2545 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2546 strlen(cmd->u.tc_start.ts_ids_str) + 1);
2550 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2551 enum nrs_tbf_flag tif)
2553 struct cfs_lstr src;
2554 struct cfs_lstr res;
2556 struct tbf_id id = { 0 };
2559 if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2564 INIT_LIST_HEAD(id_list);
2565 while (src.ls_str) {
2566 struct nrs_tbf_id *nti_id;
2568 if (cfs_gettok(&src, ' ', &res) == 0)
2569 GOTO(out, rc = -EINVAL);
2572 if (tif == NRS_TBF_FLAG_UID) {
2573 if (!cfs_str2num_check(res.ls_str, res.ls_len,
2574 &id.ti_uid, 0, (u32)~0U))
2575 GOTO(out, rc = -EINVAL);
2577 if (!cfs_str2num_check(res.ls_str, res.ls_len,
2578 &id.ti_gid, 0, (u32)~0U))
2579 GOTO(out, rc = -EINVAL);
2582 OBD_ALLOC_PTR(nti_id);
2584 GOTO(out, rc = -ENOMEM);
2586 nti_id->nti_id = id;
2587 list_add_tail(&nti_id->nti_linkage, id_list);
2591 nrs_tbf_id_list_free(id_list);
2595 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2597 struct cfs_lstr src;
2599 enum nrs_tbf_flag tif;
2601 tif = cmd->u.tc_start.ts_valid_type;
2604 src.ls_len = strlen(id);
2606 rc = nrs_tbf_check_id_value(&src,
2607 tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2611 OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2612 if (cmd->u.tc_start.ts_ids_str == NULL)
2615 strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2617 rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2618 strlen(cmd->u.tc_start.ts_ids_str),
2619 &cmd->u.tc_start.ts_ids, tif);
2621 nrs_tbf_id_cmd_fini(cmd);
2627 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2628 struct nrs_tbf_rule *rule,
2629 struct nrs_tbf_cmd *start)
2631 struct nrs_tbf_head *head = rule->tr_head;
2633 enum nrs_tbf_flag tif = head->th_type_flag;
2634 int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2636 LASSERT(start->u.tc_start.ts_ids_str);
2637 INIT_LIST_HEAD(&rule->tr_ids);
2639 OBD_ALLOC(rule->tr_ids_str, ids_len);
2640 if (rule->tr_ids_str == NULL)
2643 strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2646 if (!list_empty(&start->u.tc_start.ts_ids)) {
2647 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2648 strlen(rule->tr_ids_str),
2649 &rule->tr_ids, tif);
2651 CERROR("%ss {%s} illegal\n",
2652 tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2656 OBD_FREE(rule->tr_ids_str, ids_len);
2657 rule->tr_ids_str = NULL;
2663 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2665 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2666 rule->tr_ids_str, rule->tr_rpc_rate,
2667 atomic_read(&rule->tr_ref) - 1);
2671 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2673 nrs_tbf_id_list_free(&rule->tr_ids);
2674 if (rule->tr_ids_str != NULL)
2675 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2678 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2679 .o_name = NRS_TBF_TYPE_UID,
2680 .o_startup = nrs_tbf_id_startup,
2681 .o_cli_find = nrs_tbf_id_cli_find,
2682 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2683 .o_cli_put = nrs_tbf_nid_cli_put,
2684 .o_cli_init = nrs_tbf_uid_cli_init,
2685 .o_rule_init = nrs_tbf_id_rule_init,
2686 .o_rule_dump = nrs_tbf_id_rule_dump,
2687 .o_rule_match = nrs_tbf_id_rule_match,
2688 .o_rule_fini = nrs_tbf_id_rule_fini,
2691 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2692 .o_name = NRS_TBF_TYPE_GID,
2693 .o_startup = nrs_tbf_id_startup,
2694 .o_cli_find = nrs_tbf_id_cli_find,
2695 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2696 .o_cli_put = nrs_tbf_nid_cli_put,
2697 .o_cli_init = nrs_tbf_gid_cli_init,
2698 .o_rule_init = nrs_tbf_id_rule_init,
2699 .o_rule_dump = nrs_tbf_id_rule_dump,
2700 .o_rule_match = nrs_tbf_id_rule_match,
2701 .o_rule_fini = nrs_tbf_id_rule_fini,
2704 static struct nrs_tbf_type nrs_tbf_types[] = {
2706 .ntt_name = NRS_TBF_TYPE_JOBID,
2707 .ntt_flag = NRS_TBF_FLAG_JOBID,
2708 .ntt_ops = &nrs_tbf_jobid_ops,
2711 .ntt_name = NRS_TBF_TYPE_NID,
2712 .ntt_flag = NRS_TBF_FLAG_NID,
2713 .ntt_ops = &nrs_tbf_nid_ops,
2716 .ntt_name = NRS_TBF_TYPE_OPCODE,
2717 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2718 .ntt_ops = &nrs_tbf_opcode_ops,
2721 .ntt_name = NRS_TBF_TYPE_GENERIC,
2722 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2723 .ntt_ops = &nrs_tbf_generic_ops,
2726 .ntt_name = NRS_TBF_TYPE_UID,
2727 .ntt_flag = NRS_TBF_FLAG_UID,
2728 .ntt_ops = &nrs_tbf_uid_ops,
2731 .ntt_name = NRS_TBF_TYPE_GID,
2732 .ntt_flag = NRS_TBF_FLAG_GID,
2733 .ntt_ops = &nrs_tbf_gid_ops,
2738 * Is called before the policy transitions into
2739 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2740 * policy-specific private data structure.
2742 * \param[in] policy The policy to start
2744 * \retval -ENOMEM OOM error
2747 * \see nrs_policy_register()
2748 * \see nrs_policy_ctl()
2750 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2752 struct nrs_tbf_head *head;
2753 struct nrs_tbf_ops *ops;
2761 name = NRS_TBF_TYPE_GENERIC;
2762 else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2765 GOTO(out, rc = -EINVAL);
2767 for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2768 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2769 ops = nrs_tbf_types[i].ntt_ops;
2770 type = nrs_tbf_types[i].ntt_flag;
2776 GOTO(out, rc = -ENOTSUPP);
2778 OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2780 GOTO(out, rc = -ENOMEM);
2782 memcpy(head->th_type, name, strlen(name));
2783 head->th_type[strlen(name)] = '\0';
2785 head->th_type_flag = type;
2787 head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2788 CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2789 nrs_pol2cptab(policy),
2790 nrs_pol2cptid(policy));
2791 if (head->th_binheap == NULL)
2792 GOTO(out_free_head, rc = -ENOMEM);
2794 atomic_set(&head->th_rule_sequence, 0);
2795 spin_lock_init(&head->th_rule_lock);
2796 INIT_LIST_HEAD(&head->th_list);
2797 hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2798 head->th_timer.function = nrs_tbf_timer_cb;
2799 rc = head->th_ops->o_startup(policy, head);
2801 GOTO(out_free_heap, rc);
2803 policy->pol_private = head;
2806 binheap_destroy(head->th_binheap);
2814 * Is called before the policy transitions into
2815 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2816 * private data structure.
2818 * \param[in] policy The policy to stop
2820 * \see nrs_policy_stop0()
2822 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2824 struct nrs_tbf_head *head = policy->pol_private;
2825 struct ptlrpc_nrs *nrs = policy->pol_nrs;
2826 struct nrs_tbf_rule *rule, *n;
2828 LASSERT(head != NULL);
2829 LASSERT(head->th_cli_hash != NULL);
2830 hrtimer_cancel(&head->th_timer);
2831 /* Should cleanup hash first before free rules */
2832 cfs_hash_putref(head->th_cli_hash);
2833 list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2834 list_del_init(&rule->tr_linkage);
2835 nrs_tbf_rule_put(rule);
2837 LASSERT(list_empty(&head->th_list));
2838 LASSERT(head->th_binheap != NULL);
2839 LASSERT(binheap_is_empty(head->th_binheap));
2840 binheap_destroy(head->th_binheap);
2842 nrs->nrs_throttling = 0;
2843 wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2847 * Performs a policy-specific ctl function on TBF policy instances; similar
2850 * \param[in] policy the policy instance
2851 * \param[in] opc the opcode
2852 * \param[in,out] arg used for passing parameters and information
2854 * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2855 * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2857 * \retval 0 operation carried out successfully
2860 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2861 enum ptlrpc_nrs_ctl opc,
2867 assert_spin_locked(&policy->pol_nrs->nrs_lock);
2869 switch ((enum nrs_ctl_tbf)opc) {
2874 * Read RPC rate size of a policy instance.
2876 case NRS_CTL_TBF_RD_RULE: {
2877 struct nrs_tbf_head *head = policy->pol_private;
2878 struct seq_file *m = arg;
2879 struct ptlrpc_service_part *svcpt;
2881 svcpt = policy->pol_nrs->nrs_svcpt;
2882 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2884 rc = nrs_tbf_rule_dump_all(head, m);
2889 * Write RPC rate of a policy instance.
2891 case NRS_CTL_TBF_WR_RULE: {
2892 struct nrs_tbf_head *head = policy->pol_private;
2893 struct nrs_tbf_cmd *cmd;
2895 cmd = (struct nrs_tbf_cmd *)arg;
2896 rc = nrs_tbf_command(policy,
2902 * Read the TBF policy type of a policy instance.
2904 case NRS_CTL_TBF_RD_TYPE_FLAG: {
2905 struct nrs_tbf_head *head = policy->pol_private;
2907 *(__u32 *)arg = head->th_type_flag;
2916 * Is called for obtaining a TBF policy resource.
2918 * \param[in] policy The policy on which the request is being asked for
2919 * \param[in] nrq The request for which resources are being taken
2920 * \param[in] parent Parent resource, unused in this policy
2921 * \param[out] resp Resources references are placed in this array
2922 * \param[in] moving_req Signifies limited caller context; unused in this
2926 * \see nrs_resource_get_safe()
2928 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2929 struct ptlrpc_nrs_request *nrq,
2930 const struct ptlrpc_nrs_resource *parent,
2931 struct ptlrpc_nrs_resource **resp,
2934 struct nrs_tbf_head *head;
2935 struct nrs_tbf_client *cli;
2936 struct nrs_tbf_client *tmp;
2937 struct ptlrpc_request *req;
2939 if (parent == NULL) {
2940 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2944 head = container_of(parent, struct nrs_tbf_head, th_res);
2945 req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2946 cli = head->th_ops->o_cli_find(head, req);
2948 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2949 LASSERT(cli->tc_rule);
2950 if (cli->tc_rule_sequence !=
2951 atomic_read(&head->th_rule_sequence) ||
2952 cli->tc_rule->tr_flags & NTRS_STOPPING) {
2953 struct nrs_tbf_rule *rule;
2956 "TBF class@%p rate %u sequence %d, "
2957 "rule flags %d, head sequence %d\n",
2958 cli, cli->tc_rpc_rate,
2959 cli->tc_rule_sequence,
2960 cli->tc_rule->tr_flags,
2961 atomic_read(&head->th_rule_sequence));
2962 rule = nrs_tbf_rule_match(head, cli);
2963 if (rule != cli->tc_rule) {
2964 nrs_tbf_cli_reset(head, rule, cli);
2966 if (cli->tc_rule_generation != rule->tr_generation)
2967 nrs_tbf_cli_reset_value(head, cli);
2968 nrs_tbf_rule_put(rule);
2970 } else if (cli->tc_rule_generation !=
2971 cli->tc_rule->tr_generation) {
2972 nrs_tbf_cli_reset_value(head, cli);
2974 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2978 OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2979 sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2983 nrs_tbf_cli_init(head, cli, req);
2984 tmp = head->th_ops->o_cli_findadd(head, cli);
2986 atomic_dec(&cli->tc_ref);
2987 nrs_tbf_cli_fini(cli);
2991 *resp = &cli->tc_res;
2997 * Called when releasing references to the resource hierachy obtained for a
2998 * request for scheduling using the TBF policy.
3000 * \param[in] policy the policy the resource belongs to
3001 * \param[in] res the resource to be released
3003 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3004 const struct ptlrpc_nrs_resource *res)
3006 struct nrs_tbf_head *head;
3007 struct nrs_tbf_client *cli;
3010 * Do nothing for freeing parent, nrs_tbf_net resources
3012 if (res->res_parent == NULL)
3015 cli = container_of(res, struct nrs_tbf_client, tc_res);
3016 head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3018 head->th_ops->o_cli_put(head, cli);
3022 * Called when getting a request from the TBF policy for handling, or just
3023 * peeking; removes the request from the policy when it is to be handled.
3025 * \param[in] policy The policy
3026 * \param[in] peek When set, signifies that we just want to examine the
3027 * request, and not handle it, so the request is not removed
3029 * \param[in] force Force the policy to return a request; unused in this
3032 * \retval The request to be handled; this is the next request in the TBF
3035 * \see ptlrpc_nrs_req_get_nolock()
3036 * \see nrs_request_get()
3039 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3040 bool peek, bool force)
3042 struct nrs_tbf_head *head = policy->pol_private;
3043 struct ptlrpc_nrs_request *nrq = NULL;
3044 struct nrs_tbf_client *cli;
3045 struct binheap_node *node;
3047 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3049 if (!peek && policy->pol_nrs->nrs_throttling)
3052 node = binheap_root(head->th_binheap);
3053 if (unlikely(node == NULL))
3056 cli = container_of(node, struct nrs_tbf_client, tc_node);
3057 LASSERT(cli->tc_in_heap);
3059 nrq = list_entry(cli->tc_list.next,
3060 struct ptlrpc_nrs_request,
3063 struct nrs_tbf_rule *rule = cli->tc_rule;
3064 __u64 now = ktime_to_ns(ktime_get());
3068 __u64 old_resid = 0;
3070 deadline = cli->tc_check_time +
3072 LASSERT(now >= cli->tc_check_time);
3073 passed = now - cli->tc_check_time;
3074 ntoken = passed * cli->tc_rpc_rate;
3075 do_div(ntoken, NSEC_PER_SEC);
3077 ntoken += cli->tc_ntoken;
3078 if (rule->tr_flags & NTRS_REALTIME) {
3079 LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3080 old_resid = cli->tc_nsecs_resid;
3081 cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3082 if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3084 cli->tc_nsecs_resid -= cli->tc_nsecs;
3086 } else if (ntoken > cli->tc_depth)
3087 ntoken = cli->tc_depth;
3090 struct ptlrpc_request *req;
3091 nrq = list_entry(cli->tc_list.next,
3092 struct ptlrpc_nrs_request,
3094 req = container_of(nrq,
3095 struct ptlrpc_request,
3098 cli->tc_ntoken = ntoken;
3099 cli->tc_check_time = now;
3100 list_del_init(&nrq->nr_u.tbf.tr_list);
3101 if (list_empty(&cli->tc_list)) {
3102 binheap_remove(head->th_binheap,
3104 cli->tc_in_heap = false;
3106 if (!(rule->tr_flags & NTRS_REALTIME))
3107 cli->tc_deadline = now + cli->tc_nsecs;
3108 binheap_relocate(head->th_binheap,
3112 "TBF dequeues: class@%p rate %u gen %llu "
3113 "token %llu, rule@%p rate %u gen %llu\n",
3114 cli, cli->tc_rpc_rate,
3115 cli->tc_rule_generation, cli->tc_ntoken,
3116 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3117 cli->tc_rule->tr_generation);
3121 if (rule->tr_flags & NTRS_REALTIME) {
3122 cli->tc_deadline = deadline;
3123 cli->tc_nsecs_resid = old_resid;
3124 binheap_relocate(head->th_binheap,
3126 if (node != binheap_root(head->th_binheap))
3127 return nrs_tbf_req_get(policy,
3130 policy->pol_nrs->nrs_throttling = 1;
3131 head->th_deadline = deadline;
3132 time = ktime_set(0, 0);
3133 time = ktime_add_ns(time, deadline);
3134 hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3142 * Adds request \a nrq to \a policy's list of queued requests
3144 * \param[in] policy The policy
3145 * \param[in] nrq The request to add
3147 * \retval 0 success; nrs_request_enqueue() assumes this function will always
3150 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3151 struct ptlrpc_nrs_request *nrq)
3153 struct nrs_tbf_head *head;
3154 struct nrs_tbf_client *cli;
3157 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3159 cli = container_of(nrs_request_resource(nrq),
3160 struct nrs_tbf_client, tc_res);
3161 head = container_of(nrs_request_resource(nrq)->res_parent,
3162 struct nrs_tbf_head, th_res);
3163 if (list_empty(&cli->tc_list)) {
3164 LASSERT(!cli->tc_in_heap);
3165 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3166 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3168 cli->tc_in_heap = true;
3169 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3170 list_add_tail(&nrq->nr_u.tbf.tr_list,
3172 if (policy->pol_nrs->nrs_throttling) {
3173 __u64 deadline = cli->tc_deadline;
3174 if ((head->th_deadline > deadline) &&
3175 (hrtimer_try_to_cancel(&head->th_timer)
3178 head->th_deadline = deadline;
3179 time = ktime_set(0, 0);
3180 time = ktime_add_ns(time, deadline);
3181 hrtimer_start(&head->th_timer, time,
3187 LASSERT(cli->tc_in_heap);
3188 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3189 list_add_tail(&nrq->nr_u.tbf.tr_list,
3195 "TBF enqueues: class@%p rate %u gen %llu "
3196 "token %llu, rule@%p rate %u gen %llu\n",
3197 cli, cli->tc_rpc_rate,
3198 cli->tc_rule_generation, cli->tc_ntoken,
3199 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3200 cli->tc_rule->tr_generation);
3206 * Removes request \a nrq from \a policy's list of queued requests.
3208 * \param[in] policy The policy
3209 * \param[in] nrq The request to remove
3211 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3212 struct ptlrpc_nrs_request *nrq)
3214 struct nrs_tbf_head *head;
3215 struct nrs_tbf_client *cli;
3217 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3219 cli = container_of(nrs_request_resource(nrq),
3220 struct nrs_tbf_client, tc_res);
3221 head = container_of(nrs_request_resource(nrq)->res_parent,
3222 struct nrs_tbf_head, th_res);
3224 LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3225 list_del_init(&nrq->nr_u.tbf.tr_list);
3226 if (list_empty(&cli->tc_list)) {
3227 binheap_remove(head->th_binheap,
3229 cli->tc_in_heap = false;
3231 binheap_relocate(head->th_binheap,
3237 * Prints a debug statement right before the request \a nrq stops being
3240 * \param[in] policy The policy handling the request
3241 * \param[in] nrq The request being handled
3243 * \see ptlrpc_server_finish_request()
3244 * \see ptlrpc_nrs_req_stop_nolock()
3246 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3247 struct ptlrpc_nrs_request *nrq)
3249 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3252 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3254 CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3255 policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3256 nrq->nr_u.tbf.tr_sequence);
3264 * The maximum RPC rate.
3266 #define LPROCFS_NRS_RATE_MAX 65535
3269 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3271 struct ptlrpc_service *svc = m->private;
3274 seq_printf(m, "regular_requests:\n");
3276 * Perform two separate calls to this as only one of the NRS heads'
3277 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3278 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3280 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3282 NRS_CTL_TBF_RD_RULE,
3286 * -ENOSPC means buf in the parameter m is overflow, return 0
3287 * here to let upper layer function seq_read alloc a larger
3288 * memory area and do this process again.
3290 } else if (rc == -ENOSPC) {
3294 * Ignore -ENODEV as the regular NRS head's policy may be in the
3295 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3297 } else if (rc != -ENODEV) {
3301 if (!nrs_svc_has_hp(svc))
3304 seq_printf(m, "high_priority_requests:\n");
3305 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3307 NRS_CTL_TBF_RD_RULE,
3311 * -ENOSPC means buf in the parameter m is overflow, return 0
3312 * here to let upper layer function seq_read alloc a larger
3313 * memory area and do this process again.
3315 } else if (rc == -ENOSPC) {
3324 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3329 switch (cmd->u.tc_start.ts_valid_type) {
3330 case NRS_TBF_FLAG_JOBID:
3331 rc = nrs_tbf_jobid_parse(cmd, token);
3333 case NRS_TBF_FLAG_NID:
3334 rc = nrs_tbf_nid_parse(cmd, token);
3336 case NRS_TBF_FLAG_OPCODE:
3337 rc = nrs_tbf_opcode_parse(cmd, token);
3339 case NRS_TBF_FLAG_GENERIC:
3340 rc = nrs_tbf_generic_parse(cmd, token);
3342 case NRS_TBF_FLAG_UID:
3343 case NRS_TBF_FLAG_GID:
3344 rc = nrs_tbf_ug_id_parse(cmd, token);
3353 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3355 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3356 switch (cmd->u.tc_start.ts_valid_type) {
3357 case NRS_TBF_FLAG_JOBID:
3358 nrs_tbf_jobid_cmd_fini(cmd);
3360 case NRS_TBF_FLAG_NID:
3361 nrs_tbf_nid_cmd_fini(cmd);
3363 case NRS_TBF_FLAG_OPCODE:
3364 nrs_tbf_opcode_cmd_fini(cmd);
3366 case NRS_TBF_FLAG_GENERIC:
3367 nrs_tbf_generic_cmd_fini(cmd);
3369 case NRS_TBF_FLAG_UID:
3370 case NRS_TBF_FLAG_GID:
3371 nrs_tbf_id_cmd_fini(cmd);
3374 CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3375 cmd->u.tc_start.ts_valid_type);
3380 static bool name_is_valid(const char *name)
3384 for (i = 0; i < strlen(name); i++) {
3385 if ((!isalnum(name[i])) &&
3393 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3401 key = strsep(&val, "=");
3402 if (val == NULL || strlen(val) == 0)
3405 /* Key of the value pair */
3406 if (strcmp(key, "rate") == 0) {
3407 rc = kstrtoull(val, 10, &rate);
3411 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3414 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3415 cmd->u.tc_start.ts_rpc_rate = rate;
3416 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3417 cmd->u.tc_change.tc_rpc_rate = rate;
3420 } else if (strcmp(key, "rank") == 0) {
3421 if (!name_is_valid(val))
3424 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3425 cmd->u.tc_start.ts_next_name = val;
3426 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3427 cmd->u.tc_change.tc_next_name = val;
3430 } else if (strcmp(key, "realtime") == 0) {
3431 unsigned long realtime;
3433 rc = kstrtoul(val, 10, &realtime);
3438 cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3446 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3453 while (val != NULL && strlen(val) != 0) {
3454 token = strsep(&val, " ");
3455 rc = nrs_tbf_parse_value_pair(cmd, token);
3460 switch (cmd->tc_cmd) {
3461 case NRS_CTL_TBF_START_RULE:
3462 if (cmd->u.tc_start.ts_rpc_rate == 0)
3463 cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3465 case NRS_CTL_TBF_CHANGE_RULE:
3466 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3467 cmd->u.tc_change.tc_next_name == NULL)
3470 case NRS_CTL_TBF_STOP_RULE:
3478 static struct nrs_tbf_cmd *
3479 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3481 static struct nrs_tbf_cmd *cmd;
3488 GOTO(out, rc = -ENOMEM);
3489 memset(cmd, 0, sizeof(*cmd));
3492 token = strsep(&val, " ");
3493 if (val == NULL || strlen(val) == 0)
3494 GOTO(out_free_cmd, rc = -EINVAL);
3496 /* Type of the command */
3497 if (strcmp(token, "start") == 0) {
3498 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3499 cmd->u.tc_start.ts_valid_type = type_flag;
3500 } else if (strcmp(token, "stop") == 0)
3501 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3502 else if (strcmp(token, "change") == 0)
3503 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3505 GOTO(out_free_cmd, rc = -EINVAL);
3507 /* Name of the rule */
3508 token = strsep(&val, " ");
3509 if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3510 !name_is_valid(token))
3511 GOTO(out_free_cmd, rc = -EINVAL);
3512 cmd->tc_name = token;
3514 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3518 val = strrchr(token, '}');
3520 GOTO(out_free_cmd, rc = -EINVAL);
3526 } else if (*val == ' ') {
3530 GOTO(out_free_cmd, rc = -EINVAL);
3532 rc = nrs_tbf_id_parse(cmd, token);
3534 GOTO(out_free_cmd, rc);
3537 rc = nrs_tbf_parse_value_pairs(cmd, val);
3539 GOTO(out_cmd_fini, rc = -EINVAL);
3542 nrs_tbf_cmd_fini(cmd);
3552 * Get the TBF policy type (nid, jobid, etc) preset by
3553 * proc entry 'nrs_policies' for command buffer parsing.
3555 * \param[in] svc the PTLRPC service
3556 * \param[in] queue the NRS queue type
3558 * \retval the preset TBF policy type flag
3561 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3566 rc = ptlrpc_nrs_policy_control(svc, queue,
3568 NRS_CTL_TBF_RD_TYPE_FLAG,
3571 type = NRS_TBF_FLAG_INVALID;
3576 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3578 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3579 const char __user *buffer,
3580 size_t count, loff_t *off)
3582 struct seq_file *m = file->private_data;
3583 struct ptlrpc_service *svc = m->private;
3587 static struct nrs_tbf_cmd *cmd;
3588 enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3589 unsigned long length;
3592 OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3593 if (kernbuf == NULL)
3594 GOTO(out, rc = -ENOMEM);
3596 if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3597 GOTO(out_free_kernbuff, rc = -EINVAL);
3599 if (copy_from_user(kernbuf, buffer, count))
3600 GOTO(out_free_kernbuff, rc = -EFAULT);
3603 token = strsep(&val, " ");
3605 GOTO(out_free_kernbuff, rc = -EINVAL);
3607 if (strcmp(token, "reg") == 0) {
3608 queue = PTLRPC_NRS_QUEUE_REG;
3609 } else if (strcmp(token, "hp") == 0) {
3610 queue = PTLRPC_NRS_QUEUE_HP;
3612 kernbuf[strlen(token)] = ' ';
3615 length = strlen(val);
3618 GOTO(out_free_kernbuff, rc = -EINVAL);
3620 if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3621 GOTO(out_free_kernbuff, rc = -ENODEV);
3622 else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3623 queue = PTLRPC_NRS_QUEUE_REG;
3625 cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3627 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3630 * Serialize NRS core lprocfs operations with policy registration/
3633 mutex_lock(&nrs_core.nrs_mutex);
3634 rc = ptlrpc_nrs_policy_control(svc, queue,
3636 NRS_CTL_TBF_WR_RULE,
3638 mutex_unlock(&nrs_core.nrs_mutex);
3640 nrs_tbf_cmd_fini(cmd);
3643 OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3645 return rc ? rc : count;
3648 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3651 * Initializes a TBF policy's lprocfs interface for service \a svc
3653 * \param[in] svc the service
3656 * \retval != 0 error
3658 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3660 struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3661 { .name = "nrs_tbf_rule",
3662 .fops = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3667 if (!svc->srv_debugfs_entry)
3670 ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3676 * TBF policy operations
3678 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3679 .op_policy_start = nrs_tbf_start,
3680 .op_policy_stop = nrs_tbf_stop,
3681 .op_policy_ctl = nrs_tbf_ctl,
3682 .op_res_get = nrs_tbf_res_get,
3683 .op_res_put = nrs_tbf_res_put,
3684 .op_req_get = nrs_tbf_req_get,
3685 .op_req_enqueue = nrs_tbf_req_add,
3686 .op_req_dequeue = nrs_tbf_req_del,
3687 .op_req_stop = nrs_tbf_req_stop,
3688 .op_lprocfs_init = nrs_tbf_lprocfs_init,
3692 * TBF policy configuration
3694 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3695 .nc_name = NRS_POL_NAME_TBF,
3696 .nc_ops = &nrs_tbf_ops,
3697 .nc_compat = nrs_policy_compat_all,