4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (C) 2013 DataDirect Networks, Inc.
25 * Copyright (c) 2014, 2016, Intel Corporation.
28 * lustre/ptlrpc/nrs_tbf.c
30 * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
34 #ifdef HAVE_SERVER_SUPPORT
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include <lustre_req_layout.h>
46 #include "ptlrpc_internal.h"
51 * Token Bucket Filter over client NIDs
56 #define NRS_POL_NAME_TBF "tbf"
58 static int tbf_jobid_cache_size = 8192;
59 module_param(tbf_jobid_cache_size, int, 0644);
60 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
62 static int tbf_rate = 10000;
63 module_param(tbf_rate, int, 0644);
64 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
66 static int tbf_depth = 3;
67 module_param(tbf_depth, int, 0644);
68 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
70 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
72 struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
74 struct ptlrpc_nrs *nrs = head->th_res.res_policy->pol_nrs;
75 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
77 nrs->nrs_throttling = 0;
78 wake_up(&svcpt->scp_waitq);
80 return HRTIMER_NORESTART;
83 #define NRS_TBF_DEFAULT_RULE "default"
85 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
87 LASSERT(atomic_read(&rule->tr_ref) == 0);
88 LASSERT(list_empty(&rule->tr_cli_list));
89 LASSERT(list_empty(&rule->tr_linkage));
91 rule->tr_head->th_ops->o_rule_fini(rule);
96 * Decreases the rule's usage reference count, and stops the rule in case it
97 * was already stopping and have no more outstanding usage references (which
98 * indicates it has no more queued or started requests, and can be safely
101 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
103 if (atomic_dec_and_test(&rule->tr_ref))
104 nrs_tbf_rule_fini(rule);
108 * Increases the rule's usage reference count.
110 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
112 atomic_inc(&rule->tr_ref);
116 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
118 LASSERT(!list_empty(&cli->tc_linkage));
119 LASSERT(cli->tc_rule);
120 spin_lock(&cli->tc_rule->tr_rule_lock);
121 list_del_init(&cli->tc_linkage);
122 spin_unlock(&cli->tc_rule->tr_rule_lock);
123 nrs_tbf_rule_put(cli->tc_rule);
128 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
129 struct nrs_tbf_client *cli)
132 struct nrs_tbf_rule *rule = cli->tc_rule;
134 cli->tc_rpc_rate = rule->tr_rpc_rate;
135 cli->tc_nsecs = rule->tr_nsecs_per_rpc;
136 cli->tc_depth = rule->tr_depth;
137 cli->tc_ntoken = rule->tr_depth;
138 cli->tc_check_time = ktime_to_ns(ktime_get());
139 cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
140 cli->tc_rule_generation = rule->tr_generation;
143 cfs_binheap_relocate(head->th_binheap,
148 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
149 struct nrs_tbf_rule *rule,
150 struct nrs_tbf_client *cli)
152 spin_lock(&cli->tc_rule_lock);
153 if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
154 LASSERT(rule != cli->tc_rule);
155 nrs_tbf_cli_rule_put(cli);
157 LASSERT(cli->tc_rule == NULL);
158 LASSERT(list_empty(&cli->tc_linkage));
159 /* Rule's ref is added before called */
161 spin_lock(&rule->tr_rule_lock);
162 list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
163 spin_unlock(&rule->tr_rule_lock);
164 spin_unlock(&cli->tc_rule_lock);
165 nrs_tbf_cli_reset_value(head, cli);
169 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
171 return rule->tr_head->th_ops->o_rule_dump(rule, m);
175 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
177 struct nrs_tbf_rule *rule;
180 LASSERT(head != NULL);
181 spin_lock(&head->th_rule_lock);
182 /* List the rules from newest to oldest */
183 list_for_each_entry(rule, &head->th_list, tr_linkage) {
184 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
185 rc = nrs_tbf_rule_dump(rule, m);
191 spin_unlock(&head->th_rule_lock);
196 static struct nrs_tbf_rule *
197 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
200 struct nrs_tbf_rule *rule;
202 LASSERT(head != NULL);
203 list_for_each_entry(rule, &head->th_list, tr_linkage) {
204 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
205 if (strcmp(rule->tr_name, name) == 0) {
206 nrs_tbf_rule_get(rule);
213 static struct nrs_tbf_rule *
214 nrs_tbf_rule_find(struct nrs_tbf_head *head,
217 struct nrs_tbf_rule *rule;
219 LASSERT(head != NULL);
220 spin_lock(&head->th_rule_lock);
221 rule = nrs_tbf_rule_find_nolock(head, name);
222 spin_unlock(&head->th_rule_lock);
226 static struct nrs_tbf_rule *
227 nrs_tbf_rule_match(struct nrs_tbf_head *head,
228 struct nrs_tbf_client *cli)
230 struct nrs_tbf_rule *rule = NULL;
231 struct nrs_tbf_rule *tmp_rule;
233 spin_lock(&head->th_rule_lock);
234 /* Match the newest rule in the list */
235 list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
236 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
237 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
244 rule = head->th_rule;
246 nrs_tbf_rule_get(rule);
247 spin_unlock(&head->th_rule_lock);
252 nrs_tbf_cli_init(struct nrs_tbf_head *head,
253 struct nrs_tbf_client *cli,
254 struct ptlrpc_request *req)
256 struct nrs_tbf_rule *rule;
258 memset(cli, 0, sizeof(*cli));
259 cli->tc_in_heap = false;
260 head->th_ops->o_cli_init(cli, req);
261 INIT_LIST_HEAD(&cli->tc_list);
262 INIT_LIST_HEAD(&cli->tc_linkage);
263 spin_lock_init(&cli->tc_rule_lock);
264 atomic_set(&cli->tc_ref, 1);
265 rule = nrs_tbf_rule_match(head, cli);
266 nrs_tbf_cli_reset(head, rule, cli);
270 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
272 LASSERT(list_empty(&cli->tc_list));
273 LASSERT(!cli->tc_in_heap);
274 LASSERT(atomic_read(&cli->tc_ref) == 0);
275 spin_lock(&cli->tc_rule_lock);
276 nrs_tbf_cli_rule_put(cli);
277 spin_unlock(&cli->tc_rule_lock);
282 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
283 struct nrs_tbf_head *head,
284 struct nrs_tbf_cmd *start)
286 struct nrs_tbf_rule *rule;
287 struct nrs_tbf_rule *tmp_rule;
288 struct nrs_tbf_rule *next_rule;
289 char *next_name = start->u.tc_start.ts_next_name;
292 rule = nrs_tbf_rule_find(head, start->tc_name);
294 nrs_tbf_rule_put(rule);
298 OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
302 memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
303 rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
304 rule->tr_flags = start->u.tc_start.ts_rule_flags;
305 rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
306 rule->tr_depth = tbf_depth;
307 atomic_set(&rule->tr_ref, 1);
308 INIT_LIST_HEAD(&rule->tr_cli_list);
309 INIT_LIST_HEAD(&rule->tr_nids);
310 INIT_LIST_HEAD(&rule->tr_linkage);
311 spin_lock_init(&rule->tr_rule_lock);
312 rule->tr_head = head;
314 rc = head->th_ops->o_rule_init(policy, rule, start);
320 /* Add as the newest rule */
321 spin_lock(&head->th_rule_lock);
322 tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
324 spin_unlock(&head->th_rule_lock);
325 nrs_tbf_rule_put(tmp_rule);
326 nrs_tbf_rule_put(rule);
331 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
333 spin_unlock(&head->th_rule_lock);
334 nrs_tbf_rule_put(rule);
338 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
339 nrs_tbf_rule_put(next_rule);
341 /* Add on the top of the rule list */
342 list_add(&rule->tr_linkage, &head->th_list);
344 spin_unlock(&head->th_rule_lock);
345 atomic_inc(&head->th_rule_sequence);
346 if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
347 rule->tr_flags |= NTRS_DEFAULT;
348 LASSERT(head->th_rule == NULL);
349 head->th_rule = rule;
352 CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %u gen %llu\n",
353 rule, rule->tr_rpc_rate, rule->tr_generation);
359 * Change the rank of a rule in the rule list
361 * The matched rule will be moved to the position right before another
364 * \param[in] policy the policy instance
365 * \param[in] head the TBF policy instance
366 * \param[in] name the rule name to be moved
367 * \param[in] next_name the rule name before which the matched rule will be
372 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
373 struct nrs_tbf_head *head,
377 struct nrs_tbf_rule *rule = NULL;
378 struct nrs_tbf_rule *next_rule = NULL;
381 LASSERT(head != NULL);
383 spin_lock(&head->th_rule_lock);
384 rule = nrs_tbf_rule_find_nolock(head, name);
386 GOTO(out, rc = -ENOENT);
388 if (strcmp(name, next_name) == 0)
391 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
393 GOTO(out_put, rc = -ENOENT);
395 list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
396 nrs_tbf_rule_put(next_rule);
398 nrs_tbf_rule_put(rule);
400 spin_unlock(&head->th_rule_lock);
405 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
406 struct nrs_tbf_head *head,
410 struct nrs_tbf_rule *rule;
412 assert_spin_locked(&policy->pol_nrs->nrs_lock);
414 rule = nrs_tbf_rule_find(head, name);
418 rule->tr_rpc_rate = rate;
419 rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
420 rule->tr_generation++;
421 nrs_tbf_rule_put(rule);
427 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
428 struct nrs_tbf_head *head,
429 struct nrs_tbf_cmd *change)
431 __u64 rate = change->u.tc_change.tc_rpc_rate;
432 char *next_name = change->u.tc_change.tc_next_name;
436 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
443 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
453 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
454 struct nrs_tbf_head *head,
455 struct nrs_tbf_cmd *stop)
457 struct nrs_tbf_rule *rule;
459 assert_spin_locked(&policy->pol_nrs->nrs_lock);
461 if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
464 rule = nrs_tbf_rule_find(head, stop->tc_name);
468 list_del_init(&rule->tr_linkage);
469 rule->tr_flags |= NTRS_STOPPING;
470 nrs_tbf_rule_put(rule);
471 nrs_tbf_rule_put(rule);
477 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
478 struct nrs_tbf_head *head,
479 struct nrs_tbf_cmd *cmd)
483 assert_spin_locked(&policy->pol_nrs->nrs_lock);
485 switch (cmd->tc_cmd) {
486 case NRS_CTL_TBF_START_RULE:
487 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
490 spin_unlock(&policy->pol_nrs->nrs_lock);
491 rc = nrs_tbf_rule_start(policy, head, cmd);
492 spin_lock(&policy->pol_nrs->nrs_lock);
494 case NRS_CTL_TBF_CHANGE_RULE:
495 rc = nrs_tbf_rule_change(policy, head, cmd);
497 case NRS_CTL_TBF_STOP_RULE:
498 rc = nrs_tbf_rule_stop(policy, head, cmd);
499 /* Take it as a success, if not exists at all */
500 return rc == -ENOENT ? 0 : rc;
507 * Binary heap predicate.
509 * \param[in] e1 the first binheap node to compare
510 * \param[in] e2 the second binheap node to compare
516 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
518 struct nrs_tbf_client *cli1;
519 struct nrs_tbf_client *cli2;
521 cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
522 cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
524 if (cli1->tc_deadline < cli2->tc_deadline)
526 else if (cli1->tc_deadline > cli2->tc_deadline)
529 if (cli1->tc_check_time < cli2->tc_check_time)
531 else if (cli1->tc_check_time > cli2->tc_check_time)
534 /* Maybe need more comparasion, e.g. request number in the rules */
539 * TBF binary heap operations
541 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
544 .hop_compare = tbf_cli_compare,
547 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
550 return cfs_hash_djb2_hash(key, strlen(key), mask);
553 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
555 struct nrs_tbf_client *cli = hlist_entry(hnode,
556 struct nrs_tbf_client,
559 return (strcmp(cli->tc_jobid, key) == 0);
562 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
564 struct nrs_tbf_client *cli = hlist_entry(hnode,
565 struct nrs_tbf_client,
568 return cli->tc_jobid;
571 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
573 return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
576 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
578 struct nrs_tbf_client *cli = hlist_entry(hnode,
579 struct nrs_tbf_client,
582 atomic_inc(&cli->tc_ref);
585 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
587 struct nrs_tbf_client *cli = hlist_entry(hnode,
588 struct nrs_tbf_client,
591 atomic_dec(&cli->tc_ref);
595 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
598 struct nrs_tbf_client *cli = hlist_entry(hnode,
599 struct nrs_tbf_client,
602 LASSERT(atomic_read(&cli->tc_ref) == 0);
603 nrs_tbf_cli_fini(cli);
606 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
607 .hs_hash = nrs_tbf_jobid_hop_hash,
608 .hs_keycmp = nrs_tbf_jobid_hop_keycmp,
609 .hs_key = nrs_tbf_jobid_hop_key,
610 .hs_object = nrs_tbf_hop_object,
611 .hs_get = nrs_tbf_jobid_hop_get,
612 .hs_put = nrs_tbf_jobid_hop_put,
613 .hs_put_locked = nrs_tbf_jobid_hop_put,
614 .hs_exit = nrs_tbf_jobid_hop_exit,
617 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
618 CFS_HASH_NO_ITEMREF | \
621 static struct nrs_tbf_client *
622 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
623 struct cfs_hash_bd *bd,
626 struct hlist_node *hnode;
627 struct nrs_tbf_client *cli;
629 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
633 cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
634 if (!list_empty(&cli->tc_lru))
635 list_del_init(&cli->tc_lru);
639 #define NRS_TBF_JOBID_NULL ""
641 static struct nrs_tbf_client *
642 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
643 struct ptlrpc_request *req)
646 struct nrs_tbf_client *cli;
647 struct cfs_hash *hs = head->th_cli_hash;
648 struct cfs_hash_bd bd;
650 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
652 jobid = NRS_TBF_JOBID_NULL;
653 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
654 cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
655 cfs_hash_bd_unlock(hs, &bd, 1);
660 static struct nrs_tbf_client *
661 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
662 struct nrs_tbf_client *cli)
665 struct nrs_tbf_client *ret;
666 struct cfs_hash *hs = head->th_cli_hash;
667 struct cfs_hash_bd bd;
669 jobid = cli->tc_jobid;
670 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
671 ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
673 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
676 cfs_hash_bd_unlock(hs, &bd, 1);
682 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
683 struct nrs_tbf_client *cli)
685 struct cfs_hash_bd bd;
686 struct cfs_hash *hs = head->th_cli_hash;
687 struct nrs_tbf_bucket *bkt;
691 cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
692 bkt = cfs_hash_bd_extra_get(hs, &bd);
693 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
695 LASSERT(list_empty(&cli->tc_lru));
696 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
699 * Check and purge the LRU, there is at least one client in the LRU.
701 hw = tbf_jobid_cache_size >>
702 (hs->hs_cur_bits - hs->hs_bkt_bits);
703 while (cfs_hash_bd_count_get(&bd) > hw) {
704 if (unlikely(list_empty(&bkt->ntb_lru)))
706 cli = list_entry(bkt->ntb_lru.next,
707 struct nrs_tbf_client,
709 LASSERT(atomic_read(&cli->tc_ref) == 0);
710 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
711 list_move(&cli->tc_lru, &zombies);
713 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
715 while (!list_empty(&zombies)) {
716 cli = container_of(zombies.next,
717 struct nrs_tbf_client, tc_lru);
718 list_del_init(&cli->tc_lru);
719 nrs_tbf_cli_fini(cli);
724 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
725 struct ptlrpc_request *req)
727 char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
730 jobid = NRS_TBF_JOBID_NULL;
731 LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
732 INIT_LIST_HEAD(&cli->tc_lru);
733 memcpy(cli->tc_jobid, jobid, strlen(jobid));
736 static int nrs_tbf_jobid_hash_order(void)
740 for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
746 #define NRS_TBF_JOBID_BKT_BITS 10
749 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
750 struct nrs_tbf_head *head)
752 struct nrs_tbf_cmd start;
753 struct nrs_tbf_bucket *bkt;
757 struct cfs_hash_bd bd;
759 bits = nrs_tbf_jobid_hash_order();
760 if (bits < NRS_TBF_JOBID_BKT_BITS)
761 bits = NRS_TBF_JOBID_BKT_BITS;
762 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
765 NRS_TBF_JOBID_BKT_BITS,
769 &nrs_tbf_jobid_hash_ops,
770 NRS_TBF_JOBID_HASH_FLAGS);
771 if (head->th_cli_hash == NULL)
774 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
775 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
776 INIT_LIST_HEAD(&bkt->ntb_lru);
779 memset(&start, 0, sizeof(start));
780 start.u.tc_start.ts_jobids_str = "*";
782 start.u.tc_start.ts_rpc_rate = tbf_rate;
783 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
784 start.tc_name = NRS_TBF_DEFAULT_RULE;
785 INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
786 rc = nrs_tbf_rule_start(policy, head, &start);
788 cfs_hash_putref(head->th_cli_hash);
789 head->th_cli_hash = NULL;
796 * Frees jobid of \a list.
800 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
802 struct nrs_tbf_jobid *jobid, *n;
804 list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
805 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
806 list_del(&jobid->tj_linkage);
812 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
814 struct nrs_tbf_jobid *jobid;
817 OBD_ALLOC_PTR(jobid);
821 OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
822 if (jobid->tj_id == NULL) {
827 memcpy(jobid->tj_id, id->ls_str, id->ls_len);
828 ptr = lprocfs_strnstr(id->ls_str, "*", id->ls_len);
830 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
832 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
834 list_add_tail(&jobid->tj_linkage, jobid_list);
839 cfs_match_wildcard(const char *pattern, const char *content)
841 if (*pattern == '\0' && *content == '\0')
844 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
847 while (*pattern == *content) {
850 if (*pattern == '\0' && *content == '\0')
853 if (*pattern == '*' && *(pattern + 1) != '\0' &&
859 return (cfs_match_wildcard(pattern + 1, content) ||
860 cfs_match_wildcard(pattern, content + 1));
866 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
868 if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
869 return strcmp(jobid->tj_id, id) == 0;
871 if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
872 return cfs_match_wildcard(jobid->tj_id, id);
878 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
880 struct nrs_tbf_jobid *jobid;
882 list_for_each_entry(jobid, jobid_list, tj_linkage) {
883 if (nrs_tbf_jobid_match(jobid, id))
890 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
899 INIT_LIST_HEAD(jobid_list);
901 rc = cfs_gettok(&src, ' ', &res);
906 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
911 nrs_tbf_jobid_list_free(jobid_list);
915 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
917 if (!list_empty(&cmd->u.tc_start.ts_jobids))
918 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
919 if (cmd->u.tc_start.ts_jobids_str)
920 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
921 strlen(cmd->u.tc_start.ts_jobids_str) + 1);
924 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
927 int keylen = strlen(key);
930 rc = cfs_gettok(src, '=', &res);
931 if (rc == 0 || res.ls_len != keylen ||
932 strncmp(res.ls_str, key, keylen) != 0 ||
933 src->ls_len <= 2 || src->ls_str[0] != '{' ||
934 src->ls_str[src->ls_len - 1] != '}')
937 /* Skip '{' and '}' */
943 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
949 src.ls_len = strlen(id);
950 rc = nrs_tbf_check_id_value(&src, "jobid");
954 OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
955 if (cmd->u.tc_start.ts_jobids_str == NULL)
958 memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
960 /* parse jobid list */
961 rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
962 strlen(cmd->u.tc_start.ts_jobids_str),
963 &cmd->u.tc_start.ts_jobids);
965 nrs_tbf_jobid_cmd_fini(cmd);
970 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
971 struct nrs_tbf_rule *rule,
972 struct nrs_tbf_cmd *start)
976 LASSERT(start->u.tc_start.ts_jobids_str);
977 OBD_ALLOC(rule->tr_jobids_str,
978 strlen(start->u.tc_start.ts_jobids_str) + 1);
979 if (rule->tr_jobids_str == NULL)
982 memcpy(rule->tr_jobids_str,
983 start->u.tc_start.ts_jobids_str,
984 strlen(start->u.tc_start.ts_jobids_str));
986 INIT_LIST_HEAD(&rule->tr_jobids);
987 if (!list_empty(&start->u.tc_start.ts_jobids)) {
988 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
989 strlen(rule->tr_jobids_str),
992 CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
995 OBD_FREE(rule->tr_jobids_str,
996 strlen(start->u.tc_start.ts_jobids_str) + 1);
1001 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1003 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1004 rule->tr_jobids_str, rule->tr_rpc_rate,
1005 atomic_read(&rule->tr_ref) - 1);
1010 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1011 struct nrs_tbf_client *cli)
1013 return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1016 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1018 if (!list_empty(&rule->tr_jobids))
1019 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1020 LASSERT(rule->tr_jobids_str != NULL);
1021 OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1024 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1025 .o_name = NRS_TBF_TYPE_JOBID,
1026 .o_startup = nrs_tbf_jobid_startup,
1027 .o_cli_find = nrs_tbf_jobid_cli_find,
1028 .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1029 .o_cli_put = nrs_tbf_jobid_cli_put,
1030 .o_cli_init = nrs_tbf_jobid_cli_init,
1031 .o_rule_init = nrs_tbf_jobid_rule_init,
1032 .o_rule_dump = nrs_tbf_jobid_rule_dump,
1033 .o_rule_match = nrs_tbf_jobid_rule_match,
1034 .o_rule_fini = nrs_tbf_jobid_rule_fini,
1038 * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1040 * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1041 * nrs_tbf_client objects.
1043 #define NRS_TBF_NID_BKT_BITS 8
1044 #define NRS_TBF_NID_BITS 16
1046 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1049 return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1052 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1054 lnet_nid_t *nid = (lnet_nid_t *)key;
1055 struct nrs_tbf_client *cli = hlist_entry(hnode,
1056 struct nrs_tbf_client,
1059 return *nid == cli->tc_nid;
1062 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1064 struct nrs_tbf_client *cli = hlist_entry(hnode,
1065 struct nrs_tbf_client,
1068 return &cli->tc_nid;
1071 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1073 struct nrs_tbf_client *cli = hlist_entry(hnode,
1074 struct nrs_tbf_client,
1077 atomic_inc(&cli->tc_ref);
1080 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1082 struct nrs_tbf_client *cli = hlist_entry(hnode,
1083 struct nrs_tbf_client,
1086 atomic_dec(&cli->tc_ref);
1089 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1091 struct nrs_tbf_client *cli = hlist_entry(hnode,
1092 struct nrs_tbf_client,
1095 LASSERTF(atomic_read(&cli->tc_ref) == 0,
1096 "Busy TBF object from client with NID %s, with %d refs\n",
1097 libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1099 nrs_tbf_cli_fini(cli);
1102 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1103 .hs_hash = nrs_tbf_nid_hop_hash,
1104 .hs_keycmp = nrs_tbf_nid_hop_keycmp,
1105 .hs_key = nrs_tbf_nid_hop_key,
1106 .hs_object = nrs_tbf_hop_object,
1107 .hs_get = nrs_tbf_nid_hop_get,
1108 .hs_put = nrs_tbf_nid_hop_put,
1109 .hs_put_locked = nrs_tbf_nid_hop_put,
1110 .hs_exit = nrs_tbf_nid_hop_exit,
1113 static struct nrs_tbf_client *
1114 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1115 struct ptlrpc_request *req)
1117 return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1120 static struct nrs_tbf_client *
1121 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1122 struct nrs_tbf_client *cli)
1124 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1129 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1130 struct nrs_tbf_client *cli)
1132 cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1136 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1137 struct nrs_tbf_head *head)
1139 struct nrs_tbf_cmd start;
1142 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1145 NRS_TBF_NID_BKT_BITS, 0,
1148 &nrs_tbf_nid_hash_ops,
1149 CFS_HASH_RW_BKTLOCK);
1150 if (head->th_cli_hash == NULL)
1153 memset(&start, 0, sizeof(start));
1154 start.u.tc_start.ts_nids_str = "*";
1156 start.u.tc_start.ts_rpc_rate = tbf_rate;
1157 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1158 start.tc_name = NRS_TBF_DEFAULT_RULE;
1159 INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1160 rc = nrs_tbf_rule_start(policy, head, &start);
1162 cfs_hash_putref(head->th_cli_hash);
1163 head->th_cli_hash = NULL;
1170 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1171 struct ptlrpc_request *req)
1173 cli->tc_nid = req->rq_peer.nid;
1176 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1177 struct nrs_tbf_rule *rule,
1178 struct nrs_tbf_cmd *start)
1180 LASSERT(start->u.tc_start.ts_nids_str);
1181 OBD_ALLOC(rule->tr_nids_str,
1182 strlen(start->u.tc_start.ts_nids_str) + 1);
1183 if (rule->tr_nids_str == NULL)
1186 memcpy(rule->tr_nids_str,
1187 start->u.tc_start.ts_nids_str,
1188 strlen(start->u.tc_start.ts_nids_str));
1190 INIT_LIST_HEAD(&rule->tr_nids);
1191 if (!list_empty(&start->u.tc_start.ts_nids)) {
1192 if (cfs_parse_nidlist(rule->tr_nids_str,
1193 strlen(rule->tr_nids_str),
1194 &rule->tr_nids) <= 0) {
1195 CERROR("nids {%s} illegal\n",
1197 OBD_FREE(rule->tr_nids_str,
1198 strlen(start->u.tc_start.ts_nids_str) + 1);
1206 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1208 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
1209 rule->tr_nids_str, rule->tr_rpc_rate,
1210 atomic_read(&rule->tr_ref) - 1);
1215 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1216 struct nrs_tbf_client *cli)
1218 return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1221 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1223 if (!list_empty(&rule->tr_nids))
1224 cfs_free_nidlist(&rule->tr_nids);
1225 LASSERT(rule->tr_nids_str != NULL);
1226 OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1229 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1231 if (!list_empty(&cmd->u.tc_start.ts_nids))
1232 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1233 if (cmd->u.tc_start.ts_nids_str)
1234 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1235 strlen(cmd->u.tc_start.ts_nids_str) + 1);
1238 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1240 struct cfs_lstr src;
1244 src.ls_len = strlen(id);
1245 rc = nrs_tbf_check_id_value(&src, "nid");
1249 OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1250 if (cmd->u.tc_start.ts_nids_str == NULL)
1253 memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1255 /* parse NID list */
1256 if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1257 strlen(cmd->u.tc_start.ts_nids_str),
1258 &cmd->u.tc_start.ts_nids) <= 0) {
1259 nrs_tbf_nid_cmd_fini(cmd);
1266 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1267 .o_name = NRS_TBF_TYPE_NID,
1268 .o_startup = nrs_tbf_nid_startup,
1269 .o_cli_find = nrs_tbf_nid_cli_find,
1270 .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1271 .o_cli_put = nrs_tbf_nid_cli_put,
1272 .o_cli_init = nrs_tbf_nid_cli_init,
1273 .o_rule_init = nrs_tbf_nid_rule_init,
1274 .o_rule_dump = nrs_tbf_nid_rule_dump,
1275 .o_rule_match = nrs_tbf_nid_rule_match,
1276 .o_rule_fini = nrs_tbf_nid_rule_fini,
1279 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1282 return cfs_hash_djb2_hash(key, strlen(key), mask);
1285 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1287 struct nrs_tbf_client *cli = hlist_entry(hnode,
1288 struct nrs_tbf_client,
1291 return (strcmp(cli->tc_key, key) == 0);
1294 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1296 struct nrs_tbf_client *cli = hlist_entry(hnode,
1297 struct nrs_tbf_client,
1302 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1304 struct nrs_tbf_client *cli = hlist_entry(hnode,
1305 struct nrs_tbf_client,
1308 atomic_inc(&cli->tc_ref);
1311 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1313 struct nrs_tbf_client *cli = hlist_entry(hnode,
1314 struct nrs_tbf_client,
1317 atomic_dec(&cli->tc_ref);
1320 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1323 struct nrs_tbf_client *cli = hlist_entry(hnode,
1324 struct nrs_tbf_client,
1327 LASSERT(atomic_read(&cli->tc_ref) == 0);
1328 nrs_tbf_cli_fini(cli);
1331 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1332 .hs_hash = nrs_tbf_hop_hash,
1333 .hs_keycmp = nrs_tbf_hop_keycmp,
1334 .hs_key = nrs_tbf_hop_key,
1335 .hs_object = nrs_tbf_hop_object,
1336 .hs_get = nrs_tbf_hop_get,
1337 .hs_put = nrs_tbf_hop_put,
1338 .hs_put_locked = nrs_tbf_hop_put,
1339 .hs_exit = nrs_tbf_hop_exit,
1342 #define NRS_TBF_GENERIC_BKT_BITS 10
1343 #define NRS_TBF_GENERIC_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
1344 CFS_HASH_NO_ITEMREF | \
1348 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1350 struct nrs_tbf_cmd start;
1351 struct nrs_tbf_bucket *bkt;
1355 struct cfs_hash_bd bd;
1357 bits = nrs_tbf_jobid_hash_order();
1358 if (bits < NRS_TBF_GENERIC_BKT_BITS)
1359 bits = NRS_TBF_GENERIC_BKT_BITS;
1360 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1362 NRS_TBF_GENERIC_BKT_BITS,
1365 NRS_TBF_GENERIC_HASH_FLAGS);
1366 if (head->th_cli_hash == NULL)
1369 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1370 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1371 INIT_LIST_HEAD(&bkt->ntb_lru);
1374 memset(&start, 0, sizeof(start));
1375 start.u.tc_start.ts_conds_str = "*";
1377 start.u.tc_start.ts_rpc_rate = tbf_rate;
1378 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1379 start.tc_name = NRS_TBF_DEFAULT_RULE;
1380 INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1381 rc = nrs_tbf_rule_start(policy, head, &start);
1383 cfs_hash_putref(head->th_cli_hash);
1388 static struct nrs_tbf_client *
1389 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1392 struct hlist_node *hnode;
1393 struct nrs_tbf_client *cli;
1395 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1399 cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1400 if (!list_empty(&cli->tc_lru))
1401 list_del_init(&cli->tc_lru);
1406 * ONLY opcode presented in this function will be checked in
1407 * nrs_tbf_id_cli_set(). That means, we can add or remove an
1408 * opcode to enable or disable requests handled in nrs_tbf
1410 static struct req_format *req_fmt(__u32 opcode)
1414 return &RQF_OST_GETATTR;
1416 return &RQF_OST_SETATTR;
1418 return &RQF_OST_BRW_READ;
1420 return &RQF_OST_BRW_WRITE;
1421 /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1422 * in most case. Should they be removed? */
1424 return &RQF_OST_CREATE;
1426 return &RQF_OST_DESTROY;
1428 return &RQF_OST_PUNCH;
1430 return &RQF_OST_SYNC;
1432 return &RQF_OST_LADVISE;
1434 return &RQF_MDS_GETATTR;
1435 case MDS_GETATTR_NAME:
1436 return &RQF_MDS_GETATTR_NAME;
1437 /* close is skipped to avoid LDLM cancel slowness */
1440 return &RQF_MDS_CLOSE;
1443 return &RQF_MDS_REINT;
1445 return &RQF_MDS_READPAGE;
1447 return &RQF_MDS_GET_ROOT;
1449 return &RQF_MDS_STATFS;
1451 return &RQF_MDS_SYNC;
1453 return &RQF_MDS_QUOTACTL;
1455 return &RQF_MDS_GETXATTR;
1457 return &RQF_MDS_GET_INFO;
1458 /* HSM op is skipped */
1460 case MDS_HSM_STATE_GET:
1461 return &RQF_MDS_HSM_STATE_GET;
1462 case MDS_HSM_STATE_SET:
1463 return &RQF_MDS_HSM_STATE_SET;
1464 case MDS_HSM_ACTION:
1465 return &RQF_MDS_HSM_ACTION;
1466 case MDS_HSM_CT_REGISTER:
1467 return &RQF_MDS_HSM_CT_REGISTER;
1468 case MDS_HSM_CT_UNREGISTER:
1469 return &RQF_MDS_HSM_CT_UNREGISTER;
1471 case MDS_SWAP_LAYOUTS:
1472 return &RQF_MDS_SWAP_LAYOUTS;
1474 return &RQF_LDLM_ENQUEUE;
1480 static struct req_format *intent_req_fmt(__u32 it_opc)
1482 if (it_opc & (IT_OPEN | IT_CREAT))
1483 return &RQF_LDLM_INTENT_OPEN;
1484 else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1485 return &RQF_LDLM_INTENT_GETATTR;
1486 else if (it_opc & IT_GETXATTR)
1487 return &RQF_LDLM_INTENT_GETXATTR;
1488 else if (it_opc & (IT_GLIMPSE | IT_BRW))
1489 return &RQF_LDLM_INTENT;
1494 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1497 struct ost_body *body;
1499 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1501 id->ti_uid = body->oa.o_uid;
1502 id->ti_gid = body->oa.o_gid;
1509 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1512 struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1516 /* TODO: nodemaping feature converts {ug}id from individual
1517 * clients to the actual ones of the file system. Some work
1518 * may be needed to fix this. */
1519 id->ti_uid = b->mbo_uid;
1520 id->ti_gid = b->mbo_gid;
1523 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1526 struct mdt_rec_reint *rec;
1528 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1529 LASSERT(rec != NULL);
1531 /* use the fs{ug}id as {ug}id of the process */
1532 id->ti_uid = rec->rr_fsuid;
1533 id->ti_gid = rec->rr_fsgid;
1536 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1539 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1544 case MDS_GETATTR_NAME:
1549 case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1550 unpack_ugid_from_mdt_body(req, id);
1554 unpack_ugid_from_mdt_rec_reint(req, id);
1563 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1566 struct ldlm_intent *lit;
1567 struct req_format *fmt;
1569 if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1572 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1573 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1577 fmt = intent_req_fmt(lit->opc);
1581 req_capsule_extend(&req->rq_pill, fmt);
1583 if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1584 unpack_ugid_from_mdt_body(req, id);
1585 else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1586 unpack_ugid_from_mdt_rec_reint(req, id);
1592 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1593 enum nrs_tbf_flag ti_type)
1595 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1596 struct req_format *fmt = req_fmt(opc);
1597 bool fmt_unset = false;
1600 memset(id, 0, sizeof(struct tbf_id));
1601 id->ti_type = ti_type;
1605 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1606 if (req->rq_pill.rc_fmt == NULL) {
1607 req_capsule_set(&req->rq_pill, fmt);
1611 if (opc < OST_LAST_OPC)
1612 rc = ost_tbf_id_cli_set(req, id);
1613 else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1614 rc = mdt_tbf_id_cli_set(req, id);
1615 else if (opc == LDLM_ENQUEUE)
1616 rc = ldlm_tbf_id_cli_set(req, id);
1620 /* restore it to the initialized state */
1622 req->rq_pill.rc_fmt = NULL;
1626 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1627 struct ptlrpc_request *req,
1628 char *keystr, size_t keystr_sz)
1631 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1634 nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1635 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1637 jobid = NRS_TBF_JOBID_NULL;
1639 snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1640 libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1644 INIT_LIST_HEAD(&cli->tc_lru);
1645 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1646 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1647 cli->tc_nid = req->rq_peer.nid;
1648 cli->tc_opcode = opc;
1653 static struct nrs_tbf_client *
1654 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1656 struct nrs_tbf_client *cli;
1657 struct cfs_hash *hs = head->th_cli_hash;
1658 struct cfs_hash_bd bd;
1659 char keystr[NRS_TBF_KEY_LEN];
1661 nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1662 cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1663 cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1664 cfs_hash_bd_unlock(hs, &bd, 1);
1669 static struct nrs_tbf_client *
1670 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1671 struct nrs_tbf_client *cli)
1674 struct nrs_tbf_client *ret;
1675 struct cfs_hash *hs = head->th_cli_hash;
1676 struct cfs_hash_bd bd;
1679 cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1680 ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1682 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1685 cfs_hash_bd_unlock(hs, &bd, 1);
1691 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1693 struct cfs_hash_bd bd;
1694 struct cfs_hash *hs = head->th_cli_hash;
1695 struct nrs_tbf_bucket *bkt;
1699 cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1700 bkt = cfs_hash_bd_extra_get(hs, &bd);
1701 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1703 LASSERT(list_empty(&cli->tc_lru));
1704 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1707 * Check and purge the LRU, there is at least one client in the LRU.
1709 hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1710 while (cfs_hash_bd_count_get(&bd) > hw) {
1711 if (unlikely(list_empty(&bkt->ntb_lru)))
1713 cli = list_entry(bkt->ntb_lru.next,
1714 struct nrs_tbf_client,
1716 LASSERT(atomic_read(&cli->tc_ref) == 0);
1717 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1718 list_move(&cli->tc_lru, &zombies);
1720 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1722 while (!list_empty(&zombies)) {
1723 cli = container_of(zombies.next,
1724 struct nrs_tbf_client, tc_lru);
1725 list_del_init(&cli->tc_lru);
1726 nrs_tbf_cli_fini(cli);
1731 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1732 struct ptlrpc_request *req)
1734 char keystr[NRS_TBF_KEY_LEN];
1736 nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1740 nrs_tbf_id_list_free(struct list_head *uid_list)
1742 struct nrs_tbf_id *nti_id, *n;
1744 list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1745 list_del_init(&nti_id->nti_linkage);
1746 OBD_FREE_PTR(nti_id);
1751 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1753 LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1754 expr->te_field < NRS_TBF_FIELD_MAX);
1755 switch (expr->te_field) {
1756 case NRS_TBF_FIELD_NID:
1757 cfs_free_nidlist(&expr->te_cond);
1759 case NRS_TBF_FIELD_JOBID:
1760 nrs_tbf_jobid_list_free(&expr->te_cond);
1762 case NRS_TBF_FIELD_OPCODE:
1763 CFS_FREE_BITMAP(expr->te_opcodes);
1765 case NRS_TBF_FIELD_UID:
1766 case NRS_TBF_FIELD_GID:
1767 nrs_tbf_id_list_free(&expr->te_cond);
1776 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1778 struct nrs_tbf_expression *expression;
1779 struct nrs_tbf_expression *n;
1781 LASSERT(list_empty(&conjunction->tc_linkage));
1782 list_for_each_entry_safe(expression, n,
1783 &conjunction->tc_expressions,
1785 list_del_init(&expression->te_linkage);
1786 nrs_tbf_expression_free(expression);
1788 OBD_FREE_PTR(conjunction);
1792 nrs_tbf_conds_free(struct list_head *cond_list)
1794 struct nrs_tbf_conjunction *conjunction;
1795 struct nrs_tbf_conjunction *n;
1797 list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1798 list_del_init(&conjunction->tc_linkage);
1799 nrs_tbf_conjunction_free(conjunction);
1804 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1806 if (!list_empty(&cmd->u.tc_start.ts_conds))
1807 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1808 if (cmd->u.tc_start.ts_conds_str)
1809 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1810 strlen(cmd->u.tc_start.ts_conds_str) + 1);
1813 #define NRS_TBF_DISJUNCTION_DELIM (',')
1814 #define NRS_TBF_CONJUNCTION_DELIM ('&')
1815 #define NRS_TBF_EXPRESSION_DELIM ('=')
1818 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1820 int len = strlen(str);
1822 return (field->ls_len == len &&
1823 strncmp(field->ls_str, str, len) == 0);
1827 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1829 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1830 enum nrs_tbf_flag tif);
1833 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1835 struct nrs_tbf_expression *expr;
1836 struct cfs_lstr field;
1839 OBD_ALLOC_PTR(expr);
1843 rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1844 if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1845 src->ls_str[src->ls_len - 1] != '}')
1846 GOTO(out, rc = -EINVAL);
1848 /* Skip '{' and '}' */
1852 if (nrs_tbf_check_field(&field, "nid")) {
1853 if (cfs_parse_nidlist(src->ls_str,
1855 &expr->te_cond) <= 0)
1856 GOTO(out, rc = -EINVAL);
1857 expr->te_field = NRS_TBF_FIELD_NID;
1858 } else if (nrs_tbf_check_field(&field, "jobid")) {
1859 if (nrs_tbf_jobid_list_parse(src->ls_str,
1861 &expr->te_cond) < 0)
1862 GOTO(out, rc = -EINVAL);
1863 expr->te_field = NRS_TBF_FIELD_JOBID;
1864 } else if (nrs_tbf_check_field(&field, "opcode")) {
1865 if (nrs_tbf_opcode_list_parse(src->ls_str,
1867 &expr->te_opcodes) < 0)
1868 GOTO(out, rc = -EINVAL);
1869 expr->te_field = NRS_TBF_FIELD_OPCODE;
1870 } else if (nrs_tbf_check_field(&field, "uid")) {
1871 if (nrs_tbf_id_list_parse(src->ls_str,
1874 NRS_TBF_FLAG_UID) < 0)
1875 GOTO(out, rc = -EINVAL);
1876 expr->te_field = NRS_TBF_FIELD_UID;
1877 } else if (nrs_tbf_check_field(&field, "gid")) {
1878 if (nrs_tbf_id_list_parse(src->ls_str,
1881 NRS_TBF_FLAG_GID) < 0)
1882 GOTO(out, rc = -EINVAL);
1883 expr->te_field = NRS_TBF_FIELD_GID;
1885 GOTO(out, rc = -EINVAL);
1888 list_add_tail(&expr->te_linkage, cond_list);
1896 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1898 struct nrs_tbf_conjunction *conjunction;
1899 struct cfs_lstr expr;
1902 OBD_ALLOC_PTR(conjunction);
1903 if (conjunction == NULL)
1906 INIT_LIST_HEAD(&conjunction->tc_expressions);
1907 list_add_tail(&conjunction->tc_linkage, cond_list);
1909 while (src->ls_str) {
1910 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1915 rc = nrs_tbf_expression_parse(&expr,
1916 &conjunction->tc_expressions);
1924 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1926 struct cfs_lstr src;
1927 struct cfs_lstr res;
1932 INIT_LIST_HEAD(cond_list);
1933 while (src.ls_str) {
1934 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1939 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1947 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1951 OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1952 if (cmd->u.tc_start.ts_conds_str == NULL)
1955 memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1957 /* Parse hybird NID and JOBID conditions */
1958 rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1959 strlen(cmd->u.tc_start.ts_conds_str),
1960 &cmd->u.tc_start.ts_conds);
1962 nrs_tbf_generic_cmd_fini(cmd);
1968 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1971 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1972 struct nrs_tbf_rule *rule,
1973 struct nrs_tbf_client *cli)
1975 switch (expr->te_field) {
1976 case NRS_TBF_FIELD_NID:
1977 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1978 case NRS_TBF_FIELD_JOBID:
1979 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1980 case NRS_TBF_FIELD_OPCODE:
1981 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1982 case NRS_TBF_FIELD_UID:
1983 case NRS_TBF_FIELD_GID:
1984 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1991 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1992 struct nrs_tbf_rule *rule,
1993 struct nrs_tbf_client *cli)
1995 struct nrs_tbf_expression *expr;
1998 list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1999 matched = nrs_tbf_expression_match(expr, rule, cli);
2008 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2010 struct nrs_tbf_conjunction *conjunction;
2013 list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2014 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2023 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2025 if (!list_empty(&rule->tr_conds))
2026 nrs_tbf_conds_free(&rule->tr_conds);
2027 LASSERT(rule->tr_conds_str != NULL);
2028 OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2032 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2033 struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2037 LASSERT(start->u.tc_start.ts_conds_str);
2038 OBD_ALLOC(rule->tr_conds_str,
2039 strlen(start->u.tc_start.ts_conds_str) + 1);
2040 if (rule->tr_conds_str == NULL)
2043 memcpy(rule->tr_conds_str,
2044 start->u.tc_start.ts_conds_str,
2045 strlen(start->u.tc_start.ts_conds_str));
2047 INIT_LIST_HEAD(&rule->tr_conds);
2048 if (!list_empty(&start->u.tc_start.ts_conds)) {
2049 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2050 strlen(rule->tr_conds_str),
2054 nrs_tbf_generic_rule_fini(rule);
2060 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2062 seq_printf(m, "%s %s %u, ref %d\n", rule->tr_name,
2063 rule->tr_conds_str, rule->tr_rpc_rate,
2064 atomic_read(&rule->tr_ref) - 1);
2069 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2070 struct nrs_tbf_client *cli)
2072 return nrs_tbf_cond_match(rule, cli);
2075 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2076 .o_name = NRS_TBF_TYPE_GENERIC,
2077 .o_startup = nrs_tbf_startup,
2078 .o_cli_find = nrs_tbf_cli_find,
2079 .o_cli_findadd = nrs_tbf_cli_findadd,
2080 .o_cli_put = nrs_tbf_cli_put,
2081 .o_cli_init = nrs_tbf_generic_cli_init,
2082 .o_rule_init = nrs_tbf_rule_init,
2083 .o_rule_dump = nrs_tbf_generic_rule_dump,
2084 .o_rule_match = nrs_tbf_generic_rule_match,
2085 .o_rule_fini = nrs_tbf_generic_rule_fini,
2088 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2090 if (rule->tr_opcodes != NULL)
2091 CFS_FREE_BITMAP(rule->tr_opcodes);
2093 LASSERT(rule->tr_opcodes_str != NULL);
2094 OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2097 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2100 return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2103 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2105 const __u32 *opc = key;
2106 struct nrs_tbf_client *cli = hlist_entry(hnode,
2107 struct nrs_tbf_client,
2110 return *opc == cli->tc_opcode;
2113 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2115 struct nrs_tbf_client *cli = hlist_entry(hnode,
2116 struct nrs_tbf_client,
2119 return &cli->tc_opcode;
2122 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2123 struct hlist_node *hnode)
2125 struct nrs_tbf_client *cli = hlist_entry(hnode,
2126 struct nrs_tbf_client,
2129 atomic_inc(&cli->tc_ref);
2132 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2133 struct hlist_node *hnode)
2135 struct nrs_tbf_client *cli = hlist_entry(hnode,
2136 struct nrs_tbf_client,
2139 atomic_dec(&cli->tc_ref);
2142 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2143 struct hlist_node *hnode)
2145 struct nrs_tbf_client *cli = hlist_entry(hnode,
2146 struct nrs_tbf_client,
2149 LASSERTF(atomic_read(&cli->tc_ref) == 0,
2150 "Busy TBF object from client with opcode %s, with %d refs\n",
2151 ll_opcode2str(cli->tc_opcode),
2152 atomic_read(&cli->tc_ref));
2154 nrs_tbf_cli_fini(cli);
2156 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2157 .hs_hash = nrs_tbf_opcode_hop_hash,
2158 .hs_keycmp = nrs_tbf_opcode_hop_keycmp,
2159 .hs_key = nrs_tbf_opcode_hop_key,
2160 .hs_object = nrs_tbf_hop_object,
2161 .hs_get = nrs_tbf_opcode_hop_get,
2162 .hs_put = nrs_tbf_opcode_hop_put,
2163 .hs_put_locked = nrs_tbf_opcode_hop_put,
2164 .hs_exit = nrs_tbf_opcode_hop_exit,
2168 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2169 struct nrs_tbf_head *head)
2171 struct nrs_tbf_cmd start = { 0 };
2174 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2177 NRS_TBF_NID_BKT_BITS, 0,
2180 &nrs_tbf_opcode_hash_ops,
2181 CFS_HASH_RW_BKTLOCK);
2182 if (head->th_cli_hash == NULL)
2185 start.u.tc_start.ts_opcodes_str = "*";
2187 start.u.tc_start.ts_rpc_rate = tbf_rate;
2188 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2189 start.tc_name = NRS_TBF_DEFAULT_RULE;
2190 rc = nrs_tbf_rule_start(policy, head, &start);
2195 static struct nrs_tbf_client *
2196 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2197 struct ptlrpc_request *req)
2201 opc = lustre_msg_get_opc(req->rq_reqmsg);
2202 return cfs_hash_lookup(head->th_cli_hash, &opc);
2205 static struct nrs_tbf_client *
2206 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2207 struct nrs_tbf_client *cli)
2209 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2214 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2215 struct ptlrpc_request *req)
2217 cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2220 #define MAX_OPCODE_LEN 32
2222 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2225 char opcode_str[MAX_OPCODE_LEN];
2227 if (id->ls_len + 1 > MAX_OPCODE_LEN)
2230 memcpy(opcode_str, id->ls_str, id->ls_len);
2231 opcode_str[id->ls_len] = '\0';
2233 op = ll_str2opcode(opcode_str);
2237 cfs_bitmap_set(opcodes, op);
2242 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2244 struct cfs_bitmap *opcodes;
2245 struct cfs_lstr src;
2246 struct cfs_lstr res;
2250 opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2251 if (opcodes == NULL)
2256 while (src.ls_str) {
2257 rc = cfs_gettok(&src, ' ', &res);
2262 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2267 if (rc == 0 && bitmaptr)
2268 *bitmaptr = opcodes;
2270 CFS_FREE_BITMAP(opcodes);
2275 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2277 if (cmd->u.tc_start.ts_opcodes_str)
2278 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2279 strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2283 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2285 struct cfs_lstr src;
2289 src.ls_len = strlen(id);
2290 rc = nrs_tbf_check_id_value(&src, "opcode");
2294 OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2295 if (cmd->u.tc_start.ts_opcodes_str == NULL)
2298 memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2300 /* parse opcode list */
2301 rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2302 strlen(cmd->u.tc_start.ts_opcodes_str),
2305 nrs_tbf_opcode_cmd_fini(cmd);
2311 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2312 struct nrs_tbf_client *cli)
2314 if (rule->tr_opcodes == NULL)
2317 return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2320 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2321 struct nrs_tbf_rule *rule,
2322 struct nrs_tbf_cmd *start)
2326 LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2327 OBD_ALLOC(rule->tr_opcodes_str,
2328 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2329 if (rule->tr_opcodes_str == NULL)
2332 strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2333 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2335 /* Default rule '*' */
2336 if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2339 rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2340 strlen(rule->tr_opcodes_str),
2343 OBD_FREE(rule->tr_opcodes_str,
2344 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2350 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2352 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2353 rule->tr_opcodes_str, rule->tr_rpc_rate,
2354 atomic_read(&rule->tr_ref) - 1);
2359 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2360 .o_name = NRS_TBF_TYPE_OPCODE,
2361 .o_startup = nrs_tbf_opcode_startup,
2362 .o_cli_find = nrs_tbf_opcode_cli_find,
2363 .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2364 .o_cli_put = nrs_tbf_nid_cli_put,
2365 .o_cli_init = nrs_tbf_opcode_cli_init,
2366 .o_rule_init = nrs_tbf_opcode_rule_init,
2367 .o_rule_dump = nrs_tbf_opcode_rule_dump,
2368 .o_rule_match = nrs_tbf_opcode_rule_match,
2369 .o_rule_fini = nrs_tbf_opcode_rule_fini,
2372 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2375 return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2378 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2380 const struct tbf_id *opc = key;
2381 enum nrs_tbf_flag ntf;
2382 struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2384 ntf = opc->ti_type & cli->tc_id.ti_type;
2385 if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2388 if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2394 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2396 struct nrs_tbf_client *cli = hlist_entry(hnode,
2397 struct nrs_tbf_client,
2402 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2404 struct nrs_tbf_client *cli = hlist_entry(hnode,
2405 struct nrs_tbf_client,
2408 atomic_inc(&cli->tc_ref);
2411 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2413 struct nrs_tbf_client *cli = hlist_entry(hnode,
2414 struct nrs_tbf_client,
2417 atomic_dec(&cli->tc_ref);
2421 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2424 struct nrs_tbf_client *cli = hlist_entry(hnode,
2425 struct nrs_tbf_client,
2428 LASSERT(atomic_read(&cli->tc_ref) == 0);
2429 nrs_tbf_cli_fini(cli);
2432 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2433 .hs_hash = nrs_tbf_id_hop_hash,
2434 .hs_keycmp = nrs_tbf_id_hop_keycmp,
2435 .hs_key = nrs_tbf_id_hop_key,
2436 .hs_object = nrs_tbf_hop_object,
2437 .hs_get = nrs_tbf_id_hop_get,
2438 .hs_put = nrs_tbf_id_hop_put,
2439 .hs_put_locked = nrs_tbf_id_hop_put,
2440 .hs_exit = nrs_tbf_id_hop_exit,
2444 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2445 struct nrs_tbf_head *head)
2447 struct nrs_tbf_cmd start;
2450 head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2453 NRS_TBF_NID_BKT_BITS, 0,
2456 &nrs_tbf_id_hash_ops,
2457 CFS_HASH_RW_BKTLOCK);
2458 if (head->th_cli_hash == NULL)
2461 memset(&start, 0, sizeof(start));
2462 start.u.tc_start.ts_ids_str = "*";
2463 start.u.tc_start.ts_rpc_rate = tbf_rate;
2464 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2465 start.tc_name = NRS_TBF_DEFAULT_RULE;
2466 INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2467 rc = nrs_tbf_rule_start(policy, head, &start);
2469 cfs_hash_putref(head->th_cli_hash);
2470 head->th_cli_hash = NULL;
2476 static struct nrs_tbf_client *
2477 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2478 struct ptlrpc_request *req)
2482 LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2483 head->th_type_flag == NRS_TBF_FLAG_GID);
2485 nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2486 return cfs_hash_lookup(head->th_cli_hash, &id);
2489 static struct nrs_tbf_client *
2490 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2491 struct nrs_tbf_client *cli)
2493 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2498 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2499 struct ptlrpc_request *req)
2501 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2505 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2506 struct ptlrpc_request *req)
2508 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2512 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2514 struct nrs_tbf_id *nti_id;
2515 enum nrs_tbf_flag flag;
2517 list_for_each_entry(nti_id, id_list, nti_linkage) {
2518 flag = id.ti_type & nti_id->nti_id.ti_type;
2522 if ((flag & NRS_TBF_FLAG_UID) &&
2523 (id.ti_uid != nti_id->nti_id.ti_uid))
2526 if ((flag & NRS_TBF_FLAG_GID) &&
2527 (id.ti_gid != nti_id->nti_id.ti_gid))
2536 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2537 struct nrs_tbf_client *cli)
2539 return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2542 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2544 nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2546 if (cmd->u.tc_start.ts_ids_str)
2547 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2548 strlen(cmd->u.tc_start.ts_ids_str) + 1);
2552 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2553 enum nrs_tbf_flag tif)
2555 struct cfs_lstr src;
2556 struct cfs_lstr res;
2558 struct tbf_id id = { 0 };
2561 if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2566 INIT_LIST_HEAD(id_list);
2567 while (src.ls_str) {
2568 struct nrs_tbf_id *nti_id;
2570 if (cfs_gettok(&src, ' ', &res) == 0)
2571 GOTO(out, rc = -EINVAL);
2574 if (tif == NRS_TBF_FLAG_UID) {
2575 if (!cfs_str2num_check(res.ls_str, res.ls_len,
2576 &id.ti_uid, 0, (u32)~0U))
2577 GOTO(out, rc = -EINVAL);
2579 if (!cfs_str2num_check(res.ls_str, res.ls_len,
2580 &id.ti_gid, 0, (u32)~0U))
2581 GOTO(out, rc = -EINVAL);
2584 OBD_ALLOC_PTR(nti_id);
2586 GOTO(out, rc = -ENOMEM);
2588 nti_id->nti_id = id;
2589 list_add_tail(&nti_id->nti_linkage, id_list);
2593 nrs_tbf_id_list_free(id_list);
2597 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2599 struct cfs_lstr src;
2601 enum nrs_tbf_flag tif;
2603 tif = cmd->u.tc_start.ts_valid_type;
2606 src.ls_len = strlen(id);
2608 rc = nrs_tbf_check_id_value(&src,
2609 tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2613 OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2614 if (cmd->u.tc_start.ts_ids_str == NULL)
2617 strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2619 rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2620 strlen(cmd->u.tc_start.ts_ids_str),
2621 &cmd->u.tc_start.ts_ids, tif);
2623 nrs_tbf_id_cmd_fini(cmd);
2629 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2630 struct nrs_tbf_rule *rule,
2631 struct nrs_tbf_cmd *start)
2633 struct nrs_tbf_head *head = rule->tr_head;
2635 enum nrs_tbf_flag tif = head->th_type_flag;
2636 int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2638 LASSERT(start->u.tc_start.ts_ids_str);
2639 INIT_LIST_HEAD(&rule->tr_ids);
2641 OBD_ALLOC(rule->tr_ids_str, ids_len);
2642 if (rule->tr_ids_str == NULL)
2645 strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2648 if (!list_empty(&start->u.tc_start.ts_ids)) {
2649 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2650 strlen(rule->tr_ids_str),
2651 &rule->tr_ids, tif);
2653 CERROR("%ss {%s} illegal\n",
2654 tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2658 OBD_FREE(rule->tr_ids_str, ids_len);
2659 rule->tr_ids_str = NULL;
2665 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2667 seq_printf(m, "%s {%s} %u, ref %d\n", rule->tr_name,
2668 rule->tr_ids_str, rule->tr_rpc_rate,
2669 atomic_read(&rule->tr_ref) - 1);
2673 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2675 nrs_tbf_id_list_free(&rule->tr_ids);
2676 if (rule->tr_ids_str != NULL)
2677 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2680 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2681 .o_name = NRS_TBF_TYPE_UID,
2682 .o_startup = nrs_tbf_id_startup,
2683 .o_cli_find = nrs_tbf_id_cli_find,
2684 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2685 .o_cli_put = nrs_tbf_nid_cli_put,
2686 .o_cli_init = nrs_tbf_uid_cli_init,
2687 .o_rule_init = nrs_tbf_id_rule_init,
2688 .o_rule_dump = nrs_tbf_id_rule_dump,
2689 .o_rule_match = nrs_tbf_id_rule_match,
2690 .o_rule_fini = nrs_tbf_id_rule_fini,
2693 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2694 .o_name = NRS_TBF_TYPE_GID,
2695 .o_startup = nrs_tbf_id_startup,
2696 .o_cli_find = nrs_tbf_id_cli_find,
2697 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2698 .o_cli_put = nrs_tbf_nid_cli_put,
2699 .o_cli_init = nrs_tbf_gid_cli_init,
2700 .o_rule_init = nrs_tbf_id_rule_init,
2701 .o_rule_dump = nrs_tbf_id_rule_dump,
2702 .o_rule_match = nrs_tbf_id_rule_match,
2703 .o_rule_fini = nrs_tbf_id_rule_fini,
2706 static struct nrs_tbf_type nrs_tbf_types[] = {
2708 .ntt_name = NRS_TBF_TYPE_JOBID,
2709 .ntt_flag = NRS_TBF_FLAG_JOBID,
2710 .ntt_ops = &nrs_tbf_jobid_ops,
2713 .ntt_name = NRS_TBF_TYPE_NID,
2714 .ntt_flag = NRS_TBF_FLAG_NID,
2715 .ntt_ops = &nrs_tbf_nid_ops,
2718 .ntt_name = NRS_TBF_TYPE_OPCODE,
2719 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2720 .ntt_ops = &nrs_tbf_opcode_ops,
2723 .ntt_name = NRS_TBF_TYPE_GENERIC,
2724 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2725 .ntt_ops = &nrs_tbf_generic_ops,
2728 .ntt_name = NRS_TBF_TYPE_UID,
2729 .ntt_flag = NRS_TBF_FLAG_UID,
2730 .ntt_ops = &nrs_tbf_uid_ops,
2733 .ntt_name = NRS_TBF_TYPE_GID,
2734 .ntt_flag = NRS_TBF_FLAG_GID,
2735 .ntt_ops = &nrs_tbf_gid_ops,
2740 * Is called before the policy transitions into
2741 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2742 * policy-specific private data structure.
2744 * \param[in] policy The policy to start
2746 * \retval -ENOMEM OOM error
2749 * \see nrs_policy_register()
2750 * \see nrs_policy_ctl()
2752 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2754 struct nrs_tbf_head *head;
2755 struct nrs_tbf_ops *ops;
2763 name = NRS_TBF_TYPE_GENERIC;
2764 else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2767 GOTO(out, rc = -EINVAL);
2769 for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2770 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2771 ops = nrs_tbf_types[i].ntt_ops;
2772 type = nrs_tbf_types[i].ntt_flag;
2778 GOTO(out, rc = -ENOTSUPP);
2780 OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2782 GOTO(out, rc = -ENOMEM);
2784 memcpy(head->th_type, name, strlen(name));
2785 head->th_type[strlen(name)] = '\0';
2787 head->th_type_flag = type;
2789 head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2790 CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2791 nrs_pol2cptab(policy),
2792 nrs_pol2cptid(policy));
2793 if (head->th_binheap == NULL)
2794 GOTO(out_free_head, rc = -ENOMEM);
2796 atomic_set(&head->th_rule_sequence, 0);
2797 spin_lock_init(&head->th_rule_lock);
2798 INIT_LIST_HEAD(&head->th_list);
2799 hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2800 head->th_timer.function = nrs_tbf_timer_cb;
2801 rc = head->th_ops->o_startup(policy, head);
2803 GOTO(out_free_heap, rc);
2805 policy->pol_private = head;
2808 cfs_binheap_destroy(head->th_binheap);
2816 * Is called before the policy transitions into
2817 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2818 * private data structure.
2820 * \param[in] policy The policy to stop
2822 * \see nrs_policy_stop0()
2824 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2826 struct nrs_tbf_head *head = policy->pol_private;
2827 struct ptlrpc_nrs *nrs = policy->pol_nrs;
2828 struct nrs_tbf_rule *rule, *n;
2830 LASSERT(head != NULL);
2831 LASSERT(head->th_cli_hash != NULL);
2832 hrtimer_cancel(&head->th_timer);
2833 /* Should cleanup hash first before free rules */
2834 cfs_hash_putref(head->th_cli_hash);
2835 list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2836 list_del_init(&rule->tr_linkage);
2837 nrs_tbf_rule_put(rule);
2839 LASSERT(list_empty(&head->th_list));
2840 LASSERT(head->th_binheap != NULL);
2841 LASSERT(cfs_binheap_is_empty(head->th_binheap));
2842 cfs_binheap_destroy(head->th_binheap);
2844 nrs->nrs_throttling = 0;
2845 wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2849 * Performs a policy-specific ctl function on TBF policy instances; similar
2852 * \param[in] policy the policy instance
2853 * \param[in] opc the opcode
2854 * \param[in,out] arg used for passing parameters and information
2856 * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2857 * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2859 * \retval 0 operation carried out successfully
2862 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2863 enum ptlrpc_nrs_ctl opc,
2869 assert_spin_locked(&policy->pol_nrs->nrs_lock);
2871 switch ((enum nrs_ctl_tbf)opc) {
2876 * Read RPC rate size of a policy instance.
2878 case NRS_CTL_TBF_RD_RULE: {
2879 struct nrs_tbf_head *head = policy->pol_private;
2880 struct seq_file *m = arg;
2881 struct ptlrpc_service_part *svcpt;
2883 svcpt = policy->pol_nrs->nrs_svcpt;
2884 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2886 rc = nrs_tbf_rule_dump_all(head, m);
2891 * Write RPC rate of a policy instance.
2893 case NRS_CTL_TBF_WR_RULE: {
2894 struct nrs_tbf_head *head = policy->pol_private;
2895 struct nrs_tbf_cmd *cmd;
2897 cmd = (struct nrs_tbf_cmd *)arg;
2898 rc = nrs_tbf_command(policy,
2904 * Read the TBF policy type of a policy instance.
2906 case NRS_CTL_TBF_RD_TYPE_FLAG: {
2907 struct nrs_tbf_head *head = policy->pol_private;
2909 *(__u32 *)arg = head->th_type_flag;
2918 * Is called for obtaining a TBF policy resource.
2920 * \param[in] policy The policy on which the request is being asked for
2921 * \param[in] nrq The request for which resources are being taken
2922 * \param[in] parent Parent resource, unused in this policy
2923 * \param[out] resp Resources references are placed in this array
2924 * \param[in] moving_req Signifies limited caller context; unused in this
2928 * \see nrs_resource_get_safe()
2930 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2931 struct ptlrpc_nrs_request *nrq,
2932 const struct ptlrpc_nrs_resource *parent,
2933 struct ptlrpc_nrs_resource **resp,
2936 struct nrs_tbf_head *head;
2937 struct nrs_tbf_client *cli;
2938 struct nrs_tbf_client *tmp;
2939 struct ptlrpc_request *req;
2941 if (parent == NULL) {
2942 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2946 head = container_of(parent, struct nrs_tbf_head, th_res);
2947 req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2948 cli = head->th_ops->o_cli_find(head, req);
2950 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2951 LASSERT(cli->tc_rule);
2952 if (cli->tc_rule_sequence !=
2953 atomic_read(&head->th_rule_sequence) ||
2954 cli->tc_rule->tr_flags & NTRS_STOPPING) {
2955 struct nrs_tbf_rule *rule;
2958 "TBF class@%p rate %u sequence %d, "
2959 "rule flags %d, head sequence %d\n",
2960 cli, cli->tc_rpc_rate,
2961 cli->tc_rule_sequence,
2962 cli->tc_rule->tr_flags,
2963 atomic_read(&head->th_rule_sequence));
2964 rule = nrs_tbf_rule_match(head, cli);
2965 if (rule != cli->tc_rule) {
2966 nrs_tbf_cli_reset(head, rule, cli);
2968 if (cli->tc_rule_generation != rule->tr_generation)
2969 nrs_tbf_cli_reset_value(head, cli);
2970 nrs_tbf_rule_put(rule);
2972 } else if (cli->tc_rule_generation !=
2973 cli->tc_rule->tr_generation) {
2974 nrs_tbf_cli_reset_value(head, cli);
2976 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2980 OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2981 sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2985 nrs_tbf_cli_init(head, cli, req);
2986 tmp = head->th_ops->o_cli_findadd(head, cli);
2988 atomic_dec(&cli->tc_ref);
2989 nrs_tbf_cli_fini(cli);
2993 *resp = &cli->tc_res;
2999 * Called when releasing references to the resource hierachy obtained for a
3000 * request for scheduling using the TBF policy.
3002 * \param[in] policy the policy the resource belongs to
3003 * \param[in] res the resource to be released
3005 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3006 const struct ptlrpc_nrs_resource *res)
3008 struct nrs_tbf_head *head;
3009 struct nrs_tbf_client *cli;
3012 * Do nothing for freeing parent, nrs_tbf_net resources
3014 if (res->res_parent == NULL)
3017 cli = container_of(res, struct nrs_tbf_client, tc_res);
3018 head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3020 head->th_ops->o_cli_put(head, cli);
3024 * Called when getting a request from the TBF policy for handling, or just
3025 * peeking; removes the request from the policy when it is to be handled.
3027 * \param[in] policy The policy
3028 * \param[in] peek When set, signifies that we just want to examine the
3029 * request, and not handle it, so the request is not removed
3031 * \param[in] force Force the policy to return a request; unused in this
3034 * \retval The request to be handled; this is the next request in the TBF
3037 * \see ptlrpc_nrs_req_get_nolock()
3038 * \see nrs_request_get()
3041 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3042 bool peek, bool force)
3044 struct nrs_tbf_head *head = policy->pol_private;
3045 struct ptlrpc_nrs_request *nrq = NULL;
3046 struct nrs_tbf_client *cli;
3047 struct cfs_binheap_node *node;
3049 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3051 if (!peek && policy->pol_nrs->nrs_throttling)
3054 node = cfs_binheap_root(head->th_binheap);
3055 if (unlikely(node == NULL))
3058 cli = container_of(node, struct nrs_tbf_client, tc_node);
3059 LASSERT(cli->tc_in_heap);
3061 nrq = list_entry(cli->tc_list.next,
3062 struct ptlrpc_nrs_request,
3065 struct nrs_tbf_rule *rule = cli->tc_rule;
3066 __u64 now = ktime_to_ns(ktime_get());
3070 __u64 old_resid = 0;
3072 deadline = cli->tc_check_time +
3074 LASSERT(now >= cli->tc_check_time);
3075 passed = now - cli->tc_check_time;
3076 ntoken = passed * cli->tc_rpc_rate;
3077 do_div(ntoken, NSEC_PER_SEC);
3079 ntoken += cli->tc_ntoken;
3080 if (rule->tr_flags & NTRS_REALTIME) {
3081 LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3082 old_resid = cli->tc_nsecs_resid;
3083 cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3084 if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3086 cli->tc_nsecs_resid -= cli->tc_nsecs;
3088 } else if (ntoken > cli->tc_depth)
3089 ntoken = cli->tc_depth;
3092 struct ptlrpc_request *req;
3093 nrq = list_entry(cli->tc_list.next,
3094 struct ptlrpc_nrs_request,
3096 req = container_of(nrq,
3097 struct ptlrpc_request,
3100 cli->tc_ntoken = ntoken;
3101 cli->tc_check_time = now;
3102 list_del_init(&nrq->nr_u.tbf.tr_list);
3103 if (list_empty(&cli->tc_list)) {
3104 cfs_binheap_remove(head->th_binheap,
3106 cli->tc_in_heap = false;
3108 if (!(rule->tr_flags & NTRS_REALTIME))
3109 cli->tc_deadline = now + cli->tc_nsecs;
3110 cfs_binheap_relocate(head->th_binheap,
3114 "TBF dequeues: class@%p rate %u gen %llu "
3115 "token %llu, rule@%p rate %u gen %llu\n",
3116 cli, cli->tc_rpc_rate,
3117 cli->tc_rule_generation, cli->tc_ntoken,
3118 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3119 cli->tc_rule->tr_generation);
3123 if (rule->tr_flags & NTRS_REALTIME) {
3124 cli->tc_deadline = deadline;
3125 cli->tc_nsecs_resid = old_resid;
3126 cfs_binheap_relocate(head->th_binheap,
3128 if (node != cfs_binheap_root(head->th_binheap))
3129 return nrs_tbf_req_get(policy,
3132 policy->pol_nrs->nrs_throttling = 1;
3133 head->th_deadline = deadline;
3134 time = ktime_set(0, 0);
3135 time = ktime_add_ns(time, deadline);
3136 hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3144 * Adds request \a nrq to \a policy's list of queued requests
3146 * \param[in] policy The policy
3147 * \param[in] nrq The request to add
3149 * \retval 0 success; nrs_request_enqueue() assumes this function will always
3152 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3153 struct ptlrpc_nrs_request *nrq)
3155 struct nrs_tbf_head *head;
3156 struct nrs_tbf_client *cli;
3159 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3161 cli = container_of(nrs_request_resource(nrq),
3162 struct nrs_tbf_client, tc_res);
3163 head = container_of(nrs_request_resource(nrq)->res_parent,
3164 struct nrs_tbf_head, th_res);
3165 if (list_empty(&cli->tc_list)) {
3166 LASSERT(!cli->tc_in_heap);
3167 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3168 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
3170 cli->tc_in_heap = true;
3171 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3172 list_add_tail(&nrq->nr_u.tbf.tr_list,
3174 if (policy->pol_nrs->nrs_throttling) {
3175 __u64 deadline = cli->tc_deadline;
3176 if ((head->th_deadline > deadline) &&
3177 (hrtimer_try_to_cancel(&head->th_timer)
3180 head->th_deadline = deadline;
3181 time = ktime_set(0, 0);
3182 time = ktime_add_ns(time, deadline);
3183 hrtimer_start(&head->th_timer, time,
3189 LASSERT(cli->tc_in_heap);
3190 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3191 list_add_tail(&nrq->nr_u.tbf.tr_list,
3197 "TBF enqueues: class@%p rate %u gen %llu "
3198 "token %llu, rule@%p rate %u gen %llu\n",
3199 cli, cli->tc_rpc_rate,
3200 cli->tc_rule_generation, cli->tc_ntoken,
3201 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3202 cli->tc_rule->tr_generation);
3208 * Removes request \a nrq from \a policy's list of queued requests.
3210 * \param[in] policy The policy
3211 * \param[in] nrq The request to remove
3213 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3214 struct ptlrpc_nrs_request *nrq)
3216 struct nrs_tbf_head *head;
3217 struct nrs_tbf_client *cli;
3219 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3221 cli = container_of(nrs_request_resource(nrq),
3222 struct nrs_tbf_client, tc_res);
3223 head = container_of(nrs_request_resource(nrq)->res_parent,
3224 struct nrs_tbf_head, th_res);
3226 LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3227 list_del_init(&nrq->nr_u.tbf.tr_list);
3228 if (list_empty(&cli->tc_list)) {
3229 cfs_binheap_remove(head->th_binheap,
3231 cli->tc_in_heap = false;
3233 cfs_binheap_relocate(head->th_binheap,
3239 * Prints a debug statement right before the request \a nrq stops being
3242 * \param[in] policy The policy handling the request
3243 * \param[in] nrq The request being handled
3245 * \see ptlrpc_server_finish_request()
3246 * \see ptlrpc_nrs_req_stop_nolock()
3248 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3249 struct ptlrpc_nrs_request *nrq)
3251 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3254 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3256 CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3257 policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3258 nrq->nr_u.tbf.tr_sequence);
3266 * The maximum RPC rate.
3268 #define LPROCFS_NRS_RATE_MAX 65535
3271 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3273 struct ptlrpc_service *svc = m->private;
3276 seq_printf(m, "regular_requests:\n");
3278 * Perform two separate calls to this as only one of the NRS heads'
3279 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3280 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3282 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3284 NRS_CTL_TBF_RD_RULE,
3288 * -ENOSPC means buf in the parameter m is overflow, return 0
3289 * here to let upper layer function seq_read alloc a larger
3290 * memory area and do this process again.
3292 } else if (rc == -ENOSPC) {
3296 * Ignore -ENODEV as the regular NRS head's policy may be in the
3297 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3299 } else if (rc != -ENODEV) {
3303 if (!nrs_svc_has_hp(svc))
3306 seq_printf(m, "high_priority_requests:\n");
3307 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3309 NRS_CTL_TBF_RD_RULE,
3313 * -ENOSPC means buf in the parameter m is overflow, return 0
3314 * here to let upper layer function seq_read alloc a larger
3315 * memory area and do this process again.
3317 } else if (rc == -ENOSPC) {
3326 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3331 switch (cmd->u.tc_start.ts_valid_type) {
3332 case NRS_TBF_FLAG_JOBID:
3333 rc = nrs_tbf_jobid_parse(cmd, token);
3335 case NRS_TBF_FLAG_NID:
3336 rc = nrs_tbf_nid_parse(cmd, token);
3338 case NRS_TBF_FLAG_OPCODE:
3339 rc = nrs_tbf_opcode_parse(cmd, token);
3341 case NRS_TBF_FLAG_GENERIC:
3342 rc = nrs_tbf_generic_parse(cmd, token);
3344 case NRS_TBF_FLAG_UID:
3345 case NRS_TBF_FLAG_GID:
3346 rc = nrs_tbf_ug_id_parse(cmd, token);
3355 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3357 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3358 switch (cmd->u.tc_start.ts_valid_type) {
3359 case NRS_TBF_FLAG_JOBID:
3360 nrs_tbf_jobid_cmd_fini(cmd);
3362 case NRS_TBF_FLAG_NID:
3363 nrs_tbf_nid_cmd_fini(cmd);
3365 case NRS_TBF_FLAG_OPCODE:
3366 nrs_tbf_opcode_cmd_fini(cmd);
3368 case NRS_TBF_FLAG_GENERIC:
3369 nrs_tbf_generic_cmd_fini(cmd);
3371 case NRS_TBF_FLAG_UID:
3372 case NRS_TBF_FLAG_GID:
3373 nrs_tbf_id_cmd_fini(cmd);
3376 CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3377 cmd->u.tc_start.ts_valid_type);
3382 static bool name_is_valid(const char *name)
3386 for (i = 0; i < strlen(name); i++) {
3387 if ((!isalnum(name[i])) &&
3395 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3403 key = strsep(&val, "=");
3404 if (val == NULL || strlen(val) == 0)
3407 /* Key of the value pair */
3408 if (strcmp(key, "rate") == 0) {
3409 rc = kstrtoull(val, 10, &rate);
3413 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3416 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3417 cmd->u.tc_start.ts_rpc_rate = rate;
3418 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3419 cmd->u.tc_change.tc_rpc_rate = rate;
3422 } else if (strcmp(key, "rank") == 0) {
3423 if (!name_is_valid(val))
3426 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3427 cmd->u.tc_start.ts_next_name = val;
3428 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3429 cmd->u.tc_change.tc_next_name = val;
3432 } else if (strcmp(key, "realtime") == 0) {
3433 unsigned long realtime;
3435 rc = kstrtoul(val, 10, &realtime);
3440 cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3448 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3455 while (val != NULL && strlen(val) != 0) {
3456 token = strsep(&val, " ");
3457 rc = nrs_tbf_parse_value_pair(cmd, token);
3462 switch (cmd->tc_cmd) {
3463 case NRS_CTL_TBF_START_RULE:
3464 if (cmd->u.tc_start.ts_rpc_rate == 0)
3465 cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3467 case NRS_CTL_TBF_CHANGE_RULE:
3468 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3469 cmd->u.tc_change.tc_next_name == NULL)
3472 case NRS_CTL_TBF_STOP_RULE:
3480 static struct nrs_tbf_cmd *
3481 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3483 static struct nrs_tbf_cmd *cmd;
3490 GOTO(out, rc = -ENOMEM);
3491 memset(cmd, 0, sizeof(*cmd));
3494 token = strsep(&val, " ");
3495 if (val == NULL || strlen(val) == 0)
3496 GOTO(out_free_cmd, rc = -EINVAL);
3498 /* Type of the command */
3499 if (strcmp(token, "start") == 0) {
3500 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3501 cmd->u.tc_start.ts_valid_type = type_flag;
3502 } else if (strcmp(token, "stop") == 0)
3503 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3504 else if (strcmp(token, "change") == 0)
3505 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3507 GOTO(out_free_cmd, rc = -EINVAL);
3509 /* Name of the rule */
3510 token = strsep(&val, " ");
3511 if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3512 !name_is_valid(token))
3513 GOTO(out_free_cmd, rc = -EINVAL);
3514 cmd->tc_name = token;
3516 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3520 val = strrchr(token, '}');
3522 GOTO(out_free_cmd, rc = -EINVAL);
3528 } else if (*val == ' ') {
3532 GOTO(out_free_cmd, rc = -EINVAL);
3534 rc = nrs_tbf_id_parse(cmd, token);
3536 GOTO(out_free_cmd, rc);
3539 rc = nrs_tbf_parse_value_pairs(cmd, val);
3541 GOTO(out_cmd_fini, rc = -EINVAL);
3544 nrs_tbf_cmd_fini(cmd);
3554 * Get the TBF policy type (nid, jobid, etc) preset by
3555 * proc entry 'nrs_policies' for command buffer parsing.
3557 * \param[in] svc the PTLRPC service
3558 * \param[in] queue the NRS queue type
3560 * \retval the preset TBF policy type flag
3563 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3568 rc = ptlrpc_nrs_policy_control(svc, queue,
3570 NRS_CTL_TBF_RD_TYPE_FLAG,
3573 type = NRS_TBF_FLAG_INVALID;
3578 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3580 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3581 const char __user *buffer,
3582 size_t count, loff_t *off)
3584 struct seq_file *m = file->private_data;
3585 struct ptlrpc_service *svc = m->private;
3589 static struct nrs_tbf_cmd *cmd;
3590 enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3591 unsigned long length;
3594 OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3595 if (kernbuf == NULL)
3596 GOTO(out, rc = -ENOMEM);
3598 if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3599 GOTO(out_free_kernbuff, rc = -EINVAL);
3601 if (copy_from_user(kernbuf, buffer, count))
3602 GOTO(out_free_kernbuff, rc = -EFAULT);
3605 token = strsep(&val, " ");
3607 GOTO(out_free_kernbuff, rc = -EINVAL);
3609 if (strcmp(token, "reg") == 0) {
3610 queue = PTLRPC_NRS_QUEUE_REG;
3611 } else if (strcmp(token, "hp") == 0) {
3612 queue = PTLRPC_NRS_QUEUE_HP;
3614 kernbuf[strlen(token)] = ' ';
3617 length = strlen(val);
3620 GOTO(out_free_kernbuff, rc = -EINVAL);
3622 if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3623 GOTO(out_free_kernbuff, rc = -ENODEV);
3624 else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3625 queue = PTLRPC_NRS_QUEUE_REG;
3627 cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3629 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3632 * Serialize NRS core lprocfs operations with policy registration/
3635 mutex_lock(&nrs_core.nrs_mutex);
3636 rc = ptlrpc_nrs_policy_control(svc, queue,
3638 NRS_CTL_TBF_WR_RULE,
3640 mutex_unlock(&nrs_core.nrs_mutex);
3642 nrs_tbf_cmd_fini(cmd);
3645 OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3647 return rc ? rc : count;
3650 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3653 * Initializes a TBF policy's lprocfs interface for service \a svc
3655 * \param[in] svc the service
3658 * \retval != 0 error
3660 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3662 struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3663 { .name = "nrs_tbf_rule",
3664 .fops = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3669 if (!svc->srv_debugfs_entry)
3672 ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3678 * TBF policy operations
3680 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3681 .op_policy_start = nrs_tbf_start,
3682 .op_policy_stop = nrs_tbf_stop,
3683 .op_policy_ctl = nrs_tbf_ctl,
3684 .op_res_get = nrs_tbf_res_get,
3685 .op_res_put = nrs_tbf_res_put,
3686 .op_req_get = nrs_tbf_req_get,
3687 .op_req_enqueue = nrs_tbf_req_add,
3688 .op_req_dequeue = nrs_tbf_req_del,
3689 .op_req_stop = nrs_tbf_req_stop,
3690 .op_lprocfs_init = nrs_tbf_lprocfs_init,
3694 * TBF policy configuration
3696 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3697 .nc_name = NRS_POL_NAME_TBF,
3698 .nc_ops = &nrs_tbf_ops,
3699 .nc_compat = nrs_policy_compat_all,
3706 #endif /* HAVE_SERVER_SUPPORT */