4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License version 2 for more details. A copy is
14 * included in the COPYING file that accompanied this code.
16 * You should have received a copy of the GNU General Public License
17 * along with this program; if not, write to the Free Software
18 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
23 * Copyright (C) 2013 DataDirect Networks, Inc.
25 * Copyright (c) 2014, 2016, Intel Corporation.
28 * lustre/ptlrpc/nrs_tbf.c
30 * Network Request Scheduler (NRS) Token Bucket Filter(TBF) policy
34 #ifdef HAVE_SERVER_SUPPORT
41 #define DEBUG_SUBSYSTEM S_RPC
42 #include <obd_support.h>
43 #include <obd_class.h>
44 #include <libcfs/libcfs.h>
45 #include <lustre_req_layout.h>
46 #include "ptlrpc_internal.h"
51 * Token Bucket Filter over client NIDs
56 #define NRS_POL_NAME_TBF "tbf"
58 static int tbf_jobid_cache_size = 8192;
59 module_param(tbf_jobid_cache_size, int, 0644);
60 MODULE_PARM_DESC(tbf_jobid_cache_size, "The size of jobid cache");
62 static int tbf_rate = 10000;
63 module_param(tbf_rate, int, 0644);
64 MODULE_PARM_DESC(tbf_rate, "Default rate limit in RPCs/s");
66 static int tbf_depth = 3;
67 module_param(tbf_depth, int, 0644);
68 MODULE_PARM_DESC(tbf_depth, "How many tokens that a client can save up");
70 static enum hrtimer_restart nrs_tbf_timer_cb(struct hrtimer *timer)
72 struct nrs_tbf_head *head = container_of(timer, struct nrs_tbf_head,
74 struct ptlrpc_nrs *nrs = head->th_res.res_policy->pol_nrs;
75 struct ptlrpc_service_part *svcpt = nrs->nrs_svcpt;
77 nrs->nrs_throttling = 0;
78 wake_up(&svcpt->scp_waitq);
80 return HRTIMER_NORESTART;
83 #define NRS_TBF_DEFAULT_RULE "default"
85 static void nrs_tbf_rule_fini(struct nrs_tbf_rule *rule)
87 LASSERT(atomic_read(&rule->tr_ref) == 0);
88 LASSERT(list_empty(&rule->tr_cli_list));
89 LASSERT(list_empty(&rule->tr_linkage));
91 rule->tr_head->th_ops->o_rule_fini(rule);
96 * Decreases the rule's usage reference count, and stops the rule in case it
97 * was already stopping and have no more outstanding usage references (which
98 * indicates it has no more queued or started requests, and can be safely
101 static void nrs_tbf_rule_put(struct nrs_tbf_rule *rule)
103 if (atomic_dec_and_test(&rule->tr_ref))
104 nrs_tbf_rule_fini(rule);
108 * Increases the rule's usage reference count.
110 static inline void nrs_tbf_rule_get(struct nrs_tbf_rule *rule)
112 atomic_inc(&rule->tr_ref);
116 nrs_tbf_cli_rule_put(struct nrs_tbf_client *cli)
118 LASSERT(!list_empty(&cli->tc_linkage));
119 LASSERT(cli->tc_rule);
120 spin_lock(&cli->tc_rule->tr_rule_lock);
121 list_del_init(&cli->tc_linkage);
122 spin_unlock(&cli->tc_rule->tr_rule_lock);
123 nrs_tbf_rule_put(cli->tc_rule);
128 nrs_tbf_cli_reset_value(struct nrs_tbf_head *head,
129 struct nrs_tbf_client *cli)
132 struct nrs_tbf_rule *rule = cli->tc_rule;
134 cli->tc_rpc_rate = rule->tr_rpc_rate;
135 cli->tc_nsecs = rule->tr_nsecs;
136 cli->tc_depth = rule->tr_depth;
137 cli->tc_ntoken = rule->tr_depth;
138 cli->tc_check_time = ktime_to_ns(ktime_get());
139 cli->tc_rule_sequence = atomic_read(&head->th_rule_sequence);
140 cli->tc_rule_generation = rule->tr_generation;
143 cfs_binheap_relocate(head->th_binheap,
148 nrs_tbf_cli_reset(struct nrs_tbf_head *head,
149 struct nrs_tbf_rule *rule,
150 struct nrs_tbf_client *cli)
152 spin_lock(&cli->tc_rule_lock);
153 if (cli->tc_rule != NULL && !list_empty(&cli->tc_linkage)) {
154 LASSERT(rule != cli->tc_rule);
155 nrs_tbf_cli_rule_put(cli);
157 LASSERT(cli->tc_rule == NULL);
158 LASSERT(list_empty(&cli->tc_linkage));
159 /* Rule's ref is added before called */
161 spin_lock(&rule->tr_rule_lock);
162 list_add_tail(&cli->tc_linkage, &rule->tr_cli_list);
163 spin_unlock(&rule->tr_rule_lock);
164 spin_unlock(&cli->tc_rule_lock);
165 nrs_tbf_cli_reset_value(head, cli);
169 nrs_tbf_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
171 return rule->tr_head->th_ops->o_rule_dump(rule, m);
175 nrs_tbf_rule_dump_all(struct nrs_tbf_head *head, struct seq_file *m)
177 struct nrs_tbf_rule *rule;
180 LASSERT(head != NULL);
181 spin_lock(&head->th_rule_lock);
182 /* List the rules from newest to oldest */
183 list_for_each_entry(rule, &head->th_list, tr_linkage) {
184 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
185 rc = nrs_tbf_rule_dump(rule, m);
191 spin_unlock(&head->th_rule_lock);
196 static struct nrs_tbf_rule *
197 nrs_tbf_rule_find_nolock(struct nrs_tbf_head *head,
200 struct nrs_tbf_rule *rule;
202 LASSERT(head != NULL);
203 list_for_each_entry(rule, &head->th_list, tr_linkage) {
204 LASSERT((rule->tr_flags & NTRS_STOPPING) == 0);
205 if (strcmp(rule->tr_name, name) == 0) {
206 nrs_tbf_rule_get(rule);
213 static struct nrs_tbf_rule *
214 nrs_tbf_rule_find(struct nrs_tbf_head *head,
217 struct nrs_tbf_rule *rule;
219 LASSERT(head != NULL);
220 spin_lock(&head->th_rule_lock);
221 rule = nrs_tbf_rule_find_nolock(head, name);
222 spin_unlock(&head->th_rule_lock);
226 static struct nrs_tbf_rule *
227 nrs_tbf_rule_match(struct nrs_tbf_head *head,
228 struct nrs_tbf_client *cli)
230 struct nrs_tbf_rule *rule = NULL;
231 struct nrs_tbf_rule *tmp_rule;
233 spin_lock(&head->th_rule_lock);
234 /* Match the newest rule in the list */
235 list_for_each_entry(tmp_rule, &head->th_list, tr_linkage) {
236 LASSERT((tmp_rule->tr_flags & NTRS_STOPPING) == 0);
237 if (head->th_ops->o_rule_match(tmp_rule, cli)) {
244 rule = head->th_rule;
246 nrs_tbf_rule_get(rule);
247 spin_unlock(&head->th_rule_lock);
252 nrs_tbf_cli_init(struct nrs_tbf_head *head,
253 struct nrs_tbf_client *cli,
254 struct ptlrpc_request *req)
256 struct nrs_tbf_rule *rule;
258 memset(cli, 0, sizeof(*cli));
259 cli->tc_in_heap = false;
260 head->th_ops->o_cli_init(cli, req);
261 INIT_LIST_HEAD(&cli->tc_list);
262 INIT_LIST_HEAD(&cli->tc_linkage);
263 spin_lock_init(&cli->tc_rule_lock);
264 atomic_set(&cli->tc_ref, 1);
265 rule = nrs_tbf_rule_match(head, cli);
266 nrs_tbf_cli_reset(head, rule, cli);
270 nrs_tbf_cli_fini(struct nrs_tbf_client *cli)
272 LASSERT(list_empty(&cli->tc_list));
273 LASSERT(!cli->tc_in_heap);
274 LASSERT(atomic_read(&cli->tc_ref) == 0);
275 spin_lock(&cli->tc_rule_lock);
276 nrs_tbf_cli_rule_put(cli);
277 spin_unlock(&cli->tc_rule_lock);
282 nrs_tbf_rule_start(struct ptlrpc_nrs_policy *policy,
283 struct nrs_tbf_head *head,
284 struct nrs_tbf_cmd *start)
286 struct nrs_tbf_rule *rule;
287 struct nrs_tbf_rule *tmp_rule;
288 struct nrs_tbf_rule *next_rule;
289 char *next_name = start->u.tc_start.ts_next_name;
292 rule = nrs_tbf_rule_find(head, start->tc_name);
294 nrs_tbf_rule_put(rule);
298 OBD_CPT_ALLOC_PTR(rule, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
302 memcpy(rule->tr_name, start->tc_name, strlen(start->tc_name));
303 rule->tr_rpc_rate = start->u.tc_start.ts_rpc_rate;
304 rule->tr_flags = start->u.tc_start.ts_rule_flags;
305 rule->tr_nsecs = NSEC_PER_SEC;
306 do_div(rule->tr_nsecs, rule->tr_rpc_rate);
307 rule->tr_depth = tbf_depth;
308 atomic_set(&rule->tr_ref, 1);
309 INIT_LIST_HEAD(&rule->tr_cli_list);
310 INIT_LIST_HEAD(&rule->tr_nids);
311 INIT_LIST_HEAD(&rule->tr_linkage);
312 spin_lock_init(&rule->tr_rule_lock);
313 rule->tr_head = head;
315 rc = head->th_ops->o_rule_init(policy, rule, start);
321 /* Add as the newest rule */
322 spin_lock(&head->th_rule_lock);
323 tmp_rule = nrs_tbf_rule_find_nolock(head, start->tc_name);
325 spin_unlock(&head->th_rule_lock);
326 nrs_tbf_rule_put(tmp_rule);
327 nrs_tbf_rule_put(rule);
332 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
334 spin_unlock(&head->th_rule_lock);
335 nrs_tbf_rule_put(rule);
339 list_add(&rule->tr_linkage, next_rule->tr_linkage.prev);
340 nrs_tbf_rule_put(next_rule);
342 /* Add on the top of the rule list */
343 list_add(&rule->tr_linkage, &head->th_list);
345 spin_unlock(&head->th_rule_lock);
346 atomic_inc(&head->th_rule_sequence);
347 if (start->u.tc_start.ts_rule_flags & NTRS_DEFAULT) {
348 rule->tr_flags |= NTRS_DEFAULT;
349 LASSERT(head->th_rule == NULL);
350 head->th_rule = rule;
353 CDEBUG(D_RPCTRACE, "TBF starts rule@%p rate %llu gen %llu\n",
354 rule, rule->tr_rpc_rate, rule->tr_generation);
360 * Change the rank of a rule in the rule list
362 * The matched rule will be moved to the position right before another
365 * \param[in] policy the policy instance
366 * \param[in] head the TBF policy instance
367 * \param[in] name the rule name to be moved
368 * \param[in] next_name the rule name before which the matched rule will be
373 nrs_tbf_rule_change_rank(struct ptlrpc_nrs_policy *policy,
374 struct nrs_tbf_head *head,
378 struct nrs_tbf_rule *rule = NULL;
379 struct nrs_tbf_rule *next_rule = NULL;
382 LASSERT(head != NULL);
384 spin_lock(&head->th_rule_lock);
385 rule = nrs_tbf_rule_find_nolock(head, name);
387 GOTO(out, rc = -ENOENT);
389 if (strcmp(name, next_name) == 0)
392 next_rule = nrs_tbf_rule_find_nolock(head, next_name);
394 GOTO(out_put, rc = -ENOENT);
396 list_move(&rule->tr_linkage, next_rule->tr_linkage.prev);
397 nrs_tbf_rule_put(next_rule);
399 nrs_tbf_rule_put(rule);
401 spin_unlock(&head->th_rule_lock);
406 nrs_tbf_rule_change_rate(struct ptlrpc_nrs_policy *policy,
407 struct nrs_tbf_head *head,
411 struct nrs_tbf_rule *rule;
413 assert_spin_locked(&policy->pol_nrs->nrs_lock);
415 rule = nrs_tbf_rule_find(head, name);
419 rule->tr_rpc_rate = rate;
420 rule->tr_nsecs = NSEC_PER_SEC;
421 do_div(rule->tr_nsecs, rule->tr_rpc_rate);
422 rule->tr_generation++;
423 nrs_tbf_rule_put(rule);
429 nrs_tbf_rule_change(struct ptlrpc_nrs_policy *policy,
430 struct nrs_tbf_head *head,
431 struct nrs_tbf_cmd *change)
433 __u64 rate = change->u.tc_change.tc_rpc_rate;
434 char *next_name = change->u.tc_change.tc_next_name;
438 rc = nrs_tbf_rule_change_rate(policy, head, change->tc_name,
445 rc = nrs_tbf_rule_change_rank(policy, head, change->tc_name,
455 nrs_tbf_rule_stop(struct ptlrpc_nrs_policy *policy,
456 struct nrs_tbf_head *head,
457 struct nrs_tbf_cmd *stop)
459 struct nrs_tbf_rule *rule;
461 assert_spin_locked(&policy->pol_nrs->nrs_lock);
463 if (strcmp(stop->tc_name, NRS_TBF_DEFAULT_RULE) == 0)
466 rule = nrs_tbf_rule_find(head, stop->tc_name);
470 list_del_init(&rule->tr_linkage);
471 rule->tr_flags |= NTRS_STOPPING;
472 nrs_tbf_rule_put(rule);
473 nrs_tbf_rule_put(rule);
479 nrs_tbf_command(struct ptlrpc_nrs_policy *policy,
480 struct nrs_tbf_head *head,
481 struct nrs_tbf_cmd *cmd)
485 assert_spin_locked(&policy->pol_nrs->nrs_lock);
487 switch (cmd->tc_cmd) {
488 case NRS_CTL_TBF_START_RULE:
489 if (cmd->u.tc_start.ts_valid_type != head->th_type_flag)
492 spin_unlock(&policy->pol_nrs->nrs_lock);
493 rc = nrs_tbf_rule_start(policy, head, cmd);
494 spin_lock(&policy->pol_nrs->nrs_lock);
496 case NRS_CTL_TBF_CHANGE_RULE:
497 rc = nrs_tbf_rule_change(policy, head, cmd);
499 case NRS_CTL_TBF_STOP_RULE:
500 rc = nrs_tbf_rule_stop(policy, head, cmd);
501 /* Take it as a success, if not exists at all */
502 return rc == -ENOENT ? 0 : rc;
509 * Binary heap predicate.
511 * \param[in] e1 the first binheap node to compare
512 * \param[in] e2 the second binheap node to compare
518 tbf_cli_compare(struct cfs_binheap_node *e1, struct cfs_binheap_node *e2)
520 struct nrs_tbf_client *cli1;
521 struct nrs_tbf_client *cli2;
523 cli1 = container_of(e1, struct nrs_tbf_client, tc_node);
524 cli2 = container_of(e2, struct nrs_tbf_client, tc_node);
526 if (cli1->tc_deadline < cli2->tc_deadline)
528 else if (cli1->tc_deadline > cli2->tc_deadline)
531 if (cli1->tc_check_time < cli2->tc_check_time)
533 else if (cli1->tc_check_time > cli2->tc_check_time)
536 /* Maybe need more comparasion, e.g. request number in the rules */
541 * TBF binary heap operations
543 static struct cfs_binheap_ops nrs_tbf_heap_ops = {
546 .hop_compare = tbf_cli_compare,
549 static unsigned nrs_tbf_jobid_hop_hash(struct cfs_hash *hs, const void *key,
552 return cfs_hash_djb2_hash(key, strlen(key), mask);
555 static int nrs_tbf_jobid_hop_keycmp(const void *key, struct hlist_node *hnode)
557 struct nrs_tbf_client *cli = hlist_entry(hnode,
558 struct nrs_tbf_client,
561 return (strcmp(cli->tc_jobid, key) == 0);
564 static void *nrs_tbf_jobid_hop_key(struct hlist_node *hnode)
566 struct nrs_tbf_client *cli = hlist_entry(hnode,
567 struct nrs_tbf_client,
570 return cli->tc_jobid;
573 static void *nrs_tbf_hop_object(struct hlist_node *hnode)
575 return hlist_entry(hnode, struct nrs_tbf_client, tc_hnode);
578 static void nrs_tbf_jobid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
580 struct nrs_tbf_client *cli = hlist_entry(hnode,
581 struct nrs_tbf_client,
584 atomic_inc(&cli->tc_ref);
587 static void nrs_tbf_jobid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
589 struct nrs_tbf_client *cli = hlist_entry(hnode,
590 struct nrs_tbf_client,
593 atomic_dec(&cli->tc_ref);
597 nrs_tbf_jobid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
600 struct nrs_tbf_client *cli = hlist_entry(hnode,
601 struct nrs_tbf_client,
604 LASSERT(atomic_read(&cli->tc_ref) == 0);
605 nrs_tbf_cli_fini(cli);
608 static struct cfs_hash_ops nrs_tbf_jobid_hash_ops = {
609 .hs_hash = nrs_tbf_jobid_hop_hash,
610 .hs_keycmp = nrs_tbf_jobid_hop_keycmp,
611 .hs_key = nrs_tbf_jobid_hop_key,
612 .hs_object = nrs_tbf_hop_object,
613 .hs_get = nrs_tbf_jobid_hop_get,
614 .hs_put = nrs_tbf_jobid_hop_put,
615 .hs_put_locked = nrs_tbf_jobid_hop_put,
616 .hs_exit = nrs_tbf_jobid_hop_exit,
619 #define NRS_TBF_JOBID_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
620 CFS_HASH_NO_ITEMREF | \
623 static struct nrs_tbf_client *
624 nrs_tbf_jobid_hash_lookup(struct cfs_hash *hs,
625 struct cfs_hash_bd *bd,
628 struct hlist_node *hnode;
629 struct nrs_tbf_client *cli;
631 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)jobid);
635 cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
636 if (!list_empty(&cli->tc_lru))
637 list_del_init(&cli->tc_lru);
641 #define NRS_TBF_JOBID_NULL ""
643 static struct nrs_tbf_client *
644 nrs_tbf_jobid_cli_find(struct nrs_tbf_head *head,
645 struct ptlrpc_request *req)
648 struct nrs_tbf_client *cli;
649 struct cfs_hash *hs = head->th_cli_hash;
650 struct cfs_hash_bd bd;
652 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
654 jobid = NRS_TBF_JOBID_NULL;
655 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
656 cli = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
657 cfs_hash_bd_unlock(hs, &bd, 1);
662 static struct nrs_tbf_client *
663 nrs_tbf_jobid_cli_findadd(struct nrs_tbf_head *head,
664 struct nrs_tbf_client *cli)
667 struct nrs_tbf_client *ret;
668 struct cfs_hash *hs = head->th_cli_hash;
669 struct cfs_hash_bd bd;
671 jobid = cli->tc_jobid;
672 cfs_hash_bd_get_and_lock(hs, (void *)jobid, &bd, 1);
673 ret = nrs_tbf_jobid_hash_lookup(hs, &bd, jobid);
675 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
678 cfs_hash_bd_unlock(hs, &bd, 1);
684 nrs_tbf_jobid_cli_put(struct nrs_tbf_head *head,
685 struct nrs_tbf_client *cli)
687 struct cfs_hash_bd bd;
688 struct cfs_hash *hs = head->th_cli_hash;
689 struct nrs_tbf_bucket *bkt;
691 struct list_head zombies;
693 INIT_LIST_HEAD(&zombies);
694 cfs_hash_bd_get(hs, &cli->tc_jobid, &bd);
695 bkt = cfs_hash_bd_extra_get(hs, &bd);
696 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
698 LASSERT(list_empty(&cli->tc_lru));
699 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
702 * Check and purge the LRU, there is at least one client in the LRU.
704 hw = tbf_jobid_cache_size >>
705 (hs->hs_cur_bits - hs->hs_bkt_bits);
706 while (cfs_hash_bd_count_get(&bd) > hw) {
707 if (unlikely(list_empty(&bkt->ntb_lru)))
709 cli = list_entry(bkt->ntb_lru.next,
710 struct nrs_tbf_client,
712 LASSERT(atomic_read(&cli->tc_ref) == 0);
713 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
714 list_move(&cli->tc_lru, &zombies);
716 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
718 while (!list_empty(&zombies)) {
719 cli = container_of0(zombies.next,
720 struct nrs_tbf_client, tc_lru);
721 list_del_init(&cli->tc_lru);
722 nrs_tbf_cli_fini(cli);
727 nrs_tbf_jobid_cli_init(struct nrs_tbf_client *cli,
728 struct ptlrpc_request *req)
730 char *jobid = lustre_msg_get_jobid(req->rq_reqmsg);
733 jobid = NRS_TBF_JOBID_NULL;
734 LASSERT(strlen(jobid) < LUSTRE_JOBID_SIZE);
735 INIT_LIST_HEAD(&cli->tc_lru);
736 memcpy(cli->tc_jobid, jobid, strlen(jobid));
739 static int nrs_tbf_jobid_hash_order(void)
743 for (bits = 1; (1 << bits) < tbf_jobid_cache_size; ++bits)
749 #define NRS_TBF_JOBID_BKT_BITS 10
752 nrs_tbf_jobid_startup(struct ptlrpc_nrs_policy *policy,
753 struct nrs_tbf_head *head)
755 struct nrs_tbf_cmd start;
756 struct nrs_tbf_bucket *bkt;
760 struct cfs_hash_bd bd;
762 bits = nrs_tbf_jobid_hash_order();
763 if (bits < NRS_TBF_JOBID_BKT_BITS)
764 bits = NRS_TBF_JOBID_BKT_BITS;
765 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
768 NRS_TBF_JOBID_BKT_BITS,
772 &nrs_tbf_jobid_hash_ops,
773 NRS_TBF_JOBID_HASH_FLAGS);
774 if (head->th_cli_hash == NULL)
777 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
778 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
779 INIT_LIST_HEAD(&bkt->ntb_lru);
782 memset(&start, 0, sizeof(start));
783 start.u.tc_start.ts_jobids_str = "*";
785 start.u.tc_start.ts_rpc_rate = tbf_rate;
786 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
787 start.tc_name = NRS_TBF_DEFAULT_RULE;
788 INIT_LIST_HEAD(&start.u.tc_start.ts_jobids);
789 rc = nrs_tbf_rule_start(policy, head, &start);
791 cfs_hash_putref(head->th_cli_hash);
792 head->th_cli_hash = NULL;
799 * Frees jobid of \a list.
803 nrs_tbf_jobid_list_free(struct list_head *jobid_list)
805 struct nrs_tbf_jobid *jobid, *n;
807 list_for_each_entry_safe(jobid, n, jobid_list, tj_linkage) {
808 OBD_FREE(jobid->tj_id, strlen(jobid->tj_id) + 1);
809 list_del(&jobid->tj_linkage);
810 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
815 nrs_tbf_jobid_list_add(struct cfs_lstr *id, struct list_head *jobid_list)
817 struct nrs_tbf_jobid *jobid;
821 OBD_ALLOC(jobid, sizeof(struct nrs_tbf_jobid));
825 OBD_ALLOC(jobid->tj_id, id->ls_len + 1);
826 if (jobid->tj_id == NULL) {
827 OBD_FREE(jobid, sizeof(struct nrs_tbf_jobid));
831 memcpy(jobid->tj_id, id->ls_str, id->ls_len);
832 rc = cfs_gettok(id, '*', &res);
834 jobid->tj_match_flag = NRS_TBF_MATCH_FULL;
836 jobid->tj_match_flag = NRS_TBF_MATCH_WILDCARD;
838 list_add_tail(&jobid->tj_linkage, jobid_list);
843 cfs_match_wildcard(const char *pattern, const char *content)
845 if (*pattern == '\0' && *content == '\0')
848 if (*pattern == '*' && *(pattern + 1) != '\0' && *content == '\0')
851 while (*pattern == *content) {
854 if (*pattern == '\0' && *content == '\0')
857 if (*pattern == '*' && *(pattern + 1) != '\0' &&
863 return (cfs_match_wildcard(pattern + 1, content) ||
864 cfs_match_wildcard(pattern, content + 1));
870 nrs_tbf_jobid_match(const struct nrs_tbf_jobid *jobid, const char *id)
872 if (jobid->tj_match_flag == NRS_TBF_MATCH_FULL)
873 return strcmp(jobid->tj_id, id) == 0;
875 if (jobid->tj_match_flag == NRS_TBF_MATCH_WILDCARD)
876 return cfs_match_wildcard(jobid->tj_id, id);
882 nrs_tbf_jobid_list_match(struct list_head *jobid_list, char *id)
884 struct nrs_tbf_jobid *jobid;
886 list_for_each_entry(jobid, jobid_list, tj_linkage) {
887 if (nrs_tbf_jobid_match(jobid, id))
894 nrs_tbf_jobid_list_parse(char *str, int len, struct list_head *jobid_list)
903 INIT_LIST_HEAD(jobid_list);
905 rc = cfs_gettok(&src, ' ', &res);
910 rc = nrs_tbf_jobid_list_add(&res, jobid_list);
915 nrs_tbf_jobid_list_free(jobid_list);
919 static void nrs_tbf_jobid_cmd_fini(struct nrs_tbf_cmd *cmd)
921 if (!list_empty(&cmd->u.tc_start.ts_jobids))
922 nrs_tbf_jobid_list_free(&cmd->u.tc_start.ts_jobids);
923 if (cmd->u.tc_start.ts_jobids_str)
924 OBD_FREE(cmd->u.tc_start.ts_jobids_str,
925 strlen(cmd->u.tc_start.ts_jobids_str) + 1);
928 static int nrs_tbf_check_id_value(struct cfs_lstr *src, char *key)
931 int keylen = strlen(key);
934 rc = cfs_gettok(src, '=', &res);
935 if (rc == 0 || res.ls_len != keylen ||
936 strncmp(res.ls_str, key, keylen) != 0 ||
937 src->ls_len <= 2 || src->ls_str[0] != '{' ||
938 src->ls_str[src->ls_len - 1] != '}')
941 /* Skip '{' and '}' */
947 static int nrs_tbf_jobid_parse(struct nrs_tbf_cmd *cmd, char *id)
953 src.ls_len = strlen(id);
954 rc = nrs_tbf_check_id_value(&src, "jobid");
958 OBD_ALLOC(cmd->u.tc_start.ts_jobids_str, src.ls_len + 1);
959 if (cmd->u.tc_start.ts_jobids_str == NULL)
962 memcpy(cmd->u.tc_start.ts_jobids_str, src.ls_str, src.ls_len);
964 /* parse jobid list */
965 rc = nrs_tbf_jobid_list_parse(cmd->u.tc_start.ts_jobids_str,
966 strlen(cmd->u.tc_start.ts_jobids_str),
967 &cmd->u.tc_start.ts_jobids);
969 nrs_tbf_jobid_cmd_fini(cmd);
974 static int nrs_tbf_jobid_rule_init(struct ptlrpc_nrs_policy *policy,
975 struct nrs_tbf_rule *rule,
976 struct nrs_tbf_cmd *start)
980 LASSERT(start->u.tc_start.ts_jobids_str);
981 OBD_ALLOC(rule->tr_jobids_str,
982 strlen(start->u.tc_start.ts_jobids_str) + 1);
983 if (rule->tr_jobids_str == NULL)
986 memcpy(rule->tr_jobids_str,
987 start->u.tc_start.ts_jobids_str,
988 strlen(start->u.tc_start.ts_jobids_str));
990 INIT_LIST_HEAD(&rule->tr_jobids);
991 if (!list_empty(&start->u.tc_start.ts_jobids)) {
992 rc = nrs_tbf_jobid_list_parse(rule->tr_jobids_str,
993 strlen(rule->tr_jobids_str),
996 CERROR("jobids {%s} illegal\n", rule->tr_jobids_str);
999 OBD_FREE(rule->tr_jobids_str,
1000 strlen(start->u.tc_start.ts_jobids_str) + 1);
1005 nrs_tbf_jobid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1007 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1008 rule->tr_jobids_str, rule->tr_rpc_rate,
1009 atomic_read(&rule->tr_ref) - 1);
1014 nrs_tbf_jobid_rule_match(struct nrs_tbf_rule *rule,
1015 struct nrs_tbf_client *cli)
1017 return nrs_tbf_jobid_list_match(&rule->tr_jobids, cli->tc_jobid);
1020 static void nrs_tbf_jobid_rule_fini(struct nrs_tbf_rule *rule)
1022 if (!list_empty(&rule->tr_jobids))
1023 nrs_tbf_jobid_list_free(&rule->tr_jobids);
1024 LASSERT(rule->tr_jobids_str != NULL);
1025 OBD_FREE(rule->tr_jobids_str, strlen(rule->tr_jobids_str) + 1);
1028 static struct nrs_tbf_ops nrs_tbf_jobid_ops = {
1029 .o_name = NRS_TBF_TYPE_JOBID,
1030 .o_startup = nrs_tbf_jobid_startup,
1031 .o_cli_find = nrs_tbf_jobid_cli_find,
1032 .o_cli_findadd = nrs_tbf_jobid_cli_findadd,
1033 .o_cli_put = nrs_tbf_jobid_cli_put,
1034 .o_cli_init = nrs_tbf_jobid_cli_init,
1035 .o_rule_init = nrs_tbf_jobid_rule_init,
1036 .o_rule_dump = nrs_tbf_jobid_rule_dump,
1037 .o_rule_match = nrs_tbf_jobid_rule_match,
1038 .o_rule_fini = nrs_tbf_jobid_rule_fini,
1042 * libcfs_hash operations for nrs_tbf_net::cn_cli_hash
1044 * This uses ptlrpc_request::rq_peer.nid as its key, in order to hash
1045 * nrs_tbf_client objects.
1047 #define NRS_TBF_NID_BKT_BITS 8
1048 #define NRS_TBF_NID_BITS 16
1050 static unsigned nrs_tbf_nid_hop_hash(struct cfs_hash *hs, const void *key,
1053 return cfs_hash_djb2_hash(key, sizeof(lnet_nid_t), mask);
1056 static int nrs_tbf_nid_hop_keycmp(const void *key, struct hlist_node *hnode)
1058 lnet_nid_t *nid = (lnet_nid_t *)key;
1059 struct nrs_tbf_client *cli = hlist_entry(hnode,
1060 struct nrs_tbf_client,
1063 return *nid == cli->tc_nid;
1066 static void *nrs_tbf_nid_hop_key(struct hlist_node *hnode)
1068 struct nrs_tbf_client *cli = hlist_entry(hnode,
1069 struct nrs_tbf_client,
1072 return &cli->tc_nid;
1075 static void nrs_tbf_nid_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1077 struct nrs_tbf_client *cli = hlist_entry(hnode,
1078 struct nrs_tbf_client,
1081 atomic_inc(&cli->tc_ref);
1084 static void nrs_tbf_nid_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1086 struct nrs_tbf_client *cli = hlist_entry(hnode,
1087 struct nrs_tbf_client,
1090 atomic_dec(&cli->tc_ref);
1093 static void nrs_tbf_nid_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1095 struct nrs_tbf_client *cli = hlist_entry(hnode,
1096 struct nrs_tbf_client,
1099 LASSERTF(atomic_read(&cli->tc_ref) == 0,
1100 "Busy TBF object from client with NID %s, with %d refs\n",
1101 libcfs_nid2str(cli->tc_nid), atomic_read(&cli->tc_ref));
1103 nrs_tbf_cli_fini(cli);
1106 static struct cfs_hash_ops nrs_tbf_nid_hash_ops = {
1107 .hs_hash = nrs_tbf_nid_hop_hash,
1108 .hs_keycmp = nrs_tbf_nid_hop_keycmp,
1109 .hs_key = nrs_tbf_nid_hop_key,
1110 .hs_object = nrs_tbf_hop_object,
1111 .hs_get = nrs_tbf_nid_hop_get,
1112 .hs_put = nrs_tbf_nid_hop_put,
1113 .hs_put_locked = nrs_tbf_nid_hop_put,
1114 .hs_exit = nrs_tbf_nid_hop_exit,
1117 static struct nrs_tbf_client *
1118 nrs_tbf_nid_cli_find(struct nrs_tbf_head *head,
1119 struct ptlrpc_request *req)
1121 return cfs_hash_lookup(head->th_cli_hash, &req->rq_peer.nid);
1124 static struct nrs_tbf_client *
1125 nrs_tbf_nid_cli_findadd(struct nrs_tbf_head *head,
1126 struct nrs_tbf_client *cli)
1128 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_nid,
1133 nrs_tbf_nid_cli_put(struct nrs_tbf_head *head,
1134 struct nrs_tbf_client *cli)
1136 cfs_hash_put(head->th_cli_hash, &cli->tc_hnode);
1140 nrs_tbf_nid_startup(struct ptlrpc_nrs_policy *policy,
1141 struct nrs_tbf_head *head)
1143 struct nrs_tbf_cmd start;
1146 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1149 NRS_TBF_NID_BKT_BITS, 0,
1152 &nrs_tbf_nid_hash_ops,
1153 CFS_HASH_RW_BKTLOCK);
1154 if (head->th_cli_hash == NULL)
1157 memset(&start, 0, sizeof(start));
1158 start.u.tc_start.ts_nids_str = "*";
1160 start.u.tc_start.ts_rpc_rate = tbf_rate;
1161 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1162 start.tc_name = NRS_TBF_DEFAULT_RULE;
1163 INIT_LIST_HEAD(&start.u.tc_start.ts_nids);
1164 rc = nrs_tbf_rule_start(policy, head, &start);
1166 cfs_hash_putref(head->th_cli_hash);
1167 head->th_cli_hash = NULL;
1174 nrs_tbf_nid_cli_init(struct nrs_tbf_client *cli,
1175 struct ptlrpc_request *req)
1177 cli->tc_nid = req->rq_peer.nid;
1180 static int nrs_tbf_nid_rule_init(struct ptlrpc_nrs_policy *policy,
1181 struct nrs_tbf_rule *rule,
1182 struct nrs_tbf_cmd *start)
1184 LASSERT(start->u.tc_start.ts_nids_str);
1185 OBD_ALLOC(rule->tr_nids_str,
1186 strlen(start->u.tc_start.ts_nids_str) + 1);
1187 if (rule->tr_nids_str == NULL)
1190 memcpy(rule->tr_nids_str,
1191 start->u.tc_start.ts_nids_str,
1192 strlen(start->u.tc_start.ts_nids_str));
1194 INIT_LIST_HEAD(&rule->tr_nids);
1195 if (!list_empty(&start->u.tc_start.ts_nids)) {
1196 if (cfs_parse_nidlist(rule->tr_nids_str,
1197 strlen(rule->tr_nids_str),
1198 &rule->tr_nids) <= 0) {
1199 CERROR("nids {%s} illegal\n",
1201 OBD_FREE(rule->tr_nids_str,
1202 strlen(start->u.tc_start.ts_nids_str) + 1);
1210 nrs_tbf_nid_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
1212 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
1213 rule->tr_nids_str, rule->tr_rpc_rate,
1214 atomic_read(&rule->tr_ref) - 1);
1219 nrs_tbf_nid_rule_match(struct nrs_tbf_rule *rule,
1220 struct nrs_tbf_client *cli)
1222 return cfs_match_nid(cli->tc_nid, &rule->tr_nids);
1225 static void nrs_tbf_nid_rule_fini(struct nrs_tbf_rule *rule)
1227 if (!list_empty(&rule->tr_nids))
1228 cfs_free_nidlist(&rule->tr_nids);
1229 LASSERT(rule->tr_nids_str != NULL);
1230 OBD_FREE(rule->tr_nids_str, strlen(rule->tr_nids_str) + 1);
1233 static void nrs_tbf_nid_cmd_fini(struct nrs_tbf_cmd *cmd)
1235 if (!list_empty(&cmd->u.tc_start.ts_nids))
1236 cfs_free_nidlist(&cmd->u.tc_start.ts_nids);
1237 if (cmd->u.tc_start.ts_nids_str)
1238 OBD_FREE(cmd->u.tc_start.ts_nids_str,
1239 strlen(cmd->u.tc_start.ts_nids_str) + 1);
1242 static int nrs_tbf_nid_parse(struct nrs_tbf_cmd *cmd, char *id)
1244 struct cfs_lstr src;
1248 src.ls_len = strlen(id);
1249 rc = nrs_tbf_check_id_value(&src, "nid");
1253 OBD_ALLOC(cmd->u.tc_start.ts_nids_str, src.ls_len + 1);
1254 if (cmd->u.tc_start.ts_nids_str == NULL)
1257 memcpy(cmd->u.tc_start.ts_nids_str, src.ls_str, src.ls_len);
1259 /* parse NID list */
1260 if (cfs_parse_nidlist(cmd->u.tc_start.ts_nids_str,
1261 strlen(cmd->u.tc_start.ts_nids_str),
1262 &cmd->u.tc_start.ts_nids) <= 0) {
1263 nrs_tbf_nid_cmd_fini(cmd);
1270 static struct nrs_tbf_ops nrs_tbf_nid_ops = {
1271 .o_name = NRS_TBF_TYPE_NID,
1272 .o_startup = nrs_tbf_nid_startup,
1273 .o_cli_find = nrs_tbf_nid_cli_find,
1274 .o_cli_findadd = nrs_tbf_nid_cli_findadd,
1275 .o_cli_put = nrs_tbf_nid_cli_put,
1276 .o_cli_init = nrs_tbf_nid_cli_init,
1277 .o_rule_init = nrs_tbf_nid_rule_init,
1278 .o_rule_dump = nrs_tbf_nid_rule_dump,
1279 .o_rule_match = nrs_tbf_nid_rule_match,
1280 .o_rule_fini = nrs_tbf_nid_rule_fini,
1283 static unsigned nrs_tbf_hop_hash(struct cfs_hash *hs, const void *key,
1286 return cfs_hash_djb2_hash(key, strlen(key), mask);
1289 static int nrs_tbf_hop_keycmp(const void *key, struct hlist_node *hnode)
1291 struct nrs_tbf_client *cli = hlist_entry(hnode,
1292 struct nrs_tbf_client,
1295 return (strcmp(cli->tc_key, key) == 0);
1298 static void *nrs_tbf_hop_key(struct hlist_node *hnode)
1300 struct nrs_tbf_client *cli = hlist_entry(hnode,
1301 struct nrs_tbf_client,
1306 static void nrs_tbf_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
1308 struct nrs_tbf_client *cli = hlist_entry(hnode,
1309 struct nrs_tbf_client,
1312 atomic_inc(&cli->tc_ref);
1315 static void nrs_tbf_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
1317 struct nrs_tbf_client *cli = hlist_entry(hnode,
1318 struct nrs_tbf_client,
1321 atomic_dec(&cli->tc_ref);
1324 static void nrs_tbf_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
1327 struct nrs_tbf_client *cli = hlist_entry(hnode,
1328 struct nrs_tbf_client,
1331 LASSERT(atomic_read(&cli->tc_ref) == 0);
1332 nrs_tbf_cli_fini(cli);
1335 static struct cfs_hash_ops nrs_tbf_hash_ops = {
1336 .hs_hash = nrs_tbf_hop_hash,
1337 .hs_keycmp = nrs_tbf_hop_keycmp,
1338 .hs_key = nrs_tbf_hop_key,
1339 .hs_object = nrs_tbf_hop_object,
1340 .hs_get = nrs_tbf_hop_get,
1341 .hs_put = nrs_tbf_hop_put,
1342 .hs_put_locked = nrs_tbf_hop_put,
1343 .hs_exit = nrs_tbf_hop_exit,
1346 #define NRS_TBF_GENERIC_BKT_BITS 10
1347 #define NRS_TBF_GENERIC_HASH_FLAGS (CFS_HASH_SPIN_BKTLOCK | \
1348 CFS_HASH_NO_ITEMREF | \
1352 nrs_tbf_startup(struct ptlrpc_nrs_policy *policy, struct nrs_tbf_head *head)
1354 struct nrs_tbf_cmd start;
1355 struct nrs_tbf_bucket *bkt;
1359 struct cfs_hash_bd bd;
1361 bits = nrs_tbf_jobid_hash_order();
1362 if (bits < NRS_TBF_GENERIC_BKT_BITS)
1363 bits = NRS_TBF_GENERIC_BKT_BITS;
1364 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
1366 NRS_TBF_GENERIC_BKT_BITS,
1369 NRS_TBF_GENERIC_HASH_FLAGS);
1370 if (head->th_cli_hash == NULL)
1373 cfs_hash_for_each_bucket(head->th_cli_hash, &bd, i) {
1374 bkt = cfs_hash_bd_extra_get(head->th_cli_hash, &bd);
1375 INIT_LIST_HEAD(&bkt->ntb_lru);
1378 memset(&start, 0, sizeof(start));
1379 start.u.tc_start.ts_conds_str = "*";
1381 start.u.tc_start.ts_rpc_rate = tbf_rate;
1382 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
1383 start.tc_name = NRS_TBF_DEFAULT_RULE;
1384 INIT_LIST_HEAD(&start.u.tc_start.ts_conds);
1385 rc = nrs_tbf_rule_start(policy, head, &start);
1387 cfs_hash_putref(head->th_cli_hash);
1392 static struct nrs_tbf_client *
1393 nrs_tbf_cli_hash_lookup(struct cfs_hash *hs, struct cfs_hash_bd *bd,
1396 struct hlist_node *hnode;
1397 struct nrs_tbf_client *cli;
1399 hnode = cfs_hash_bd_lookup_locked(hs, bd, (void *)key);
1403 cli = container_of0(hnode, struct nrs_tbf_client, tc_hnode);
1404 if (!list_empty(&cli->tc_lru))
1405 list_del_init(&cli->tc_lru);
1410 * ONLY opcode presented in this function will be checked in
1411 * nrs_tbf_id_cli_set(). That means, we can add or remove an
1412 * opcode to enable or disable requests handled in nrs_tbf
1414 static struct req_format *req_fmt(__u32 opcode)
1418 return &RQF_OST_GETATTR;
1420 return &RQF_OST_SETATTR;
1422 return &RQF_OST_BRW_READ;
1424 return &RQF_OST_BRW_WRITE;
1425 /* FIXME: OST_CREATE and OST_DESTROY comes from MDS
1426 * in most case. Should they be removed? */
1428 return &RQF_OST_CREATE;
1430 return &RQF_OST_DESTROY;
1432 return &RQF_OST_PUNCH;
1434 return &RQF_OST_SYNC;
1436 return &RQF_OST_LADVISE;
1438 return &RQF_MDS_GETATTR;
1439 case MDS_GETATTR_NAME:
1440 return &RQF_MDS_GETATTR_NAME;
1441 /* close is skipped to avoid LDLM cancel slowness */
1444 return &RQF_MDS_CLOSE;
1447 return &RQF_MDS_REINT;
1449 return &RQF_MDS_READPAGE;
1451 return &RQF_MDS_GET_ROOT;
1453 return &RQF_MDS_STATFS;
1455 return &RQF_MDS_SYNC;
1457 return &RQF_MDS_QUOTACTL;
1459 return &RQF_MDS_GETXATTR;
1461 return &RQF_MDS_GET_INFO;
1462 /* HSM op is skipped */
1464 case MDS_HSM_STATE_GET:
1465 return &RQF_MDS_HSM_STATE_GET;
1466 case MDS_HSM_STATE_SET:
1467 return &RQF_MDS_HSM_STATE_SET;
1468 case MDS_HSM_ACTION:
1469 return &RQF_MDS_HSM_ACTION;
1470 case MDS_HSM_CT_REGISTER:
1471 return &RQF_MDS_HSM_CT_REGISTER;
1472 case MDS_HSM_CT_UNREGISTER:
1473 return &RQF_MDS_HSM_CT_UNREGISTER;
1475 case MDS_SWAP_LAYOUTS:
1476 return &RQF_MDS_SWAP_LAYOUTS;
1478 return &RQF_LDLM_ENQUEUE;
1484 static struct req_format *intent_req_fmt(__u32 it_opc)
1486 if (it_opc & (IT_OPEN | IT_CREAT))
1487 return &RQF_LDLM_INTENT_OPEN;
1488 else if (it_opc & (IT_GETATTR | IT_LOOKUP))
1489 return &RQF_LDLM_INTENT_GETATTR;
1490 else if (it_opc & IT_UNLINK)
1491 return &RQF_LDLM_INTENT_UNLINK;
1492 else if (it_opc & IT_GETXATTR)
1493 return &RQF_LDLM_INTENT_GETXATTR;
1494 else if (it_opc & (IT_GLIMPSE | IT_BRW))
1495 return &RQF_LDLM_INTENT;
1500 static int ost_tbf_id_cli_set(struct ptlrpc_request *req,
1503 struct ost_body *body;
1505 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
1507 id->ti_uid = body->oa.o_uid;
1508 id->ti_gid = body->oa.o_gid;
1515 static void unpack_ugid_from_mdt_body(struct ptlrpc_request *req,
1518 struct mdt_body *b = req_capsule_client_get(&req->rq_pill,
1522 /* TODO: nodemaping feature converts {ug}id from individual
1523 * clients to the actual ones of the file system. Some work
1524 * may be needed to fix this. */
1525 id->ti_uid = b->mbo_uid;
1526 id->ti_gid = b->mbo_gid;
1529 static void unpack_ugid_from_mdt_rec_reint(struct ptlrpc_request *req,
1532 struct mdt_rec_reint *rec;
1534 rec = req_capsule_client_get(&req->rq_pill, &RMF_REC_REINT);
1535 LASSERT(rec != NULL);
1537 /* use the fs{ug}id as {ug}id of the process */
1538 id->ti_uid = rec->rr_fsuid;
1539 id->ti_gid = rec->rr_fsgid;
1542 static int mdt_tbf_id_cli_set(struct ptlrpc_request *req,
1545 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1550 case MDS_GETATTR_NAME:
1555 case MDS_HSM_STATE_GET ... MDS_SWAP_LAYOUTS:
1556 unpack_ugid_from_mdt_body(req, id);
1560 unpack_ugid_from_mdt_rec_reint(req, id);
1569 static int ldlm_tbf_id_cli_set(struct ptlrpc_request *req,
1572 struct ldlm_intent *lit;
1573 struct req_format *fmt;
1575 if (req->rq_reqmsg->lm_bufcount <= DLM_INTENT_IT_OFF)
1578 req_capsule_extend(&req->rq_pill, &RQF_LDLM_INTENT_BASIC);
1579 lit = req_capsule_client_get(&req->rq_pill, &RMF_LDLM_INTENT);
1583 fmt = intent_req_fmt(lit->opc);
1587 req_capsule_extend(&req->rq_pill, fmt);
1589 if (lit->opc & (IT_GETXATTR | IT_GETATTR | IT_LOOKUP))
1590 unpack_ugid_from_mdt_body(req, id);
1591 else if (lit->opc & (IT_OPEN | IT_OPEN | IT_GLIMPSE | IT_BRW))
1592 unpack_ugid_from_mdt_rec_reint(req, id);
1598 static int nrs_tbf_id_cli_set(struct ptlrpc_request *req, struct tbf_id *id,
1599 enum nrs_tbf_flag ti_type)
1601 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1602 struct req_format *fmt = req_fmt(opc);
1603 bool fmt_unset = false;
1606 memset(id, 0, sizeof(struct tbf_id));
1607 id->ti_type = ti_type;
1611 req_capsule_init(&req->rq_pill, req, RCL_SERVER);
1612 if (req->rq_pill.rc_fmt == NULL) {
1613 req_capsule_set(&req->rq_pill, fmt);
1617 if (opc < OST_LAST_OPC)
1618 rc = ost_tbf_id_cli_set(req, id);
1619 else if (opc >= MDS_FIRST_OPC && opc < MDS_LAST_OPC)
1620 rc = mdt_tbf_id_cli_set(req, id);
1621 else if (opc == LDLM_ENQUEUE)
1622 rc = ldlm_tbf_id_cli_set(req, id);
1626 /* restore it to the initialized state */
1628 req->rq_pill.rc_fmt = NULL;
1632 static inline void nrs_tbf_cli_gen_key(struct nrs_tbf_client *cli,
1633 struct ptlrpc_request *req,
1634 char *keystr, size_t keystr_sz)
1637 u32 opc = lustre_msg_get_opc(req->rq_reqmsg);
1640 nrs_tbf_id_cli_set(req, &id, NRS_TBF_FLAG_UID | NRS_TBF_FLAG_GID);
1641 jobid = lustre_msg_get_jobid(req->rq_reqmsg);
1643 jobid = NRS_TBF_JOBID_NULL;
1645 snprintf(keystr, keystr_sz, "%s_%s_%d_%u_%u", jobid,
1646 libcfs_nid2str(req->rq_peer.nid), opc, id.ti_uid,
1650 INIT_LIST_HEAD(&cli->tc_lru);
1651 strlcpy(cli->tc_key, keystr, sizeof(cli->tc_key));
1652 strlcpy(cli->tc_jobid, jobid, sizeof(cli->tc_jobid));
1653 cli->tc_nid = req->rq_peer.nid;
1654 cli->tc_opcode = opc;
1659 static struct nrs_tbf_client *
1660 nrs_tbf_cli_find(struct nrs_tbf_head *head, struct ptlrpc_request *req)
1662 struct nrs_tbf_client *cli;
1663 struct cfs_hash *hs = head->th_cli_hash;
1664 struct cfs_hash_bd bd;
1665 char keystr[NRS_TBF_KEY_LEN];
1667 nrs_tbf_cli_gen_key(NULL, req, keystr, sizeof(keystr));
1668 cfs_hash_bd_get_and_lock(hs, (void *)keystr, &bd, 1);
1669 cli = nrs_tbf_cli_hash_lookup(hs, &bd, keystr);
1670 cfs_hash_bd_unlock(hs, &bd, 1);
1675 static struct nrs_tbf_client *
1676 nrs_tbf_cli_findadd(struct nrs_tbf_head *head,
1677 struct nrs_tbf_client *cli)
1680 struct nrs_tbf_client *ret;
1681 struct cfs_hash *hs = head->th_cli_hash;
1682 struct cfs_hash_bd bd;
1685 cfs_hash_bd_get_and_lock(hs, (void *)key, &bd, 1);
1686 ret = nrs_tbf_cli_hash_lookup(hs, &bd, key);
1688 cfs_hash_bd_add_locked(hs, &bd, &cli->tc_hnode);
1691 cfs_hash_bd_unlock(hs, &bd, 1);
1697 nrs_tbf_cli_put(struct nrs_tbf_head *head, struct nrs_tbf_client *cli)
1699 struct cfs_hash_bd bd;
1700 struct cfs_hash *hs = head->th_cli_hash;
1701 struct nrs_tbf_bucket *bkt;
1703 struct list_head zombies;
1705 INIT_LIST_HEAD(&zombies);
1706 cfs_hash_bd_get(hs, &cli->tc_key, &bd);
1707 bkt = cfs_hash_bd_extra_get(hs, &bd);
1708 if (!cfs_hash_bd_dec_and_lock(hs, &bd, &cli->tc_ref))
1710 LASSERT(list_empty(&cli->tc_lru));
1711 list_add_tail(&cli->tc_lru, &bkt->ntb_lru);
1714 * Check and purge the LRU, there is at least one client in the LRU.
1716 hw = tbf_jobid_cache_size >> (hs->hs_cur_bits - hs->hs_bkt_bits);
1717 while (cfs_hash_bd_count_get(&bd) > hw) {
1718 if (unlikely(list_empty(&bkt->ntb_lru)))
1720 cli = list_entry(bkt->ntb_lru.next,
1721 struct nrs_tbf_client,
1723 LASSERT(atomic_read(&cli->tc_ref) == 0);
1724 cfs_hash_bd_del_locked(hs, &bd, &cli->tc_hnode);
1725 list_move(&cli->tc_lru, &zombies);
1727 cfs_hash_bd_unlock(head->th_cli_hash, &bd, 1);
1729 while (!list_empty(&zombies)) {
1730 cli = container_of0(zombies.next,
1731 struct nrs_tbf_client, tc_lru);
1732 list_del_init(&cli->tc_lru);
1733 nrs_tbf_cli_fini(cli);
1738 nrs_tbf_generic_cli_init(struct nrs_tbf_client *cli,
1739 struct ptlrpc_request *req)
1741 char keystr[NRS_TBF_KEY_LEN];
1743 nrs_tbf_cli_gen_key(cli, req, keystr, sizeof(keystr));
1747 nrs_tbf_id_list_free(struct list_head *uid_list)
1749 struct nrs_tbf_id *nti_id, *n;
1751 list_for_each_entry_safe(nti_id, n, uid_list, nti_linkage) {
1752 list_del_init(&nti_id->nti_linkage);
1753 OBD_FREE_PTR(nti_id);
1758 nrs_tbf_expression_free(struct nrs_tbf_expression *expr)
1760 LASSERT(expr->te_field >= NRS_TBF_FIELD_NID &&
1761 expr->te_field < NRS_TBF_FIELD_MAX);
1762 switch (expr->te_field) {
1763 case NRS_TBF_FIELD_NID:
1764 cfs_free_nidlist(&expr->te_cond);
1766 case NRS_TBF_FIELD_JOBID:
1767 nrs_tbf_jobid_list_free(&expr->te_cond);
1769 case NRS_TBF_FIELD_OPCODE:
1770 CFS_FREE_BITMAP(expr->te_opcodes);
1772 case NRS_TBF_FIELD_UID:
1773 case NRS_TBF_FIELD_GID:
1774 nrs_tbf_id_list_free(&expr->te_cond);
1783 nrs_tbf_conjunction_free(struct nrs_tbf_conjunction *conjunction)
1785 struct nrs_tbf_expression *expression;
1786 struct nrs_tbf_expression *n;
1788 LASSERT(list_empty(&conjunction->tc_linkage));
1789 list_for_each_entry_safe(expression, n,
1790 &conjunction->tc_expressions,
1792 list_del_init(&expression->te_linkage);
1793 nrs_tbf_expression_free(expression);
1795 OBD_FREE_PTR(conjunction);
1799 nrs_tbf_conds_free(struct list_head *cond_list)
1801 struct nrs_tbf_conjunction *conjunction;
1802 struct nrs_tbf_conjunction *n;
1804 list_for_each_entry_safe(conjunction, n, cond_list, tc_linkage) {
1805 list_del_init(&conjunction->tc_linkage);
1806 nrs_tbf_conjunction_free(conjunction);
1811 nrs_tbf_generic_cmd_fini(struct nrs_tbf_cmd *cmd)
1813 if (!list_empty(&cmd->u.tc_start.ts_conds))
1814 nrs_tbf_conds_free(&cmd->u.tc_start.ts_conds);
1815 if (cmd->u.tc_start.ts_conds_str)
1816 OBD_FREE(cmd->u.tc_start.ts_conds_str,
1817 strlen(cmd->u.tc_start.ts_conds_str) + 1);
1820 #define NRS_TBF_DISJUNCTION_DELIM (',')
1821 #define NRS_TBF_CONJUNCTION_DELIM ('&')
1822 #define NRS_TBF_EXPRESSION_DELIM ('=')
1825 nrs_tbf_check_field(struct cfs_lstr *field, char *str)
1827 int len = strlen(str);
1829 return (field->ls_len == len &&
1830 strncmp(field->ls_str, str, len) == 0);
1834 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr);
1836 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
1837 enum nrs_tbf_flag tif);
1840 nrs_tbf_expression_parse(struct cfs_lstr *src, struct list_head *cond_list)
1842 struct nrs_tbf_expression *expr;
1843 struct cfs_lstr field;
1846 OBD_ALLOC(expr, sizeof(struct nrs_tbf_expression));
1850 rc = cfs_gettok(src, NRS_TBF_EXPRESSION_DELIM, &field);
1851 if (rc == 0 || src->ls_len <= 2 || src->ls_str[0] != '{' ||
1852 src->ls_str[src->ls_len - 1] != '}')
1853 GOTO(out, rc = -EINVAL);
1855 /* Skip '{' and '}' */
1859 if (nrs_tbf_check_field(&field, "nid")) {
1860 if (cfs_parse_nidlist(src->ls_str,
1862 &expr->te_cond) <= 0)
1863 GOTO(out, rc = -EINVAL);
1864 expr->te_field = NRS_TBF_FIELD_NID;
1865 } else if (nrs_tbf_check_field(&field, "jobid")) {
1866 if (nrs_tbf_jobid_list_parse(src->ls_str,
1868 &expr->te_cond) < 0)
1869 GOTO(out, rc = -EINVAL);
1870 expr->te_field = NRS_TBF_FIELD_JOBID;
1871 } else if (nrs_tbf_check_field(&field, "opcode")) {
1872 if (nrs_tbf_opcode_list_parse(src->ls_str,
1874 &expr->te_opcodes) < 0)
1875 GOTO(out, rc = -EINVAL);
1876 expr->te_field = NRS_TBF_FIELD_OPCODE;
1877 } else if (nrs_tbf_check_field(&field, "uid")) {
1878 if (nrs_tbf_id_list_parse(src->ls_str,
1881 NRS_TBF_FLAG_UID) < 0)
1882 GOTO(out, rc = -EINVAL);
1883 expr->te_field = NRS_TBF_FIELD_UID;
1884 } else if (nrs_tbf_check_field(&field, "gid")) {
1885 if (nrs_tbf_id_list_parse(src->ls_str,
1888 NRS_TBF_FLAG_GID) < 0)
1889 GOTO(out, rc = -EINVAL);
1890 expr->te_field = NRS_TBF_FIELD_GID;
1892 GOTO(out, rc = -EINVAL);
1895 list_add_tail(&expr->te_linkage, cond_list);
1903 nrs_tbf_conjunction_parse(struct cfs_lstr *src, struct list_head *cond_list)
1905 struct nrs_tbf_conjunction *conjunction;
1906 struct cfs_lstr expr;
1909 OBD_ALLOC(conjunction, sizeof(struct nrs_tbf_conjunction));
1910 if (conjunction == NULL)
1913 INIT_LIST_HEAD(&conjunction->tc_expressions);
1914 list_add_tail(&conjunction->tc_linkage, cond_list);
1916 while (src->ls_str) {
1917 rc = cfs_gettok(src, NRS_TBF_CONJUNCTION_DELIM, &expr);
1922 rc = nrs_tbf_expression_parse(&expr,
1923 &conjunction->tc_expressions);
1931 nrs_tbf_conds_parse(char *str, int len, struct list_head *cond_list)
1933 struct cfs_lstr src;
1934 struct cfs_lstr res;
1939 INIT_LIST_HEAD(cond_list);
1940 while (src.ls_str) {
1941 rc = cfs_gettok(&src, NRS_TBF_DISJUNCTION_DELIM, &res);
1946 rc = nrs_tbf_conjunction_parse(&res, cond_list);
1954 nrs_tbf_generic_parse(struct nrs_tbf_cmd *cmd, const char *id)
1958 OBD_ALLOC(cmd->u.tc_start.ts_conds_str, strlen(id) + 1);
1959 if (cmd->u.tc_start.ts_conds_str == NULL)
1962 memcpy(cmd->u.tc_start.ts_conds_str, id, strlen(id));
1964 /* Parse hybird NID and JOBID conditions */
1965 rc = nrs_tbf_conds_parse(cmd->u.tc_start.ts_conds_str,
1966 strlen(cmd->u.tc_start.ts_conds_str),
1967 &cmd->u.tc_start.ts_conds);
1969 nrs_tbf_generic_cmd_fini(cmd);
1975 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id);
1978 nrs_tbf_expression_match(struct nrs_tbf_expression *expr,
1979 struct nrs_tbf_rule *rule,
1980 struct nrs_tbf_client *cli)
1982 switch (expr->te_field) {
1983 case NRS_TBF_FIELD_NID:
1984 return cfs_match_nid(cli->tc_nid, &expr->te_cond);
1985 case NRS_TBF_FIELD_JOBID:
1986 return nrs_tbf_jobid_list_match(&expr->te_cond, cli->tc_jobid);
1987 case NRS_TBF_FIELD_OPCODE:
1988 return cfs_bitmap_check(expr->te_opcodes, cli->tc_opcode);
1989 case NRS_TBF_FIELD_UID:
1990 case NRS_TBF_FIELD_GID:
1991 return nrs_tbf_id_list_match(&expr->te_cond, cli->tc_id);
1998 nrs_tbf_conjunction_match(struct nrs_tbf_conjunction *conjunction,
1999 struct nrs_tbf_rule *rule,
2000 struct nrs_tbf_client *cli)
2002 struct nrs_tbf_expression *expr;
2005 list_for_each_entry(expr, &conjunction->tc_expressions, te_linkage) {
2006 matched = nrs_tbf_expression_match(expr, rule, cli);
2015 nrs_tbf_cond_match(struct nrs_tbf_rule *rule, struct nrs_tbf_client *cli)
2017 struct nrs_tbf_conjunction *conjunction;
2020 list_for_each_entry(conjunction, &rule->tr_conds, tc_linkage) {
2021 matched = nrs_tbf_conjunction_match(conjunction, rule, cli);
2030 nrs_tbf_generic_rule_fini(struct nrs_tbf_rule *rule)
2032 if (!list_empty(&rule->tr_conds))
2033 nrs_tbf_conds_free(&rule->tr_conds);
2034 LASSERT(rule->tr_conds_str != NULL);
2035 OBD_FREE(rule->tr_conds_str, strlen(rule->tr_conds_str) + 1);
2039 nrs_tbf_rule_init(struct ptlrpc_nrs_policy *policy,
2040 struct nrs_tbf_rule *rule, struct nrs_tbf_cmd *start)
2044 LASSERT(start->u.tc_start.ts_conds_str);
2045 OBD_ALLOC(rule->tr_conds_str,
2046 strlen(start->u.tc_start.ts_conds_str) + 1);
2047 if (rule->tr_conds_str == NULL)
2050 memcpy(rule->tr_conds_str,
2051 start->u.tc_start.ts_conds_str,
2052 strlen(start->u.tc_start.ts_conds_str));
2054 INIT_LIST_HEAD(&rule->tr_conds);
2055 if (!list_empty(&start->u.tc_start.ts_conds)) {
2056 rc = nrs_tbf_conds_parse(rule->tr_conds_str,
2057 strlen(rule->tr_conds_str),
2061 nrs_tbf_generic_rule_fini(rule);
2067 nrs_tbf_generic_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2069 seq_printf(m, "%s %s %llu, ref %d\n", rule->tr_name,
2070 rule->tr_conds_str, rule->tr_rpc_rate,
2071 atomic_read(&rule->tr_ref) - 1);
2076 nrs_tbf_generic_rule_match(struct nrs_tbf_rule *rule,
2077 struct nrs_tbf_client *cli)
2079 return nrs_tbf_cond_match(rule, cli);
2082 static struct nrs_tbf_ops nrs_tbf_generic_ops = {
2083 .o_name = NRS_TBF_TYPE_GENERIC,
2084 .o_startup = nrs_tbf_startup,
2085 .o_cli_find = nrs_tbf_cli_find,
2086 .o_cli_findadd = nrs_tbf_cli_findadd,
2087 .o_cli_put = nrs_tbf_cli_put,
2088 .o_cli_init = nrs_tbf_generic_cli_init,
2089 .o_rule_init = nrs_tbf_rule_init,
2090 .o_rule_dump = nrs_tbf_generic_rule_dump,
2091 .o_rule_match = nrs_tbf_generic_rule_match,
2092 .o_rule_fini = nrs_tbf_generic_rule_fini,
2095 static void nrs_tbf_opcode_rule_fini(struct nrs_tbf_rule *rule)
2097 if (rule->tr_opcodes != NULL)
2098 CFS_FREE_BITMAP(rule->tr_opcodes);
2100 LASSERT(rule->tr_opcodes_str != NULL);
2101 OBD_FREE(rule->tr_opcodes_str, strlen(rule->tr_opcodes_str) + 1);
2104 static unsigned nrs_tbf_opcode_hop_hash(struct cfs_hash *hs, const void *key,
2107 return cfs_hash_djb2_hash(key, sizeof(__u32), mask);
2110 static int nrs_tbf_opcode_hop_keycmp(const void *key, struct hlist_node *hnode)
2112 const __u32 *opc = key;
2113 struct nrs_tbf_client *cli = hlist_entry(hnode,
2114 struct nrs_tbf_client,
2117 return *opc == cli->tc_opcode;
2120 static void *nrs_tbf_opcode_hop_key(struct hlist_node *hnode)
2122 struct nrs_tbf_client *cli = hlist_entry(hnode,
2123 struct nrs_tbf_client,
2126 return &cli->tc_opcode;
2129 static void nrs_tbf_opcode_hop_get(struct cfs_hash *hs,
2130 struct hlist_node *hnode)
2132 struct nrs_tbf_client *cli = hlist_entry(hnode,
2133 struct nrs_tbf_client,
2136 atomic_inc(&cli->tc_ref);
2139 static void nrs_tbf_opcode_hop_put(struct cfs_hash *hs,
2140 struct hlist_node *hnode)
2142 struct nrs_tbf_client *cli = hlist_entry(hnode,
2143 struct nrs_tbf_client,
2146 atomic_dec(&cli->tc_ref);
2149 static void nrs_tbf_opcode_hop_exit(struct cfs_hash *hs,
2150 struct hlist_node *hnode)
2152 struct nrs_tbf_client *cli = hlist_entry(hnode,
2153 struct nrs_tbf_client,
2156 LASSERTF(atomic_read(&cli->tc_ref) == 0,
2157 "Busy TBF object from client with opcode %s, with %d refs\n",
2158 ll_opcode2str(cli->tc_opcode),
2159 atomic_read(&cli->tc_ref));
2161 nrs_tbf_cli_fini(cli);
2163 static struct cfs_hash_ops nrs_tbf_opcode_hash_ops = {
2164 .hs_hash = nrs_tbf_opcode_hop_hash,
2165 .hs_keycmp = nrs_tbf_opcode_hop_keycmp,
2166 .hs_key = nrs_tbf_opcode_hop_key,
2167 .hs_object = nrs_tbf_hop_object,
2168 .hs_get = nrs_tbf_opcode_hop_get,
2169 .hs_put = nrs_tbf_opcode_hop_put,
2170 .hs_put_locked = nrs_tbf_opcode_hop_put,
2171 .hs_exit = nrs_tbf_opcode_hop_exit,
2175 nrs_tbf_opcode_startup(struct ptlrpc_nrs_policy *policy,
2176 struct nrs_tbf_head *head)
2178 struct nrs_tbf_cmd start = { 0 };
2181 head->th_cli_hash = cfs_hash_create("nrs_tbf_hash",
2184 NRS_TBF_NID_BKT_BITS, 0,
2187 &nrs_tbf_opcode_hash_ops,
2188 CFS_HASH_RW_BKTLOCK);
2189 if (head->th_cli_hash == NULL)
2192 start.u.tc_start.ts_opcodes = NULL;
2193 start.u.tc_start.ts_opcodes_str = "*";
2195 start.u.tc_start.ts_rpc_rate = tbf_rate;
2196 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2197 start.tc_name = NRS_TBF_DEFAULT_RULE;
2198 rc = nrs_tbf_rule_start(policy, head, &start);
2203 static struct nrs_tbf_client *
2204 nrs_tbf_opcode_cli_find(struct nrs_tbf_head *head,
2205 struct ptlrpc_request *req)
2209 opc = lustre_msg_get_opc(req->rq_reqmsg);
2210 return cfs_hash_lookup(head->th_cli_hash, &opc);
2213 static struct nrs_tbf_client *
2214 nrs_tbf_opcode_cli_findadd(struct nrs_tbf_head *head,
2215 struct nrs_tbf_client *cli)
2217 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_opcode,
2222 nrs_tbf_opcode_cli_init(struct nrs_tbf_client *cli,
2223 struct ptlrpc_request *req)
2225 cli->tc_opcode = lustre_msg_get_opc(req->rq_reqmsg);
2228 #define MAX_OPCODE_LEN 32
2230 nrs_tbf_opcode_set_bit(const struct cfs_lstr *id, struct cfs_bitmap *opcodes)
2233 char opcode_str[MAX_OPCODE_LEN];
2235 if (id->ls_len + 1 > MAX_OPCODE_LEN)
2238 memcpy(opcode_str, id->ls_str, id->ls_len);
2239 opcode_str[id->ls_len] = '\0';
2241 op = ll_str2opcode(opcode_str);
2245 cfs_bitmap_set(opcodes, op);
2250 nrs_tbf_opcode_list_parse(char *str, int len, struct cfs_bitmap **bitmaptr)
2252 struct cfs_bitmap *opcodes;
2253 struct cfs_lstr src;
2254 struct cfs_lstr res;
2258 opcodes = CFS_ALLOCATE_BITMAP(LUSTRE_MAX_OPCODES);
2259 if (opcodes == NULL)
2264 while (src.ls_str) {
2265 rc = cfs_gettok(&src, ' ', &res);
2270 rc = nrs_tbf_opcode_set_bit(&res, opcodes);
2276 *bitmaptr = opcodes;
2278 CFS_FREE_BITMAP(opcodes);
2283 static void nrs_tbf_opcode_cmd_fini(struct nrs_tbf_cmd *cmd)
2285 if (cmd->u.tc_start.ts_opcodes)
2286 CFS_FREE_BITMAP(cmd->u.tc_start.ts_opcodes);
2288 if (cmd->u.tc_start.ts_opcodes_str)
2289 OBD_FREE(cmd->u.tc_start.ts_opcodes_str,
2290 strlen(cmd->u.tc_start.ts_opcodes_str) + 1);
2294 static int nrs_tbf_opcode_parse(struct nrs_tbf_cmd *cmd, char *id)
2296 struct cfs_lstr src;
2300 src.ls_len = strlen(id);
2301 rc = nrs_tbf_check_id_value(&src, "opcode");
2305 OBD_ALLOC(cmd->u.tc_start.ts_opcodes_str, src.ls_len + 1);
2306 if (cmd->u.tc_start.ts_opcodes_str == NULL)
2309 memcpy(cmd->u.tc_start.ts_opcodes_str, src.ls_str, src.ls_len);
2311 /* parse opcode list */
2312 rc = nrs_tbf_opcode_list_parse(cmd->u.tc_start.ts_opcodes_str,
2313 strlen(cmd->u.tc_start.ts_opcodes_str),
2314 &cmd->u.tc_start.ts_opcodes);
2316 nrs_tbf_opcode_cmd_fini(cmd);
2322 nrs_tbf_opcode_rule_match(struct nrs_tbf_rule *rule,
2323 struct nrs_tbf_client *cli)
2325 if (rule->tr_opcodes == NULL)
2328 return cfs_bitmap_check(rule->tr_opcodes, cli->tc_opcode);
2331 static int nrs_tbf_opcode_rule_init(struct ptlrpc_nrs_policy *policy,
2332 struct nrs_tbf_rule *rule,
2333 struct nrs_tbf_cmd *start)
2337 LASSERT(start->u.tc_start.ts_opcodes_str != NULL);
2338 OBD_ALLOC(rule->tr_opcodes_str,
2339 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2340 if (rule->tr_opcodes_str == NULL)
2343 strncpy(rule->tr_opcodes_str, start->u.tc_start.ts_opcodes_str,
2344 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2346 /* Default rule '*' */
2347 if (start->u.tc_start.ts_opcodes == NULL)
2350 rc = nrs_tbf_opcode_list_parse(rule->tr_opcodes_str,
2351 strlen(rule->tr_opcodes_str),
2354 OBD_FREE(rule->tr_opcodes_str,
2355 strlen(start->u.tc_start.ts_opcodes_str) + 1);
2361 nrs_tbf_opcode_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2363 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2364 rule->tr_opcodes_str, rule->tr_rpc_rate,
2365 atomic_read(&rule->tr_ref) - 1);
2370 struct nrs_tbf_ops nrs_tbf_opcode_ops = {
2371 .o_name = NRS_TBF_TYPE_OPCODE,
2372 .o_startup = nrs_tbf_opcode_startup,
2373 .o_cli_find = nrs_tbf_opcode_cli_find,
2374 .o_cli_findadd = nrs_tbf_opcode_cli_findadd,
2375 .o_cli_put = nrs_tbf_nid_cli_put,
2376 .o_cli_init = nrs_tbf_opcode_cli_init,
2377 .o_rule_init = nrs_tbf_opcode_rule_init,
2378 .o_rule_dump = nrs_tbf_opcode_rule_dump,
2379 .o_rule_match = nrs_tbf_opcode_rule_match,
2380 .o_rule_fini = nrs_tbf_opcode_rule_fini,
2383 static unsigned nrs_tbf_id_hop_hash(struct cfs_hash *hs, const void *key,
2386 return cfs_hash_djb2_hash(key, sizeof(struct tbf_id), mask);
2389 static int nrs_tbf_id_hop_keycmp(const void *key, struct hlist_node *hnode)
2391 const struct tbf_id *opc = key;
2392 enum nrs_tbf_flag ntf;
2393 struct nrs_tbf_client *cli = hlist_entry(hnode, struct nrs_tbf_client,
2395 ntf = opc->ti_type & cli->tc_id.ti_type;
2396 if ((ntf & NRS_TBF_FLAG_UID) && opc->ti_uid != cli->tc_id.ti_uid)
2399 if ((ntf & NRS_TBF_FLAG_GID) && opc->ti_gid != cli->tc_id.ti_gid)
2405 static void *nrs_tbf_id_hop_key(struct hlist_node *hnode)
2407 struct nrs_tbf_client *cli = hlist_entry(hnode,
2408 struct nrs_tbf_client,
2413 static void nrs_tbf_id_hop_get(struct cfs_hash *hs, struct hlist_node *hnode)
2415 struct nrs_tbf_client *cli = hlist_entry(hnode,
2416 struct nrs_tbf_client,
2419 atomic_inc(&cli->tc_ref);
2422 static void nrs_tbf_id_hop_put(struct cfs_hash *hs, struct hlist_node *hnode)
2424 struct nrs_tbf_client *cli = hlist_entry(hnode,
2425 struct nrs_tbf_client,
2428 atomic_dec(&cli->tc_ref);
2432 nrs_tbf_id_hop_exit(struct cfs_hash *hs, struct hlist_node *hnode)
2435 struct nrs_tbf_client *cli = hlist_entry(hnode,
2436 struct nrs_tbf_client,
2439 LASSERT(atomic_read(&cli->tc_ref) == 0);
2440 nrs_tbf_cli_fini(cli);
2443 static struct cfs_hash_ops nrs_tbf_id_hash_ops = {
2444 .hs_hash = nrs_tbf_id_hop_hash,
2445 .hs_keycmp = nrs_tbf_id_hop_keycmp,
2446 .hs_key = nrs_tbf_id_hop_key,
2447 .hs_object = nrs_tbf_hop_object,
2448 .hs_get = nrs_tbf_id_hop_get,
2449 .hs_put = nrs_tbf_id_hop_put,
2450 .hs_put_locked = nrs_tbf_id_hop_put,
2451 .hs_exit = nrs_tbf_id_hop_exit,
2455 nrs_tbf_id_startup(struct ptlrpc_nrs_policy *policy,
2456 struct nrs_tbf_head *head)
2458 struct nrs_tbf_cmd start;
2461 head->th_cli_hash = cfs_hash_create("nrs_tbf_id_hash",
2464 NRS_TBF_NID_BKT_BITS, 0,
2467 &nrs_tbf_id_hash_ops,
2468 CFS_HASH_RW_BKTLOCK);
2469 if (head->th_cli_hash == NULL)
2472 memset(&start, 0, sizeof(start));
2473 start.u.tc_start.ts_ids_str = "*";
2474 start.u.tc_start.ts_rpc_rate = tbf_rate;
2475 start.u.tc_start.ts_rule_flags = NTRS_DEFAULT;
2476 start.tc_name = NRS_TBF_DEFAULT_RULE;
2477 INIT_LIST_HEAD(&start.u.tc_start.ts_ids);
2478 rc = nrs_tbf_rule_start(policy, head, &start);
2480 cfs_hash_putref(head->th_cli_hash);
2481 head->th_cli_hash = NULL;
2487 static struct nrs_tbf_client *
2488 nrs_tbf_id_cli_find(struct nrs_tbf_head *head,
2489 struct ptlrpc_request *req)
2493 LASSERT(head->th_type_flag == NRS_TBF_FLAG_UID ||
2494 head->th_type_flag == NRS_TBF_FLAG_GID);
2496 nrs_tbf_id_cli_set(req, &id, head->th_type_flag);
2497 return cfs_hash_lookup(head->th_cli_hash, &id);
2500 static struct nrs_tbf_client *
2501 nrs_tbf_id_cli_findadd(struct nrs_tbf_head *head,
2502 struct nrs_tbf_client *cli)
2504 return cfs_hash_findadd_unique(head->th_cli_hash, &cli->tc_id,
2509 nrs_tbf_uid_cli_init(struct nrs_tbf_client *cli,
2510 struct ptlrpc_request *req)
2512 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_UID);
2516 nrs_tbf_gid_cli_init(struct nrs_tbf_client *cli,
2517 struct ptlrpc_request *req)
2519 nrs_tbf_id_cli_set(req, &cli->tc_id, NRS_TBF_FLAG_GID);
2523 nrs_tbf_id_list_match(struct list_head *id_list, struct tbf_id id)
2525 struct nrs_tbf_id *nti_id;
2526 enum nrs_tbf_flag flag;
2528 list_for_each_entry(nti_id, id_list, nti_linkage) {
2529 flag = id.ti_type & nti_id->nti_id.ti_type;
2533 if ((flag & NRS_TBF_FLAG_UID) &&
2534 (id.ti_uid != nti_id->nti_id.ti_uid))
2537 if ((flag & NRS_TBF_FLAG_GID) &&
2538 (id.ti_gid != nti_id->nti_id.ti_gid))
2547 nrs_tbf_id_rule_match(struct nrs_tbf_rule *rule,
2548 struct nrs_tbf_client *cli)
2550 return nrs_tbf_id_list_match(&rule->tr_ids, cli->tc_id);
2553 static void nrs_tbf_id_cmd_fini(struct nrs_tbf_cmd *cmd)
2555 nrs_tbf_id_list_free(&cmd->u.tc_start.ts_ids);
2557 if (cmd->u.tc_start.ts_ids_str)
2558 OBD_FREE(cmd->u.tc_start.ts_ids_str,
2559 strlen(cmd->u.tc_start.ts_ids_str) + 1);
2563 nrs_tbf_id_list_parse(char *str, int len, struct list_head *id_list,
2564 enum nrs_tbf_flag tif)
2566 struct cfs_lstr src;
2567 struct cfs_lstr res;
2569 struct tbf_id id = { 0 };
2572 if (tif != NRS_TBF_FLAG_UID && tif != NRS_TBF_FLAG_GID)
2577 INIT_LIST_HEAD(id_list);
2578 while (src.ls_str) {
2579 struct nrs_tbf_id *nti_id;
2581 if (cfs_gettok(&src, ' ', &res) == 0)
2582 GOTO(out, rc = -EINVAL);
2585 if (tif == NRS_TBF_FLAG_UID) {
2586 if (!cfs_str2num_check(res.ls_str, res.ls_len,
2587 &id.ti_uid, 0, (u32)~0U))
2588 GOTO(out, rc = -EINVAL);
2590 if (!cfs_str2num_check(res.ls_str, res.ls_len,
2591 &id.ti_gid, 0, (u32)~0U))
2592 GOTO(out, rc = -EINVAL);
2595 OBD_ALLOC_PTR(nti_id);
2597 GOTO(out, rc = -ENOMEM);
2599 nti_id->nti_id = id;
2600 list_add_tail(&nti_id->nti_linkage, id_list);
2604 nrs_tbf_id_list_free(id_list);
2608 static int nrs_tbf_ug_id_parse(struct nrs_tbf_cmd *cmd, char *id)
2610 struct cfs_lstr src;
2612 enum nrs_tbf_flag tif;
2614 tif = cmd->u.tc_start.ts_valid_type;
2617 src.ls_len = strlen(id);
2619 rc = nrs_tbf_check_id_value(&src,
2620 tif == NRS_TBF_FLAG_UID ? "uid" : "gid");
2624 OBD_ALLOC(cmd->u.tc_start.ts_ids_str, src.ls_len + 1);
2625 if (cmd->u.tc_start.ts_ids_str == NULL)
2628 strlcpy(cmd->u.tc_start.ts_ids_str, src.ls_str, src.ls_len + 1);
2630 rc = nrs_tbf_id_list_parse(cmd->u.tc_start.ts_ids_str,
2631 strlen(cmd->u.tc_start.ts_ids_str),
2632 &cmd->u.tc_start.ts_ids, tif);
2634 nrs_tbf_id_cmd_fini(cmd);
2640 nrs_tbf_id_rule_init(struct ptlrpc_nrs_policy *policy,
2641 struct nrs_tbf_rule *rule,
2642 struct nrs_tbf_cmd *start)
2644 struct nrs_tbf_head *head = rule->tr_head;
2646 enum nrs_tbf_flag tif = head->th_type_flag;
2647 int ids_len = strlen(start->u.tc_start.ts_ids_str) + 1;
2649 LASSERT(start->u.tc_start.ts_ids_str);
2650 INIT_LIST_HEAD(&rule->tr_ids);
2652 OBD_ALLOC(rule->tr_ids_str, ids_len);
2653 if (rule->tr_ids_str == NULL)
2656 strlcpy(rule->tr_ids_str, start->u.tc_start.ts_ids_str,
2659 if (!list_empty(&start->u.tc_start.ts_ids)) {
2660 rc = nrs_tbf_id_list_parse(rule->tr_ids_str,
2661 strlen(rule->tr_ids_str),
2662 &rule->tr_ids, tif);
2664 CERROR("%ss {%s} illegal\n",
2665 tif == NRS_TBF_FLAG_UID ? "uid" : "gid",
2669 OBD_FREE(rule->tr_ids_str, ids_len);
2670 rule->tr_ids_str = NULL;
2676 nrs_tbf_id_rule_dump(struct nrs_tbf_rule *rule, struct seq_file *m)
2678 seq_printf(m, "%s {%s} %llu, ref %d\n", rule->tr_name,
2679 rule->tr_ids_str, rule->tr_rpc_rate,
2680 atomic_read(&rule->tr_ref) - 1);
2684 static void nrs_tbf_id_rule_fini(struct nrs_tbf_rule *rule)
2686 nrs_tbf_id_list_free(&rule->tr_ids);
2687 if (rule->tr_ids_str != NULL)
2688 OBD_FREE(rule->tr_ids_str, strlen(rule->tr_ids_str) + 1);
2691 struct nrs_tbf_ops nrs_tbf_uid_ops = {
2692 .o_name = NRS_TBF_TYPE_UID,
2693 .o_startup = nrs_tbf_id_startup,
2694 .o_cli_find = nrs_tbf_id_cli_find,
2695 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2696 .o_cli_put = nrs_tbf_nid_cli_put,
2697 .o_cli_init = nrs_tbf_uid_cli_init,
2698 .o_rule_init = nrs_tbf_id_rule_init,
2699 .o_rule_dump = nrs_tbf_id_rule_dump,
2700 .o_rule_match = nrs_tbf_id_rule_match,
2701 .o_rule_fini = nrs_tbf_id_rule_fini,
2704 struct nrs_tbf_ops nrs_tbf_gid_ops = {
2705 .o_name = NRS_TBF_TYPE_GID,
2706 .o_startup = nrs_tbf_id_startup,
2707 .o_cli_find = nrs_tbf_id_cli_find,
2708 .o_cli_findadd = nrs_tbf_id_cli_findadd,
2709 .o_cli_put = nrs_tbf_nid_cli_put,
2710 .o_cli_init = nrs_tbf_gid_cli_init,
2711 .o_rule_init = nrs_tbf_id_rule_init,
2712 .o_rule_dump = nrs_tbf_id_rule_dump,
2713 .o_rule_match = nrs_tbf_id_rule_match,
2714 .o_rule_fini = nrs_tbf_id_rule_fini,
2717 static struct nrs_tbf_type nrs_tbf_types[] = {
2719 .ntt_name = NRS_TBF_TYPE_JOBID,
2720 .ntt_flag = NRS_TBF_FLAG_JOBID,
2721 .ntt_ops = &nrs_tbf_jobid_ops,
2724 .ntt_name = NRS_TBF_TYPE_NID,
2725 .ntt_flag = NRS_TBF_FLAG_NID,
2726 .ntt_ops = &nrs_tbf_nid_ops,
2729 .ntt_name = NRS_TBF_TYPE_OPCODE,
2730 .ntt_flag = NRS_TBF_FLAG_OPCODE,
2731 .ntt_ops = &nrs_tbf_opcode_ops,
2734 .ntt_name = NRS_TBF_TYPE_GENERIC,
2735 .ntt_flag = NRS_TBF_FLAG_GENERIC,
2736 .ntt_ops = &nrs_tbf_generic_ops,
2739 .ntt_name = NRS_TBF_TYPE_UID,
2740 .ntt_flag = NRS_TBF_FLAG_UID,
2741 .ntt_ops = &nrs_tbf_uid_ops,
2744 .ntt_name = NRS_TBF_TYPE_GID,
2745 .ntt_flag = NRS_TBF_FLAG_GID,
2746 .ntt_ops = &nrs_tbf_gid_ops,
2751 * Is called before the policy transitions into
2752 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED; allocates and initializes a
2753 * policy-specific private data structure.
2755 * \param[in] policy The policy to start
2757 * \retval -ENOMEM OOM error
2760 * \see nrs_policy_register()
2761 * \see nrs_policy_ctl()
2763 static int nrs_tbf_start(struct ptlrpc_nrs_policy *policy, char *arg)
2765 struct nrs_tbf_head *head;
2766 struct nrs_tbf_ops *ops;
2774 name = NRS_TBF_TYPE_GENERIC;
2775 else if (strlen(arg) < NRS_TBF_TYPE_MAX_LEN)
2778 GOTO(out, rc = -EINVAL);
2780 for (i = 0; i < ARRAY_SIZE(nrs_tbf_types); i++) {
2781 if (strcmp(name, nrs_tbf_types[i].ntt_name) == 0) {
2782 ops = nrs_tbf_types[i].ntt_ops;
2783 type = nrs_tbf_types[i].ntt_flag;
2789 GOTO(out, rc = -ENOTSUPP);
2791 OBD_CPT_ALLOC_PTR(head, nrs_pol2cptab(policy), nrs_pol2cptid(policy));
2793 GOTO(out, rc = -ENOMEM);
2795 memcpy(head->th_type, name, strlen(name));
2796 head->th_type[strlen(name)] = '\0';
2798 head->th_type_flag = type;
2800 head->th_binheap = cfs_binheap_create(&nrs_tbf_heap_ops,
2801 CBH_FLAG_ATOMIC_GROW, 4096, NULL,
2802 nrs_pol2cptab(policy),
2803 nrs_pol2cptid(policy));
2804 if (head->th_binheap == NULL)
2805 GOTO(out_free_head, rc = -ENOMEM);
2807 atomic_set(&head->th_rule_sequence, 0);
2808 spin_lock_init(&head->th_rule_lock);
2809 INIT_LIST_HEAD(&head->th_list);
2810 hrtimer_init(&head->th_timer, CLOCK_MONOTONIC, HRTIMER_MODE_ABS);
2811 head->th_timer.function = nrs_tbf_timer_cb;
2812 rc = head->th_ops->o_startup(policy, head);
2814 GOTO(out_free_heap, rc);
2816 policy->pol_private = head;
2819 cfs_binheap_destroy(head->th_binheap);
2827 * Is called before the policy transitions into
2828 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED; deallocates the policy-specific
2829 * private data structure.
2831 * \param[in] policy The policy to stop
2833 * \see nrs_policy_stop0()
2835 static void nrs_tbf_stop(struct ptlrpc_nrs_policy *policy)
2837 struct nrs_tbf_head *head = policy->pol_private;
2838 struct ptlrpc_nrs *nrs = policy->pol_nrs;
2839 struct nrs_tbf_rule *rule, *n;
2841 LASSERT(head != NULL);
2842 LASSERT(head->th_cli_hash != NULL);
2843 hrtimer_cancel(&head->th_timer);
2844 /* Should cleanup hash first before free rules */
2845 cfs_hash_putref(head->th_cli_hash);
2846 list_for_each_entry_safe(rule, n, &head->th_list, tr_linkage) {
2847 list_del_init(&rule->tr_linkage);
2848 nrs_tbf_rule_put(rule);
2850 LASSERT(list_empty(&head->th_list));
2851 LASSERT(head->th_binheap != NULL);
2852 LASSERT(cfs_binheap_is_empty(head->th_binheap));
2853 cfs_binheap_destroy(head->th_binheap);
2855 nrs->nrs_throttling = 0;
2856 wake_up(&policy->pol_nrs->nrs_svcpt->scp_waitq);
2860 * Performs a policy-specific ctl function on TBF policy instances; similar
2863 * \param[in] policy the policy instance
2864 * \param[in] opc the opcode
2865 * \param[in,out] arg used for passing parameters and information
2867 * \pre assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2868 * \post assert_spin_locked(&policy->pol_nrs->->nrs_lock)
2870 * \retval 0 operation carried out successfully
2873 static int nrs_tbf_ctl(struct ptlrpc_nrs_policy *policy,
2874 enum ptlrpc_nrs_ctl opc,
2880 assert_spin_locked(&policy->pol_nrs->nrs_lock);
2882 switch ((enum nrs_ctl_tbf)opc) {
2887 * Read RPC rate size of a policy instance.
2889 case NRS_CTL_TBF_RD_RULE: {
2890 struct nrs_tbf_head *head = policy->pol_private;
2891 struct seq_file *m = (struct seq_file *) arg;
2892 struct ptlrpc_service_part *svcpt;
2894 svcpt = policy->pol_nrs->nrs_svcpt;
2895 seq_printf(m, "CPT %d:\n", svcpt->scp_cpt);
2897 rc = nrs_tbf_rule_dump_all(head, m);
2902 * Write RPC rate of a policy instance.
2904 case NRS_CTL_TBF_WR_RULE: {
2905 struct nrs_tbf_head *head = policy->pol_private;
2906 struct nrs_tbf_cmd *cmd;
2908 cmd = (struct nrs_tbf_cmd *)arg;
2909 rc = nrs_tbf_command(policy,
2915 * Read the TBF policy type of a policy instance.
2917 case NRS_CTL_TBF_RD_TYPE_FLAG: {
2918 struct nrs_tbf_head *head = policy->pol_private;
2920 *(__u32 *)arg = head->th_type_flag;
2929 * Is called for obtaining a TBF policy resource.
2931 * \param[in] policy The policy on which the request is being asked for
2932 * \param[in] nrq The request for which resources are being taken
2933 * \param[in] parent Parent resource, unused in this policy
2934 * \param[out] resp Resources references are placed in this array
2935 * \param[in] moving_req Signifies limited caller context; unused in this
2939 * \see nrs_resource_get_safe()
2941 static int nrs_tbf_res_get(struct ptlrpc_nrs_policy *policy,
2942 struct ptlrpc_nrs_request *nrq,
2943 const struct ptlrpc_nrs_resource *parent,
2944 struct ptlrpc_nrs_resource **resp,
2947 struct nrs_tbf_head *head;
2948 struct nrs_tbf_client *cli;
2949 struct nrs_tbf_client *tmp;
2950 struct ptlrpc_request *req;
2952 if (parent == NULL) {
2953 *resp = &((struct nrs_tbf_head *)policy->pol_private)->th_res;
2957 head = container_of(parent, struct nrs_tbf_head, th_res);
2958 req = container_of(nrq, struct ptlrpc_request, rq_nrq);
2959 cli = head->th_ops->o_cli_find(head, req);
2961 spin_lock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2962 LASSERT(cli->tc_rule);
2963 if (cli->tc_rule_sequence !=
2964 atomic_read(&head->th_rule_sequence) ||
2965 cli->tc_rule->tr_flags & NTRS_STOPPING) {
2966 struct nrs_tbf_rule *rule;
2969 "TBF class@%p rate %llu sequence %d, "
2970 "rule flags %d, head sequence %d\n",
2971 cli, cli->tc_rpc_rate,
2972 cli->tc_rule_sequence,
2973 cli->tc_rule->tr_flags,
2974 atomic_read(&head->th_rule_sequence));
2975 rule = nrs_tbf_rule_match(head, cli);
2976 if (rule != cli->tc_rule) {
2977 nrs_tbf_cli_reset(head, rule, cli);
2979 if (cli->tc_rule_generation != rule->tr_generation)
2980 nrs_tbf_cli_reset_value(head, cli);
2981 nrs_tbf_rule_put(rule);
2983 } else if (cli->tc_rule_generation !=
2984 cli->tc_rule->tr_generation) {
2985 nrs_tbf_cli_reset_value(head, cli);
2987 spin_unlock(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
2991 OBD_CPT_ALLOC_GFP(cli, nrs_pol2cptab(policy), nrs_pol2cptid(policy),
2992 sizeof(*cli), moving_req ? GFP_ATOMIC : __GFP_IO);
2996 nrs_tbf_cli_init(head, cli, req);
2997 tmp = head->th_ops->o_cli_findadd(head, cli);
2999 atomic_dec(&cli->tc_ref);
3000 nrs_tbf_cli_fini(cli);
3004 *resp = &cli->tc_res;
3010 * Called when releasing references to the resource hierachy obtained for a
3011 * request for scheduling using the TBF policy.
3013 * \param[in] policy the policy the resource belongs to
3014 * \param[in] res the resource to be released
3016 static void nrs_tbf_res_put(struct ptlrpc_nrs_policy *policy,
3017 const struct ptlrpc_nrs_resource *res)
3019 struct nrs_tbf_head *head;
3020 struct nrs_tbf_client *cli;
3023 * Do nothing for freeing parent, nrs_tbf_net resources
3025 if (res->res_parent == NULL)
3028 cli = container_of(res, struct nrs_tbf_client, tc_res);
3029 head = container_of(res->res_parent, struct nrs_tbf_head, th_res);
3031 head->th_ops->o_cli_put(head, cli);
3035 * Called when getting a request from the TBF policy for handling, or just
3036 * peeking; removes the request from the policy when it is to be handled.
3038 * \param[in] policy The policy
3039 * \param[in] peek When set, signifies that we just want to examine the
3040 * request, and not handle it, so the request is not removed
3042 * \param[in] force Force the policy to return a request; unused in this
3045 * \retval The request to be handled; this is the next request in the TBF
3048 * \see ptlrpc_nrs_req_get_nolock()
3049 * \see nrs_request_get()
3052 struct ptlrpc_nrs_request *nrs_tbf_req_get(struct ptlrpc_nrs_policy *policy,
3053 bool peek, bool force)
3055 struct nrs_tbf_head *head = policy->pol_private;
3056 struct ptlrpc_nrs_request *nrq = NULL;
3057 struct nrs_tbf_client *cli;
3058 struct cfs_binheap_node *node;
3060 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3062 if (!peek && policy->pol_nrs->nrs_throttling)
3065 node = cfs_binheap_root(head->th_binheap);
3066 if (unlikely(node == NULL))
3069 cli = container_of(node, struct nrs_tbf_client, tc_node);
3070 LASSERT(cli->tc_in_heap);
3072 nrq = list_entry(cli->tc_list.next,
3073 struct ptlrpc_nrs_request,
3076 struct nrs_tbf_rule *rule = cli->tc_rule;
3077 __u64 now = ktime_to_ns(ktime_get());
3081 __u64 old_resid = 0;
3083 deadline = cli->tc_check_time +
3085 LASSERT(now >= cli->tc_check_time);
3086 passed = now - cli->tc_check_time;
3087 ntoken = passed * cli->tc_rpc_rate;
3088 do_div(ntoken, NSEC_PER_SEC);
3090 ntoken += cli->tc_ntoken;
3091 if (rule->tr_flags & NTRS_REALTIME) {
3092 LASSERT(cli->tc_nsecs_resid < cli->tc_nsecs);
3093 old_resid = cli->tc_nsecs_resid;
3094 cli->tc_nsecs_resid += passed % cli->tc_nsecs;
3095 if (cli->tc_nsecs_resid > cli->tc_nsecs) {
3097 cli->tc_nsecs_resid -= cli->tc_nsecs;
3099 } else if (ntoken > cli->tc_depth)
3100 ntoken = cli->tc_depth;
3103 struct ptlrpc_request *req;
3104 nrq = list_entry(cli->tc_list.next,
3105 struct ptlrpc_nrs_request,
3107 req = container_of(nrq,
3108 struct ptlrpc_request,
3111 cli->tc_ntoken = ntoken;
3112 cli->tc_check_time = now;
3113 list_del_init(&nrq->nr_u.tbf.tr_list);
3114 if (list_empty(&cli->tc_list)) {
3115 cfs_binheap_remove(head->th_binheap,
3117 cli->tc_in_heap = false;
3119 if (!(rule->tr_flags & NTRS_REALTIME))
3120 cli->tc_deadline = now + cli->tc_nsecs;
3121 cfs_binheap_relocate(head->th_binheap,
3125 "TBF dequeues: class@%p rate %llu gen %llu "
3126 "token %llu, rule@%p rate %llu gen %llu\n",
3127 cli, cli->tc_rpc_rate,
3128 cli->tc_rule_generation, cli->tc_ntoken,
3129 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3130 cli->tc_rule->tr_generation);
3134 if (rule->tr_flags & NTRS_REALTIME) {
3135 cli->tc_deadline = deadline;
3136 cli->tc_nsecs_resid = old_resid;
3137 cfs_binheap_relocate(head->th_binheap,
3139 if (node != cfs_binheap_root(head->th_binheap))
3140 return nrs_tbf_req_get(policy,
3143 policy->pol_nrs->nrs_throttling = 1;
3144 head->th_deadline = deadline;
3145 time = ktime_set(0, 0);
3146 time = ktime_add_ns(time, deadline);
3147 hrtimer_start(&head->th_timer, time, HRTIMER_MODE_ABS);
3155 * Adds request \a nrq to \a policy's list of queued requests
3157 * \param[in] policy The policy
3158 * \param[in] nrq The request to add
3160 * \retval 0 success; nrs_request_enqueue() assumes this function will always
3163 static int nrs_tbf_req_add(struct ptlrpc_nrs_policy *policy,
3164 struct ptlrpc_nrs_request *nrq)
3166 struct nrs_tbf_head *head;
3167 struct nrs_tbf_client *cli;
3170 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3172 cli = container_of(nrs_request_resource(nrq),
3173 struct nrs_tbf_client, tc_res);
3174 head = container_of(nrs_request_resource(nrq)->res_parent,
3175 struct nrs_tbf_head, th_res);
3176 if (list_empty(&cli->tc_list)) {
3177 LASSERT(!cli->tc_in_heap);
3178 cli->tc_deadline = cli->tc_check_time + cli->tc_nsecs;
3179 rc = cfs_binheap_insert(head->th_binheap, &cli->tc_node);
3181 cli->tc_in_heap = true;
3182 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3183 list_add_tail(&nrq->nr_u.tbf.tr_list,
3185 if (policy->pol_nrs->nrs_throttling) {
3186 __u64 deadline = cli->tc_deadline;
3187 if ((head->th_deadline > deadline) &&
3188 (hrtimer_try_to_cancel(&head->th_timer)
3191 head->th_deadline = deadline;
3192 time = ktime_set(0, 0);
3193 time = ktime_add_ns(time, deadline);
3194 hrtimer_start(&head->th_timer, time,
3200 LASSERT(cli->tc_in_heap);
3201 nrq->nr_u.tbf.tr_sequence = head->th_sequence++;
3202 list_add_tail(&nrq->nr_u.tbf.tr_list,
3208 "TBF enqueues: class@%p rate %llu gen %llu "
3209 "token %llu, rule@%p rate %llu gen %llu\n",
3210 cli, cli->tc_rpc_rate,
3211 cli->tc_rule_generation, cli->tc_ntoken,
3212 cli->tc_rule, cli->tc_rule->tr_rpc_rate,
3213 cli->tc_rule->tr_generation);
3219 * Removes request \a nrq from \a policy's list of queued requests.
3221 * \param[in] policy The policy
3222 * \param[in] nrq The request to remove
3224 static void nrs_tbf_req_del(struct ptlrpc_nrs_policy *policy,
3225 struct ptlrpc_nrs_request *nrq)
3227 struct nrs_tbf_head *head;
3228 struct nrs_tbf_client *cli;
3230 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3232 cli = container_of(nrs_request_resource(nrq),
3233 struct nrs_tbf_client, tc_res);
3234 head = container_of(nrs_request_resource(nrq)->res_parent,
3235 struct nrs_tbf_head, th_res);
3237 LASSERT(!list_empty(&nrq->nr_u.tbf.tr_list));
3238 list_del_init(&nrq->nr_u.tbf.tr_list);
3239 if (list_empty(&cli->tc_list)) {
3240 cfs_binheap_remove(head->th_binheap,
3242 cli->tc_in_heap = false;
3244 cfs_binheap_relocate(head->th_binheap,
3250 * Prints a debug statement right before the request \a nrq stops being
3253 * \param[in] policy The policy handling the request
3254 * \param[in] nrq The request being handled
3256 * \see ptlrpc_server_finish_request()
3257 * \see ptlrpc_nrs_req_stop_nolock()
3259 static void nrs_tbf_req_stop(struct ptlrpc_nrs_policy *policy,
3260 struct ptlrpc_nrs_request *nrq)
3262 struct ptlrpc_request *req = container_of(nrq, struct ptlrpc_request,
3265 assert_spin_locked(&policy->pol_nrs->nrs_svcpt->scp_req_lock);
3267 CDEBUG(D_RPCTRACE, "NRS stop %s request from %s, seq: %llu\n",
3268 policy->pol_desc->pd_name, libcfs_id2str(req->rq_peer),
3269 nrq->nr_u.tbf.tr_sequence);
3272 #ifdef CONFIG_PROC_FS
3279 * The maximum RPC rate.
3281 #define LPROCFS_NRS_RATE_MAX 65535
3284 ptlrpc_lprocfs_nrs_tbf_rule_seq_show(struct seq_file *m, void *data)
3286 struct ptlrpc_service *svc = m->private;
3289 seq_printf(m, "regular_requests:\n");
3291 * Perform two separate calls to this as only one of the NRS heads'
3292 * policies may be in the ptlrpc_nrs_pol_state::NRS_POL_STATE_STARTED or
3293 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPING state.
3295 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_REG,
3297 NRS_CTL_TBF_RD_RULE,
3301 * -ENOSPC means buf in the parameter m is overflow, return 0
3302 * here to let upper layer function seq_read alloc a larger
3303 * memory area and do this process again.
3305 } else if (rc == -ENOSPC) {
3309 * Ignore -ENODEV as the regular NRS head's policy may be in the
3310 * ptlrpc_nrs_pol_state::NRS_POL_STATE_STOPPED state.
3312 } else if (rc != -ENODEV) {
3316 if (!nrs_svc_has_hp(svc))
3319 seq_printf(m, "high_priority_requests:\n");
3320 rc = ptlrpc_nrs_policy_control(svc, PTLRPC_NRS_QUEUE_HP,
3322 NRS_CTL_TBF_RD_RULE,
3326 * -ENOSPC means buf in the parameter m is overflow, return 0
3327 * here to let upper layer function seq_read alloc a larger
3328 * memory area and do this process again.
3330 } else if (rc == -ENOSPC) {
3339 static int nrs_tbf_id_parse(struct nrs_tbf_cmd *cmd, char *token)
3344 switch (cmd->u.tc_start.ts_valid_type) {
3345 case NRS_TBF_FLAG_JOBID:
3346 rc = nrs_tbf_jobid_parse(cmd, token);
3348 case NRS_TBF_FLAG_NID:
3349 rc = nrs_tbf_nid_parse(cmd, token);
3351 case NRS_TBF_FLAG_OPCODE:
3352 rc = nrs_tbf_opcode_parse(cmd, token);
3354 case NRS_TBF_FLAG_GENERIC:
3355 rc = nrs_tbf_generic_parse(cmd, token);
3357 case NRS_TBF_FLAG_UID:
3358 case NRS_TBF_FLAG_GID:
3359 rc = nrs_tbf_ug_id_parse(cmd, token);
3368 static void nrs_tbf_cmd_fini(struct nrs_tbf_cmd *cmd)
3370 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3371 switch (cmd->u.tc_start.ts_valid_type) {
3372 case NRS_TBF_FLAG_JOBID:
3373 nrs_tbf_jobid_cmd_fini(cmd);
3375 case NRS_TBF_FLAG_NID:
3376 nrs_tbf_nid_cmd_fini(cmd);
3378 case NRS_TBF_FLAG_OPCODE:
3379 nrs_tbf_opcode_cmd_fini(cmd);
3381 case NRS_TBF_FLAG_GENERIC:
3382 nrs_tbf_generic_cmd_fini(cmd);
3384 case NRS_TBF_FLAG_UID:
3385 case NRS_TBF_FLAG_GID:
3386 nrs_tbf_id_cmd_fini(cmd);
3389 CWARN("unknown NRS_TBF_FLAGS:0x%x\n",
3390 cmd->u.tc_start.ts_valid_type);
3395 static bool name_is_valid(const char *name)
3399 for (i = 0; i < strlen(name); i++) {
3400 if ((!isalnum(name[i])) &&
3408 nrs_tbf_parse_value_pair(struct nrs_tbf_cmd *cmd, char *buffer)
3416 key = strsep(&val, "=");
3417 if (val == NULL || strlen(val) == 0)
3420 /* Key of the value pair */
3421 if (strcmp(key, "rate") == 0) {
3422 rc = kstrtoull(val, 10, &rate);
3426 if (rate <= 0 || rate >= LPROCFS_NRS_RATE_MAX)
3429 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3430 cmd->u.tc_start.ts_rpc_rate = rate;
3431 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3432 cmd->u.tc_change.tc_rpc_rate = rate;
3435 } else if (strcmp(key, "rank") == 0) {
3436 if (!name_is_valid(val))
3439 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE)
3440 cmd->u.tc_start.ts_next_name = val;
3441 else if (cmd->tc_cmd == NRS_CTL_TBF_CHANGE_RULE)
3442 cmd->u.tc_change.tc_next_name = val;
3445 } else if (strcmp(key, "realtime") == 0) {
3446 unsigned long realtime;
3448 rc = kstrtoul(val, 10, &realtime);
3453 cmd->u.tc_start.ts_rule_flags |= NTRS_REALTIME;
3461 nrs_tbf_parse_value_pairs(struct nrs_tbf_cmd *cmd, char *buffer)
3468 while (val != NULL && strlen(val) != 0) {
3469 token = strsep(&val, " ");
3470 rc = nrs_tbf_parse_value_pair(cmd, token);
3475 switch (cmd->tc_cmd) {
3476 case NRS_CTL_TBF_START_RULE:
3477 if (cmd->u.tc_start.ts_rpc_rate == 0)
3478 cmd->u.tc_start.ts_rpc_rate = tbf_rate;
3480 case NRS_CTL_TBF_CHANGE_RULE:
3481 if (cmd->u.tc_change.tc_rpc_rate == 0 &&
3482 cmd->u.tc_change.tc_next_name == NULL)
3485 case NRS_CTL_TBF_STOP_RULE:
3493 static struct nrs_tbf_cmd *
3494 nrs_tbf_parse_cmd(char *buffer, unsigned long count, __u32 type_flag)
3496 static struct nrs_tbf_cmd *cmd;
3503 GOTO(out, rc = -ENOMEM);
3504 memset(cmd, 0, sizeof(*cmd));
3507 token = strsep(&val, " ");
3508 if (val == NULL || strlen(val) == 0)
3509 GOTO(out_free_cmd, rc = -EINVAL);
3511 /* Type of the command */
3512 if (strcmp(token, "start") == 0) {
3513 cmd->tc_cmd = NRS_CTL_TBF_START_RULE;
3514 cmd->u.tc_start.ts_valid_type = type_flag;
3515 } else if (strcmp(token, "stop") == 0)
3516 cmd->tc_cmd = NRS_CTL_TBF_STOP_RULE;
3517 else if (strcmp(token, "change") == 0)
3518 cmd->tc_cmd = NRS_CTL_TBF_CHANGE_RULE;
3520 GOTO(out_free_cmd, rc = -EINVAL);
3522 /* Name of the rule */
3523 token = strsep(&val, " ");
3524 if ((val == NULL && cmd->tc_cmd != NRS_CTL_TBF_STOP_RULE) ||
3525 !name_is_valid(token))
3526 GOTO(out_free_cmd, rc = -EINVAL);
3527 cmd->tc_name = token;
3529 if (cmd->tc_cmd == NRS_CTL_TBF_START_RULE) {
3533 val = strrchr(token, '}');
3535 GOTO(out_free_cmd, rc = -EINVAL);
3541 } else if (*val == ' ') {
3545 GOTO(out_free_cmd, rc = -EINVAL);
3547 rc = nrs_tbf_id_parse(cmd, token);
3549 GOTO(out_free_cmd, rc);
3552 rc = nrs_tbf_parse_value_pairs(cmd, val);
3554 GOTO(out_cmd_fini, rc = -EINVAL);
3557 nrs_tbf_cmd_fini(cmd);
3567 * Get the TBF policy type (nid, jobid, etc) preset by
3568 * proc entry 'nrs_policies' for command buffer parsing.
3570 * \param[in] svc the PTLRPC service
3571 * \param[in] queue the NRS queue type
3573 * \retval the preset TBF policy type flag
3576 nrs_tbf_type_flag(struct ptlrpc_service *svc, enum ptlrpc_nrs_queue_type queue)
3581 rc = ptlrpc_nrs_policy_control(svc, queue,
3583 NRS_CTL_TBF_RD_TYPE_FLAG,
3586 type = NRS_TBF_FLAG_INVALID;
3591 extern struct nrs_core nrs_core;
3592 #define LPROCFS_WR_NRS_TBF_MAX_CMD (4096)
3594 ptlrpc_lprocfs_nrs_tbf_rule_seq_write(struct file *file,
3595 const char __user *buffer,
3596 size_t count, loff_t *off)
3598 struct seq_file *m = file->private_data;
3599 struct ptlrpc_service *svc = m->private;
3603 static struct nrs_tbf_cmd *cmd;
3604 enum ptlrpc_nrs_queue_type queue = PTLRPC_NRS_QUEUE_BOTH;
3605 unsigned long length;
3608 OBD_ALLOC(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3609 if (kernbuf == NULL)
3610 GOTO(out, rc = -ENOMEM);
3612 if (count > LPROCFS_WR_NRS_TBF_MAX_CMD - 1)
3613 GOTO(out_free_kernbuff, rc = -EINVAL);
3615 if (copy_from_user(kernbuf, buffer, count))
3616 GOTO(out_free_kernbuff, rc = -EFAULT);
3619 token = strsep(&val, " ");
3621 GOTO(out_free_kernbuff, rc = -EINVAL);
3623 if (strcmp(token, "reg") == 0) {
3624 queue = PTLRPC_NRS_QUEUE_REG;
3625 } else if (strcmp(token, "hp") == 0) {
3626 queue = PTLRPC_NRS_QUEUE_HP;
3628 kernbuf[strlen(token)] = ' ';
3631 length = strlen(val);
3634 GOTO(out_free_kernbuff, rc = -EINVAL);
3636 if (queue == PTLRPC_NRS_QUEUE_HP && !nrs_svc_has_hp(svc))
3637 GOTO(out_free_kernbuff, rc = -ENODEV);
3638 else if (queue == PTLRPC_NRS_QUEUE_BOTH && !nrs_svc_has_hp(svc))
3639 queue = PTLRPC_NRS_QUEUE_REG;
3641 cmd = nrs_tbf_parse_cmd(val, length, nrs_tbf_type_flag(svc, queue));
3643 GOTO(out_free_kernbuff, rc = PTR_ERR(cmd));
3646 * Serialize NRS core lprocfs operations with policy registration/
3649 mutex_lock(&nrs_core.nrs_mutex);
3650 rc = ptlrpc_nrs_policy_control(svc, queue,
3652 NRS_CTL_TBF_WR_RULE,
3654 mutex_unlock(&nrs_core.nrs_mutex);
3656 nrs_tbf_cmd_fini(cmd);
3659 OBD_FREE(kernbuf, LPROCFS_WR_NRS_TBF_MAX_CMD);
3661 return rc ? rc : count;
3663 LPROC_SEQ_FOPS(ptlrpc_lprocfs_nrs_tbf_rule);
3666 * Initializes a TBF policy's lprocfs interface for service \a svc
3668 * \param[in] svc the service
3671 * \retval != 0 error
3673 static int nrs_tbf_lprocfs_init(struct ptlrpc_service *svc)
3675 struct lprocfs_vars nrs_tbf_lprocfs_vars[] = {
3676 { .name = "nrs_tbf_rule",
3677 .fops = &ptlrpc_lprocfs_nrs_tbf_rule_fops,
3682 if (svc->srv_procroot == NULL)
3685 return lprocfs_add_vars(svc->srv_procroot, nrs_tbf_lprocfs_vars, NULL);
3689 * Cleans up a TBF policy's lprocfs interface for service \a svc
3691 * \param[in] svc the service
3693 static void nrs_tbf_lprocfs_fini(struct ptlrpc_service *svc)
3695 if (svc->srv_procroot == NULL)
3698 lprocfs_remove_proc_entry("nrs_tbf_rule", svc->srv_procroot);
3701 #endif /* CONFIG_PROC_FS */
3704 * TBF policy operations
3706 static const struct ptlrpc_nrs_pol_ops nrs_tbf_ops = {
3707 .op_policy_start = nrs_tbf_start,
3708 .op_policy_stop = nrs_tbf_stop,
3709 .op_policy_ctl = nrs_tbf_ctl,
3710 .op_res_get = nrs_tbf_res_get,
3711 .op_res_put = nrs_tbf_res_put,
3712 .op_req_get = nrs_tbf_req_get,
3713 .op_req_enqueue = nrs_tbf_req_add,
3714 .op_req_dequeue = nrs_tbf_req_del,
3715 .op_req_stop = nrs_tbf_req_stop,
3716 #ifdef CONFIG_PROC_FS
3717 .op_lprocfs_init = nrs_tbf_lprocfs_init,
3718 .op_lprocfs_fini = nrs_tbf_lprocfs_fini,
3723 * TBF policy configuration
3725 struct ptlrpc_nrs_pol_conf nrs_conf_tbf = {
3726 .nc_name = NRS_POL_NAME_TBF,
3727 .nc_ops = &nrs_tbf_ops,
3728 .nc_compat = nrs_policy_compat_all,
3735 #endif /* HAVE_SERVER_SUPPORT */