4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (C) 2013 DataDirect Networks, Inc.
25 * Copyright (c) 2014, 2016, Intel Corporation.
28 * lustre/ptlrpc/nrs_tbf.c
30 * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
39 #define DEBUG_SUBSYSTEM S_RPC
40 #include <obd_support.h>
41 #include <obd_class.h>
42 #include <libcfs/libcfs.h>
43 #include <lustre_req_layout.h>
44 #include "ptlrpc_internal.h"
49 * Token Bucket Filter over client NIDs
54 #define NRS_POL_NAME_TBF "tbf"
56 static int tbf_jobid_cache_size = 8192;
57 module_param(tbf_jobid_cache_size, int, 0644);
58 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
60 static int tbf_rate = 10000;
61 module_param(tbf_rate, int, 0644);
62 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
64 static int tbf_depth = 3;
65 module_param(tbf_depth, int, 0644);
66 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
68 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
70 struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
72 struct ptlrpc_nrs *nrs = head->th_res.res_policy->pol_nrs;
73 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
75 nrs->nrs_throttling = 0;
76 wake_up(&svcpt->scp_waitq);
78 return HRTIMER_NORESTART;
81 #define NRS_TBF_DEFAULT_RULE "default"
83 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
85 LASSERT(atomic_read(&rule->tr_ref) == 0);
86 LASSERT(list_empty(&rule->tr_cli_list));
87 LASSERT(list_empty(&rule->tr_linkage));
89 rule->tr_head->th_ops->o_rule_fini(rule);
94 * Decreases the rule's usage reference count, and stops the rule in case it
95 * was already stopping and have no more outstanding usage references (which
96 * indicates it has no more queued or started requests, and can be safely
99 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
101 if (atomic_dec_and_test(&rule->tr_ref))
102 nrs_tbf_rule_fini(rule);
106 * Increases the rule's usage reference count.
108 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
110 atomic_inc(&rule->tr_ref);
114 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
116 LASSERT(!list_empty(&cli->tc_linkage));
117 LASSERT(cli->tc_rule);
118 spin_lock(&cli->tc_rule->tr_rule_lock);
119 list_del_init(&cli->tc_linkage);
120 spin_unlock(&cli->tc_rule->tr_rule_lock);
121 nrs_tbf_rule_put(cli->tc_rule);
126 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
127 struct nrs_tbf_client *cli)
130 struct nrs_tbf_rule *rule = cli->tc_rule;
132 cli->tc_rpc_rate = rule->tr_rpc_rate;
133 cli->tc_nsecs = rule->tr_nsecs_per_rpc;
134 cli->tc_nsecs_resid = 0;
135 cli->tc_depth = rule->tr_depth;
136 cli->tc_ntoken = rule->tr_depth;
137 cli->tc_check_time = ktime_to_ns(ktime_get());
138 cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
139 cli->tc_rule_generation = rule->tr_generation;
142 binheap_relocate(head->th_binheap,
147 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
148 struct nrs_tbf_rule *rule,
149 struct nrs_tbf_client *cli)
151 spin_lock(&cli->tc_rule_lock);
152 if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
153 LASSERT(rule != cli->tc_rule);
154 nrs_tbf_cli_rule_put(cli);
156 LASSERT(cli->tc_rule == NULL);
157 LASSERT(list_empty(&cli->tc_linkage));
158 /* Rule's ref is added before called */
160 spin_lock(&rule->tr_rule_lock);
161 list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
162 spin_unlock(&rule->tr_rule_lock);
163 spin_unlock(&cli->tc_rule_lock);
164 nrs_tbf_cli_reset_value(head, cli);
168 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
170 return rule->tr_head->th_ops->o_rule_dump(rule, m);
174 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
176 struct nrs_tbf_rule *rule;
179 LASSERT(head != NULL);
180 spin_lock(&head->th_rule_lock);
181 /* List the rules from newest to oldest */
182 list_for_each_entry(rule, &head->th_list, tr_linkage) {
183 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
184 rc = nrs_tbf_rule_dump(rule, m);
190 spin_unlock(&head->th_rule_lock);
195 static struct nrs_tbf_rule *
196 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
199 struct nrs_tbf_rule *rule;
201 LASSERT(head != NULL);
202 list_for_each_entry(rule, &head->th_list, tr_linkage) {
203 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
204 if (strcmp(rule->tr_name, name) == 0) {
205 nrs_tbf_rule_get(rule);
212 static struct nrs_tbf_rule *
213 nrs_tbf_rule_find(struct nrs_tbf_head *head,
216 struct nrs_tbf_rule *rule;
218 LASSERT(head != NULL);
219 spin_lock(&head->th_rule_lock);
220 rule = nrs_tbf_rule_find_nolock(head, name);
221 spin_unlock(&head->th_rule_lock);
225 static struct nrs_tbf_rule *
226 nrs_tbf_rule_match(struct nrs_tbf_head *head,
227 struct nrs_tbf_client *cli)
229 struct nrs_tbf_rule *rule = NULL;
230 struct nrs_tbf_rule *tmp_rule;
232 spin_lock(&head->th_rule_lock);
233 /* Match the newest rule in the list */
234 list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
235 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
236 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
243 rule = head->th_rule;
245 nrs_tbf_rule_get(rule);
246 spin_unlock(&head->th_rule_lock);
251 nrs_tbf_cli_init(struct nrs_tbf_head *head,
252 struct nrs_tbf_client *cli,
253 struct ptlrpc_request *req)
255 struct nrs_tbf_rule *rule;
257 memset(cli, 0, sizeof(*cli));
258 cli->tc_in_heap = false;
259 head->th_ops->o_cli_init(cli, req);
260 INIT_LIST_HEAD(&cli->tc_list);
261 INIT_LIST_HEAD(&cli->tc_linkage);
262 spin_lock_init(&cli->tc_rule_lock);
263 atomic_set(&cli->tc_ref, 1);
264 rule = nrs_tbf_rule_match(head, cli);
265 nrs_tbf_cli_reset(head, rule, cli);
269 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
271 LASSERT(list_empty(&cli->tc_list));
272 LASSERT(!cli->tc_in_heap);
273 LASSERT(atomic_read(&cli->tc_ref) == 0);
274 spin_lock(&cli->tc_rule_lock);
275 nrs_tbf_cli_rule_put(cli);
276 spin_unlock(&cli->tc_rule_lock);
281 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
282 struct nrs_tbf_head *head,
283 struct nrs_tbf_cmd *start)
285 struct nrs_tbf_rule *rule;
286 struct nrs_tbf_rule *tmp_rule;
287 struct nrs_tbf_rule *next_rule;
288 char *next_name = start->u.tc_start.ts_next_name;
291 rule = nrs_tbf_rule_find(head, start->tc_name);
293 nrs_tbf_rule_put(rule);
297 OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
301 strlcpy(rule->tr_name, start->tc_name, sizeof(rule->tr_name));
302 rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
303 rule->tr_flags = start->u.tc_start.ts_rule_flags;
304 rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
305 rule->tr_depth = tbf_depth;
306 atomic_set(&rule->tr_ref, 1);
307 INIT_LIST_HEAD(&rule->tr_cli_list);
308 INIT_LIST_HEAD(&rule->tr_nids);
309 INIT_LIST_HEAD(&rule->tr_linkage);
310 spin_lock_init(&rule->tr_rule_lock);
311 rule->tr_head = head;
313 rc = head->th_ops->o_rule_init(policy, rule, start);
319 /* Add as the newest rule */
320 spin_lock(&head->th_rule_lock);
321 tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
323 spin_unlock(&head->th_rule_lock);
324 nrs_tbf_rule_put(tmp_rule);
325 nrs_tbf_rule_put(rule);
330 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
332 spin_unlock(&head->th_rule_lock);
333 nrs_tbf_rule_put(rule);
337 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
338 nrs_tbf_rule_put(next_rule);
340 /* Add on the top of the rule list */
341 list_add(&rule->tr_linkage, &head->th_list);
343 spin_unlock(&head->th_rule_lock);
344 atomic_inc(&head->th_rule_sequence);
345 if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
346 rule->tr_flags |= NTRS_DEFAULT;
347 LASSERT(head->th_rule == NULL);
348 head->th_rule = rule;
351 CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
352 rule, rule->tr_rpc_rate, rule->tr_generation);
358 * Change the rank of a rule in the rule list
360 * The matched rule will be moved to the position right before another
363 * \param[in] policy the policy instance
364 * \param[in] head the TBF policy instance
365 * \param[in] name the rule name to be moved
366 * \param[in] next_name the rule name before which the matched rule will be
371 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
372 struct nrs_tbf_head *head,
376 struct nrs_tbf_rule *rule = NULL;
377 struct nrs_tbf_rule *next_rule = NULL;
380 LASSERT(head != NULL);
382 spin_lock(&head->th_rule_lock);
383 rule = nrs_tbf_rule_find_nolock(head, name);
385 GOTO(out, rc = -ENOENT);
387 if (strcmp(name, next_name) == 0)
390 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
392 GOTO(out_put, rc = -ENOENT);
394 /* rules may be adjacent in same list, so list_move() isn't safe here */
395 list_move_tail(&rule->tr_linkage, &next_rule->tr_linkage);
396 nrs_tbf_rule_put(next_rule);
398 nrs_tbf_rule_put(rule);
400 spin_unlock(&head->th_rule_lock);
405 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
406 struct nrs_tbf_head *head,
410 struct nrs_tbf_rule *rule;
412 assert_spin_locked(&policy->pol_nrs->nrs_lock);
414 rule = nrs_tbf_rule_find(head, name);
418 rule->tr_rpc_rate = rate;
419 rule->tr_nsecs_per_rpc = NSEC_PER_SEC / rule->tr_rpc_rate;
420 rule->tr_generation++;
421 nrs_tbf_rule_put(rule);
427 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
428 struct nrs_tbf_head *head,
429 struct nrs_tbf_cmd *change)
431 __u64 rate = change->u.tc_change.tc_rpc_rate;
432 char *next_name = change->u.tc_change.tc_next_name;
436 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
443 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
453 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
454 struct nrs_tbf_head *head,
455 struct nrs_tbf_cmd *stop)
457 struct nrs_tbf_rule *rule;
459 assert_spin_locked(&policy->pol_nrs->nrs_lock);
461 if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
464 rule = nrs_tbf_rule_find(head, stop->tc_name);
468 list_del_init(&rule->tr_linkage);
469 rule->tr_flags |= NTRS_STOPPING;
470 nrs_tbf_rule_put(rule);
471 nrs_tbf_rule_put(rule);
477 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
478 struct nrs_tbf_head *head,
479 struct nrs_tbf_cmd *cmd)
483 assert_spin_locked(&policy->pol_nrs->nrs_lock);
485 switch (cmd->tc_cmd) {
486 case NRS_CTL_TBF_START_RULE:
487 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
490 spin_unlock(&policy->pol_nrs->nrs_lock);
491 rc = nrs_tbf_rule_start(policy, head, cmd);
492 spin_lock(&policy->pol_nrs->nrs_lock);
494 case NRS_CTL_TBF_CHANGE_RULE:
495 rc = nrs_tbf_rule_change(policy, head, cmd);
497 case NRS_CTL_TBF_STOP_RULE:
498 rc = nrs_tbf_rule_stop(policy, head, cmd);
499 /* Take it as a success, if not exists at all */
500 return rc == -ENOENT ? 0 : rc;
507 * Binary heap predicate.
509 * \param[in] e1 the first binheap node to compare
510 * \param[in] e2 the second binheap node to compare
516 tbf_cli_compare(struct binheap_node *e1, struct binheap_node *e2)
518 struct nrs_tbf_client *cli1;
519 struct nrs_tbf_client *cli2;
521 cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
522 cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
524 if (cli1->tc_deadline < cli2->tc_deadline)
526 else if (cli1->tc_deadline > cli2->tc_deadline)
529 if (cli1->tc_check_time < cli2->tc_check_time)
531 else if (cli1->tc_check_time > cli2->tc_check_time)
534 /* Maybe need more comparasion, e.g. request number in the rules */
539 * TBF binary heap operations
541 static struct binheap_ops nrs_tbf_heap_ops = {
544 .hop_compare = tbf_cli_compare,
547 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
550 return cfs_hash_djb2_hash(key, strlen(key), mask);
553 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
555 struct nrs_tbf_client *cli = hlist_entry(hnode,
556 struct nrs_tbf_client,
559 return (strcmp(cli->tc_jobid, key) == 0);
562 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
564 struct nrs_tbf_client *cli = hlist_entry(hnode,
565 struct nrs_tbf_client,
568 return cli->tc_jobid;
571 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
573 return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
576 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
578 struct nrs_tbf_client *cli = hlist_entry(hnode,
579 struct nrs_tbf_client,
582 atomic_inc(&cli->tc_ref);
585 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
587 struct nrs_tbf_client *cli = hlist_entry(hnode,
588 struct nrs_tbf_client,
591 atomic_dec(&cli->tc_ref);
595 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
598 struct nrs_tbf_client *cli = hlist_entry(hnode,
599 struct nrs_tbf_client,
602 LASSERT(atomic_read(&cli->tc_ref) == 0);
603 nrs_tbf_cli_fini(cli);
606 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
607 .hs_hash = nrs_tbf_jobid_hop_hash,
608 .hs_keycmp = nrs_tbf_jobid_hop_keycmp,
609 .hs_key = nrs_tbf_jobid_hop_key,
610 .hs_object = nrs_tbf_hop_object,
611 .hs_get = nrs_tbf_jobid_hop_get,
612 .hs_put = nrs_tbf_jobid_hop_put,
613 .hs_put_locked = nrs_tbf_jobid_hop_put,
614 .hs_exit = nrs_tbf_jobid_hop_exit,
617 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
618 CFS_HASH_NO_ITEMREF | \
621 static struct nrs_tbf_client *
622 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
623 struct cfs_hash_bd *bd,
626 struct hlist_node *hnode;
627 struct nrs_tbf_client *cli;
629 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
633 cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
634 if (!list_empty(&cli->tc_lru))
635 list_del_init(&cli->tc_lru);
639 #define NRS_TBF_JOBID_NULL ""
641 static struct nrs_tbf_client *
642 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
643 struct ptlrpc_request *req)
646 struct nrs_tbf_client *cli;
647 struct cfs_hash *hs = head->th_cli_hash;
648 struct cfs_hash_bd bd;
650 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
652 jobid = NRS_TBF_JOBID_NULL;
653 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
654 cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
655 cfs_hash_bd_unlock(hs, &bd, 1);
660 static struct nrs_tbf_client *
661 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
662 struct nrs_tbf_client *cli)
665 struct nrs_tbf_client *ret;
666 struct cfs_hash *hs = head->th_cli_hash;
667 struct cfs_hash_bd bd;
669 jobid = cli->tc_jobid;
670 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
671 ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
673 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
676 cfs_hash_bd_unlock(hs, &bd, 1);
682 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
683 struct nrs_tbf_client *cli)
685 struct cfs_hash_bd bd;
686 struct cfs_hash *hs = head->th_cli_hash;
687 struct nrs_tbf_bucket *bkt;
691 cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
692 bkt = cfs_hash_bd_extra_get(hs, &bd);
693 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
695 LASSERT(list_empty(&cli->tc_lru));
696 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
699 * Check and purge the LRU, there is at least one client in the LRU.
701 hw = tbf_jobid_cache_size >>
702 (hs->hs_cur_bits - hs->hs_bkt_bits);
703 while (cfs_hash_bd_count_get(&bd) > hw) {
704 if (unlikely(list_empty(&bkt->ntb_lru)))
706 cli = list_first_entry(&bkt->ntb_lru,
707 struct nrs_tbf_client,
709 LASSERT(atomic_read(&cli->tc_ref) == 0);
710 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
711 list_move(&cli->tc_lru, &zombies);
713 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
715 while (!list_empty(&zombies)) {
716 cli = container_of(zombies.next,
717 struct nrs_tbf_client, tc_lru);
718 list_del_init(&cli->tc_lru);
719 nrs_tbf_cli_fini(cli);
724 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
725 struct ptlrpc_request *req)
727 char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
730 jobid = NRS_TBF_JOBID_NULL;
731 LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
732 INIT_LIST_HEAD(&cli->tc_lru);
733 memcpy(cli->tc_jobid, jobid, strlen(jobid));
736 static int nrs_tbf_jobid_hash_order(void)
740 for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
746 #define NRS_TBF_JOBID_BKT_BITS 10
749 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
750 struct nrs_tbf_head *head)
752 struct nrs_tbf_cmd start;
753 struct nrs_tbf_bucket *bkt;
757 struct cfs_hash_bd bd;
759 bits = nrs_tbf_jobid_hash_order();
760 if (bits < NRS_TBF_JOBID_BKT_BITS)
761 bits = NRS_TBF_JOBID_BKT_BITS;
762 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
765 NRS_TBF_JOBID_BKT_BITS,
769 &nrs_tbf_jobid_hash_ops,
770 NRS_TBF_JOBID_HASH_FLAGS);
771 if (head->th_cli_hash == NULL)
774 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
775 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
776 INIT_LIST_HEAD(&bkt->ntb_lru);
779 memset(&start, 0, sizeof(start));
780 start.u.tc_start.ts_jobids_str = "*";
782 start.u.tc_start.ts_rpc_rate = tbf_rate;
783 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
784 start.tc_name = NRS_TBF_DEFAULT_RULE;
785 INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
786 rc = nrs_tbf_rule_start(policy, head, &start);
788 cfs_hash_putref(head->th_cli_hash);
789 head->th_cli_hash = NULL;
796 * Frees jobid of \a list.
800 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
802 struct nrs_tbf_jobid *jobid, *n;
804 list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
805 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
806 list_del(&jobid->tj_linkage);
812 nrs_tbf_jobid_list_add(char *id, struct list_head *jobid_list)
814 struct nrs_tbf_jobid *jobid;
817 OBD_ALLOC_PTR(jobid);
821 OBD_ALLOC(jobid->tj_id, strlen(id) + 1);
822 if (jobid->tj_id == NULL) {
827 strcpy(jobid->tj_id, id);
828 ptr = strchr(id, '*');
830 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
832 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
834 list_add_tail(&jobid->tj_linkage, jobid_list);
839 cfs_match_wildcard(const char *pattern, const char *content)
841 if (*pattern == '\0' && *content == '\0')
844 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
847 while (*pattern == *content) {
850 if (*pattern == '\0' && *content == '\0')
853 if (*pattern == '*' && *(pattern + 1) != '\0' &&
859 return (cfs_match_wildcard(pattern + 1, content) ||
860 cfs_match_wildcard(pattern, content + 1));
866 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
868 if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
869 return strcmp(jobid->tj_id, id) == 0;
871 if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
872 return cfs_match_wildcard(jobid->tj_id, id);
878 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
880 struct nrs_tbf_jobid *jobid;
882 list_for_each_entry(jobid, jobid_list, tj_linkage) {
883 if (nrs_tbf_jobid_match(jobid, id))
890 nrs_tbf_jobid_list_parse(char *orig, struct list_head *jobid_list)
896 copy = kstrdup(orig, GFP_KERNEL);
900 INIT_LIST_HEAD(jobid_list);
901 while (str && rc == 0) {
902 char *tok = strsep(&str, " ");
905 rc = nrs_tbf_jobid_list_add(tok, jobid_list);
907 if (list_empty(jobid_list))
910 nrs_tbf_jobid_list_free(jobid_list);
915 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
917 if (!list_empty(&cmd->u.tc_start.ts_jobids))
918 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
919 if (cmd->u.tc_start.ts_jobids_str)
920 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
921 strlen(cmd->u.tc_start.ts_jobids_str) + 1);
924 static int nrs_tbf_check_id_value(char **strp, char *key)
930 tok = strim(strsep(&str, "="));
932 /* No LHS or no '=' */
936 if (strcmp(tok, key) != 0 ||
937 str[0] != '{' || str[len-1] != '}')
938 /* Wrong key, or RHS missing {} */
941 /* Skip '{' and '}' */
948 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
952 rc = nrs_tbf_check_id_value(&id, "jobid");
956 OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, strlen(id) + 1);
957 if (cmd->u.tc_start.ts_jobids_str == NULL)
960 strcpy(cmd->u.tc_start.ts_jobids_str, id);
962 /* parse jobid list */
963 rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
964 &cmd->u.tc_start.ts_jobids);
966 nrs_tbf_jobid_cmd_fini(cmd);
971 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
972 struct nrs_tbf_rule *rule,
973 struct nrs_tbf_cmd *start)
977 LASSERT(start->u.tc_start.ts_jobids_str);
978 OBD_ALLOC(rule->tr_jobids_str,
979 strlen(start->u.tc_start.ts_jobids_str) + 1);
980 if (rule->tr_jobids_str == NULL)
983 memcpy(rule->tr_jobids_str,
984 start->u.tc_start.ts_jobids_str,
985 strlen(start->u.tc_start.ts_jobids_str));
987 INIT_LIST_HEAD(&rule->tr_jobids);
988 if (!list_empty(&start->u.tc_start.ts_jobids)) {
989 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
992 CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
995 OBD_FREE(rule->tr_jobids_str,
996 strlen(start->u.tc_start.ts_jobids_str) + 1);
1001 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1003 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1004 rule->tr_jobids_str, rule->tr_rpc_rate,
1005 atomic_read(&rule->tr_ref) - 1);
1010 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1011 struct nrs_tbf_client *cli)
1013 return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1016 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1018 if (!list_empty(&rule->tr_jobids))
1019 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1020 LASSERT(rule->tr_jobids_str != NULL);
1021 OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1024 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1025 .o_name = NRS_TBF_TYPE_JOBID,
1026 .o_startup = nrs_tbf_jobid_startup,
1027 .o_cli_find = nrs_tbf_jobid_cli_find,
1028 .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1029 .o_cli_put = nrs_tbf_jobid_cli_put,
1030 .o_cli_init = nrs_tbf_jobid_cli_init,
1031 .o_rule_init = nrs_tbf_jobid_rule_init,
1032 .o_rule_dump = nrs_tbf_jobid_rule_dump,
1033 .o_rule_match = nrs_tbf_jobid_rule_match,
1034 .o_rule_fini = nrs_tbf_jobid_rule_fini,
1038 * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1040 * This uses ptlrpc_request::rq_peer.nid (as nid4) as its key, in order to hash
1041 * nrs_tbf_client objects.
1043 #define NRS_TBF_NID_BKT_BITS 8
1044 #define NRS_TBF_NID_BITS 16
1046 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1049 return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1052 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1054 const struct lnet_nid *nid = key;
1055 struct nrs_tbf_client *cli = hlist_entry(hnode,
1056 struct nrs_tbf_client,
1059 return nid_same(nid, &cli->tc_nid);
1062 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1064 struct nrs_tbf_client *cli = hlist_entry(hnode,
1065 struct nrs_tbf_client,
1068 return &cli->tc_nid;
1071 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1073 struct nrs_tbf_client *cli = hlist_entry(hnode,
1074 struct nrs_tbf_client,
1077 atomic_inc(&cli->tc_ref);
1080 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1082 struct nrs_tbf_client *cli = hlist_entry(hnode,
1083 struct nrs_tbf_client,
1086 atomic_dec(&cli->tc_ref);
1089 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1091 struct nrs_tbf_client *cli = hlist_entry(hnode,
1092 struct nrs_tbf_client,
1095 LASSERTF(atomic_read(&cli->tc_ref) == 0,
1096 "Busy TBF object from client with NID %s, with %d refs\n",
1097 libcfs_nidstr(&cli->tc_nid), atomic_read(&cli->tc_ref));
1099 nrs_tbf_cli_fini(cli);
1102 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1103 .hs_hash = nrs_tbf_nid_hop_hash,
1104 .hs_keycmp = nrs_tbf_nid_hop_keycmp,
1105 .hs_key = nrs_tbf_nid_hop_key,
1106 .hs_object = nrs_tbf_hop_object,
1107 .hs_get = nrs_tbf_nid_hop_get,
1108 .hs_put = nrs_tbf_nid_hop_put,
1109 .hs_put_locked = nrs_tbf_nid_hop_put,
1110 .hs_exit = nrs_tbf_nid_hop_exit,
1113 static struct nrs_tbf_client *
1114 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1115 struct ptlrpc_request *req)
1117 lnet_nid_t nid4 = lnet_nid_to_nid4(&req->rq_peer.nid);
1119 return cfs_hash_lookup(head->th_cli_hash, &nid4);
1122 static struct nrs_tbf_client *
1123 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1124 struct nrs_tbf_client *cli)
1126 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1131 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1132 struct nrs_tbf_client *cli)
1134 cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1138 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1139 struct nrs_tbf_head *head)
1141 struct nrs_tbf_cmd start;
1144 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1147 NRS_TBF_NID_BKT_BITS, 0,
1150 &nrs_tbf_nid_hash_ops,
1151 CFS_HASH_RW_BKTLOCK);
1152 if (head->th_cli_hash == NULL)
1155 memset(&start, 0, sizeof(start));
1156 start.u.tc_start.ts_nids_str = "*";
1158 start.u.tc_start.ts_rpc_rate = tbf_rate;
1159 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1160 start.tc_name = NRS_TBF_DEFAULT_RULE;
1161 INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1162 rc = nrs_tbf_rule_start(policy, head, &start);
1164 cfs_hash_putref(head->th_cli_hash);
1165 head->th_cli_hash = NULL;
1172 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1173 struct ptlrpc_request *req)
1175 cli->tc_nid = req->rq_peer.nid;
1178 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1179 struct nrs_tbf_rule *rule,
1180 struct nrs_tbf_cmd *start)
1182 LASSERT(start->u.tc_start.ts_nids_str);
1183 OBD_ALLOC(rule->tr_nids_str,
1184 strlen(start->u.tc_start.ts_nids_str) + 1);
1185 if (rule->tr_nids_str == NULL)
1188 memcpy(rule->tr_nids_str,
1189 start->u.tc_start.ts_nids_str,
1190 strlen(start->u.tc_start.ts_nids_str));
1192 INIT_LIST_HEAD(&rule->tr_nids);
1193 if (!list_empty(&start->u.tc_start.ts_nids)) {
1194 if (cfs_parse_nidlist(rule->tr_nids_str,
1195 &rule->tr_nids) < 0) {
1196 CERROR("nids {%s} illegal\n",
1198 OBD_FREE(rule->tr_nids_str,
1199 strlen(start->u.tc_start.ts_nids_str) + 1);
1207 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1209 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1210 rule->tr_nids_str, rule->tr_rpc_rate,
1211 atomic_read(&rule->tr_ref) - 1);
1216 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1217 struct nrs_tbf_client *cli)
1219 return cfs_match_nid(&cli->tc_nid, &rule->tr_nids);
1222 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1224 if (!list_empty(&rule->tr_nids))
1225 cfs_free_nidlist(&rule->tr_nids);
1226 LASSERT(rule->tr_nids_str != NULL);
1227 OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1230 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1232 if (!list_empty(&cmd->u.tc_start.ts_nids))
1233 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1234 if (cmd->u.tc_start.ts_nids_str)
1235 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1236 strlen(cmd->u.tc_start.ts_nids_str) + 1);
1239 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1243 rc = nrs_tbf_check_id_value(&id, "nid");
1247 OBD_ALLOC(cmd->u.tc_start.ts_nids_str, strlen(id) + 1);
1248 if (cmd->u.tc_start.ts_nids_str == NULL)
1251 strcpy(cmd->u.tc_start.ts_nids_str, id);
1253 /* parse NID list */
1254 if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1255 &cmd->u.tc_start.ts_nids) < 0) {
1256 nrs_tbf_nid_cmd_fini(cmd);
1263 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1264 .o_name = NRS_TBF_TYPE_NID,
1265 .o_startup = nrs_tbf_nid_startup,
1266 .o_cli_find = nrs_tbf_nid_cli_find,
1267 .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1268 .o_cli_put = nrs_tbf_nid_cli_put,
1269 .o_cli_init = nrs_tbf_nid_cli_init,
1270 .o_rule_init = nrs_tbf_nid_rule_init,
1271 .o_rule_dump = nrs_tbf_nid_rule_dump,
1272 .o_rule_match = nrs_tbf_nid_rule_match,
1273 .o_rule_fini = nrs_tbf_nid_rule_fini,
1276 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1279 return cfs_hash_djb2_hash(key, strlen(key), mask);
1282 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1284 struct nrs_tbf_client *cli = hlist_entry(hnode,
1285 struct nrs_tbf_client,
1288 return (strcmp(cli->tc_key, key) == 0);
1291 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1293 struct nrs_tbf_client *cli = hlist_entry(hnode,
1294 struct nrs_tbf_client,
1299 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1301 struct nrs_tbf_client *cli = hlist_entry(hnode,
1302 struct nrs_tbf_client,
1305 atomic_inc(&cli->tc_ref);
1308 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1310 struct nrs_tbf_client *cli = hlist_entry(hnode,
1311 struct nrs_tbf_client,
1314 atomic_dec(&cli->tc_ref);
1317 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1320 struct nrs_tbf_client *cli = hlist_entry(hnode,
1321 struct nrs_tbf_client,
1324 LASSERT(atomic_read(&cli->tc_ref) == 0);
1325 nrs_tbf_cli_fini(cli);
1328 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1329 .hs_hash = nrs_tbf_hop_hash,
1330 .hs_keycmp = nrs_tbf_hop_keycmp,
1331 .hs_key = nrs_tbf_hop_key,
1332 .hs_object = nrs_tbf_hop_object,
1333 .hs_get = nrs_tbf_hop_get,
1334 .hs_put = nrs_tbf_hop_put,
1335 .hs_put_locked = nrs_tbf_hop_put,
1336 .hs_exit = nrs_tbf_hop_exit,
1339 #define NRS_TBF_GENERIC_BKT_BITS 10
1340 #define NRS_TBF_GENERIC_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
1341 CFS_HASH_NO_ITEMREF | \
1345 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1347 struct nrs_tbf_cmd start;
1348 struct nrs_tbf_bucket *bkt;
1352 struct cfs_hash_bd bd;
1354 bits = nrs_tbf_jobid_hash_order();
1355 if (bits < NRS_TBF_GENERIC_BKT_BITS)
1356 bits = NRS_TBF_GENERIC_BKT_BITS;
1357 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1359 NRS_TBF_GENERIC_BKT_BITS,
1362 NRS_TBF_GENERIC_HASH_FLAGS);
1363 if (head->th_cli_hash == NULL)
1366 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1367 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1368 INIT_LIST_HEAD(&bkt->ntb_lru);
1371 memset(&start, 0, sizeof(start));
1372 start.u.tc_start.ts_conds_str = "*";
1374 start.u.tc_start.ts_rpc_rate = tbf_rate;
1375 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1376 start.tc_name = NRS_TBF_DEFAULT_RULE;
1377 INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1378 rc = nrs_tbf_rule_start(policy, head, &start);
1380 cfs_hash_putref(head->th_cli_hash);
1385 static struct nrs_tbf_client *
1386 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1389 struct hlist_node *hnode;
1390 struct nrs_tbf_client *cli;
1392 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1396 cli = container_of(hnode, struct nrs_tbf_client, tc_hnode);
1397 if (!list_empty(&cli->tc_lru))
1398 list_del_init(&cli->tc_lru);
1403 * ONLY opcode presented in this function will be checked in
1404 * nrs_tbf_id_cli_set(). That means, we can add or remove an
1405 * opcode to enable or disable requests handled in nrs_tbf
1407 static struct req_format *req_fmt(__u32 opcode)
1411 return &RQF_OST_GETATTR;
1413 return &RQF_OST_SETATTR;
1415 return &RQF_OST_BRW_READ;
1417 return &RQF_OST_BRW_WRITE;
1418 /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1419 * in most case. Should they be removed? */
1421 return &RQF_OST_CREATE;
1423 return &RQF_OST_DESTROY;
1425 return &RQF_OST_PUNCH;
1427 return &RQF_OST_SYNC;
1429 return &RQF_OST_LADVISE;
1431 return &RQF_MDS_GETATTR;
1432 case MDS_GETATTR_NAME:
1433 return &RQF_MDS_GETATTR_NAME;
1434 /* close is skipped to avoid LDLM cancel slowness */
1437 return &RQF_MDS_CLOSE;
1440 return &RQF_MDS_REINT;
1442 return &RQF_MDS_READPAGE;
1444 return &RQF_MDS_GET_ROOT;
1446 return &RQF_MDS_STATFS;
1448 return &RQF_MDS_SYNC;
1450 return &RQF_MDS_QUOTACTL;
1452 return &RQF_MDS_GETXATTR;
1454 return &RQF_MDS_GET_INFO;
1455 /* HSM op is skipped */
1457 case MDS_HSM_STATE_GET:
1458 return &RQF_MDS_HSM_STATE_GET;
1459 case MDS_HSM_STATE_SET:
1460 return &RQF_MDS_HSM_STATE_SET;
1461 case MDS_HSM_ACTION:
1462 return &RQF_MDS_HSM_ACTION;
1463 case MDS_HSM_CT_REGISTER:
1464 return &RQF_MDS_HSM_CT_REGISTER;
1465 case MDS_HSM_CT_UNREGISTER:
1466 return &RQF_MDS_HSM_CT_UNREGISTER;
1468 case MDS_SWAP_LAYOUTS:
1469 return &RQF_MDS_SWAP_LAYOUTS;
1471 return &RQF_LDLM_ENQUEUE;
1477 static struct req_format *intent_req_fmt(__u32 it_opc)
1479 if (it_opc & (IT_OPEN | IT_CREAT))
1480 return &RQF_LDLM_INTENT_OPEN;
1481 else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1482 return &RQF_LDLM_INTENT_GETATTR;
1483 else if (it_opc & IT_GETXATTR)
1484 return &RQF_LDLM_INTENT_GETXATTR;
1485 else if (it_opc & (IT_GLIMPSE | IT_BRW))
1486 return &RQF_LDLM_INTENT;
1491 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1494 struct ost_body *body;
1496 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1498 id->ti_uid = body->oa.o_uid;
1499 id->ti_gid = body->oa.o_gid;
1506 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1509 struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1513 /* TODO: nodemaping feature converts {ug}id from individual
1514 * clients to the actual ones of the file system. Some work
1515 * may be needed to fix this. */
1516 id->ti_uid = b->mbo_uid;
1517 id->ti_gid = b->mbo_gid;
1520 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1523 struct mdt_rec_reint *rec;
1525 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1526 LASSERT(rec != NULL);
1528 /* use the fs{ug}id as {ug}id of the process */
1529 id->ti_uid = rec->rr_fsuid;
1530 id->ti_gid = rec->rr_fsgid;
1533 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1536 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1541 case MDS_GETATTR_NAME:
1546 case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1547 unpack_ugid_from_mdt_body(req, id);
1551 unpack_ugid_from_mdt_rec_reint(req, id);
1560 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1563 struct ldlm_intent *lit;
1564 struct req_format *fmt;
1566 if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1569 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1570 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1574 fmt = intent_req_fmt(lit->opc);
1578 req_capsule_extend(&req->rq_pill, fmt);
1580 if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1581 unpack_ugid_from_mdt_body(req, id);
1582 else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1583 unpack_ugid_from_mdt_rec_reint(req, id);
1589 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1590 enum nrs_tbf_flag ti_type)
1593 struct req_format *fmt;
1594 const struct req_format *old_fmt;
1597 memset(id, 0, sizeof(struct tbf_id));
1598 id->ti_type = ti_type;
1600 rc = lustre_msg_get_uid_gid(req->rq_reqmsg, &id->ti_uid, &id->ti_gid);
1601 if (!rc && id->ti_uid != (u32) -1 && id->ti_gid != (u32) -1)
1604 /* client req doesn't have uid/gid pack in ptlrpc_body
1605 * --> fallback to the old method
1607 opc = lustre_msg_get_opc(req->rq_reqmsg);
1612 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1613 old_fmt = req->rq_pill.rc_fmt;
1614 if (old_fmt == NULL)
1615 req_capsule_set(&req->rq_pill, fmt);
1617 if (opc < OST_LAST_OPC)
1618 rc = ost_tbf_id_cli_set(req, id);
1619 else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1620 rc = mdt_tbf_id_cli_set(req, id);
1621 else if (opc == LDLM_ENQUEUE)
1622 rc = ldlm_tbf_id_cli_set(req, id);
1626 /* restore it to the original state */
1627 if (req->rq_pill.rc_fmt != old_fmt)
1628 req->rq_pill.rc_fmt = old_fmt;
1632 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1633 struct ptlrpc_request *req,
1634 char *keystr, size_t keystr_sz)
1637 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1640 nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1641 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1643 jobid = NRS_TBF_JOBID_NULL;
1645 snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1646 libcfs_nidstr(&req->rq_peer.nid), opc, id.ti_uid,
1650 INIT_LIST_HEAD(&cli->tc_lru);
1651 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1652 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1653 cli->tc_nid = req->rq_peer.nid;
1654 cli->tc_opcode = opc;
1659 static struct nrs_tbf_client *
1660 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1662 struct nrs_tbf_client *cli;
1663 struct cfs_hash *hs = head->th_cli_hash;
1664 struct cfs_hash_bd bd;
1665 char keystr[NRS_TBF_KEY_LEN];
1667 nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1668 cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1669 cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1670 cfs_hash_bd_unlock(hs, &bd, 1);
1675 static struct nrs_tbf_client *
1676 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1677 struct nrs_tbf_client *cli)
1680 struct nrs_tbf_client *ret;
1681 struct cfs_hash *hs = head->th_cli_hash;
1682 struct cfs_hash_bd bd;
1685 cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1686 ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1688 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1691 cfs_hash_bd_unlock(hs, &bd, 1);
1697 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1699 struct cfs_hash_bd bd;
1700 struct cfs_hash *hs = head->th_cli_hash;
1701 struct nrs_tbf_bucket *bkt;
1705 cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1706 bkt = cfs_hash_bd_extra_get(hs, &bd);
1707 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1709 LASSERT(list_empty(&cli->tc_lru));
1710 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1713 * Check and purge the LRU, there is at least one client in the LRU.
1715 hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1716 while (cfs_hash_bd_count_get(&bd) > hw) {
1717 if (unlikely(list_empty(&bkt->ntb_lru)))
1719 cli = list_first_entry(&bkt->ntb_lru,
1720 struct nrs_tbf_client,
1722 LASSERT(atomic_read(&cli->tc_ref) == 0);
1723 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1724 list_move(&cli->tc_lru, &zombies);
1726 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1728 while (!list_empty(&zombies)) {
1729 cli = container_of(zombies.next,
1730 struct nrs_tbf_client, tc_lru);
1731 list_del_init(&cli->tc_lru);
1732 nrs_tbf_cli_fini(cli);
1737 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1738 struct ptlrpc_request *req)
1740 char keystr[NRS_TBF_KEY_LEN];
1742 nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1746 nrs_tbf_id_list_free(struct list_head *uid_list)
1748 struct nrs_tbf_id *nti_id, *n;
1750 list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1751 list_del_init(&nti_id->nti_linkage);
1752 OBD_FREE_PTR(nti_id);
1757 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1759 LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1760 expr->te_field < NRS_TBF_FIELD_MAX);
1761 switch (expr->te_field) {
1762 case NRS_TBF_FIELD_NID:
1763 cfs_free_nidlist(&expr->te_cond);
1765 case NRS_TBF_FIELD_JOBID:
1766 nrs_tbf_jobid_list_free(&expr->te_cond);
1768 case NRS_TBF_FIELD_OPCODE:
1769 bitmap_free(expr->te_opcodes);
1771 case NRS_TBF_FIELD_UID:
1772 case NRS_TBF_FIELD_GID:
1773 nrs_tbf_id_list_free(&expr->te_cond);
1782 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1784 struct nrs_tbf_expression *expression;
1785 struct nrs_tbf_expression *n;
1787 LASSERT(list_empty(&conjunction->tc_linkage));
1788 list_for_each_entry_safe(expression, n,
1789 &conjunction->tc_expressions,
1791 list_del_init(&expression->te_linkage);
1792 nrs_tbf_expression_free(expression);
1794 OBD_FREE_PTR(conjunction);
1798 nrs_tbf_conds_free(struct list_head *cond_list)
1800 struct nrs_tbf_conjunction *conjunction;
1801 struct nrs_tbf_conjunction *n;
1803 list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1804 list_del_init(&conjunction->tc_linkage);
1805 nrs_tbf_conjunction_free(conjunction);
1810 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1812 if (!list_empty(&cmd->u.tc_start.ts_conds))
1813 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1814 if (cmd->u.tc_start.ts_conds_str)
1815 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1816 strlen(cmd->u.tc_start.ts_conds_str) + 1);
1819 #define NRS_TBF_DISJUNCTION_DELIM (",")
1820 #define NRS_TBF_CONJUNCTION_DELIM ("&")
1821 #define NRS_TBF_EXPRESSION_DELIM ("=")
1824 nrs_tbf_opcode_list_parse(char *str, unsigned long **bitmaptr);
1826 nrs_tbf_id_list_parse(char *str, struct list_head *id_list,
1827 enum nrs_tbf_flag tif);
1830 nrs_tbf_expression_parse(char *str, struct list_head *cond_list)
1832 struct nrs_tbf_expression *expr;
1837 OBD_ALLOC_PTR(expr);
1841 field = strim(strsep(&str, NRS_TBF_EXPRESSION_DELIM));
1842 if (!*field || !str)
1843 /* No LHS or no '=' sign */
1844 GOTO(out, rc = -EINVAL);
1847 if (len < 2 || str[0] != '{' || str[len-1] != '}')
1848 /* No {} around RHS */
1849 GOTO(out, rc = -EINVAL);
1851 /* Skip '{' and '}' */
1856 if (strcmp(field, "nid") == 0) {
1857 if (cfs_parse_nidlist(str, &expr->te_cond) < 0)
1858 GOTO(out, rc = -EINVAL);
1859 expr->te_field = NRS_TBF_FIELD_NID;
1860 } else if (strcmp(field, "jobid") == 0) {
1861 if (nrs_tbf_jobid_list_parse(str, &expr->te_cond) < 0)
1862 GOTO(out, rc = -EINVAL);
1863 expr->te_field = NRS_TBF_FIELD_JOBID;
1864 } else if (strcmp(field, "opcode") == 0) {
1865 if (nrs_tbf_opcode_list_parse(str, &expr->te_opcodes) < 0)
1866 GOTO(out, rc = -EINVAL);
1867 expr->te_field = NRS_TBF_FIELD_OPCODE;
1868 } else if (strcmp(field, "uid") == 0) {
1869 if (nrs_tbf_id_list_parse(str, &expr->te_cond,
1870 NRS_TBF_FLAG_UID) < 0)
1871 GOTO(out, rc = -EINVAL);
1872 expr->te_field = NRS_TBF_FIELD_UID;
1873 } else if (strcmp(field, "gid") == 0) {
1874 if (nrs_tbf_id_list_parse(str, &expr->te_cond,
1875 NRS_TBF_FLAG_GID) < 0)
1876 GOTO(out, rc = -EINVAL);
1877 expr->te_field = NRS_TBF_FIELD_GID;
1879 GOTO(out, rc = -EINVAL);
1882 list_add_tail(&expr->te_linkage, cond_list);
1890 nrs_tbf_conjunction_parse(char *str, struct list_head *cond_list)
1892 struct nrs_tbf_conjunction *conjunction;
1895 OBD_ALLOC_PTR(conjunction);
1896 if (conjunction == NULL)
1899 INIT_LIST_HEAD(&conjunction->tc_expressions);
1900 list_add_tail(&conjunction->tc_linkage, cond_list);
1902 while (str && !rc) {
1903 char *expr = strsep(&str, NRS_TBF_CONJUNCTION_DELIM);
1905 rc = nrs_tbf_expression_parse(expr,
1906 &conjunction->tc_expressions);
1912 nrs_tbf_conds_parse(char *orig, struct list_head *cond_list)
1917 orig = kstrdup(orig, GFP_KERNEL);
1922 INIT_LIST_HEAD(cond_list);
1923 while (str && !rc) {
1924 char *term = strsep(&str, NRS_TBF_DISJUNCTION_DELIM);
1926 rc = nrs_tbf_conjunction_parse(term, cond_list);
1934 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1938 OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1939 if (cmd->u.tc_start.ts_conds_str == NULL)
1942 memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1944 /* Parse hybird NID and JOBID conditions */
1945 rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1946 &cmd->u.tc_start.ts_conds);
1948 nrs_tbf_generic_cmd_fini(cmd);
1954 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1957 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1958 struct nrs_tbf_rule *rule,
1959 struct nrs_tbf_client *cli)
1961 switch (expr->te_field) {
1962 case NRS_TBF_FIELD_NID:
1963 return cfs_match_nid(&cli->tc_nid, &expr->te_cond);
1964 case NRS_TBF_FIELD_JOBID:
1965 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1966 case NRS_TBF_FIELD_OPCODE:
1967 return test_bit(cli->tc_opcode, expr->te_opcodes);
1968 case NRS_TBF_FIELD_UID:
1969 case NRS_TBF_FIELD_GID:
1970 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1977 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1978 struct nrs_tbf_rule *rule,
1979 struct nrs_tbf_client *cli)
1981 struct nrs_tbf_expression *expr;
1984 list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
1985 matched = nrs_tbf_expression_match(expr, rule, cli);
1994 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
1996 struct nrs_tbf_conjunction *conjunction;
1999 list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2000 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2009 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2011 if (!list_empty(&rule->tr_conds))
2012 nrs_tbf_conds_free(&rule->tr_conds);
2013 LASSERT(rule->tr_conds_str != NULL);
2014 OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2018 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2019 struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2023 LASSERT(start->u.tc_start.ts_conds_str);
2024 OBD_ALLOC(rule->tr_conds_str,
2025 strlen(start->u.tc_start.ts_conds_str) + 1);
2026 if (rule->tr_conds_str == NULL)
2029 memcpy(rule->tr_conds_str,
2030 start->u.tc_start.ts_conds_str,
2031 strlen(start->u.tc_start.ts_conds_str));
2033 INIT_LIST_HEAD(&rule->tr_conds);
2034 if (!list_empty(&start->u.tc_start.ts_conds)) {
2035 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2039 nrs_tbf_generic_rule_fini(rule);
2045 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2047 seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2048 rule->tr_conds_str, rule->tr_rpc_rate,
2049 atomic_read(&rule->tr_ref) - 1);
2054 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2055 struct nrs_tbf_client *cli)
2057 return nrs_tbf_cond_match(rule, cli);
2060 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2061 .o_name = NRS_TBF_TYPE_GENERIC,
2062 .o_startup = nrs_tbf_startup,
2063 .o_cli_find = nrs_tbf_cli_find,
2064 .o_cli_findadd = nrs_tbf_cli_findadd,
2065 .o_cli_put = nrs_tbf_cli_put,
2066 .o_cli_init = nrs_tbf_generic_cli_init,
2067 .o_rule_init = nrs_tbf_rule_init,
2068 .o_rule_dump = nrs_tbf_generic_rule_dump,
2069 .o_rule_match = nrs_tbf_generic_rule_match,
2070 .o_rule_fini = nrs_tbf_generic_rule_fini,
2073 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2075 if (rule->tr_opcodes)
2076 bitmap_free(rule->tr_opcodes);
2078 LASSERT(rule->tr_opcodes_str != NULL);
2079 OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2082 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2085 return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2088 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2090 const __u32 *opc = key;
2091 struct nrs_tbf_client *cli = hlist_entry(hnode,
2092 struct nrs_tbf_client,
2095 return *opc == cli->tc_opcode;
2098 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2100 struct nrs_tbf_client *cli = hlist_entry(hnode,
2101 struct nrs_tbf_client,
2104 return &cli->tc_opcode;
2107 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2108 struct hlist_node *hnode)
2110 struct nrs_tbf_client *cli = hlist_entry(hnode,
2111 struct nrs_tbf_client,
2114 atomic_inc(&cli->tc_ref);
2117 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2118 struct hlist_node *hnode)
2120 struct nrs_tbf_client *cli = hlist_entry(hnode,
2121 struct nrs_tbf_client,
2124 atomic_dec(&cli->tc_ref);
2127 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2128 struct hlist_node *hnode)
2130 struct nrs_tbf_client *cli = hlist_entry(hnode,
2131 struct nrs_tbf_client,
2134 LASSERTF(atomic_read(&cli->tc_ref) == 0,
2135 "Busy TBF object from client with opcode %s, with %d refs\n",
2136 ll_opcode2str(cli->tc_opcode),
2137 atomic_read(&cli->tc_ref));
2139 nrs_tbf_cli_fini(cli);
2141 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2142 .hs_hash = nrs_tbf_opcode_hop_hash,
2143 .hs_keycmp = nrs_tbf_opcode_hop_keycmp,
2144 .hs_key = nrs_tbf_opcode_hop_key,
2145 .hs_object = nrs_tbf_hop_object,
2146 .hs_get = nrs_tbf_opcode_hop_get,
2147 .hs_put = nrs_tbf_opcode_hop_put,
2148 .hs_put_locked = nrs_tbf_opcode_hop_put,
2149 .hs_exit = nrs_tbf_opcode_hop_exit,
2153 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2154 struct nrs_tbf_head *head)
2156 struct nrs_tbf_cmd start = { 0 };
2159 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2162 NRS_TBF_NID_BKT_BITS, 0,
2165 &nrs_tbf_opcode_hash_ops,
2166 CFS_HASH_RW_BKTLOCK);
2167 if (head->th_cli_hash == NULL)
2170 start.u.tc_start.ts_opcodes_str = "*";
2172 start.u.tc_start.ts_rpc_rate = tbf_rate;
2173 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2174 start.tc_name = NRS_TBF_DEFAULT_RULE;
2175 rc = nrs_tbf_rule_start(policy, head, &start);
2180 static struct nrs_tbf_client *
2181 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2182 struct ptlrpc_request *req)
2186 opc = lustre_msg_get_opc(req->rq_reqmsg);
2187 return cfs_hash_lookup(head->th_cli_hash, &opc);
2190 static struct nrs_tbf_client *
2191 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2192 struct nrs_tbf_client *cli)
2194 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2199 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2200 struct ptlrpc_request *req)
2202 cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2205 #define MAX_OPCODE_LEN 32
2207 nrs_tbf_opcode_set_bit(char *id, unsigned long *opcodes)
2211 op = ll_str2opcode(id);
2215 set_bit(op, opcodes);
2220 nrs_tbf_opcode_list_parse(char *orig, unsigned long **bitmaptr)
2222 unsigned long *opcodes;
2228 orig = kstrdup(orig, GFP_KERNEL);
2231 opcodes = bitmap_zalloc(LUSTRE_MAX_OPCODES, GFP_KERNEL);
2237 while (str && rc == 0) {
2238 char *tok = strsep(&str, " ");
2241 rc = nrs_tbf_opcode_set_bit(tok, opcodes);
2249 if (rc == 0 && bitmaptr)
2250 *bitmaptr = opcodes;
2252 bitmap_free(opcodes);
2257 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2259 if (cmd->u.tc_start.ts_opcodes_str)
2260 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2261 strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2265 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2269 rc = nrs_tbf_check_id_value(&id, "opcode");
2273 OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, strlen(id) + 1);
2274 if (cmd->u.tc_start.ts_opcodes_str == NULL)
2277 strcpy(cmd->u.tc_start.ts_opcodes_str, id);
2279 /* parse opcode list */
2280 rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str, NULL);
2282 nrs_tbf_opcode_cmd_fini(cmd);
2288 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2289 struct nrs_tbf_client *cli)
2291 if (rule->tr_opcodes == NULL)
2294 return test_bit(cli->tc_opcode, rule->tr_opcodes);
2297 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2298 struct nrs_tbf_rule *rule,
2299 struct nrs_tbf_cmd *start)
2303 LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2304 OBD_ALLOC(rule->tr_opcodes_str,
2305 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2306 if (rule->tr_opcodes_str == NULL)
2309 strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2310 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2312 /* Default rule '*' */
2313 if (strcmp(start->u.tc_start.ts_opcodes_str, "*") == 0)
2316 rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2319 OBD_FREE(rule->tr_opcodes_str,
2320 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2326 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2328 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2329 rule->tr_opcodes_str, rule->tr_rpc_rate,
2330 atomic_read(&rule->tr_ref) - 1);
2335 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2336 .o_name = NRS_TBF_TYPE_OPCODE,
2337 .o_startup = nrs_tbf_opcode_startup,
2338 .o_cli_find = nrs_tbf_opcode_cli_find,
2339 .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2340 .o_cli_put = nrs_tbf_nid_cli_put,
2341 .o_cli_init = nrs_tbf_opcode_cli_init,
2342 .o_rule_init = nrs_tbf_opcode_rule_init,
2343 .o_rule_dump = nrs_tbf_opcode_rule_dump,
2344 .o_rule_match = nrs_tbf_opcode_rule_match,
2345 .o_rule_fini = nrs_tbf_opcode_rule_fini,
2348 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2351 return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2354 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2356 const struct tbf_id *opc = key;
2357 enum nrs_tbf_flag ntf;
2358 struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2360 ntf = opc->ti_type & cli->tc_id.ti_type;
2361 if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2364 if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2370 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2372 struct nrs_tbf_client *cli = hlist_entry(hnode,
2373 struct nrs_tbf_client,
2378 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2380 struct nrs_tbf_client *cli = hlist_entry(hnode,
2381 struct nrs_tbf_client,
2384 atomic_inc(&cli->tc_ref);
2387 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2389 struct nrs_tbf_client *cli = hlist_entry(hnode,
2390 struct nrs_tbf_client,
2393 atomic_dec(&cli->tc_ref);
2397 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2400 struct nrs_tbf_client *cli = hlist_entry(hnode,
2401 struct nrs_tbf_client,
2404 LASSERT(atomic_read(&cli->tc_ref) == 0);
2405 nrs_tbf_cli_fini(cli);
2408 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2409 .hs_hash = nrs_tbf_id_hop_hash,
2410 .hs_keycmp = nrs_tbf_id_hop_keycmp,
2411 .hs_key = nrs_tbf_id_hop_key,
2412 .hs_object = nrs_tbf_hop_object,
2413 .hs_get = nrs_tbf_id_hop_get,
2414 .hs_put = nrs_tbf_id_hop_put,
2415 .hs_put_locked = nrs_tbf_id_hop_put,
2416 .hs_exit = nrs_tbf_id_hop_exit,
2420 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2421 struct nrs_tbf_head *head)
2423 struct nrs_tbf_cmd start;
2426 head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2429 NRS_TBF_NID_BKT_BITS, 0,
2432 &nrs_tbf_id_hash_ops,
2433 CFS_HASH_RW_BKTLOCK);
2434 if (head->th_cli_hash == NULL)
2437 memset(&start, 0, sizeof(start));
2438 start.u.tc_start.ts_ids_str = "*";
2439 start.u.tc_start.ts_rpc_rate = tbf_rate;
2440 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2441 start.tc_name = NRS_TBF_DEFAULT_RULE;
2442 INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2443 rc = nrs_tbf_rule_start(policy, head, &start);
2445 cfs_hash_putref(head->th_cli_hash);
2446 head->th_cli_hash = NULL;
2452 static struct nrs_tbf_client *
2453 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2454 struct ptlrpc_request *req)
2458 LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2459 head->th_type_flag == NRS_TBF_FLAG_GID);
2461 nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2462 return cfs_hash_lookup(head->th_cli_hash, &id);
2465 static struct nrs_tbf_client *
2466 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2467 struct nrs_tbf_client *cli)
2469 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2474 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2475 struct ptlrpc_request *req)
2477 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2481 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2482 struct ptlrpc_request *req)
2484 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2488 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2490 struct nrs_tbf_id *nti_id;
2491 enum nrs_tbf_flag flag;
2493 list_for_each_entry(nti_id, id_list, nti_linkage) {
2494 flag = id.ti_type & nti_id->nti_id.ti_type;
2498 if ((flag & NRS_TBF_FLAG_UID) &&
2499 (id.ti_uid != nti_id->nti_id.ti_uid))
2502 if ((flag & NRS_TBF_FLAG_GID) &&
2503 (id.ti_gid != nti_id->nti_id.ti_gid))
2512 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2513 struct nrs_tbf_client *cli)
2515 return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2518 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2520 nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2522 if (cmd->u.tc_start.ts_ids_str)
2523 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2524 strlen(cmd->u.tc_start.ts_ids_str) + 1);
2528 nrs_tbf_id_list_parse(char *orig, struct list_head *id_list,
2529 enum nrs_tbf_flag tif)
2534 struct tbf_id id = { 0 };
2537 if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2540 orig = kstrdup(orig, GFP_KERNEL);
2544 INIT_LIST_HEAD(id_list);
2545 for (str = orig; str ; ) {
2546 struct nrs_tbf_id *nti_id;
2549 tok = strsep(&str, " ");
2551 /* Empty token - leading, trailing, or
2552 * multiple spaces in list
2557 rc = kstrtoul(tok, 0, &val);
2559 GOTO(out, rc = -EINVAL);
2560 if (tif == NRS_TBF_FLAG_UID)
2565 OBD_ALLOC_PTR(nti_id);
2567 GOTO(out, rc = -ENOMEM);
2569 nti_id->nti_id = id;
2570 list_add_tail(&nti_id->nti_linkage, id_list);
2572 if (list_empty(id_list))
2573 /* Only white space in the list */
2574 GOTO(out, rc = -EINVAL);
2578 nrs_tbf_id_list_free(id_list);
2582 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2585 enum nrs_tbf_flag tif;
2587 tif = cmd->u.tc_start.ts_valid_type;
2589 rc = nrs_tbf_check_id_value(&id,
2590 tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2594 OBD_ALLOC(cmd->u.tc_start.ts_ids_str, strlen(id) + 1);
2595 if (cmd->u.tc_start.ts_ids_str == NULL)
2598 strcpy(cmd->u.tc_start.ts_ids_str, id);
2600 rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2601 &cmd->u.tc_start.ts_ids, tif);
2603 nrs_tbf_id_cmd_fini(cmd);
2609 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2610 struct nrs_tbf_rule *rule,
2611 struct nrs_tbf_cmd *start)
2613 struct nrs_tbf_head *head = rule->tr_head;
2615 enum nrs_tbf_flag tif = head->th_type_flag;
2616 int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2618 LASSERT(start->u.tc_start.ts_ids_str);
2619 INIT_LIST_HEAD(&rule->tr_ids);
2621 OBD_ALLOC(rule->tr_ids_str, ids_len);
2622 if (rule->tr_ids_str == NULL)
2625 strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2628 if (!list_empty(&start->u.tc_start.ts_ids)) {
2629 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2630 &rule->tr_ids, tif);
2632 CERROR("%ss {%s} illegal\n",
2633 tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2637 OBD_FREE(rule->tr_ids_str, ids_len);
2638 rule->tr_ids_str = NULL;
2644 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2646 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2647 rule->tr_ids_str, rule->tr_rpc_rate,
2648 atomic_read(&rule->tr_ref) - 1);
2652 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2654 nrs_tbf_id_list_free(&rule->tr_ids);
2655 if (rule->tr_ids_str != NULL)
2656 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2659 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2660 .o_name = NRS_TBF_TYPE_UID,
2661 .o_startup = nrs_tbf_id_startup,
2662 .o_cli_find = nrs_tbf_id_cli_find,
2663 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2664 .o_cli_put = nrs_tbf_nid_cli_put,
2665 .o_cli_init = nrs_tbf_uid_cli_init,
2666 .o_rule_init = nrs_tbf_id_rule_init,
2667 .o_rule_dump = nrs_tbf_id_rule_dump,
2668 .o_rule_match = nrs_tbf_id_rule_match,
2669 .o_rule_fini = nrs_tbf_id_rule_fini,
2672 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2673 .o_name = NRS_TBF_TYPE_GID,
2674 .o_startup = nrs_tbf_id_startup,
2675 .o_cli_find = nrs_tbf_id_cli_find,
2676 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2677 .o_cli_put = nrs_tbf_nid_cli_put,
2678 .o_cli_init = nrs_tbf_gid_cli_init,
2679 .o_rule_init = nrs_tbf_id_rule_init,
2680 .o_rule_dump = nrs_tbf_id_rule_dump,
2681 .o_rule_match = nrs_tbf_id_rule_match,
2682 .o_rule_fini = nrs_tbf_id_rule_fini,
2685 static struct nrs_tbf_type nrs_tbf_types[] = {
2687 .ntt_name = NRS_TBF_TYPE_JOBID,
2688 .ntt_flag = NRS_TBF_FLAG_JOBID,
2689 .ntt_ops = &nrs_tbf_jobid_ops,
2692 .ntt_name = NRS_TBF_TYPE_NID,
2693 .ntt_flag = NRS_TBF_FLAG_NID,
2694 .ntt_ops = &nrs_tbf_nid_ops,
2697 .ntt_name = NRS_TBF_TYPE_OPCODE,
2698 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2699 .ntt_ops = &nrs_tbf_opcode_ops,
2702 .ntt_name = NRS_TBF_TYPE_GENERIC,
2703 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2704 .ntt_ops = &nrs_tbf_generic_ops,
2707 .ntt_name = NRS_TBF_TYPE_UID,
2708 .ntt_flag = NRS_TBF_FLAG_UID,
2709 .ntt_ops = &nrs_tbf_uid_ops,
2712 .ntt_name = NRS_TBF_TYPE_GID,
2713 .ntt_flag = NRS_TBF_FLAG_GID,
2714 .ntt_ops = &nrs_tbf_gid_ops,
2719 * Is called before the policy transitions into
2720 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2721 * policy-specific private data structure.
2723 * \param[in] policy The policy to start
2725 * \retval -ENOMEM OOM error
2728 * \see nrs_policy_register()
2729 * \see nrs_policy_ctl()
2731 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2733 struct nrs_tbf_head *head;
2734 struct nrs_tbf_ops *ops;
2742 name = NRS_TBF_TYPE_GENERIC;
2743 else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2746 GOTO(out, rc = -EINVAL);
2748 for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2749 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2750 ops = nrs_tbf_types[i].ntt_ops;
2751 type = nrs_tbf_types[i].ntt_flag;
2757 GOTO(out, rc = -ENOTSUPP);
2759 OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2761 GOTO(out, rc = -ENOMEM);
2763 memcpy(head->th_type, name, strlen(name));
2764 head->th_type[strlen(name)] = '\0';
2766 head->th_type_flag = type;
2768 head->th_binheap = binheap_create(&nrs_tbf_heap_ops,
2769 CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2770 nrs_pol2cptab(policy),
2771 nrs_pol2cptid(policy));
2772 if (head->th_binheap == NULL)
2773 GOTO(out_free_head, rc = -ENOMEM);
2775 atomic_set(&head->th_rule_sequence, 0);
2776 spin_lock_init(&head->th_rule_lock);
2777 INIT_LIST_HEAD(&head->th_list);
2778 hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2779 head->th_timer.function = nrs_tbf_timer_cb;
2780 rc = head->th_ops->o_startup(policy, head);
2782 GOTO(out_free_heap, rc);
2784 policy->pol_private = head;
2787 binheap_destroy(head->th_binheap);
2795 * Is called before the policy transitions into
2796 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2797 * private data structure.
2799 * \param[in] policy The policy to stop
2801 * \see nrs_policy_stop0()
2803 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2805 struct nrs_tbf_head *head = policy->pol_private;
2806 struct ptlrpc_nrs *nrs = policy->pol_nrs;
2807 struct nrs_tbf_rule *rule, *n;
2809 LASSERT(head != NULL);
2810 LASSERT(head->th_cli_hash != NULL);
2811 hrtimer_cancel(&head->th_timer);
2812 /* Should cleanup hash first before free rules */
2813 cfs_hash_putref(head->th_cli_hash);
2814 list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2815 list_del_init(&rule->tr_linkage);
2816 nrs_tbf_rule_put(rule);
2818 LASSERT(list_empty(&head->th_list));
2819 LASSERT(head->th_binheap != NULL);
2820 LASSERT(binheap_is_empty(head->th_binheap));
2821 binheap_destroy(head->th_binheap);
2823 nrs->nrs_throttling = 0;
2824 wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2828 * Performs a policy-specific ctl function on TBF policy instances; similar
2831 * \param[in] policy the policy instance
2832 * \param[in] opc the opcode
2833 * \param[in,out] arg used for passing parameters and information
2835 * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2836 * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2838 * \retval 0 operation carried out successfully
2841 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2842 enum ptlrpc_nrs_ctl opc,
2848 assert_spin_locked(&policy->pol_nrs->nrs_lock);
2855 * Read RPC rate size of a policy instance.
2857 case NRS_CTL_TBF_RD_RULE: {
2858 struct nrs_tbf_head *head = policy->pol_private;
2859 struct seq_file *m = arg;
2860 struct ptlrpc_service_part *svcpt;
2862 svcpt = policy->pol_nrs->nrs_svcpt;
2863 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2865 rc = nrs_tbf_rule_dump_all(head, m);
2870 * Write RPC rate of a policy instance.
2872 case NRS_CTL_TBF_WR_RULE: {
2873 struct nrs_tbf_head *head = policy->pol_private;
2874 struct nrs_tbf_cmd *cmd;
2876 cmd = (struct nrs_tbf_cmd *)arg;
2877 rc = nrs_tbf_command(policy,
2883 * Read the TBF policy type of a policy instance.
2885 case NRS_CTL_TBF_RD_TYPE_FLAG: {
2886 struct nrs_tbf_head *head = policy->pol_private;
2888 *(__u32 *)arg = head->th_type_flag;
2897 * Is called for obtaining a TBF policy resource.
2899 * \param[in] policy The policy on which the request is being asked for
2900 * \param[in] nrq The request for which resources are being taken
2901 * \param[in] parent Parent resource, unused in this policy
2902 * \param[out] resp Resources references are placed in this array
2903 * \param[in] moving_req Signifies limited caller context; unused in this
2907 * \see nrs_resource_get_safe()
2909 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2910 struct ptlrpc_nrs_request *nrq,
2911 const struct ptlrpc_nrs_resource *parent,
2912 struct ptlrpc_nrs_resource **resp,
2915 struct nrs_tbf_head *head;
2916 struct nrs_tbf_client *cli;
2917 struct nrs_tbf_client *tmp;
2918 struct ptlrpc_request *req;
2920 if (parent == NULL) {
2921 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2925 head = container_of(parent, struct nrs_tbf_head, th_res);
2926 req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2927 cli = head->th_ops->o_cli_find(head, req);
2929 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2930 LASSERT(cli->tc_rule);
2931 if (cli->tc_rule_sequence !=
2932 atomic_read(&head->th_rule_sequence) ||
2933 cli->tc_rule->tr_flags & NTRS_STOPPING) {
2934 struct nrs_tbf_rule *rule;
2937 "TBF class@%p rate %llu sequence %d, "
2938 "rule flags %d, head sequence %d\n",
2939 cli, cli->tc_rpc_rate,
2940 cli->tc_rule_sequence,
2941 cli->tc_rule->tr_flags,
2942 atomic_read(&head->th_rule_sequence));
2943 rule = nrs_tbf_rule_match(head, cli);
2944 if (rule != cli->tc_rule) {
2945 nrs_tbf_cli_reset(head, rule, cli);
2947 if (cli->tc_rule_generation != rule->tr_generation)
2948 nrs_tbf_cli_reset_value(head, cli);
2949 nrs_tbf_rule_put(rule);
2951 } else if (cli->tc_rule_generation !=
2952 cli->tc_rule->tr_generation) {
2953 nrs_tbf_cli_reset_value(head, cli);
2955 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2959 OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2960 sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2964 nrs_tbf_cli_init(head, cli, req);
2965 tmp = head->th_ops->o_cli_findadd(head, cli);
2967 atomic_dec(&cli->tc_ref);
2968 nrs_tbf_cli_fini(cli);
2972 *resp = &cli->tc_res;
2978 * Called when releasing references to the resource hierachy obtained for a
2979 * request for scheduling using the TBF policy.
2981 * \param[in] policy the policy the resource belongs to
2982 * \param[in] res the resource to be released
2984 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
2985 const struct ptlrpc_nrs_resource *res)
2987 struct nrs_tbf_head *head;
2988 struct nrs_tbf_client *cli;
2991 * Do nothing for freeing parent, nrs_tbf_net resources
2993 if (res->res_parent == NULL)
2996 cli = container_of(res, struct nrs_tbf_client, tc_res);
2997 head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
2999 head->th_ops->o_cli_put(head, cli);
3003 * Called when getting a request from the TBF policy for handling, or just
3004 * peeking; removes the request from the policy when it is to be handled.
3006 * \param[in] policy The policy
3007 * \param[in] peek When set, signifies that we just want to examine the
3008 * request, and not handle it, so the request is not removed
3010 * \param[in] force Force the policy to return a request
3012 * \retval The request to be handled; this is the next request in the TBF
3015 * \see ptlrpc_nrs_req_get_nolock()
3016 * \see nrs_request_get()
3019 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3020 bool peek, bool force)
3022 struct nrs_tbf_head *head = policy->pol_private;
3023 struct ptlrpc_nrs_request *nrq = NULL;
3024 struct nrs_tbf_client *cli;
3025 struct binheap_node *node;
3027 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3029 if (likely(!peek && !force) && policy->pol_nrs->nrs_throttling)
3032 node = binheap_root(head->th_binheap);
3033 if (unlikely(node == NULL))
3036 cli = container_of(node, struct nrs_tbf_client, tc_node);
3037 LASSERT(cli->tc_in_heap);
3038 if (unlikely(peek)) {
3039 nrq = list_first_entry(&cli->tc_list,
3040 struct ptlrpc_nrs_request,
3043 struct nrs_tbf_rule *rule = cli->tc_rule;
3044 __u64 now = ktime_to_ns(ktime_get());
3048 __u64 old_resid = 0;
3050 deadline = cli->tc_check_time +
3052 LASSERT(now >= cli->tc_check_time);
3053 passed = now - cli->tc_check_time;
3054 ntoken = passed * cli->tc_rpc_rate;
3055 do_div(ntoken, NSEC_PER_SEC);
3057 ntoken += cli->tc_ntoken;
3058 if (rule->tr_flags & NTRS_REALTIME) {
3059 LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3060 old_resid = cli->tc_nsecs_resid;
3061 cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3062 if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3064 cli->tc_nsecs_resid -= cli->tc_nsecs;
3066 } else if (ntoken > cli->tc_depth)
3067 ntoken = cli->tc_depth;
3069 /* give an extra token with force mode */
3070 if (unlikely(force) && ntoken == 0)
3074 nrq = list_first_entry(&cli->tc_list,
3075 struct ptlrpc_nrs_request,
3078 cli->tc_ntoken = ntoken;
3079 cli->tc_check_time = now;
3080 list_del_init(&nrq->nr_u.tbf.tr_list);
3081 if (list_empty(&cli->tc_list)) {
3082 binheap_remove(head->th_binheap,
3084 cli->tc_in_heap = false;
3086 if (!(rule->tr_flags & NTRS_REALTIME))
3087 cli->tc_deadline = now + cli->tc_nsecs;
3088 binheap_relocate(head->th_binheap,
3092 "TBF dequeues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3093 cli, cli->tc_rpc_rate,
3094 cli->tc_rule_generation, cli->tc_ntoken,
3095 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3096 cli->tc_rule->tr_generation);
3100 if (rule->tr_flags & NTRS_REALTIME) {
3101 cli->tc_deadline = deadline;
3102 cli->tc_nsecs_resid = old_resid;
3103 binheap_relocate(head->th_binheap,
3105 if (node != binheap_root(head->th_binheap))
3106 return nrs_tbf_req_get(policy,
3109 policy->pol_nrs->nrs_throttling = 1;
3110 head->th_deadline = deadline;
3111 time = ktime_set(0, 0);
3112 time = ktime_add_ns(time, deadline);
3113 hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3121 * Adds request \a nrq to \a policy's list of queued requests
3123 * \param[in] policy The policy
3124 * \param[in] nrq The request to add
3126 * \retval 0 success; nrs_request_enqueue() assumes this function will always
3129 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3130 struct ptlrpc_nrs_request *nrq)
3132 struct nrs_tbf_head *head;
3133 struct nrs_tbf_client *cli;
3136 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3138 cli = container_of(nrs_request_resource(nrq),
3139 struct nrs_tbf_client, tc_res);
3140 head = container_of(nrs_request_resource(nrq)->res_parent,
3141 struct nrs_tbf_head, th_res);
3142 if (list_empty(&cli->tc_list)) {
3143 LASSERT(!cli->tc_in_heap);
3144 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3145 rc = binheap_insert(head->th_binheap, &cli->tc_node);
3147 cli->tc_in_heap = true;
3148 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3149 list_add_tail(&nrq->nr_u.tbf.tr_list,
3151 if (policy->pol_nrs->nrs_throttling) {
3152 __u64 deadline = cli->tc_deadline;
3153 if ((head->th_deadline > deadline) &&
3154 (hrtimer_try_to_cancel(&head->th_timer)
3157 head->th_deadline = deadline;
3158 time = ktime_set(0, 0);
3159 time = ktime_add_ns(time, deadline);
3160 hrtimer_start(&head->th_timer, time,
3166 LASSERT(cli->tc_in_heap);
3167 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3168 list_add_tail(&nrq->nr_u.tbf.tr_list,
3174 "TBF enqueues: class@%p rate %llu gen %llu token %llu, rule@%p rate %llu gen %llu\n",
3175 cli, cli->tc_rpc_rate,
3176 cli->tc_rule_generation, cli->tc_ntoken,
3177 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3178 cli->tc_rule->tr_generation);
3184 * Removes request \a nrq from \a policy's list of queued requests.
3186 * \param[in] policy The policy
3187 * \param[in] nrq The request to remove
3189 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3190 struct ptlrpc_nrs_request *nrq)
3192 struct nrs_tbf_head *head;
3193 struct nrs_tbf_client *cli;
3195 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3197 cli = container_of(nrs_request_resource(nrq),
3198 struct nrs_tbf_client, tc_res);
3199 head = container_of(nrs_request_resource(nrq)->res_parent,
3200 struct nrs_tbf_head, th_res);
3202 LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3203 list_del_init(&nrq->nr_u.tbf.tr_list);
3204 if (list_empty(&cli->tc_list)) {
3205 binheap_remove(head->th_binheap,
3207 cli->tc_in_heap = false;
3209 binheap_relocate(head->th_binheap,
3215 * Prints a debug statement right before the request \a nrq stops being
3218 * \param[in] policy The policy handling the request
3219 * \param[in] nrq The request being handled
3221 * \see ptlrpc_server_finish_request()
3222 * \see ptlrpc_nrs_req_stop_nolock()
3224 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3225 struct ptlrpc_nrs_request *nrq)
3227 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3230 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3232 CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3233 policy->pol_desc->pd_name, libcfs_idstr(&req->rq_peer),
3234 nrq->nr_u.tbf.tr_sequence);
3242 * The maximum RPC rate.
3244 #define LPROCFS_NRS_RATE_MAX 1000000ULL /* 1rpc/us */
3247 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3249 struct ptlrpc_service *svc = m->private;
3252 seq_printf(m, "regular_requests:\n");
3254 * Perform two separate calls to this as only one of the NRS heads'
3255 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3256 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3258 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3260 NRS_CTL_TBF_RD_RULE,
3264 * -ENOSPC means buf in the parameter m is overflow, return 0
3265 * here to let upper layer function seq_read alloc a larger
3266 * memory area and do this process again.
3268 } else if (rc == -ENOSPC) {
3272 * Ignore -ENODEV as the regular NRS head's policy may be in the
3273 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3275 } else if (rc != -ENODEV) {
3279 if (!nrs_svc_has_hp(svc))
3282 seq_printf(m, "high_priority_requests:\n");
3283 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3285 NRS_CTL_TBF_RD_RULE,
3289 * -ENOSPC means buf in the parameter m is overflow, return 0
3290 * here to let upper layer function seq_read alloc a larger
3291 * memory area and do this process again.
3293 } else if (rc == -ENOSPC) {
3302 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3307 switch (cmd->u.tc_start.ts_valid_type) {
3308 case NRS_TBF_FLAG_JOBID:
3309 rc = nrs_tbf_jobid_parse(cmd, token);
3311 case NRS_TBF_FLAG_NID:
3312 rc = nrs_tbf_nid_parse(cmd, token);
3314 case NRS_TBF_FLAG_OPCODE:
3315 rc = nrs_tbf_opcode_parse(cmd, token);
3317 case NRS_TBF_FLAG_GENERIC:
3318 rc = nrs_tbf_generic_parse(cmd, token);
3320 case NRS_TBF_FLAG_UID:
3321 case NRS_TBF_FLAG_GID:
3322 rc = nrs_tbf_ug_id_parse(cmd, token);
3331 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3333 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3334 switch (cmd->u.tc_start.ts_valid_type) {
3335 case NRS_TBF_FLAG_JOBID:
3336 nrs_tbf_jobid_cmd_fini(cmd);
3338 case NRS_TBF_FLAG_NID:
3339 nrs_tbf_nid_cmd_fini(cmd);
3341 case NRS_TBF_FLAG_OPCODE:
3342 nrs_tbf_opcode_cmd_fini(cmd);
3344 case NRS_TBF_FLAG_GENERIC:
3345 nrs_tbf_generic_cmd_fini(cmd);
3347 case NRS_TBF_FLAG_UID:
3348 case NRS_TBF_FLAG_GID:
3349 nrs_tbf_id_cmd_fini(cmd);
3352 CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3353 cmd->u.tc_start.ts_valid_type);
3358 static int check_rule_name(const char *name)
3362 if (name[0] == '\0')
3365 for (i = 0; name[i] != '\0' && i < MAX_TBF_NAME; i++) {
3366 if (!isalnum(name[i]) && name[i] != '_')
3370 if (i == MAX_TBF_NAME)
3371 return -ENAMETOOLONG;
3377 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3385 key = strsep(&val, "=");
3386 if (val == NULL || strlen(val) == 0)
3389 /* Key of the value pair */
3390 if (strcmp(key, "rate") == 0) {
3391 rc = kstrtoull(val, 10, &rate);
3395 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3398 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3399 cmd->u.tc_start.ts_rpc_rate = rate;
3400 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3401 cmd->u.tc_change.tc_rpc_rate = rate;
3404 } else if (strcmp(key, "rank") == 0) {
3405 rc = check_rule_name(val);
3409 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3410 cmd->u.tc_start.ts_next_name = val;
3411 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3412 cmd->u.tc_change.tc_next_name = val;
3415 } else if (strcmp(key, "realtime") == 0) {
3416 unsigned long realtime;
3418 rc = kstrtoul(val, 10, &realtime);
3423 cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3431 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3438 while (val != NULL && strlen(val) != 0) {
3439 token = strsep(&val, " ");
3440 rc = nrs_tbf_parse_value_pair(cmd, token);
3445 switch (cmd->tc_cmd) {
3446 case NRS_CTL_TBF_START_RULE:
3447 if (cmd->u.tc_start.ts_rpc_rate == 0)
3448 cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3450 case NRS_CTL_TBF_CHANGE_RULE:
3451 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3452 cmd->u.tc_change.tc_next_name == NULL)
3455 case NRS_CTL_TBF_STOP_RULE:
3463 static struct nrs_tbf_cmd *
3464 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3466 struct nrs_tbf_cmd *cmd;
3473 GOTO(out, rc = -ENOMEM);
3474 memset(cmd, 0, sizeof(*cmd));
3477 token = strsep(&val, " ");
3478 if (val == NULL || strlen(val) == 0)
3479 GOTO(out_free_cmd, rc = -EINVAL);
3481 /* Type of the command */
3482 if (strcmp(token, "start") == 0) {
3483 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3484 cmd->u.tc_start.ts_valid_type = type_flag;
3485 } else if (strcmp(token, "stop") == 0)
3486 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3487 else if (strcmp(token, "change") == 0)
3488 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3490 GOTO(out_free_cmd, rc = -EINVAL);
3492 /* Name of the rule */
3493 token = strsep(&val, " ");
3494 if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE))
3495 GOTO(out_free_cmd, rc = -EINVAL);
3497 rc = check_rule_name(token);
3499 GOTO(out_free_cmd, rc);
3501 cmd->tc_name = token;
3503 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3507 val = strrchr(token, '}');
3509 GOTO(out_free_cmd, rc = -EINVAL);
3515 } else if (*val == ' ') {
3519 GOTO(out_free_cmd, rc = -EINVAL);
3521 rc = nrs_tbf_id_parse(cmd, token);
3523 GOTO(out_free_cmd, rc);
3526 rc = nrs_tbf_parse_value_pairs(cmd, val);
3528 GOTO(out_cmd_fini, rc = -EINVAL);
3531 nrs_tbf_cmd_fini(cmd);
3541 * Get the TBF policy type (nid, jobid, etc) preset by
3542 * proc entry 'nrs_policies' for command buffer parsing.
3544 * \param[in] svc the PTLRPC service
3545 * \param[in] queue the NRS queue type
3547 * \retval the preset TBF policy type flag
3550 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3555 rc = ptlrpc_nrs_policy_control(svc, queue,
3557 NRS_CTL_TBF_RD_TYPE_FLAG,
3560 type = NRS_TBF_FLAG_INVALID;
3565 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3567 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3568 const char __user *buffer,
3569 size_t count, loff_t *off)
3571 struct seq_file *m = file->private_data;
3572 struct ptlrpc_service *svc = m->private;
3576 struct nrs_tbf_cmd *cmd;
3577 enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3578 unsigned long length;
3581 OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3582 if (kernbuf == NULL)
3583 GOTO(out, rc = -ENOMEM);
3585 if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3586 GOTO(out_free_kernbuff, rc = -EINVAL);
3588 if (copy_from_user(kernbuf, buffer, count))
3589 GOTO(out_free_kernbuff, rc = -EFAULT);
3592 token = strsep(&val, " ");
3594 GOTO(out_free_kernbuff, rc = -EINVAL);
3596 if (strcmp(token, "reg") == 0) {
3597 queue = PTLRPC_NRS_QUEUE_REG;
3598 } else if (strcmp(token, "hp") == 0) {
3599 queue = PTLRPC_NRS_QUEUE_HP;
3601 kernbuf[strlen(token)] = ' ';
3604 length = strlen(val);
3607 GOTO(out_free_kernbuff, rc = -EINVAL);
3609 if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3610 GOTO(out_free_kernbuff, rc = -ENODEV);
3611 else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3612 queue = PTLRPC_NRS_QUEUE_REG;
3614 cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3616 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3619 * Serialize NRS core lprocfs operations with policy registration/
3622 mutex_lock(&nrs_core.nrs_mutex);
3623 rc = ptlrpc_nrs_policy_control(svc, queue,
3625 NRS_CTL_TBF_WR_RULE,
3627 mutex_unlock(&nrs_core.nrs_mutex);
3629 nrs_tbf_cmd_fini(cmd);
3632 OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3634 return rc ? rc : count;
3637 LDEBUGFS_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3640 * Initializes a TBF policy's lprocfs interface for service \a svc
3642 * \param[in] svc the service
3645 * \retval != 0 error
3647 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3649 struct ldebugfs_vars nrs_tbf_lprocfs_vars[] = {
3650 { .name = "nrs_tbf_rule",
3651 .fops = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3656 if (!svc->srv_debugfs_entry)
3659 ldebugfs_add_vars(svc->srv_debugfs_entry, nrs_tbf_lprocfs_vars, NULL);
3665 * TBF policy operations
3667 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3668 .op_policy_start = nrs_tbf_start,
3669 .op_policy_stop = nrs_tbf_stop,
3670 .op_policy_ctl = nrs_tbf_ctl,
3671 .op_res_get = nrs_tbf_res_get,
3672 .op_res_put = nrs_tbf_res_put,
3673 .op_req_get = nrs_tbf_req_get,
3674 .op_req_enqueue = nrs_tbf_req_add,
3675 .op_req_dequeue = nrs_tbf_req_del,
3676 .op_req_stop = nrs_tbf_req_stop,
3677 .op_lprocfs_init = nrs_tbf_lprocfs_init,
3681 * TBF policy configuration
3683 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3684 .nc_name = NRS_POL_NAME_TBF,
3685 .nc_ops = &nrs_tbf_ops,
3686 .nc_compat = nrs_policy_compat_all,